Java HTTP协议收发MQ 消息代码实例详解
程序员文章站
2024-02-29 22:04:46
1. 准备环境
在工程 pom 文件添加 http java 客户端的依赖。
or...
1. 准备环境
在工程 pom 文件添加 http java 客户端的依赖。
<dependency> <groupid>org.eclipse.jetty</groupid> <artifactid>jetty-client</artifactid> <version>9.3.4.rc1</version> </dependency> <dependency> <groupid>com.aliyun.openservices</groupid> <artifactid>ons-client</artifactid> <version>1.1.11</version> </dependency>
2. 运行代码配置(user.properties)
您需要设置配置文件(user.properties)的相关内容,具体请参考申请 mq 资源 。
#您在控制台创建的topic topic=xxx #公测url url=http://publictest-rest.ons.aliyun.com #阿里云身份验证码 ak=xxx #阿里云身份验证密钥 sk=xxx #mq控制台创建的producer id producerid=xxx #mq控制台创建的consumer id consumerid=xxx
说明:url 中的 key,tag以及 post content-type 没有任何的限制,只要确保key 和 tag 相同唯一即可,可以放在 user.properties 里面。
3. http 发送消息示例代码
您可以按以下说明设置相应参数并测试 http 消息发送功能。 package com.aliyun.openservice.ons.http.demo; import java.nio.charset.charset; import java.util.date; import java.util.properties; import org.eclipse.jetty.client.httpclient; import org.eclipse.jetty.client.api.contentprovider; import org.eclipse.jetty.client.api.contentresponse; import org.eclipse.jetty.client.api.request; import org.eclipse.jetty.client.util.stringcontentprovider; import com.aliyun.openservices.ons.api.impl.authority.authutil; public class httpproducer { public static string signature="signature"; public static string num="num"; public static string consumerid="consumerid"; public static string producerid="producerid"; public static string timeout="timeout"; public static string topic="topic"; public static string ak="accesskey"; public static string body="body"; public static string msghandle="msghandle"; public static string time="time"; public static void main(string[] args) throws exception { httpclient httpclient=new httpclient(); httpclient.setmaxconnectionsperdestination(1); httpclient.start(); properties properties=new properties(); properties.load(httpproducer.class.getclassloader().getresourceasstream("user.properties")); string topic=properties.getproperty("topic"); //请在user.properties配置您的topic string url=properties.getproperty("url");//公测集群配置为http://publictest-rest.ons.aliyun.com/ string ak=properties.getproperty("ak");//请在user.properties配置您的ak string sk=properties.getproperty("sk");//请在user.properties配置您的sk string pid=properties.getproperty("producerid");//请在user.properties配置您的producer id string date=string.valueof(new date().gettime()); string sign=null; string body="hello ons http"; string newline="\n"; string signstring; for (int i = 0; i < 10; i++) { date=string.valueof(new date().gettime()); request req=httpclient.post(url+"message/?topic="+topic+"&time="+date+"&tag=http"+"&key=http"); contentprovider content=new stringcontentprovider(body); req.content(content); signstring=topic+newline+pid+newline+md5.getinstance().getmd5string(body)+newline+date; system.out.println(signstring); sign=authutil.calsignature(signstring.getbytes(charset.forname("utf-8")), sk); req.header(signature, sign); req.header(ak, ak); req.header(producerid, pid); contentresponse response; response=req.send(); system.out.println("send msg:"+response.getstatus()+response.getcontentasstring()); } } }
4. http接收消息示例代码
请按以下说明设置相应参数并测试 http 消息接收功能。
package com.aliyun.openservice.ons.http.demo; import java.nio.charset.charset; import java.util.date; import java.util.list; import java.util.properties; import org.eclipse.jetty.client.httpclient; import org.eclipse.jetty.client.api.contentprovider; import org.eclipse.jetty.client.api.contentresponse; import org.eclipse.jetty.client.api.request; import org.eclipse.jetty.client.util.stringcontentprovider; import org.eclipse.jetty.http.httpmethod; import com.alibaba.fastjson.json; import com.aliyun.openservice.ons.mqtt.demo.mqttproducer; import com.aliyun.openservices.ons.api.impl.authority.authutil; public class httpconsumer { public static string signature="signature"; public static string num="num"; public static string consumerid="consumerid"; public static string producerid="producerid"; public static string timeout="timeout"; public static string topic="topic"; public static string ak="accesskey"; public static string body="body"; public static string msghandle="msghandle"; public static string time="time"; public static void main(string[] args) throws exception { httpclient httpclient=new httpclient(); httpclient.setmaxconnectionsperdestination(1); httpclient.start(); properties properties=new properties(); properties.load(httpconsumer.class.getclassloader().getresourceasstream("user.properties")); string topic=properties.getproperty("topic"); //请在user.properties配置您的topic string url=properties.getproperty("url");//公测集群配置为http://publictest-rest.ons.aliyun.com/ string ak=properties.getproperty("ak");//请在user.properties配置您的ak string sk=properties.getproperty("sk");//请在user.properties配置您的sk string cid=properties.getproperty("consumerid");//请在user.properties配置您的consumer id string date=string.valueof(new date().gettime()); string sign=null; string newline="\n"; string signstring; system.out.println(newline+newline); while (true) { try { date=string.valueof(new date().gettime()); request req=httpclient.post(url+"message/?topic="+topic+"&time="+date+"&num="+32); req.method(httpmethod.get); contentresponse response; signstring=topic+newline+cid+newline+date; sign=authutil.calsignature(signstring.getbytes(charset.forname("utf-8")), sk); req.header(signature, sign); req.header(ak, ak); req.header(consumerid, cid); long start=system.currenttimemillis(); response=req.send(); system.out.println("get cost:"+(system.currenttimemillis()-start)/1000 +" "+response.getstatus()+" "+response.getcontentasstring()); list<simplemessage> list = null; if (response.getcontentasstring()!=null&&!response.getcontentasstring().isempty()) { list=json.parsearray(response.getcontentasstring(), simplemessage.class); } if (list==null||list.size()==0) { thread.sleep(100); continue; } system.out.println("size is :"+list.size()); for (simplemessage simplemessage : list) { date=string.valueof(new date().gettime()); system.out.println("receive msg:"+simplemessage.getbody()+" born time "+simplemessage.getborntime()); req=httpclient.post(url+"message/?msghandle="+simplemessage.getmsghandle()+"&topic="+topic+"&time="+date); req.method(httpmethod.delete); signstring=topic+newline+cid+newline+simplemessage.getmsghandle()+newline+date; sign=authutil.calsignature(signstring.getbytes(charset.forname("utf-8")), sk); req.header(signature, sign); req.header(ak, ak); req.header(consumerid, cid); response=req.send(); system.out.println("delete msg:"+response.tostring()); } thread.sleep(100); } catch (exception e) { e.printstacktrace(); } } } }
5. http示例程序工具类
(1)消息封装类: simplemessage.java
package com.aliyun.openservice.ons.http.demo; public class simplemessage { private string body; private string msgid; private string borntime; private string msghandle; private int reconsumetimes; private string tag; public void settag(string tag) { this.tag = tag; } public string gettag() { return tag; } public int getreconsumetimes() { return reconsumetimes; } public void setreconsumetimes(int reconsumetimes) { this.reconsumetimes = reconsumetimes; } public void setmsghandle(string msghandle) { this.msghandle = msghandle; } public string getmsghandle() { return msghandle; } public string getbody() { return body; } public void setbody(string body) { this.body = body; } public string getmsgid() { return msgid; } public void setmsgid(string msgid) { this.msgid = msgid; } public string getborntime() { return borntime; } public void setborntime(string borntime) { this.borntime = borntime; } }
(2)字符串签名类: md5.java
package com.aliyun.openservice.ons.http.demo; import java.io.unsupportedencodingexception; import java.nio.charset.charset; import java.security.messagedigest; import java.sql.sqlexception; import java.util.date; import java.util.hashmap; import java.util.map; import java.util.concurrent.concurrenthashmap; import java.util.concurrent.locks.reentrantlock; import org.slf4j.loggerfactory; public class md5 { private static final org.slf4j.logger log = loggerfactory.getlogger(md5.class); private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; private static map<character, integer> rdigits = new hashmap<character, integer>(16); static { for (int i = 0; i < digits.length; ++i) { rdigits.put(digits[i], i); } } private static md5 me = new md5(); private messagedigest mhasher; private final reentrantlock oplock = new reentrantlock(); private md5() { try { this.mhasher = messagedigest.getinstance("md5"); } catch (exception e) { throw new runtimeexception(e); } } public static md5 getinstance() { return me; } public string getmd5string(string content) { return this.bytes2string(this.hash(content)); } public string getmd5string(byte[] content) { return this.bytes2string(this.hash(content)); } public byte[] getmd5bytes(byte[] content) { return this.hash(content); } public byte[] hash(string str) { this.oplock.lock(); try { byte[] bt = this.mhasher.digest(str.getbytes("utf-8")); if (null == bt || bt.length != 16) { throw new illegalargumentexception("md5 need"); } return bt; } catch (unsupportedencodingexception e) { throw new runtimeexception("unsupported utf-8 encoding", e); } finally { this.oplock.unlock(); } } public byte[] hash(byte[] data) { this.oplock.lock(); try { byte[] bt = this.mhasher.digest(data); if (null == bt || bt.length != 16) { throw new illegalargumentexception("md5 need"); } return bt; } finally { this.oplock.unlock(); } } public string bytes2string(byte[] bt) { int l = bt.length; char[] out = new char[l << 1]; for (int i = 0, j = 0; i < l; i++) { out[j++] = digits[(0xf0 & bt[i]) >>> 4]; out[j++] = digits[0x0f & bt[i]]; } if (log.isdebugenabled()) { log.debug("[hash]" + new string(out)); } return new string(out); } public byte[] string2bytes(string str) { if (null == str) { throw new nullpointerexception("argument is not allowed empty"); } if (str.length() != 32) { throw new illegalargumentexception("string length must equals 32"); } byte[] data = new byte[16]; char[] chs = str.tochararray(); for (int i = 0; i < 16; ++i) { int h = rdigits.get(chs[i * 2]).intvalue(); int l = rdigits.get(chs[i * 2 + 1]).intvalue(); data[i] = (byte) ((h & 0x0f) << 4 | l & 0x0f); } return data; } }
希望本篇文章对您有所帮助
下一篇: 详解Android中Drawable方法