Python socket客户端接收消息和发送心跳包
程序员文章站
2022-03-22 14:36:40
Python socket客户端接收消息和发送心跳包十分想念顺店杂可。。。根据接口文档信息,接收消息接口定义Topfit服务端提供Socket端口侦听,以TCP方式提供数据接口。推送场景为:异动系统中编辑的异动公式触发结果。Socket消息的数据格式为基于JSON格式的字符串,在向Socket发送时应以UTF8格式编码后序列化为二进制字节流。心跳测试:系统接收JSON格式心跳数据并做出响应,当客户端5分钟(可设置)无任何请求时,链接自动断开。...
Python socket客户端接收消息和发送心跳包
十分想念顺店杂可。。。
根据接口文档信息,接收消息
接口定义
Topfit服务端提供Socket端口侦听,以TCP方式提供数据接口。
推送场景为:异动系统中编辑的异动公式触发结果。
Socket消息的数据格式为基于JSON格式的字符串,在向Socket发送时应以UTF8格式编码后序列化为二进制字节流。
心跳测试:系统接收JSON格式心跳数据并做出响应,当客户端5分钟(可设置)无任何请求时,链接自动断开。
为防止TCP粘包现象,所有数据在发送前,字符串添加表头(0x1b,0x1b)字符和表尾(0x1c,0x1c),作为记录分割符。
下面为代码,仅供参考
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @Time : 2020/11/19 13:50
# @Author : 甄超锋
import socket
import time
import threading
import json
class SocketTest(object):
def __init__(self, size, ip_port, login_dict):
"""
:param size: 接收报文大小
:param ip_port: (host,port)
:param login_dict: 登录报文
"""
self.sk = None
self.size = size
self.ip_port = ip_port
self.login_dict = login_dict
# 建立socket连接
def connect(self):
self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sk.connect(self.ip_port)
except Exception as e:
print(e)
# 重新连接 5s/次
def reconnect(self):
while True:
try:
self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sk.connect(self.ip_port)
self.login_send()
print('-----------------------------------------')
print('client start connect to host/port:{}'.format(self.ip_port))
print('-----------------------------------------')
break
except ConnectionRefusedError:
print('socket server refused or not started, reconnect to server in 5s .... host/port:{}'.format(
self.ip_port))
time.sleep(5)
except Exception as e:
print('do connect error:{}'.format(str(e)))
time.sleep(5)
# 间隔30s发送心跳
def heartbeats(self):
while True:
msg = str(time.strftime("%Y-%m-%d %H:%M:%S"))
try:
heartbeat = {
"msgType": 3,
"time": msg
}
heartbeat = json.dumps(heartbeat)
heartbeat = "0x1b,0x1b" + heartbeat + "0x1c,0x1c"
self.sk.send(heartbeat.encode("utf8"))
print(msg + " 发送心跳{}".format(heartbeat))
except socket.error:
print(f'\r\nsocket error,do reconnect:{msg}')
time.sleep(5)
self.reconnect()
except Exception as e:
print(f'\r\nother error occur:{msg}\n{e}')
time.sleep(5)
self.reconnect()
time.sleep(30)
# 发送登录验证
def login_send(self):
msg = str(time.strftime("%Y-%m-%d %H:%M:%S"))
try:
login_dict = json.dumps(self.login_dict)
login_dict = "0x1b,0x1b" + login_dict + "0x1c,0x1c"
self.sk.send(login_dict.encode("utf-8"))
except socket.error:
print(f'\r\nsocket error,do reconnect:{msg}\n{login_dict}')
time.sleep(5)
except Exception as e:
print(f'\r\nother error occur:{msg}\n{login_dict}\n{e}')
time.sleep(5)
# 接收数据
def rec(self):
while True:
msg = str(time.strftime("%Y-%m-%d %H:%M:%S"))
try:
data = self.sk.recv(self.size)
print(msg + " 接收到消息: {}".format(data.decode()))
if not data.decode():
self.reconnect()
# TODO 后续业务处理
except socket.error:
print(f'\r\nsocket error,do reconnect:{msg}\n{data}')
time.sleep(5)
self.reconnect()
except Exception as e:
print(f'\r\nother error occur:{msg}\n{data}\n{e}')
time.sleep(5)
self.reconnect()
if __name__ == '__main__':
host = "127.0.0.1"
port = 9785
ip_port = (host, port)
SIZE = 1024
login_dict = {
"msgType": 1,
"msgLength": 29,
"content": {
"userName": "",
"pwd": ""
}
}
socket1 = SocketTest(
size=SIZE,
ip_port=ip_port,
login_dict=login_dict
)
print("连接客户端")
socket1.connect()
print("登录login")
socket1.login_send()
print("开始接收")
t1 = threading.Thread(target=socket1.rec) # 接收线程
t2 = threading.Thread(target=socket1.heartbeats) # 心跳线程
t1.start()
t2.start()
t1.join()
t2.join()
pythonQQ交流群:785239887
本文地址:https://blog.csdn.net/Mr_Zhen/article/details/109815932
上一篇: Python_XXBJ基本图形绘制