欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rabbitmq批量处理

程序员文章站 2022-07-13 08:58:24
...

我们通过spring-amqp操作rabbitmq是极其简单的,消息的生产者和消费者只需要如下配置:

客户端(生产者):connectionFactory、queue、exchange、messageConverter、RabbitTemplate。

服务端(消费者):connectionFactory、queue、exchange、messageConverter、listenerContainer。

 

如果消息堆积严重,我们可以通过两种方式来处理消息,一种是在服务端开启监听多线程服务(concurrency="10"),另一种是让消息批量出队列。

 

开启多线程的配置示例如下:

	<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" requeue-rejected="false"   
	      concurrency="10"   message-converter="jsonMessageConverter" > 
    	    <rabbit:listener ref="tradeListener" method="listen"  queues="queue_trade_repay" />
	</rabbit:listener-container>

 

批量出队列的示例如下:

客户端(消息生产者

import java.math.BigDecimal;

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.rd.account.domain.AccountLog;
import com.rd.ifaes.mq.producer.RabbitProducer;
import com.rd.ifaes.web.BaseTest;

/**
 * 消息生产者
 * @author lihua
 * @since 2018-04-08
 *
 */
public class Producer  extends BaseTest{
	
//	@Autowired
//	private RabbitTemplate rabbitTemplate;
	
	//这里对rabbitTemplate做了简单的封装,您可以直接使用rabbitTemplate
	@Autowired
	private RabbitProducer rabbitProducer;
	
	private static final String queueName = "ACCOUNT_LOG_BATCH"; //MqConstant.ROUTING_KEY_ACCOUNT_LOG_BATCH;

	@Test
	public void main() {
		for (int i = 0; i < 512; i++) {
			AccountLog log = new AccountLog("001", "001", "asdf", BigDecimal.valueOf(i), "remark"+i);
			rabbitProducer.send(queueName, log);
//			rabbitTemplate.convertAndSend(queueName, "hello" + i);
		}
		
	}
	
}

 

服务端(消息消费者)

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 消息消费者
 * @author lihua
 * @since 2018-04-08
 *
 */
public class Consumer extends BaseTest{
	
	private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
	
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Autowired
	private AccountLogService accountLogService;
	private static final String queueName = "ACCOUNT_LOG_BATCH"; //MqConstant.ROUTING_KEY_ACCOUNT_LOG_BATCH;
	private static final int BATCH_SIZE = 100;
	
	@Test
	public void consumer() {
    	while (true) {
    		rabbitTemplate.execute(new ChannelCallback<String>() {
    			@Override
    			public String doInRabbit(Channel channel) throws Exception {
    				SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    		        try {
    		            final AMQP.Queue.DeclareOk ok = channel.queueDeclare(queueName, true, false, false, null);
    		            int messageCount = ok.getMessageCount();
    		            LOGGER.info("run consumer {}, msg count {}", sdf.format(new Date()), messageCount);
    		            if (messageCount == 0) {
    		                return null;
    		            }
    		            List<AccountLog> list = new ArrayList<>();
    		            channel.basicQos(BATCH_SIZE);
    		            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    		            LOGGER.info("channel id {}", Integer.toHexString(System.identityHashCode(channel)));
    		            final String inConsumerTag = "test consumer" + sdf.format(new Date());
    		            channel.basicConsume(queueName, false, inConsumerTag, queueingConsumer);
    		            long messageId = -1;
    		            int dealedCount = 0;
    		            int i = BATCH_SIZE;
    		            while (i-- > 0) {
    		                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(BATCH_SIZE);
    		                if (delivery == null) {
    		                    break;
    		                }
    		                String msg = new String(delivery.getBody());
    		                AccountLog log = JSONObject.parseObject(msg, AccountLog.class);
    		                list.add(log);
    		                messageId = delivery.getEnvelope().getDeliveryTag();
    		                LOGGER.info("get message {} delivery id {}", msg, messageId);
    		                dealedCount++;
    		                if (dealedCount % 5 == 0) {
    		                    channel.basicAck(messageId, true);
    		                    LOGGER.info("batch ack message id =>{}", messageId);
    		                    messageId = -1;
    		                }
    		            }
    		            if (messageId > 0) {
    		                channel.basicAck(messageId, true);
    		                LOGGER.info("last to ack message id =>{}", messageId);
    		            }
    		            
    		            // 日志入库
    		            accountLogService.saveBatch(list);
    		            
    		        } finally {
    		            LOGGER.info("consumer done {}", sdf.format(new Date()));
    		        }
    		        channel.abort();
    				return null;
    			}
    		});
			
    		try {
    			Thread.sleep(5000);
    		} catch (InterruptedException e) {
    			
    		}
		}
	}

}

 

import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:spring-context.xml"})
public abstract class BaseTest {

}

 

补一个服务端真实案例:

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;


@Component
@Lazy(value=false)
public class AccountLogBatchListener {
	
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Autowired
	private AccountLogService accountLogService;
	
	private static final Logger LOGGER = LoggerFactory.getLogger(AccountLogBatchListener.class);
	private static final String QUEUE_NAME = MqConstant.ROUTING_KEY_ACCOUNT_LOG_BATCH;
	private static final ExecutorService executor = Executors.newFixedThreadPool(1);
	private static final int BATCH_SIZE = 100;
	
	@PostConstruct
	public void init(){		
		executor.submit(new Callable<String>() {
			@Override
			public String call() throws Exception {
				execute();
				return null;
			}			
		});
	}
	
	private void execute(){		
		while (true) {
			rabbitTemplate.execute(new ChannelCallback<String>() {
				@Override
				public String doInRabbit(Channel channel) throws Exception {
					SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
					try {		        	
						final AMQP.Queue.DeclareOk ok = channel.queueDeclare(QUEUE_NAME, true, false, false, null);
						int messageCount = ok.getMessageCount();
						LOGGER.debug("accountLogBatchListener {}, msg count {}", sdf.format(new Date()), messageCount);
						if (messageCount == 0) {
							return null;
						}
						List<AccountLog> list = new ArrayList<>();
						channel.basicQos(BATCH_SIZE);
						QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
						LOGGER.debug("channel id {}", Integer.toHexString(System.identityHashCode(channel)));
						final String inConsumerTag = "accountLogBatchListener {}" + sdf.format(new Date());
						channel.basicConsume(QUEUE_NAME, false, inConsumerTag, queueingConsumer);
						long messageId = -1;
						int dealedCount = 0;
						int i = BATCH_SIZE;
						while (i-- > 0) {
							QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(BATCH_SIZE);
							if (delivery == null) {
								break;
							}
							String msg = new String(delivery.getBody());
							AccountLog log = JSONObject.parseObject(msg, AccountLog.class);
							list.add(log);
							messageId = delivery.getEnvelope().getDeliveryTag();
							LOGGER.info(" userId {}, delivery id {}", log.getUserId(), messageId);
							dealedCount++;
							if (dealedCount % 5 == 0) {
								channel.basicAck(messageId, true);
								LOGGER.debug("batch ack message id =>{}", messageId);
								messageId = -1;
							}
						}
						if (messageId > 0) {
							channel.basicAck(messageId, true);
							LOGGER.debug("last to ack message id =>{}", messageId);
						}
						
						// 日志入库
						accountLogService.saveBatch(list);
						
						
					} finally {
						LOGGER.info("accountLogBatchListener done {}", sdf.format(new Date()));
					}
					channel.abort();
					return null;
				}
			});
			try {
				Thread.sleep(10000);
			} catch (InterruptedException e) {
			}
		}
	}

}

 

相关标签: java rabbitmq