springboot TCP socket通信远程监听采集数据.
记录下最近研究的成果,以免以后用到的时候忘记了.
socket建立长链接,双方(客户端--服务器)握手后一方不断掉信道就会一直存在.本次文章业务场景是模拟温度湿度传感器发送温度湿度数据通过网关传送到服务器,服务器接收消息和作出回应.
项目基于springboot2.1.3构建主要用到萝卜和commons-codec辅助(主要是java.nio包)
<!--字节转换-->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
<scope>provided</scope>
</dependency>
具体思路:
一、首先启动服务端socket,创建 SocketServer
@Slf4j
@Data
@Component
@NoArgsConstructor
public class SocketServer {
private Integer port = 9090;
private boolean started;
private ServerSocket serverSocket;
// 防止重复创建socket线程链接对象浪费资源
private ExecutorService executorService = Executors.newCachedThreadPool();
public void start(){
start(null);
}
public void start(Integer port){
log.info("port: {}, {}", this.port, port);
try {
serverSocket = new ServerSocket(port == null ? this.port : port);
started = true;
log.info("Socket服务已启动,占用端口: {}", serverSocket.getLocalPort());
} catch (IOException e) {
log.error("端口冲突,异常信息:{}", e);
System.exit(0);
}
while (true){
try {
// 开启socket监听
Socket socket = serverSocket.accept();
ClientSocket register = register(socket);
// 在此判断是否重复创建socket对象线程
if (register != null){
executorService.submit(register);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
该类主要是启动socket监听端口,并开启监听创建ClientSocket对象
二、创建全局Map存放现以链接服务器的socket SocketPool
public class SocketPool {
private static final ConcurrentHashMap<String, ClientSocket> ONLINE_SOCKET_MAP = new ConcurrentHashMap<>();
public static void add(ClientSocket clientSocket){
if (clientSocket != null && !clientSocket.getKey().isEmpty())
ONLINE_SOCKET_MAP.put(clientSocket.getKey(), clientSocket);
}
public static void remove(String key){
if (!key.isEmpty())
ONLINE_SOCKET_MAP.remove(key);
}
}
三、封装socket对象 ClientSocket 每个链接都是一个ClientSocket对象.我们可以在此类中和客户端进行收到等操作达到监听效果.也可以将收到的数据保存到数据库中,以便后续业务需求.
@Slf4j
@Data
public class ClientSocket implements Runnable {
private Socket socket;
private DataInputStream inputStream;
private DataOutputStream outputStream;
private String key;
private String message;
@Override
public void run() {
while (true){
try {
onMessage(this);
log.info(LocalDateTime.now()+"当前设备:"+this.key+" 接收到数据: <<<<<<" + this.message);
} catch (Exception e) {
e.printStackTrace();
}
if (isSocketClosed(this)){
log.info("客户端已关闭,其Key值为:{}", this.getKey());
//关闭对应的服务端资源
close(this);
break;
}
}
}
}
四、创建socket任务处理器 SocketHandler,里面封装读写数据,销毁链接,等方法.
@Slf4j
public class SocketHandler {
/**
* 将连接的Socket注册到Socket池中
* @param socket
* @return
*/
public static ClientSocket register(Socket socket){
ClientSocket clientSocket = new ClientSocket();
clientSocket.setSocket(socket);
try {
clientSocket.setInputStream(new DataInputStream(socket.getInputStream()));
clientSocket.setOutputStream(new DataOutputStream(socket.getOutputStream()));
byte[] bytes = new byte[1024];
clientSocket.getInputStream().read(bytes);
clientSocket.setKey(new String(bytes, "utf-8"));
add(clientSocket);
return clientSocket;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 向指定客户端发送信息
* @param clientSocket
* @param message
*/
public static void sendMessage(ClientSocket clientSocket, String message){
try {
log.info("发送消息到客户端 : >>>>>" + message);
clientSocket.getOutputStream().write(message.getBytes("utf-8"));
//clientSocket.getOutputStream().writeUTF(message);
} catch (IOException e) {
log.error("发送信息异常:{}", e);
close(clientSocket);
}
}
/**
* 获取指定客户端的上传信息
* @param clientSocket
* @return
*/
public static void onMessage(ClientSocket clientSocket){
byte[] keyByte = new byte[1024];
byte[] msgByte = new byte[1];
try {
// 第一次先发送序列号
if(StringUtils.isEmpty(clientSocket.getKey())) {
clientSocket.getInputStream().read(keyByte);
clientSocket.setKey(new String(keyByte, "UTF-8"));
//return clientSocket.getKey();
// 以后发送数据
}else {
String info = "";
while (true) {
if (clientSocket.getInputStream().available() > 0) {
clientSocket.getInputStream().read(msgByte);
String tempStr = HexEcodeUtil.ByteArrayToHexStr(msgByte);
info += tempStr;
//已经读完
if (clientSocket.getInputStream().available() == 0) {
//重置,不然每次收到的数据都会累加起来
clientSocket.setMessage(info);
break;
}
}
}
//return clientSocket.getMessage();
}
} catch (IOException e) {
e.printStackTrace();
close(clientSocket);
}
//return null;
}
/**
* 指定Socket资源回收
* @param clientSocket
*/
public static void close(ClientSocket clientSocket){
log.info("进行资源回收");
if (clientSocket != null){
log.info("开始回收socket相关资源,其Key为{}", clientSocket.getKey());
remove(clientSocket.getKey());
Socket socket = clientSocket.getSocket();
try {
socket.shutdownInput();
socket.shutdownOutput();
} catch (IOException e) {
log.error("关闭输入输出流异常,{}", e);
}finally {
try {
socket.close();
} catch (IOException e) {
log.error("关闭socket异常{}", e);
}
}
}
}
/**
* 发送数据包,判断数据连接状态
* @param clientSocket
* @return
*/
public static boolean isSocketClosed(ClientSocket clientSocket){
try {
clientSocket.getSocket().sendUrgentData(1);
return false;
} catch (IOException e) {
return true;
}
}
}
最后一步将socket随spring容器启动类一同启动
public static void main(String[] args) {
ApplicationContext run = SpringApplication.run(CollectionApplication.class, args);
run.getBean(SocketServer.class).start();
}
至此demo基本完成,有人会奇怪前面pom导入的ommons-codec干嘛用的,这是遇到的坑,因为网关设备推送的数据是16进制码,String在读取数据转码的时候会乱码,而设备序列号却不会乱码....一度困扰我不知道怎么回事,后来查资料反应过来可能是编码的问题,于是又写了个转码的util(如果数据包不是16进制码完全用不到ommons-codec包)
下面是util
/**
* 16进制转换工具
*/
@Slf4j
public class HexEcodeUtil {
//16进制数字字符集
public static final String HEXMAXSTRING = "0123456789ABCDEF";
public static final String HEXMINSTRING = "0123456789abcdef";
/**
* byte[]转16进制Str
*
* @param byteArray
*/
public static String ByteArrayToHexStr(byte[] byteArray){
if (byteArray == null)
return null;
char[] hexArray = HEXMAXSTRING.toCharArray();
char[] hexChars = new char[byteArray.length * 2];
for (int i = 0; i < byteArray.length; i++){
int temp = byteArray[i] & 0xFF;
hexChars[i * 2] = hexArray[temp >>> 4];
hexChars[i * 2 + 1] = hexArray[temp & 0x0F];
}
return new String(hexChars);
}
/**
* Str转16进制Str
*
* @param str
* @return
*/
public static String StrToHexStr(String str) {
//根据默认编码获取字节数组
byte[] bytes = str.getBytes();
StringBuilder stringBuilder = new StringBuilder(bytes.length * 2);
//将字节数组中每个字节拆解成2位16进制整数
for (int i = 0; i < bytes.length; i++){
stringBuilder.append("0x");
stringBuilder.append(HEXMAXSTRING.charAt((bytes[i] & 0xf0) >> 4));
stringBuilder.append(HEXMAXSTRING.charAt((bytes[i] & 0x0f) >> 0));
//去掉末尾的逗号
if (i != bytes.length - 1)
stringBuilder.append(",");
}
return stringBuilder.toString();
}
/**
* 16进制Str转byte[]
*
* @param hexStr 不带空格、不带0x、不带逗号的16进制Str,如:06EEF7F1
* @return
*/
public static byte[] HexStrToByteArray(String hexStr){
byte[] byteArray = new byte[hexStr.length() / 2];
for (int i = 0; i < byteArray.length; i++){
String subStr = hexStr.substring(2 * i, 2 * i + 2);
byteArray[i] = ((byte) Integer.parseInt(subStr, 16));
}
return byteArray;
}
/**
* 温度湿度数据转换
*
*/
public static Map<String , String> HexToRead(String info){
HashMap<String, String> hashMap = new HashMap<>();
return hashMap;
}
/**
* 将16进制字符串转换为byte数组
* @param hexItr 16进制字符串
* @return
*/
public static byte[] hexItr2Arr(String hexItr) {
try {
return Hex.decodeHex(hexItr);
} catch (DecoderException e) {
log.info("16进制字符串转byte异常!");
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
//byte[] bytes = hexItr2Arr("454E383739465134563249393936394F");
byte[] bytes = hexItr2Arr("010300000002C40B010304012202585B5F");
try {
String s = new String(bytes, "UTF-8");
log.info(s);
} catch (Exception e) {
e.printStackTrace();
}
}
}
这次是真的完了...
本文地址:https://blog.csdn.net/liukangjie520/article/details/107408458
上一篇: java内存模型(Java Memory Model,JMM)
下一篇: Java入门,干货满满