Java应用beanstalkd消息队列
应用场景
最近做一个项目,处理每一个从队列收到的消息都要去获取一个锁(使用Redis实现的分布式锁),如果没有获取到锁,也不能把这个消息给丢了,那可不可以把这个没有获取到锁的消息再发回队列?
如果是用Kafka的话,消息一发回队列,马上就消费,然而这时候还是获取不到锁,进入死循环了,影响性能。考虑把这些消息给存起来,但又很繁琐。
如图,每个消费者收到消息后准备进行业务处理时都会去获取分布式锁资源,获得锁就正常执行业务流程,如果获取不到锁就设置一个delay时间,将消息再次发回队列,
等到达delay时间之后再次消费消息。
beanstalkd正好满足这一需求,而且轻量级,没有Kafka那么重。
参考beanstalkd的状态机的变化,这里不再赘述。
开始使用beanstalkd
启动beanstalk服务:
beanstalkd -l 127.0.0.1 -p 11300
引入Java客户端Maven依赖,还有其他的Java client包,可自行选用:
<dependency>
<groupId>com.dinstone</groupId>
<artifactId>beanstalkc</artifactId>
<version>2.2.0</version>
</dependency>
生产者
public class BeanstalkProducer {
public static void main(String[] args) {
Configuration config = new Configuration();
config.setServiceHost("127.0.0.1");
config.setServicePort(11300);
BeanstalkClientFactory factory = new BeanstalkClientFactory(config);
JobProducer producer = factory.createJobProducer("beanstalkd-demo");
String msg = "hello, beanstalkd";
// 返回job id
long p = producer.putJob(100, 0, 5, msg.getBytes());
System.out.println(p);
}
}
发送消息的代码putJob:
public long putJob(int priority, int delay, int ttr, byte[] data);
- priority:优先级
- delay:延迟多长时间开始执行,单位秒
- ttr: 单位秒,为consumer操作设置的reserve超时时间,如果consumer在这个ttr时间里没有完成job并将job delete掉,那这个job就会重新被迁回ready状态,再次供消费者执行
消费者
reserveJob
public class BeanstalkConsumer {
public static void main(String[] args) {
Configuration config = new Configuration();
config.setServiceHost("127.0.0.1");
config.setServicePort(11300);
BeanstalkClientFactory factory = new BeanstalkClientFactory(config);
JobConsumer consumer = factory.createJobConsumer("beanstalkd-demo");
while (true) {
// reserveJob 有一个超时时间参数,单位是秒,表示获取消息最多花费多长时间
Job job = consumer.reserveJob(3);
if (Objects.isNull(job)) {
continue;
}
System.out.println(job.getId());
System.out.println(new String(job.getData()));
// consumer.deleteJob(job.getId());
}
}
}
consumer代码最后注释掉了 consumer.deleteJob(job.getId())
,没有将消息delete掉,这个job将会一直从reserve状态到ready状态,beanstalkd会认为consumer没有在ttr时间之内完成job,而且这个操作的频繁执行很耗性能。大量的这种操作会导致你的CPU使用率一下就上去了。 所以consumer完成了job之后,就将job delete掉,如果业务代码在完成job时出现异常,也要在try catch Exception
中将job给delete掉,然后就可以开始报警了 ^~^
releaseJob
public class BeanstalkConsumerRelease {
public static void main(String[] args) {
Configuration config = new Configuration();
config.setServiceHost("127.0.0.1");
config.setServicePort(11300);
BeanstalkClientFactory factory = new BeanstalkClientFactory(config);
JobConsumer consumer = factory.createJobConsumer("beanstalkd-demo");
while (true) {
Job job = consumer.reserveJob(3);
if (Objects.isNull(job)) {
continue;
}
System.out.println(job.getId());
System.out.println(new String(job.getData()));
consumer.releaseJob(job.getId(), 99, 5);
}
}
}
consumer.releaseJob(long id, int priority, int delay)
,将消息从reserved状态迁移到delay状态,延迟(指定的延迟时间)之后job变成ready状态供消费者继续消费。 在我的项目中正好利用了这一特性解决了一开始描述的那个问题。