欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java HTTP协议收发MQ 消息代码实例详解

程序员文章站 2024-02-29 22:04:46
1. 准备环境 在工程 pom 文件添加 http java 客户端的依赖。 or...

1. 准备环境

在工程 pom 文件添加 http java 客户端的依赖。

<dependency>
  <groupid>org.eclipse.jetty</groupid>
  <artifactid>jetty-client</artifactid>
  <version>9.3.4.rc1</version>
 </dependency>  
 <dependency>
  <groupid>com.aliyun.openservices</groupid>
  <artifactid>ons-client</artifactid>
  <version>1.1.11</version>
 </dependency>

2. 运行代码配置(user.properties)

您需要设置配置文件(user.properties)的相关内容,具体请参考申请 mq 资源 。

#您在控制台创建的topic
topic=xxx
#公测url
url=http://publictest-rest.ons.aliyun.com
#阿里云身份验证码
ak=xxx
#阿里云身份验证密钥
sk=xxx
#mq控制台创建的producer id
producerid=xxx
#mq控制台创建的consumer id
consumerid=xxx

说明:url 中的 key,tag以及 post content-type 没有任何的限制,只要确保key 和 tag 相同唯一即可,可以放在 user.properties 里面。

3. http 发送消息示例代码

您可以按以下说明设置相应参数并测试 http 消息发送功能。

package com.aliyun.openservice.ons.http.demo;
import java.nio.charset.charset;
import java.util.date;
import java.util.properties;
import org.eclipse.jetty.client.httpclient;
import org.eclipse.jetty.client.api.contentprovider;
import org.eclipse.jetty.client.api.contentresponse;
import org.eclipse.jetty.client.api.request;
import org.eclipse.jetty.client.util.stringcontentprovider;
import com.aliyun.openservices.ons.api.impl.authority.authutil;
public class httpproducer {
  public static string signature="signature";
  public static string num="num";
  public static string consumerid="consumerid";
  public static string producerid="producerid";
  public static string timeout="timeout";
  public static string topic="topic";
  public static string ak="accesskey";
  public static string body="body"; 
  public static string msghandle="msghandle";
  public static string time="time";
  public static void main(string[] args) throws exception {
    httpclient httpclient=new httpclient(); 
    httpclient.setmaxconnectionsperdestination(1);
    httpclient.start(); 
    properties properties=new properties();
    properties.load(httpproducer.class.getclassloader().getresourceasstream("user.properties"));
    string topic=properties.getproperty("topic"); //请在user.properties配置您的topic
    string url=properties.getproperty("url");//公测集群配置为http://publictest-rest.ons.aliyun.com/
    string ak=properties.getproperty("ak");//请在user.properties配置您的ak
    string sk=properties.getproperty("sk");//请在user.properties配置您的sk
    string pid=properties.getproperty("producerid");//请在user.properties配置您的producer id
    string date=string.valueof(new date().gettime()); 
    string sign=null;
    string body="hello ons http";
    string newline="\n";
    string signstring;
    for (int i = 0; i < 10; i++) {
      date=string.valueof(new date().gettime());
      request req=httpclient.post(url+"message/?topic="+topic+"&time="+date+"&tag=http"+"&key=http");
      contentprovider content=new stringcontentprovider(body);
      req.content(content);
      signstring=topic+newline+pid+newline+md5.getinstance().getmd5string(body)+newline+date;
      system.out.println(signstring);
      sign=authutil.calsignature(signstring.getbytes(charset.forname("utf-8")), sk);
      req.header(signature, sign);
      req.header(ak, ak);
      req.header(producerid, pid);
      contentresponse response;
      response=req.send();
      system.out.println("send msg:"+response.getstatus()+response.getcontentasstring());
    } 
  }
}

4. http接收消息示例代码

请按以下说明设置相应参数并测试 http 消息接收功能。

package com.aliyun.openservice.ons.http.demo;
import java.nio.charset.charset;
import java.util.date;
import java.util.list;
import java.util.properties;
import org.eclipse.jetty.client.httpclient;
import org.eclipse.jetty.client.api.contentprovider;
import org.eclipse.jetty.client.api.contentresponse;
import org.eclipse.jetty.client.api.request;
import org.eclipse.jetty.client.util.stringcontentprovider;
import org.eclipse.jetty.http.httpmethod;
import com.alibaba.fastjson.json;
import com.aliyun.openservice.ons.mqtt.demo.mqttproducer;
import com.aliyun.openservices.ons.api.impl.authority.authutil;
public class httpconsumer {
  public static string signature="signature";
  public static string num="num";
  public static string consumerid="consumerid";
  public static string producerid="producerid";
  public static string timeout="timeout";
  public static string topic="topic";
  public static string ak="accesskey";
  public static string body="body"; 
  public static string msghandle="msghandle";
  public static string time="time";
  public static void main(string[] args) throws exception {
    httpclient httpclient=new httpclient(); 
    httpclient.setmaxconnectionsperdestination(1);
    httpclient.start(); 
    properties properties=new properties();
    properties.load(httpconsumer.class.getclassloader().getresourceasstream("user.properties"));
    string topic=properties.getproperty("topic"); //请在user.properties配置您的topic
    string url=properties.getproperty("url");//公测集群配置为http://publictest-rest.ons.aliyun.com/
    string ak=properties.getproperty("ak");//请在user.properties配置您的ak
    string sk=properties.getproperty("sk");//请在user.properties配置您的sk
    string cid=properties.getproperty("consumerid");//请在user.properties配置您的consumer id
    string date=string.valueof(new date().gettime()); 
    string sign=null;
    string newline="\n";
    string signstring;
    system.out.println(newline+newline);
    while (true) { 
      try {
        date=string.valueof(new date().gettime());
        request req=httpclient.post(url+"message/?topic="+topic+"&time="+date+"&num="+32);
        req.method(httpmethod.get);
        contentresponse response;
        signstring=topic+newline+cid+newline+date;
        sign=authutil.calsignature(signstring.getbytes(charset.forname("utf-8")), sk);
        req.header(signature, sign);
        req.header(ak, ak);
        req.header(consumerid, cid);
        long start=system.currenttimemillis();
        response=req.send();
        system.out.println("get cost:"+(system.currenttimemillis()-start)/1000 
                  +"  "+response.getstatus()+"  "+response.getcontentasstring()); 
        list<simplemessage> list = null;
        if (response.getcontentasstring()!=null&&!response.getcontentasstring().isempty()) {
           list=json.parsearray(response.getcontentasstring(), simplemessage.class);
        }
        if (list==null||list.size()==0) {
          thread.sleep(100);
          continue;
        } 
        system.out.println("size is :"+list.size());
        for (simplemessage simplemessage : list) {
          date=string.valueof(new date().gettime());
          system.out.println("receive msg:"+simplemessage.getbody()+"  born time "+simplemessage.getborntime());
          req=httpclient.post(url+"message/?msghandle="+simplemessage.getmsghandle()+"&topic="+topic+"&time="+date);
          req.method(httpmethod.delete);
          signstring=topic+newline+cid+newline+simplemessage.getmsghandle()+newline+date;
          sign=authutil.calsignature(signstring.getbytes(charset.forname("utf-8")), sk);
          req.header(signature, sign);
          req.header(ak, ak);
          req.header(consumerid, cid);
          response=req.send();  
          system.out.println("delete msg:"+response.tostring());
        } 
        thread.sleep(100);
      } catch (exception e) {
        e.printstacktrace();
      }
    }
  }
}

5. http示例程序工具类

(1)消息封装类: simplemessage.java

package com.aliyun.openservice.ons.http.demo;
public class simplemessage {
  private string body;
  private string msgid;
  private string borntime;
  private string msghandle;
  private int reconsumetimes;
  private string tag;
  public void settag(string tag) {
    this.tag = tag;
  }
  public string gettag() {
    return tag;
  }
  public int getreconsumetimes() {
    return reconsumetimes;
  }
  public void setreconsumetimes(int reconsumetimes) {
    this.reconsumetimes = reconsumetimes;
  }
  public void setmsghandle(string msghandle) {
    this.msghandle = msghandle;
  }
  public string getmsghandle() {
    return msghandle;
  }
  public string getbody() {
    return body;
  }
  public void setbody(string body) {
    this.body = body;
  }
  public string getmsgid() {
    return msgid;
  }
  public void setmsgid(string msgid) {
    this.msgid = msgid;
  }
  public string getborntime() {
    return borntime;
  }
  public void setborntime(string borntime) {
    this.borntime = borntime;
  }
}

(2)字符串签名类: md5.java

package com.aliyun.openservice.ons.http.demo;
import java.io.unsupportedencodingexception;
import java.nio.charset.charset;
import java.security.messagedigest;
import java.sql.sqlexception;
import java.util.date;
import java.util.hashmap;
import java.util.map;
import java.util.concurrent.concurrenthashmap;
import java.util.concurrent.locks.reentrantlock;
import org.slf4j.loggerfactory;
public class md5 {
  private static final org.slf4j.logger log = loggerfactory.getlogger(md5.class);
  private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
  private static map<character, integer> rdigits = new hashmap<character, integer>(16);
  static {
    for (int i = 0; i < digits.length; ++i) {
      rdigits.put(digits[i], i);
    }
  }
  private static md5 me = new md5();
  private messagedigest mhasher;
  private final reentrantlock oplock = new reentrantlock();
  private md5() {
    try {
      this.mhasher = messagedigest.getinstance("md5");
    } catch (exception e) {
      throw new runtimeexception(e);
    }
  }
  public static md5 getinstance() {
    return me;
  }
  public string getmd5string(string content) {
    return this.bytes2string(this.hash(content));
  }
  public string getmd5string(byte[] content) {
    return this.bytes2string(this.hash(content));
  }
  public byte[] getmd5bytes(byte[] content) {
    return this.hash(content);
  }
  public byte[] hash(string str) {
    this.oplock.lock();
    try {
      byte[] bt = this.mhasher.digest(str.getbytes("utf-8"));
      if (null == bt || bt.length != 16) {
        throw new illegalargumentexception("md5 need");
      }
      return bt;
    } catch (unsupportedencodingexception e) {
      throw new runtimeexception("unsupported utf-8 encoding", e);
    } finally {
      this.oplock.unlock();
    }
  }
  public byte[] hash(byte[] data) {
    this.oplock.lock();
    try {
      byte[] bt = this.mhasher.digest(data);
      if (null == bt || bt.length != 16) {
        throw new illegalargumentexception("md5 need");
      }
      return bt;
    } finally {
      this.oplock.unlock();
    }
  }
  public string bytes2string(byte[] bt) {
    int l = bt.length;
    char[] out = new char[l << 1];
    for (int i = 0, j = 0; i < l; i++) {
      out[j++] = digits[(0xf0 & bt[i]) >>> 4];
      out[j++] = digits[0x0f & bt[i]];
    }
    if (log.isdebugenabled()) {
      log.debug("[hash]" + new string(out));
    }
    return new string(out);
  }
  public byte[] string2bytes(string str) {
    if (null == str) {
      throw new nullpointerexception("argument is not allowed empty");
    }
    if (str.length() != 32) {
      throw new illegalargumentexception("string length must equals 32");
    }
    byte[] data = new byte[16];
    char[] chs = str.tochararray();
    for (int i = 0; i < 16; ++i) {
      int h = rdigits.get(chs[i * 2]).intvalue();
      int l = rdigits.get(chs[i * 2 + 1]).intvalue();
      data[i] = (byte) ((h & 0x0f) << 4 | l & 0x0f);
    }
    return data;
  }
}

希望本篇文章对您有所帮助