欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  移动技术

Android使用okHttp实现WebSocket源码分析

程序员文章站 2022-03-27 10:40:58
项目中需要实现一个类似心跳包的内容,语音通话期间,需要不断的请求服务器发送数据,确保正在连接中,开始想到的是使用service开启子线程,定时请求接口发送数据,但数据量很大时,http请求头部太大,每次请求需要发送重复的头数据,非常消耗流量,于是考虑使用长连接的方式。Android中实现WebSocket有很多种方式,这里说一下okHttp的方式。先来大致了解一下websocket。WebSocket属于双向通信协议,Http是单向的webSocket是需要握手进行建立连接的,只是在建立握手时,数据...

项目中需要实现一个类似心跳包的内容,语音通话期间,需要不断的请求服务器发送数据,确保正在连接中,开始想到的是使用service开启子线程,定时请求接口发送数据,但数据量很大时,http请求头部太大,每次请求需要发送重复的头数据,非常消耗流量,于是考虑使用长连接的方式。Android中实现WebSocket有很多种方式,这里说一下okHttp的方式。先来大致了解一下websocket。

  • WebSocket属于双向通信协议,Http是单向的
  • webSocket是需要握手进行建立连接的,只是在建立握手时,数据是通过HTTP传输的,但建立之后传输数据不需要http协议
  • websocket的请求链接不是http://这种,而是以ws://开头
  • websocket的Connection链接类型是 Upgrade:websocket,表示将该请求升级为websocket。
  • websocket的头部响应码是101,表示本次链接的http协议即将被更改为upgrade:websocket 指定的协议。

使用okhttp封装websocketdemo

public class WebSocketDemo {

    private final String TAG = WebSocketDemo.class.getSimpleName();

    private OkHttpClient CLIENT;
    private WebSocket mWebSocket;

    private static WebSocketDemo ourInstance;

    public static WebSocketDemo getDefault() {
        if (ourInstance == null) {
            synchronized (WebSocketDemo.class) {
                if (ourInstance == null) {
                    ourInstance = new WebSocketDemo();
                }
            }
        }
        return ourInstance;
    }

    private WebSocketDemo() {
        CLIENT = new OkHttpClient.Builder()
                .writeTimeout(5, TimeUnit.SECONDS)
                .readTimeout(5, TimeUnit.SECONDS)
                .connectTimeout(5, TimeUnit.SECONDS)
                .build();
    }

    public void connect(String url) {
        if (mWebSocket != null) {
            mWebSocket.cancel();
        }
        Request request = new Request.Builder()
                .url(url)
                .build();
        mWebSocket = CLIENT.newWebSocket(request, new SocketListener());
    }

    public void sendMessage(String message) {
        mWebSocket.send(message);
    }

    public void sendMessage(byte... data) {
        ByteString bs = ByteString.of(data);
        mWebSocket.send(bs);
    }

    public void close(int code, String reason) {
        mWebSocket.close(code, reason);
    }

    class SocketListener extends WebSocketListener {

        @Override
        public void onOpen(WebSocket webSocket, Response response) {
            super.onOpen(webSocket, response);
            Log.i(TAG, "onOpen response=" + response);
        }

        @Override
        public void onMessage(WebSocket webSocket, String text) {
            super.onMessage(webSocket, text);
            Log.i(TAG, "onMessage text=" + text);
        }

        @Override
        public void onMessage(WebSocket webSocket, ByteString bytes) {
            super.onMessage(webSocket, bytes);
            Log.i(TAG, "onMessage bytes=" + bytes);
        }

        @Override
        public void onClosing(WebSocket webSocket, int code, String reason) {
            super.onClosing(webSocket, code, reason);
            Log.i(TAG, "onClosing code=" + code);
        }

        @Override
        public void onClosed(WebSocket webSocket, int code, String reason) {
            super.onClosed(webSocket, code, reason);
            Log.i(TAG, "onClosed code=" + code);
        }

        @Override
        public void onFailure(WebSocket webSocket, Throwable t, Response response) {
            super.onFailure(webSocket, t, response);
            Log.i(TAG, "onFailure t=" + t.getMessage());
        }
    }

}

源码分析

  • 懒汉式单例获取,保证唯一且线程安全,并创造okhttpclient实例
  • 连接方法,通过client.newWebSocket()创建WebSocket对象,方法中创建RealWebSocke,并开始连接
  /**
   * Uses {@code request} to connect a new web socket.
   */
  @Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
    RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);
    webSocket.connect(this);
    return webSocket;
  }
  public RealWebSocket(Request request, WebSocketListener listener, Random random,
      long pingIntervalMillis) {
    if (!"GET".equals(request.method())) {
      throw new IllegalArgumentException("Request must be GET: " + request.method());
    }
    this.originalRequest = request;
    this.listener = listener;
    this.random = random;
    this.pingIntervalMillis = pingIntervalMillis;

    byte[] nonce = new byte[16];
    random.nextBytes(nonce);
    this.key = ByteString.of(nonce).base64();

    this.writerRunnable = new Runnable() {
      @Override public void run() {
        try {
          while (writeOneFrame()) {
          }
        } catch (IOException e) {
          failWebSocket(e, null);
        }
      }
    };
  }
  • 通过RealWebSocket的创建内容,分析出请求必须是get请求方式,原因分析过,websocket协议需要发送一个http的get请求进行握手;接下来有一些默认赋值,最后实例了一个wirterRunnable,方法里主要是while循环不断的从队列中取出数据,发送到服务器。
  • 接下来就是真正的connect方法
  public void connect(OkHttpClient client) {
    client = client.newBuilder()
        .eventListener(EventListener.NONE)
        .protocols(ONLY_HTTP1)
        .build();
    final Request request = originalRequest.newBuilder()
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version", "13")
        .build();
    call = Internal.instance.newWebSocketCall(client, request);
    call.enqueue(new Callback() {
      @Override public void onResponse(Call call, Response response) {
        try {
          checkResponse(response);
        } catch (ProtocolException e) {
          failWebSocket(e, response);
          closeQuietly(response);
          return;
        }

        // Promote the HTTP streams into web socket streams.
        StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
        streamAllocation.noNewStreams(); // Prevent connection pooling!
        Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);

        // Process all web socket messages.
        try {
          listener.onOpen(RealWebSocket.this, response);
          String name = "OkHttp WebSocket " + request.url().redact();
          initReaderAndWriter(name, streams);
          streamAllocation.connection().socket().setSoTimeout(0);
          loopReader();
        } catch (Exception e) {
          failWebSocket(e, null);
        }
      }

      @Override public void onFailure(Call call, IOException e) {
        failWebSocket(e, null);
      }
    });
  }
  • 通过该方法可以看到upgrade-websocket、connection-upgrade设置好头信息,构建符合websocket协议握手规范的http请求;enqueue()方法中,有个checkResponse()方法,其主要功能是校验回应值相关信息,包括code值,connection和upgrade等内容。
void checkResponse(Response response) throws ProtocolException {
    if (response.code() != 101) {
      throw new ProtocolException("Expected HTTP 101 response but was '"
          + response.code() + " " + response.message() + "'");
    }

    String headerConnection = response.header("Connection");
    if (!"Upgrade".equalsIgnoreCase(headerConnection)) {
      throw new ProtocolException("Expected 'Connection' header value 'Upgrade' but was '"
          + headerConnection + "'");
    }

    String headerUpgrade = response.header("Upgrade");
    if (!"websocket".equalsIgnoreCase(headerUpgrade)) {
      throw new ProtocolException(
          "Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'");
    }

    String headerAccept = response.header("Sec-WebSocket-Accept");
    String acceptExpected = ByteString.encodeUtf8(key + WebSocketProtocol.ACCEPT_MAGIC)
        .sha1().base64();
    if (!acceptExpected.equals(headerAccept)) {
      throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '"
          + acceptExpected + "' but was '" + headerAccept + "'");
    }
  }
  • 链接并校验通过后,通过流来传递数据,链接过程中设置超时时间为0,也就是永不超时,维持这个链接。最后调用loopReader()循环读取服务器消息。
  /** Receive frames until there are no more. Invoked only by the reader thread. */
  public void loopReader() throws IOException {
    while (receivedCloseCode == -1) {
      // This method call results in one or more onRead* methods being called on this thread.
      reader.processNextFrame();
    }
  }
  void processNextFrame() throws IOException {
    readHeader();
    if (isControlFrame) {
      readControlFrame();
    } else {
      readMessageFrame();
    }
  }
  private void readHeader() throws IOException {
    if (closed) throw new IOException("closed");

    ...

    int b1 = source.readByte() & 0xff;

    ...
  }
  • readHeader()方法主要关注一下readByte()方法,最后调用的是输入流的read方法,这里也是IO阻塞模型,等待接收消息。

至此websocket的源码分析已解析完,希望能帮助各位朋友们,有哪块说的不对的,欢迎指出

本文地址:https://blog.csdn.net/lxm20819/article/details/107173077