欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

springboot:amq

程序员文章站 2022-06-16 08:05:04
...
pom.xml
=======================================================
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
=======================================================
AmqConfigation.java
=======================================================
@Configuration
public class AmqConfigation {

    @Bean(name= "connectionFactory")
    public ActiveMQConnectionFactory connectionFactory () {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL("tcp://172.16.30.57:61616");
        factory.setUseAsyncSend(true);
        factory.setUseCompression(true);
        factory.setMaxThreadPoolSize(100);
        return factory;
    }

    @Bean(name = "simpleMessageConverter")
    public SimpleMessageConverter getSimpleMessageConverter(){
        SimpleMessageConverter converter = new SimpleMessageConverter();
        return converter;
    }

    @Bean(name = "queue1")
    public ActiveMQQueue queue(){
        return new ActiveMQQueue("queue1");
    }

}
=======================================================
AmqConsumerConfiguration.java
=======================================================
@Configuration
public class AmqConsumerConfiguration {

//    public RedeliveryPolicyMap getRedeliveryPolicyMap(){
//        RedeliveryPolicyMap policyMap = new RedeliveryPolicyMap();
//        RedeliveryPolicy policy = new RedeliveryPolicy();
//        policy.set
//        policyMap.setDefaultEntry();
//    }

    //=================================one start
    @Bean(name = "messageConsumer1")
    public IMessageConsumer getMessageConsumer1(){
        IMessageConsumer cunsumer = new MessageConsumer1();
        return cunsumer;
    }

    @Bean(name = "messageListenerAdapter1")
    public MessageListenerAdapter getMessageListenerAdapter1(@Qualifier("messageConsumer1")IMessageConsumer consumer,
                                                            @Qualifier("simpleMessageConverter") SimpleMessageConverter converter){
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(consumer);
        listenerAdapter.setMessageConverter(converter);
        listenerAdapter.setDefaultListenerMethod("handleMessage");
        return listenerAdapter;
    }

    @Bean(name = "messageListenerContainer1")
    public SimpleMessageListenerContainer getSimpleMessageListenerContainer1(@Qualifier("connectionFactory") ActiveMQConnectionFactory connectionFactory,
                                                                             @Qualifier("queue1") ActiveMQQueue queue,
                                                                             @Qualifier("messageListenerAdapter1") MessageListenerAdapter listenerAdapter){
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory);
        listenerContainer.setDestination(queue);
        listenerContainer.setMessageListener(listenerAdapter);
        listenerContainer.setConcurrency("5-10");
        listenerContainer.setSessionTransacted(true);
        return listenerContainer;
    }
    //=================================one end

}
=======================================================
AmqProducterConfiguration.java
=======================================================
@Configuration
public class AmqProducterConfiguration{


    @Bean(name = "messageProducter1")
    public JmsTemplate getTemplate(@Qualifier("connectionFactory") ActiveMQConnectionFactory connectionFactory,
                                   @Qualifier("simpleMessageConverter") SimpleMessageConverter converter,
                                   @Qualifier("queue1") ActiveMQQueue queue ){
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory);
        template.setMessageConverter(converter);
        template.setDefaultDestination(queue);
        template.setExplicitQosEnabled(true);
        template.setDeliveryMode(2);
        template.setPubSubDomain(false);
        template.setSessionTransacted(true);
        template.setSessionAcknowledgeMode(1);
        return template;
    }

}
=======================================================
IMessageConsumer.java
=======================================================
public interface IMessageConsumer {

    void handleMessage(String message) throws JMSException;

}
=======================================================
MessageConsumer1.java
=======================================================
public class MessageConsumer1 implements IMessageConsumer {

    public void handleMessage(String message)throws JMSException{
        try{

            System.out.println("message = [" + message + "]");

        }catch (Exception e){
        }
    }

}
=======================================================
MessageProducter.java
=======================================================
public interface MessageProducter<T extends BaseEntity> {

    void send(T t);

}
=======================================================
MessageProducter1.java
=======================================================
@Component("test-messageSender1")
public class MessageProducter1 implements MessageProducter<City> {

    @Resource(name = "messageProducter1")
    JmsTemplate messageSender;

    @Override
    public void send(City data) {
        try {
            messageSender.convertAndSend("");
        }catch (Exception e){
        }
    }

}
=======================================================
test.java
=======================================================
@Resource(name = "test-messageSender1")
    MessageProducter messageProducter;

    public void send() {
        City city = new City();
        city.setCityName("测试");
        messageProducter.send(city);
    }

相关标签: springboot amq