Android使用okHttp实现WebSocket源码分析
程序员文章站
2022-06-23 22:34:00
项目中需要实现一个类似心跳包的内容,语音通话期间,需要不断的请求服务器发送数据,确保正在连接中,开始想到的是使用service开启子线程,定时请求接口发送数据,但数据量很大时,http请求头部太大,每次请求需要发送重复的头数据,非常消耗流量,于是考虑使用长连接的方式。Android中实现WebSocket有很多种方式,这里说一下okHttp的方式。先来大致了解一下websocket。WebSocket属于双向通信协议,Http是单向的webSocket是需要握手进行建立连接的,只是在建立握手时,数据...
项目中需要实现一个类似心跳包的内容,语音通话期间,需要不断的请求服务器发送数据,确保正在连接中,开始想到的是使用service开启子线程,定时请求接口发送数据,但数据量很大时,http请求头部太大,每次请求需要发送重复的头数据,非常消耗流量,于是考虑使用长连接的方式。Android中实现WebSocket有很多种方式,这里说一下okHttp的方式。先来大致了解一下websocket。
- WebSocket属于双向通信协议,Http是单向的
- webSocket是需要握手进行建立连接的,只是在建立握手时,数据是通过HTTP传输的,但建立之后传输数据不需要http协议
- websocket的请求链接不是http://这种,而是以ws://开头
- websocket的Connection链接类型是 Upgrade:websocket,表示将该请求升级为websocket。
- websocket的头部响应码是101,表示本次链接的http协议即将被更改为upgrade:websocket 指定的协议。
使用okhttp封装websocketdemo
public class WebSocketDemo {
private final String TAG = WebSocketDemo.class.getSimpleName();
private OkHttpClient CLIENT;
private WebSocket mWebSocket;
private static WebSocketDemo ourInstance;
public static WebSocketDemo getDefault() {
if (ourInstance == null) {
synchronized (WebSocketDemo.class) {
if (ourInstance == null) {
ourInstance = new WebSocketDemo();
}
}
}
return ourInstance;
}
private WebSocketDemo() {
CLIENT = new OkHttpClient.Builder()
.writeTimeout(5, TimeUnit.SECONDS)
.readTimeout(5, TimeUnit.SECONDS)
.connectTimeout(5, TimeUnit.SECONDS)
.build();
}
public void connect(String url) {
if (mWebSocket != null) {
mWebSocket.cancel();
}
Request request = new Request.Builder()
.url(url)
.build();
mWebSocket = CLIENT.newWebSocket(request, new SocketListener());
}
public void sendMessage(String message) {
mWebSocket.send(message);
}
public void sendMessage(byte... data) {
ByteString bs = ByteString.of(data);
mWebSocket.send(bs);
}
public void close(int code, String reason) {
mWebSocket.close(code, reason);
}
class SocketListener extends WebSocketListener {
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
Log.i(TAG, "onOpen response=" + response);
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
Log.i(TAG, "onMessage text=" + text);
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
Log.i(TAG, "onMessage bytes=" + bytes);
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
super.onClosing(webSocket, code, reason);
Log.i(TAG, "onClosing code=" + code);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
Log.i(TAG, "onClosed code=" + code);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
super.onFailure(webSocket, t, response);
Log.i(TAG, "onFailure t=" + t.getMessage());
}
}
}
源码分析
- 懒汉式单例获取,保证唯一且线程安全,并创造okhttpclient实例
- 连接方法,通过client.newWebSocket()创建WebSocket对象,方法中创建RealWebSocke,并开始连接
/**
* Uses {@code request} to connect a new web socket.
*/
@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);
webSocket.connect(this);
return webSocket;
}
public RealWebSocket(Request request, WebSocketListener listener, Random random,
long pingIntervalMillis) {
if (!"GET".equals(request.method())) {
throw new IllegalArgumentException("Request must be GET: " + request.method());
}
this.originalRequest = request;
this.listener = listener;
this.random = random;
this.pingIntervalMillis = pingIntervalMillis;
byte[] nonce = new byte[16];
random.nextBytes(nonce);
this.key = ByteString.of(nonce).base64();
this.writerRunnable = new Runnable() {
@Override public void run() {
try {
while (writeOneFrame()) {
}
} catch (IOException e) {
failWebSocket(e, null);
}
}
};
}
- 通过RealWebSocket的创建内容,分析出请求必须是get请求方式,原因分析过,websocket协议需要发送一个http的get请求进行握手;接下来有一些默认赋值,最后实例了一个wirterRunnable,方法里主要是while循环不断的从队列中取出数据,发送到服务器。
- 接下来就是真正的connect方法
public void connect(OkHttpClient client) {
client = client.newBuilder()
.eventListener(EventListener.NONE)
.protocols(ONLY_HTTP1)
.build();
final Request request = originalRequest.newBuilder()
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", key)
.header("Sec-WebSocket-Version", "13")
.build();
call = Internal.instance.newWebSocketCall(client, request);
call.enqueue(new Callback() {
@Override public void onResponse(Call call, Response response) {
try {
checkResponse(response);
} catch (ProtocolException e) {
failWebSocket(e, response);
closeQuietly(response);
return;
}
// Promote the HTTP streams into web socket streams.
StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
streamAllocation.noNewStreams(); // Prevent connection pooling!
Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);
// Process all web socket messages.
try {
listener.onOpen(RealWebSocket.this, response);
String name = "OkHttp WebSocket " + request.url().redact();
initReaderAndWriter(name, streams);
streamAllocation.connection().socket().setSoTimeout(0);
loopReader();
} catch (Exception e) {
failWebSocket(e, null);
}
}
@Override public void onFailure(Call call, IOException e) {
failWebSocket(e, null);
}
});
}
- 通过该方法可以看到upgrade-websocket、connection-upgrade设置好头信息,构建符合websocket协议握手规范的http请求;enqueue()方法中,有个checkResponse()方法,其主要功能是校验回应值相关信息,包括code值,connection和upgrade等内容。
void checkResponse(Response response) throws ProtocolException {
if (response.code() != 101) {
throw new ProtocolException("Expected HTTP 101 response but was '"
+ response.code() + " " + response.message() + "'");
}
String headerConnection = response.header("Connection");
if (!"Upgrade".equalsIgnoreCase(headerConnection)) {
throw new ProtocolException("Expected 'Connection' header value 'Upgrade' but was '"
+ headerConnection + "'");
}
String headerUpgrade = response.header("Upgrade");
if (!"websocket".equalsIgnoreCase(headerUpgrade)) {
throw new ProtocolException(
"Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'");
}
String headerAccept = response.header("Sec-WebSocket-Accept");
String acceptExpected = ByteString.encodeUtf8(key + WebSocketProtocol.ACCEPT_MAGIC)
.sha1().base64();
if (!acceptExpected.equals(headerAccept)) {
throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '"
+ acceptExpected + "' but was '" + headerAccept + "'");
}
}
- 链接并校验通过后,通过流来传递数据,链接过程中设置超时时间为0,也就是永不超时,维持这个链接。最后调用loopReader()循环读取服务器消息。
/** Receive frames until there are no more. Invoked only by the reader thread. */
public void loopReader() throws IOException {
while (receivedCloseCode == -1) {
// This method call results in one or more onRead* methods being called on this thread.
reader.processNextFrame();
}
}
void processNextFrame() throws IOException {
readHeader();
if (isControlFrame) {
readControlFrame();
} else {
readMessageFrame();
}
}
private void readHeader() throws IOException {
if (closed) throw new IOException("closed");
...
int b1 = source.readByte() & 0xff;
...
}
- readHeader()方法主要关注一下readByte()方法,最后调用的是输入流的read方法,这里也是IO阻塞模型,等待接收消息。
至此websocket的源码分析已解析完,希望能帮助各位朋友们,有哪块说的不对的,欢迎指出
本文地址:https://blog.csdn.net/lxm20819/article/details/107173077
上一篇: 高可用+可伸缩存储方案
推荐阅读
-
详解Android使用OKHttp3实现下载(断点续传、显示进度)
-
android使用OkHttp实现下载的进度监听和断点续传
-
android使用OkHttp实现下载的进度监听和断点续传
-
Android使用WebSocket实现多人游戏
-
Android 使用Gallery实现3D相册(附效果图+Demo源码)
-
使用Android WebSocket实现即时通讯功能
-
Android使用WebSocket实现多人游戏
-
Android 使用Gallery实现3D相册(附效果图+Demo源码)
-
LeakCannary使用方法及实现原理探究(二)—— LeakCannary实现原理及源码分析
-
Android使用OKhttp3实现登录注册功能+springboot搭建后端的详细过程