欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Springboot之整合Socket连接案例

程序员文章站 2022-04-06 22:04:42
socket连接与硬件通信一、如何让socket随着springboot项目一起启动springboot中commandlinerunner的作用:平常开发中有可能需要实现在项目启动后执行的功能,sp...

socket连接与硬件通信

一、如何让socket随着springboot项目一起启动

springboot中commandlinerunner的作用:平常开发中有可能需要实现在项目启动后执行的功能,springboot提供的一种简单的实现方案就是添加一个model并实现commandlinerunner接口,实现功能的代码放在实现的run方法中

具体实现

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.commandlinerunner;
import org.springframework.stereotype.component;
import java.net.serversocket;
import java.net.socket;
import java.util.concurrent.arrayblockingqueue;
import java.util.concurrent.threadpoolexecutor;
import java.util.concurrent.timeunit;
/**
 * @author 易水●墨龙吟
 * @description
 * @create 2019-04-14 23:40
 */
@component
public class testrunner implements commandlinerunner {
  @autowired
  private socketproperties properties;
  @override
  public void run(string... args) throws exception {
    serversocket server = null;
    socket socket = null;
    server = new serversocket(properties.getport());
    system.out.println("设备服务器已经开启, 监听端口:" + properties.getport());
    threadpoolexecutor pool = new threadpoolexecutor(
        properties.getpoolcore(),
        properties.getpoolmax(),
        properties.getpoolkeep(),
        timeunit.seconds,
        new arrayblockingqueue<runnable>(properties.getpoolqueueinit()),
        new threadpoolexecutor.discardoldestpolicy()
    );
    while (true) {
      socket = server.accept();
      pool.execute(new serverconfig(socket));
    }
  }
}

此处使用了自定义的线程池,提高对于socket的客户端处理能力。

二、自定义配置并使用

此处将socket的端口和线程池的一些配置放到 application.yml中使用,方便使用和修改

# socket配置
socket:
 # 监听端口 2323
 port: 2323
 # 线程池 - 保持线程数 20
 pool-keep: 20
 # 线程池 - 核心线程数 10
 pool-core: 10
 # 线程池 - 最大线程数 20
 pool-max: 30
 # 线程队列容量 10
 pool-queue-init: 10
import lombok.getter;
import lombok.setter;
import lombok.tostring;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.context.annotation.configuration;
import org.springframework.context.annotation.propertysource;
import org.springframework.stereotype.component;
/**
 * @author 易水●墨龙吟
 * @description
 * @create 2019-04-18 22:35
 */
@setter
@getter
@tostring
@component
@configuration
@propertysource("classpath:application.yml")
@configurationproperties(prefix = "socket")
public class socketproperties {
  private integer port;
  private integer poolkeep;
  private integer poolcore;
  private integer poolmax;
  private integer poolqueueinit;
}

三、socket对于客户端发来的信息的处理和重发机制

当客户端端连接之后发送信息,如果超时未发送,将会关闭,发送数据有异常将会返回给客户端一个error,让客户端在发送一次数据。

import com.farm.config.socket.resolve.messagechain;
import com.farm.service.environmentservice;
import com.farm.service.impl.environmentserviceimpl;
import java.io.*;
import java.net.socket;
import java.net.socketexception;
import java.net.sockettimeoutexception;
import java.util.map;
/**
 * @author 易水●墨龙吟
 * @description
 * @create 2019-04-14 23:21
 */
public class serverconfig extends thread {
  private socket socket;
  public serverconfig(socket socket) {
    this.socket = socket;
  }
	// 获取spring容器管理的类,可以获取到sevrice的类
  private environmentservice service = springutil.getbean(environmentserviceimpl.class);
  private string handle(inputstream inputstream) throws ioexception, dataformexception {
    byte[] bytes = new byte[1024];
    int len = inputstream.read(bytes);
    if (len != -1) {
      stringbuffer request = new stringbuffer();
      request.append(new string(bytes, 0, len, "utf-8"));
      system.out.println("接受的数据: " + request);
      system.out.println("from client ... " + request + "当前线程" + thread.currentthread().getname());
      map<string, string> map = messagechain.out(request.tostring());
      system.out.println("处理的数据" + map);
      integer res = service.addenvironment(map);
      if (res == 1) {
        return "ok";
      } else {
        throw new dataformexception("数据处理异常");
      }
    } else {
      throw new dataformexception("数据处理异常");
    }
  }
  @override
  public void run() {
    bufferedwriter writer = null;
    try {
      // 设置连接超时9秒
      socket.setsotimeout(9000);
      system.out.println("客户 - " + socket.getremotesocketaddress() + " -> 机连接成功");
      inputstream inputstream = socket.getinputstream();
      writer = new bufferedwriter(new outputstreamwriter(socket.getoutputstream()));
      string result = null;
      try {
        result = handle(inputstream);
        writer.write(result);
        writer.newline();
        writer.flush();
      } catch (ioexception | dataformexception | illegalargumentexception e) {
        writer.write("error");
        writer.newline();
        writer.flush();
        system.out.println("发生异常");
        try {
          system.out.println("再次接受!");
          result = handle(inputstream);
          writer.write(result);
          writer.newline();
          writer.flush();
        } catch (dataformexception | sockettimeoutexception ex) {
          system.out.println("再次接受, 发生异常,连接关闭");
        }
      }
    } catch (socketexception socketexception) {
      socketexception.printstacktrace();
      try {
        writer.close();
      } catch (ioexception ioexception) {
        ioexception.printstacktrace();
      }
    } catch (ioexception e) {
      e.printstacktrace();
    } finally {
      try {
        writer.close();
      } catch (ioexception e) {
        e.printstacktrace();
      }
    }
  }
}

在此处有一个坑,如果客户端是用c/c++编写的,必须使用如下方法:

byte[] bytes = new byte[1024];
int len = inputstream.read(bytes);

如果使用readline或者 datainputstream datainputstream =new datainputstream(socket.getinputstream())这样会出现使用tcp连接助手,客户端发送数据收不到。

四、如何在普通类中使用spring注入类

这里需要使用一个工具类。

import org.springframework.beans.beansexception;
import org.springframework.context.applicationcontext;
import org.springframework.context.applicationcontextaware;
import org.springframework.stereotype.component;
/**
 * @author 易水●墨龙吟
 * @description
 * @create 2019-04-15 0:01
 */
@component
public class springutil implements applicationcontextaware {
  private static applicationcontext applicationcontext;
  @override
  public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {
    if (springutil.applicationcontext == null) {
      springutil.applicationcontext = applicationcontext;
    }
  }
  /**
   * 获取applicationcontext
   * @return
   */
  public static applicationcontext getapplicationcontext() {
    return applicationcontext;
  }
  /**
   * 通过name获取 bean.
   * @param name
   * @return
   */
  public static object getbean(string name){
    return getapplicationcontext().getbean(name);
  }
  /**
   * 通过class获取bean.
   * @param clazz
   * @param <t>
   * @return
   */
  public static <t> t getbean(class<t> clazz){
    return getapplicationcontext().getbean(clazz);
  }
  /**
   * 通过name,以及clazz返回指定的bean
   * @param name
   * @param clazz
   * @param <t>
   * @return
   */
  public static <t> t getbean(string name,class<t> clazz){
    return getapplicationcontext().getbean(name, clazz);
  }
}

补充:springboot下websocket前台后端数据长连接

首先导入依赖

 <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-websocket</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.security</groupid>
      <artifactid>spring-security-messaging</artifactid>
    </dependency>

spring-security-messaging 是后面继承 abstractsecuritywebsocketmessagebrokerconfigurer需要用到的依赖

websocketconfig

@configuration
@enablewebsocketmessagebroker //此注解表示使用stomp协议来传输基于消息代理的消息,此时可以在@controller类中使用@messagemapping 
public class websocketconfig implements websocketmessagebrokerconfigurer {
  @override
  public void registerstompendpoints(stompendpointregistry registry) {
     /**
     * 注册 stomp的端点
     * addendpoint:添加stomp协议的端点。这个http url是供websocket或sockjs客户端访问的地址
     * withsockjs:指定端点使用sockjs协议
     */
    registry.addendpoint("/websocket/tracker")   //物流消息通道,
      .setallowedorigins("*")   //允许跨域,里面路径可以设定
      .withsockjs()   //指定协议
      .setinterceptors(httpsessionhandshakeinterceptor()) ;    //设置拦截器()
  }
  @override
  public void configuremessagebroker(messagebrokerregistry registry) {
     /**
     * 配置消息代理
     * 启动简单broker,消息的发送的地址符合配置的前缀来的消息才发送到这个broker
     */
    registry.enablesimplebroker("/topic","/user");
  }
 //拦截器
 @bean
  public handshakeinterceptor httpsessionhandshakeinterceptor() {
    return new handshakeinterceptor() {
      @override
      public boolean beforehandshake(serverhttprequest request, serverhttpresponse response, websockethandler wshandler, map<string, object> attributes) throws exception {
        //可以在这里先判断登录是否合法
        return true;
      }
      @override
      public void afterhandshake(serverhttprequest request, serverhttpresponse response, websockethandler wshandler, exception exception) {
  //握手成功后,
      }
    };
  }
}

websocketsecurityconfiguration

@configuration
public class websocketsecurityconfiguration extends abstractsecuritywebsocketmessagebrokerconfigurer {
  @override
  protected void configureinbound(messagesecuritymetadatasourceregistry messages) {
    messages
      .nulldestmatcher().authenticated()
      .simpdestmatchers("/topic/**").authenticated()
      .simpdestmatchers("/user/**").authenticated()
      .simptypematchers(simpmessagetype.message, simpmessagetype.subscribe).denyall()
      // catch all
      .anymessage().denyall();
  }
  /**
   * disables csrf for websockets.
   */
  @override
  protected boolean sameorigindisabled() {
    return true;
  }
}

websocketresource

package com.gleam.shopmall.web.rest;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.context.applicationlistener;
import org.springframework.messaging.handler.annotation.messagemapping;
import org.springframework.messaging.handler.annotation.sendto;
import org.springframework.messaging.simp.simpmessageheaderaccessor;
import org.springframework.messaging.simp.simpmessagemappinginfo;
import org.springframework.messaging.simp.simpmessagesendingoperations;
import org.springframework.stereotype.controller;
import org.springframework.web.socket.messaging.sessiondisconnectevent;
@controller
public class websocketresource {
  private static final logger log = loggerfactory.getlogger(websocketresource.class);
  @autowired
  simpmessagesendingoperations messagingtemplate;
 //此方法适用于网页聊天室,从前端接收数据,返回订阅者(前端)
  @messagemapping("/welcome") //指定要接收消息的地址,类似@requestmapping
  @sendto("/topic/getresponse")  //默认消息将被发送到与传入消息相同的目的地,但是目的地前面附加前缀(默认情况下为“/topic”}
  public string say(string message) throws exception {
    return message;
  }
 //发送指定用户(直接从后端发送数据到前端)
  public void sendtouser(string login,string channel, string info) {
    log.debug("[touser]websocket发送消息, username={}, info={}", login, info);
    this.messagingtemplate.convertandsendtouser(login, channel, info);
    log.debug("[touser]websocket发送消息:完成");
  }
 //发送所有订阅的(直接从后端发送数据到前端)
  public void send(string channel, string info) {
    log.debug("[toall]websocket发送消息, info={}", info);
    // this.messagingtemplate.convertandsend(channel, info);
    this.messagingtemplate.convertandsend("/topic/getresponse", "接收到了吗?");
    log.debug("[toall]websocket发送消息:完成");
  }
}

前端html

<!doctype html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
  <meta charset="utf-8" />
  <script src="http://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
  <script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>
  <script src="http://code.jquery.com/jquery-1.7.2.min.js"></script>
  <script src="http://pv.sohu.com/cityjson?ie=utf-8"></script>
  <title>spring boot+websocket+广播式</title>
  <script type="text/javascript">
    var stompclient = null;
    function setconnected(connected) {
      document.getelementbyid('connect').disabled = connected;
      document.getelementbyid('disconnect').disabled = !connected;
      document.getelementbyid('conversationdiv').style.visibility = connected ? 'visible' : 'hidden';
      $('#response').html();
    }
    function connect() {
     // websocket的连接地址,此值等于websocketconfig中registry.addendpoint("/websocket/tracker").withsockjs()配置的地址,
     //这里如果是微服务或者远端,需要全路径
      var socket = new sockjs('/websocket/tracker'); //1
      stompclient = stomp.over(socket);//2
      stompclient.connect({}, function(frame) {//3
        setconnected(true);
        console.log('开始进行连接connected: ' + frame);
        // 客户端订阅消息的目的地址:此值等于websocketresource中@sendto("/topic/getresponse")注解的里配置的值
        stompclient.subscribe('/topic/getresponse', function(respnose){ //4
          showresponse(respnose.body);
        });
      });
    }
    function disconnect() {
      if (stompclient != null) {
        stompclient.disconnect();
      }
      setconnected(false);
      console.log("disconnected");
    }
    function sendname() {
      var name = $('#name').val();
      stompclient.send("/welcome", {}, returncitysn['cip'] +":"+name);// json.stringify(name)
    }
    function showresponse(message) {
      var response = $("#response");
      response.html(message+"<br>" + response.html());
    }
  </script>
</head>
<body onload="disconnect()">
<noscript><h2 style="color: red">貌似你的浏览器不支持websocket</h2></noscript>
<div>
  <div>
    <button id="connect" onclick="connect();" style="color: red">连接</button>
    <button id="disconnect" disabled="disabled" onclick="disconnect();">断开连接</button>
  </div>
  <div id="conversationdiv">
    <label>输入内容</label><input type="text" id="name" />
    <button id="sendname" onclick="sendname();">发送</button>
    <p id="response"></p>
  </div>
</div>
</body>
</html>```

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。如有错误或未考虑完全的地方,望不吝赐教。