三、wss连接B站弹幕
程序员文章站
2022-06-05 22:06:17
环境 一、websocket协议 1. 先建立连接 wss://broadcastlv.chat.bilibili.com/sub 2. 发送登录包 { "uid": 0表示未登录,否则为用户ID, "roomid": 房间ID, "protover": 1, "platform": "web", ......
环境
pip install ws4py from ws4py.client.threadedclient import websocketclient
一、websocket协议
- 先建立连接 wss://broadcastlv.chat.bilibili.com/sub
- 发送登录包
{
"uid": 0表示未登录,否则为用户id,
"roomid": 房间id,
"protover": 1,
"platform": "web",
"clientver": "1.4.0"
} - 每隔一段时间发送心跳包
- 接收响应
响应由头部和数据组成
- 解析响应得到数据
- 注册事件分发事件
二、工具层 utils.py
- 定时器类
可取消 - 事件类
注册事件 (可重复)
分发事件
取消事件
class timer(): def __init__(self,delay,fun): self.delay,self.f=delay,fun self.t=threading.timer(self.delay,self.fun) self.t.start() def fun(self): if self.f:self.f() self.t=threading.timer(self.delay,self.fun) self.t.start() def cancel(self): self.t.cancel() print("threading cancel") class event(): def __init__(self): self.map=[] self.keys=[] def index(self,k): i=-1 for key in self.keys: i+=1 if key==k:return i return -1 def on(self,key,fun): i=self.index(key) if i==-1: self.map.append({"key":key,"funs":[fun]}) self.keys.append(key) else: self.map[i]["funs"].append(fun) def emit(self,key,data=none): i=self.index(key) if i==-1: print("no regist event:"+str(key)) return for f in self.map[i]["funs"]:f(data) def rm(self,key,fun): i=self.index(key) if i==-1: print("no regist event:"+str(key)) return funs=self.map[i]["funs"] for j in range(len(funs)): if funs[j]==fun:funs[j]=none self.map[i]["funs"]=list(filter(none,funs))
三、服务层 danmuws.py
import threading import json import struct from ws4py.client.threadedclient import websocketclient from utils import event,timer event=event() class danmuwebsocket(websocketclient): def __init__(self,info,serveraddress='wss://broadcastlv.chat.bilibili.com/sub'): self.serveraddress=serveraddress websocketclient.__init__(self,serveraddress) danmuwebsocket.event=event danmuwebsocket.headerlength=16 self.info=info def opened(self): self.sendloginpacket(self.info['uid'],self.info['roomid'],self.info['protover'],self.info['platform'],self.info['clientver']) self.sendheartbeatpacket(); self.heartbeathandler = timer(20,self.sendheartbeatpacket) print("opened") def delay_close(self): dws=danmuwebsocket(self.info,self.serveraddress) event.emit('reconnect',dws); def closed(self, code, reason=none): print("closed", code, reason) if hasattr(self,"heartbeathandler"):self.heartbeathandler.cancel(); if code == 1000: return threading.timer(5,self.delay_close).start() print("closed", code, reason) def received_message(self, message): position,length=0,len(message.data)-1 while position<length: header_pack=struct.unpack(">ihhii",message.data[position:position+16]) length_pack=header_pack[0] operation=header_pack[3] if operation==3: num=header_pack[1]+position num=struct.unpack(">i",message.data[num:num+4])[0] event.emit('heartbeat',num) elif operation==5: data=json.loads(message.data[position+16:position+length_pack]) event.emit('cmd',data) #print("recv:"+data["cmd"]) else: event.emit('login'); position+=length_pack def senddata(self,data, protover = 1, operation = 2, sequence = 1): if type(data)==dict: data=json.dumps(data).encode() elif type(data)==str: data=data.encode() header=struct.pack(">ihhii",danmuwebsocket.headerlength+len(data),danmuwebsocket.headerlength,protover,operation,sequence) self.send(header+data) def sendloginpacket(self,uid, roomid, protover = 1, platform = 'web', clientver = '1.4.6'): # uint(4byte) + 00 10 + 00 01 + 00 00 00 07 + 00 00 00 01 + data 登录数据包 data = { 'uid': int(uid), 'roomid': int(roomid), 'protover': protover, 'platform': platform, 'clientver': clientver } print("sendloginpacket") data=json.dumps(data) data=data.replace(' ','') self.senddata(data.encode(),1,7,1) def sendheartbeatpacket(self): # uint(4byte) + 00 10 + 00 01 + 00 00 00 02 + 00 00 00 01 + data 心跳数据包 self.senddata(b'[object object]', 1, 2, 1); def bind(self,onreconnect=none,onlogin=none,onheartbeat=none,oncmd=none,onreceive =none): if "cmd" in event.keys:return if hasattr(onreconnect,"__call__"):event.on("reconnect",onreconnect) if hasattr(onlogin,"__call__"):event.on("login",onlogin) if hasattr(onheartbeat,"__call__"):event.on("heartbeat",onheartbeat) if hasattr(oncmd,"__call__"):event.on("cmd",oncmd) if hasattr(onreceive,"__call__"):event.on("receive",onreceive)
四、测试代码
from server import login headers={ 'user-agent': 'mozilla/5.0 (windows nt 10.0; win64; x64; rv:62.0) gecko/20100101 firefox/62.0', 'accept': 'application/json, text/plain, */*', 'accept-language': 'zh-cn,zh;q=0.8,zh-tw;q=0.7,zh-hk;q=0.5,en-us;q=0.3,en;q=0.2', 'accept-encoding': 'gzip, deflate, br', 'referer': 'https://live.bilibili.com/', 'origin': 'https://live.bilibili.com', 'connection': 'keep-alive' } s=session(headers,'cookie.txt') login=login(s) while not login.islogin(): login.get_vdcode() login.loop_vdcode() info={ "uid": login.info['uid'], "roomid": 7603080, "protover": 1, "platform": "web", "clientver": "1.4.0" }
from danmuws import danmuwebsocket def oncmd(data): cmd=data["cmd"] if cmd=="sys_msg": print(data) elif cmd=="special_gift": print(data) else: print(data) def onlogin(data): print("login success") def onreconnect(dws): global ws ws=dws def onheartbeat(num): print(num) try: ws = danmuwebsocket(info,'wss://broadcastlv.chat.bilibili.com/sub') ws.connect() ws.bind(none,onlogin,onheartbeat,oncmd) ws.run_forever() except: ws.close()