欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot webSocket实现发送广播、点对点消息和Android接收

程序员文章站 2024-03-03 20:40:22
1、springboot websocket springboot 使用的websocket 协议,不是标准的websocket协议,使用的是名称叫做stomp的协议。...

1、springboot websocket

springboot 使用的websocket 协议,不是标准的websocket协议,使用的是名称叫做stomp的协议。

1.1 stomp协议说明

stomp,streaming text orientated message protocol,是流文本定向消息协议,是一种为mom(message oriented middleware,面向消息的中间件)设计的简单文本协议。

它提供了一个可互操作的连接格式,允许stomp客户端与任意stomp消息代理(broker)进行交互,类似于openwire(一种二进制协议)。

由于其设计简单,很容易开发客户端,因此在多种语言和多种平台上得到广泛应用。其中最流行的stomp消息代理是apache activemq。

1.2 搭建

本人使用的是inject idea 搭建的springboot websocket,并未采用熟悉的gradle,而是采用了maven方式搭建。

项目结构如下

SpringBoot webSocket实现发送广播、点对点消息和Android接收 

pom.xml:

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
 xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelversion>4.0.0</modelversion>

 <groupid>com.drawthink</groupid>
 <artifactid>websocketdemo</artifactid>
 <version>0.0.1-snapshot</version>
 <packaging>jar</packaging>

 <name>websocketdemo</name>
 <description>websocketdemo project for spring boot</description>

 <parent>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-parent</artifactid>
  <version>1.3.6.release</version>
  <relativepath/> <!-- lookup parent from repository -->
 </parent>

 <properties>
  <project.build.sourceencoding>utf-8</project.build.sourceencoding>
  <project.reporting.outputencoding>utf-8</project.reporting.outputencoding>
  <java.version>1.8</java.version>
 </properties>

 <dependencies>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-thymeleaf</artifactid>
  </dependency>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-websocket</artifactid>
  </dependency>

  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-test</artifactid>
   <scope>test</scope>
  </dependency>
 </dependencies>

 <build>
  <plugins>
   <plugin>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-maven-plugin</artifactid>
   </plugin>
  </plugins>
 </build>


</project>

application:

package com.drawthink;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;

@springbootapplication
public class websocketdemoapplication {

 public static void main(string[] args) {
  springapplication.run(websocketdemoapplication.class, args);
 }
}

websocketconfig

package com.drawthink.websocket;

import org.springframework.context.annotation.configuration;
import org.springframework.messaging.simp.config.messagebrokerregistry;
import org.springframework.web.socket.config.annotation.abstractwebsocketmessagebrokerconfigurer;
import org.springframework.web.socket.config.annotation.enablewebsocketmessagebroker;
import org.springframework.web.socket.config.annotation.stompendpointregistry;

/**
 * created by lincoln on 16-10-25
 */
@configuration
@enablewebsocketmessagebroker
public class websocketconfig extends abstractwebsocketmessagebrokerconfigurer {
 @override
 public void registerstompendpoints(stompendpointregistry stompendpointregistry) {
  //允许使用socketjs方式访问,访问点为hello,允许跨域
  stompendpointregistry.addendpoint("/hello").setallowedorigins("*").withsockjs();
 }

 @override
 public void configuremessagebroker(messagebrokerregistry registry) {
  //订阅broker名称
  registry.enablesimplebroker("/topic","/user");
  //全局使用的订阅前缀(客户端订阅路径上会体现出来)
  registry.setapplicationdestinationprefixes("/app/");
  //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
  //registry.setuserdestinationprefix("/user/");
 }
}

websocketcontroller

package com.drawthink.websocket.controller;

import com.drawthink.message.clientmessage;
import com.drawthink.message.servermessage;
import com.drawthink.message.tousermessage;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.messaging.handler.annotation.messagemapping;
import org.springframework.messaging.handler.annotation.sendto;
import org.springframework.messaging.simp.simpmessagingtemplate;
import org.springframework.stereotype.controller;

/**
 * created by lincoln on 16-10-25
 */
@controller
public class websocketcontroller {

 @messagemapping("/welcome")
 //sendto 发送至 broker 下的指定订阅路径
 @sendto("/topic/getresponse")
 public servermessage say(clientmessage clientmessage){
  //方法用于广播测试
  system.out.println("clientmessage.getname() = " + clientmessage.getname());
  return new servermessage("welcome , "+clientmessage.getname()+" !");
 }

 //注入simpmessagingtemplate 用于点对点消息发送
 @autowired
 private simpmessagingtemplate messagingtemplate;

 @messagemapping("/cheat")
 // 发送的订阅路径为/user/{userid}/message
 // /user/路径是默认的一个,如果想要改变,必须在config 中setuserdestinationprefix
 public void cheatto(tousermessage tousermessage){
  //方法用于点对点测试
  system.out.println("tousermessage.getmessage() = " + tousermessage.getmessage());
  system.out.println("tousermessage.getuserid() = " + tousermessage.getuserid());          messagingtemplate.convertandsendtouser(tousermessage.getuserid(),"/message",tousermessage.getmessage());
 }
}

vo

package com.drawthink.message;

/**
 * created by lincoln on 16-10-25
 */
public class clientmessage {
 private string name;

 public string getname() {
  return name;
 }

 public void setname(string name) {
  this.name = name;
 }
}
package com.drawthink.message;

/**
 * created by lincoln on 16-10-25
 */
public class servermessage {
 private string responsemessage;

 public servermessage(string responsemessage) {
  this.responsemessage = responsemessage;
 }

 public string getresponsemessage() {
  return responsemessage;
 }

 public void setresponsemessage(string responsemessage) {
  this.responsemessage = responsemessage;
 }
}

package com.drawthink.message;

/**
 * created by lincoln on 16-10-25
 */
public class tousermessage {
 private string userid;
 private string message;

 public string getuserid() {
  return userid;
 }

 public void setuserid(string userid) {
  this.userid = userid;
 }

 public string getmessage() {
  return message;
 }

 public void setmessage(string message) {
  this.message = message;
 }
}

android 客户端

stomp协议在android系统中没有默认实现,必须自行去实现。不过好消息是,开源大神们已经完成了android上使用stomp协议的实现,所以我们只需要使用就好了。

地址:stompprotocolandroid_jb51.rar

搭建

build.gradle(app)

apply plugin: 'com.android.application'

android {
 compilesdkversion 24
 buildtoolsversion "24.0.3"
 defaultconfig {
  applicationid "com.drawthink.websocket"
  minsdkversion 16
  targetsdkversion 24
  versioncode 1
  versionname "1.0"
  testinstrumentationrunner "android.support.test.runner.androidjunitrunner"
 }
 buildtypes {
  release {
   minifyenabled false
   proguardfiles getdefaultproguardfile('proguard-android.txt'), 'proguard-rules.pro'
  }
 }
}

dependencies {
 compile filetree(include: ['*.jar'], dir: 'libs')
 androidtestcompile('com.android.support.test.espresso:espresso-core:2.2.2', {
  exclude group: 'com.android.support', module: 'support-annotations'
 })
 compile 'com.android.support:appcompat-v7:24.2.1'
 testcompile 'junit:junit:4.12'
 //依赖stomp协议的android实现
 compile 'com.github.naiksoftware:stompprotocolandroid:1.1.1'
 //stompprotocolandroid 依赖于websocket的标准实现
 compile 'org.java-websocket:java-websocket:1.3.0'
}

接收广播实例:

package com.drawthink.websocket;

import android.content.intent;
import android.os.bundle;
import android.support.v7.app.appcompatactivity;
import android.util.log;
import android.view.view;
import android.widget.button;
import android.widget.edittext;
import android.widget.textview;
import android.widget.toast;

import org.java_websocket.websocket;

import rx.subscriber;
import rx.functions.action1;
import ua.naiksoftware.stomp.lifecycleevent;
import ua.naiksoftware.stomp.stomp;
import ua.naiksoftware.stomp.client.stompclient;
import ua.naiksoftware.stomp.client.stompmessage;

import static android.content.contentvalues.tag;

public class mainactivity extends appcompatactivity {

 private textview servermessage;
 private button start;
 private button stop;
 private button send;
 private edittext edittext;
 private stompclient mstompclient;
 private button cheat;

 @override
 protected void oncreate(bundle savedinstancestate) {
  super.oncreate(savedinstancestate);
  setcontentview(r.layout.activity_main);
  bindview();
  start.setonclicklistener(new view.onclicklistener() {
   @override
   public void onclick(view v) {
   //创建client 实例
    createstompclient();
   //订阅消息
    registerstomptopic();
   }
  });

  send.setonclicklistener(new view.onclicklistener() {
   @override
   public void onclick(view v) {
    mstompclient.send("/app/welcome","{\"name\":\""+edittext.gettext()+"\"}")
      .subscribe(new subscriber<void>() {
     @override
     public void oncompleted() {
      toast("发送成功");
     }

     @override
     public void onerror(throwable e) {
      e.printstacktrace();
      toast("发送错误");
     }

     @override
     public void onnext(void avoid) {

     }
    });
   }
  });

  stop.setonclicklistener(new view.onclicklistener() {
   @override
   public void onclick(view v) {
    mstompclient.disconnect();
   }
  });

  cheat.setonclicklistener(new view.onclicklistener() {
   @override
   public void onclick(view v) {
    startactivity(new intent(mainactivity.this,cheatactivity.class));
    if(mstompclient != null) {
     mstompclient.disconnect();
    }
    finish();
   }
  });
 }

 private void showmessage(final stompmessage stompmessage) {
  runonuithread(new runnable() {
   @override
   public void run() {
    servermessage.settext("stomp command is --->"+stompmessage.getstompcommand() +" body is --->"+stompmessage.getpayload());
   }
  });
 }

 //创建client 实例
 private void createstompclient() {
  mstompclient = stomp.over(websocket.class, "ws://192.168.0.46:8080/hello/websocket");
  mstompclient.connect();
  toast.maketext(mainactivity.this,"开始连接 192.168.0.46:8080",toast.length_short).show();
  mstompclient.lifecycle().subscribe(new action1<lifecycleevent>() {
   @override
   public void call(lifecycleevent lifecycleevent) {
    switch (lifecycleevent.gettype()) {
     case opened:
      log.d(tag, "stomp connection opened");
      toast("连接已开启");
      break;

     case error:
      log.e(tag, "stomp error", lifecycleevent.getexception());
      toast("连接出错");
      break;
     case closed:
      log.d(tag, "stomp connection closed");
      toast("连接关闭");
      break;
    }
   }
  });
 }

 //订阅消息
 private void registerstomptopic() {
  mstompclient.topic("/topic/getresponse").subscribe(new action1<stompmessage>() {
   @override
   public void call(stompmessage stompmessage) {
    log.e(tag, "call: " +stompmessage.getpayload() );
    showmessage(stompmessage);
   }
  });

 }

 private void toast(final string message) {
  runonuithread(new runnable() {
   @override
   public void run() {
    toast.maketext(mainactivity.this,message,toast.length_short).show();
   }
  });
 }

 private void bindview() {
  servermessage = (textview) findviewbyid(r.id.servermessage);
  start = (button) findviewbyid(r.id.start);
  stop = (button) findviewbyid(r.id.stop);
  send = (button) findviewbyid(r.id.send);
  edittext = (edittext) findviewbyid(r.id.clientmessage);
  cheat = (button) findviewbyid(r.id.cheat);
 }
}

点对点

package com.drawthink.websocket;

import android.os.bundle;
import android.support.v7.app.appcompatactivity;
import android.util.log;
import android.view.view;
import android.widget.button;
import android.widget.edittext;
import android.widget.linearlayout;
import android.widget.textview;
import android.widget.toast;

import org.java_websocket.websocket;

import rx.subscriber;
import rx.functions.action1;
import ua.naiksoftware.stomp.lifecycleevent;
import ua.naiksoftware.stomp.stomp;
import ua.naiksoftware.stomp.client.stompclient;
import ua.naiksoftware.stomp.client.stompmessage;

import static android.content.contentvalues.tag;

public class cheatactivity extends appcompatactivity {

 private edittext cheat;
 private button send;
 private linearlayout message;
 private stompclient mstompclient;

 @override
 protected void oncreate(bundle savedinstancestate) {
  super.oncreate(savedinstancestate);
  setcontentview(r.layout.activity_cheat);
  bindview();
  createstompclient();
  registerstomptopic();
  send.setonclicklistener(new view.onclicklistener() {
   @override
   public void onclick(view v) {
   // 向/app/cheat发送json数据
    mstompclient.send("/app/cheat","{\"userid\":\"lincoln\",\"message\":\""+cheat.gettext()+"\"}")
      .subscribe(new subscriber<void>() {
       @override
       public void oncompleted() {
        toast("发送成功");
       }

       @override
       public void onerror(throwable e) {
        e.printstacktrace();
        toast("发送错误");
       }

       @override
       public void onnext(void avoid) {

       }
      });
   }
  });
 }

 private void bindview() {
  cheat = (edittext) findviewbyid(r.id.cheat);
  send = (button) findviewbyid(r.id.send);
  message = (linearlayout) findviewbyid(r.id.message);
 }

 private void createstompclient() {
  mstompclient = stomp.over(websocket.class, "ws://192.168.0.46:8080/hello/websocket");
  mstompclient.connect();
  toast.maketext(cheatactivity.this,"开始连接 192.168.0.46:8080",toast.length_short).show();
  mstompclient.lifecycle().subscribe(new action1<lifecycleevent>() {
   @override
   public void call(lifecycleevent lifecycleevent) {
    switch (lifecycleevent.gettype()) {
     case opened:
      log.d(tag, "stomp connection opened");
      toast("连接已开启");
      break;

     case error:
      log.e(tag, "stomp error", lifecycleevent.getexception());
      toast("连接出错");
      break;
     case closed:
      log.d(tag, "stomp connection closed");
      toast("连接关闭");
      break;
    }
   }
  });
 }

 // 接收/user/xiaoli/message路径发布的消息
 private void registerstomptopic() {
  mstompclient.topic("/user/xiaoli/message").subscribe(new action1<stompmessage>() {
   @override
   public void call(stompmessage stompmessage) {
    log.e(tag, "call: " +stompmessage.getpayload() );
    showmessage(stompmessage);
   }
  });
 }

 private void showmessage(final stompmessage stompmessage) {
  runonuithread(new runnable() {
   @override
   public void run() {
    textview text = new textview(cheatactivity.this);
    text.setlayoutparams(new linearlayout.layoutparams(linearlayout.layoutparams.match_parent, linearlayout.layoutparams.wrap_content));
    text.settext(system.currenttimemillis() +" body is --->"+stompmessage.getpayload());
    message.addview(text);
   }
  });
 }


 private void toast(final string message) {
  runonuithread(new runnable() {
   @override
   public void run() {
    toast.maketext(cheatactivity.this,message,toast.length_short).show();
   }
  });
 }
}

代码比较乱,说明一下。

1、stomp 使用的时候,关键是发布订阅的关系,使用过消息队列,例如rabbitmq的应该很容易理解。

服务器端 websocketconfig.java文件控制的就是订阅发布的路径关系。

2、websocket的路径说明,本例中连接的是ws://192.168.0.46:8080/hello/websocket路径,/hello是在websocketconfig的stompendpointregistry.addendpoint(“/hello”).setallowedorigins(““).withsockjs();*确定的, 如果有多个endpoint,这个地方的路径也会随之变化。

3、发布路径

发布信息的路径是由websocketconfig中的 setapplicationdestinationprefixes(“/app/”); 和 controller 中@messagemapping(“/welcome”) 组合确定的。

例如发广播消息,路径为/app/welcome

例如发点对点消息,路径为/app/cheat

4、消息订阅路径

订阅broker源自websocketconfig中的registry.enablesimplebroker(“/topic”,”/user”);此处开放了两个broker,具体的订阅服务路径给基于controller中的 @sendto(“/topic/getresponse”)或simpmessagingtemplate中给定。(注:此处,服务器和客户端须约定订阅路径)

5、关于心跳

订阅发布模型的心跳很简单,客户端向一个指定的心跳路径发送心跳,服务器处理,服务器使用指定的订阅路径向客户端发心跳,即可。因为没有socket,只需要记录是否联通的状态即可,重连客户端做一下就好了。

本人菜鸟,肯定有些地方没有搞清楚,如果有误,请大神斧正。

代码下载地址:blogrepository_jb51.rar

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。