SpringBoot webSocket实现发送广播、点对点消息和Android接收
1、springboot websocket
springboot 使用的websocket 协议,不是标准的websocket协议,使用的是名称叫做stomp的协议。
1.1 stomp协议说明
stomp,streaming text orientated message protocol,是流文本定向消息协议,是一种为mom(message oriented middleware,面向消息的中间件)设计的简单文本协议。
它提供了一个可互操作的连接格式,允许stomp客户端与任意stomp消息代理(broker)进行交互,类似于openwire(一种二进制协议)。
由于其设计简单,很容易开发客户端,因此在多种语言和多种平台上得到广泛应用。其中最流行的stomp消息代理是apache activemq。
1.2 搭建
本人使用的是inject idea 搭建的springboot websocket,并未采用熟悉的gradle,而是采用了maven方式搭建。
项目结构如下
pom.xml:
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>com.drawthink</groupid> <artifactid>websocketdemo</artifactid> <version>0.0.1-snapshot</version> <packaging>jar</packaging> <name>websocketdemo</name> <description>websocketdemo project for spring boot</description> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>1.3.6.release</version> <relativepath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceencoding>utf-8</project.build.sourceencoding> <project.reporting.outputencoding>utf-8</project.reporting.outputencoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-thymeleaf</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-websocket</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> </plugin> </plugins> </build> </project>
application:
package com.drawthink; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; @springbootapplication public class websocketdemoapplication { public static void main(string[] args) { springapplication.run(websocketdemoapplication.class, args); } }
websocketconfig
package com.drawthink.websocket; import org.springframework.context.annotation.configuration; import org.springframework.messaging.simp.config.messagebrokerregistry; import org.springframework.web.socket.config.annotation.abstractwebsocketmessagebrokerconfigurer; import org.springframework.web.socket.config.annotation.enablewebsocketmessagebroker; import org.springframework.web.socket.config.annotation.stompendpointregistry; /** * created by lincoln on 16-10-25 */ @configuration @enablewebsocketmessagebroker public class websocketconfig extends abstractwebsocketmessagebrokerconfigurer { @override public void registerstompendpoints(stompendpointregistry stompendpointregistry) { //允许使用socketjs方式访问,访问点为hello,允许跨域 stompendpointregistry.addendpoint("/hello").setallowedorigins("*").withsockjs(); } @override public void configuremessagebroker(messagebrokerregistry registry) { //订阅broker名称 registry.enablesimplebroker("/topic","/user"); //全局使用的订阅前缀(客户端订阅路径上会体现出来) registry.setapplicationdestinationprefixes("/app/"); //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/ //registry.setuserdestinationprefix("/user/"); } }
websocketcontroller
package com.drawthink.websocket.controller; import com.drawthink.message.clientmessage; import com.drawthink.message.servermessage; import com.drawthink.message.tousermessage; import org.springframework.beans.factory.annotation.autowired; import org.springframework.messaging.handler.annotation.messagemapping; import org.springframework.messaging.handler.annotation.sendto; import org.springframework.messaging.simp.simpmessagingtemplate; import org.springframework.stereotype.controller; /** * created by lincoln on 16-10-25 */ @controller public class websocketcontroller { @messagemapping("/welcome") //sendto 发送至 broker 下的指定订阅路径 @sendto("/topic/getresponse") public servermessage say(clientmessage clientmessage){ //方法用于广播测试 system.out.println("clientmessage.getname() = " + clientmessage.getname()); return new servermessage("welcome , "+clientmessage.getname()+" !"); } //注入simpmessagingtemplate 用于点对点消息发送 @autowired private simpmessagingtemplate messagingtemplate; @messagemapping("/cheat") // 发送的订阅路径为/user/{userid}/message // /user/路径是默认的一个,如果想要改变,必须在config 中setuserdestinationprefix public void cheatto(tousermessage tousermessage){ //方法用于点对点测试 system.out.println("tousermessage.getmessage() = " + tousermessage.getmessage()); system.out.println("tousermessage.getuserid() = " + tousermessage.getuserid()); messagingtemplate.convertandsendtouser(tousermessage.getuserid(),"/message",tousermessage.getmessage()); } }
vo
package com.drawthink.message; /** * created by lincoln on 16-10-25 */ public class clientmessage { private string name; public string getname() { return name; } public void setname(string name) { this.name = name; } }
package com.drawthink.message; /** * created by lincoln on 16-10-25 */ public class servermessage { private string responsemessage; public servermessage(string responsemessage) { this.responsemessage = responsemessage; } public string getresponsemessage() { return responsemessage; } public void setresponsemessage(string responsemessage) { this.responsemessage = responsemessage; } }
package com.drawthink.message; /** * created by lincoln on 16-10-25 */ public class tousermessage { private string userid; private string message; public string getuserid() { return userid; } public void setuserid(string userid) { this.userid = userid; } public string getmessage() { return message; } public void setmessage(string message) { this.message = message; } }
android 客户端
stomp协议在android系统中没有默认实现,必须自行去实现。不过好消息是,开源大神们已经完成了android上使用stomp协议的实现,所以我们只需要使用就好了。
地址:stompprotocolandroid_jb51.rar
搭建
build.gradle(app)
apply plugin: 'com.android.application' android { compilesdkversion 24 buildtoolsversion "24.0.3" defaultconfig { applicationid "com.drawthink.websocket" minsdkversion 16 targetsdkversion 24 versioncode 1 versionname "1.0" testinstrumentationrunner "android.support.test.runner.androidjunitrunner" } buildtypes { release { minifyenabled false proguardfiles getdefaultproguardfile('proguard-android.txt'), 'proguard-rules.pro' } } } dependencies { compile filetree(include: ['*.jar'], dir: 'libs') androidtestcompile('com.android.support.test.espresso:espresso-core:2.2.2', { exclude group: 'com.android.support', module: 'support-annotations' }) compile 'com.android.support:appcompat-v7:24.2.1' testcompile 'junit:junit:4.12' //依赖stomp协议的android实现 compile 'com.github.naiksoftware:stompprotocolandroid:1.1.1' //stompprotocolandroid 依赖于websocket的标准实现 compile 'org.java-websocket:java-websocket:1.3.0' }
接收广播实例:
package com.drawthink.websocket; import android.content.intent; import android.os.bundle; import android.support.v7.app.appcompatactivity; import android.util.log; import android.view.view; import android.widget.button; import android.widget.edittext; import android.widget.textview; import android.widget.toast; import org.java_websocket.websocket; import rx.subscriber; import rx.functions.action1; import ua.naiksoftware.stomp.lifecycleevent; import ua.naiksoftware.stomp.stomp; import ua.naiksoftware.stomp.client.stompclient; import ua.naiksoftware.stomp.client.stompmessage; import static android.content.contentvalues.tag; public class mainactivity extends appcompatactivity { private textview servermessage; private button start; private button stop; private button send; private edittext edittext; private stompclient mstompclient; private button cheat; @override protected void oncreate(bundle savedinstancestate) { super.oncreate(savedinstancestate); setcontentview(r.layout.activity_main); bindview(); start.setonclicklistener(new view.onclicklistener() { @override public void onclick(view v) { //创建client 实例 createstompclient(); //订阅消息 registerstomptopic(); } }); send.setonclicklistener(new view.onclicklistener() { @override public void onclick(view v) { mstompclient.send("/app/welcome","{\"name\":\""+edittext.gettext()+"\"}") .subscribe(new subscriber<void>() { @override public void oncompleted() { toast("发送成功"); } @override public void onerror(throwable e) { e.printstacktrace(); toast("发送错误"); } @override public void onnext(void avoid) { } }); } }); stop.setonclicklistener(new view.onclicklistener() { @override public void onclick(view v) { mstompclient.disconnect(); } }); cheat.setonclicklistener(new view.onclicklistener() { @override public void onclick(view v) { startactivity(new intent(mainactivity.this,cheatactivity.class)); if(mstompclient != null) { mstompclient.disconnect(); } finish(); } }); } private void showmessage(final stompmessage stompmessage) { runonuithread(new runnable() { @override public void run() { servermessage.settext("stomp command is --->"+stompmessage.getstompcommand() +" body is --->"+stompmessage.getpayload()); } }); } //创建client 实例 private void createstompclient() { mstompclient = stomp.over(websocket.class, "ws://192.168.0.46:8080/hello/websocket"); mstompclient.connect(); toast.maketext(mainactivity.this,"开始连接 192.168.0.46:8080",toast.length_short).show(); mstompclient.lifecycle().subscribe(new action1<lifecycleevent>() { @override public void call(lifecycleevent lifecycleevent) { switch (lifecycleevent.gettype()) { case opened: log.d(tag, "stomp connection opened"); toast("连接已开启"); break; case error: log.e(tag, "stomp error", lifecycleevent.getexception()); toast("连接出错"); break; case closed: log.d(tag, "stomp connection closed"); toast("连接关闭"); break; } } }); } //订阅消息 private void registerstomptopic() { mstompclient.topic("/topic/getresponse").subscribe(new action1<stompmessage>() { @override public void call(stompmessage stompmessage) { log.e(tag, "call: " +stompmessage.getpayload() ); showmessage(stompmessage); } }); } private void toast(final string message) { runonuithread(new runnable() { @override public void run() { toast.maketext(mainactivity.this,message,toast.length_short).show(); } }); } private void bindview() { servermessage = (textview) findviewbyid(r.id.servermessage); start = (button) findviewbyid(r.id.start); stop = (button) findviewbyid(r.id.stop); send = (button) findviewbyid(r.id.send); edittext = (edittext) findviewbyid(r.id.clientmessage); cheat = (button) findviewbyid(r.id.cheat); } }
点对点
package com.drawthink.websocket; import android.os.bundle; import android.support.v7.app.appcompatactivity; import android.util.log; import android.view.view; import android.widget.button; import android.widget.edittext; import android.widget.linearlayout; import android.widget.textview; import android.widget.toast; import org.java_websocket.websocket; import rx.subscriber; import rx.functions.action1; import ua.naiksoftware.stomp.lifecycleevent; import ua.naiksoftware.stomp.stomp; import ua.naiksoftware.stomp.client.stompclient; import ua.naiksoftware.stomp.client.stompmessage; import static android.content.contentvalues.tag; public class cheatactivity extends appcompatactivity { private edittext cheat; private button send; private linearlayout message; private stompclient mstompclient; @override protected void oncreate(bundle savedinstancestate) { super.oncreate(savedinstancestate); setcontentview(r.layout.activity_cheat); bindview(); createstompclient(); registerstomptopic(); send.setonclicklistener(new view.onclicklistener() { @override public void onclick(view v) { // 向/app/cheat发送json数据 mstompclient.send("/app/cheat","{\"userid\":\"lincoln\",\"message\":\""+cheat.gettext()+"\"}") .subscribe(new subscriber<void>() { @override public void oncompleted() { toast("发送成功"); } @override public void onerror(throwable e) { e.printstacktrace(); toast("发送错误"); } @override public void onnext(void avoid) { } }); } }); } private void bindview() { cheat = (edittext) findviewbyid(r.id.cheat); send = (button) findviewbyid(r.id.send); message = (linearlayout) findviewbyid(r.id.message); } private void createstompclient() { mstompclient = stomp.over(websocket.class, "ws://192.168.0.46:8080/hello/websocket"); mstompclient.connect(); toast.maketext(cheatactivity.this,"开始连接 192.168.0.46:8080",toast.length_short).show(); mstompclient.lifecycle().subscribe(new action1<lifecycleevent>() { @override public void call(lifecycleevent lifecycleevent) { switch (lifecycleevent.gettype()) { case opened: log.d(tag, "stomp connection opened"); toast("连接已开启"); break; case error: log.e(tag, "stomp error", lifecycleevent.getexception()); toast("连接出错"); break; case closed: log.d(tag, "stomp connection closed"); toast("连接关闭"); break; } } }); } // 接收/user/xiaoli/message路径发布的消息 private void registerstomptopic() { mstompclient.topic("/user/xiaoli/message").subscribe(new action1<stompmessage>() { @override public void call(stompmessage stompmessage) { log.e(tag, "call: " +stompmessage.getpayload() ); showmessage(stompmessage); } }); } private void showmessage(final stompmessage stompmessage) { runonuithread(new runnable() { @override public void run() { textview text = new textview(cheatactivity.this); text.setlayoutparams(new linearlayout.layoutparams(linearlayout.layoutparams.match_parent, linearlayout.layoutparams.wrap_content)); text.settext(system.currenttimemillis() +" body is --->"+stompmessage.getpayload()); message.addview(text); } }); } private void toast(final string message) { runonuithread(new runnable() { @override public void run() { toast.maketext(cheatactivity.this,message,toast.length_short).show(); } }); } }
代码比较乱,说明一下。
1、stomp 使用的时候,关键是发布订阅的关系,使用过消息队列,例如rabbitmq的应该很容易理解。
服务器端 websocketconfig.java文件控制的就是订阅发布的路径关系。
2、websocket的路径说明,本例中连接的是ws://192.168.0.46:8080/hello/websocket路径,/hello是在websocketconfig的stompendpointregistry.addendpoint(“/hello”).setallowedorigins(““).withsockjs();*确定的, 如果有多个endpoint,这个地方的路径也会随之变化。
3、发布路径
发布信息的路径是由websocketconfig中的 setapplicationdestinationprefixes(“/app/”); 和 controller 中@messagemapping(“/welcome”) 组合确定的。
例如发广播消息,路径为/app/welcome
例如发点对点消息,路径为/app/cheat
4、消息订阅路径
订阅broker源自websocketconfig中的registry.enablesimplebroker(“/topic”,”/user”);此处开放了两个broker,具体的订阅服务路径给基于controller中的 @sendto(“/topic/getresponse”)或simpmessagingtemplate中给定。(注:此处,服务器和客户端须约定订阅路径)
5、关于心跳
订阅发布模型的心跳很简单,客户端向一个指定的心跳路径发送心跳,服务器处理,服务器使用指定的订阅路径向客户端发心跳,即可。因为没有socket,只需要记录是否联通的状态即可,重连客户端做一下就好了。
本人菜鸟,肯定有些地方没有搞清楚,如果有误,请大神斧正。
代码下载地址:blogrepository_jb51.rar
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。