欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

e3mall项目:商品增删改查同步索引库(ActiveMQ相关)

程序员文章站 2022-04-02 11:54:20
...

e3mall项目:商品增删改查同步索引库

准备工作:在e3-search-service和e3-manager-service的pom文件中,新增ActiveMQ的依赖

<!--ActiveMQ消息中间件-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
</dependency>


一、在商品的增删改查时,发送需要同步索引库的消息。(消息发送端:e3-manager-service)

(1)在e3-manager-service中配置ActiveMQ。(在spring目录中,新建配置文件:applicationContext-activemq.xml)

e3mall项目:商品增删改查同步索引库(ActiveMQ相关)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
	http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd
	http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">


    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${activemq.url}" />  //这里的activemq.url在resouce.properties中,你也可以直接写在这里
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

    <!-- 配置生产者 -->
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!-- 商品数据更新消息 -->
    <bean id="itemUpdate" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="itemUpdate" />
    </bean>
</beans>

(2)在完成商品数据修改之前,发送消息同步索引库。(我这里对发送消息进行了封装,只需要在商品操作完成之前,调用即可)

封装的方法如下:

    /**
     * 发送消息的方法
     * @auther: xushuai
     * @date: 2018/5/26 15:26
     */
    private void sendMessage(final long itemId, Destination destination) {
        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                //将商品ID发送
                return session.createTextMessage(itemId + "");
            }
        });
    }

调用:

e3mall项目:商品增删改查同步索引库(ActiveMQ相关)

e3mall项目:商品增删改查同步索引库(ActiveMQ相关)


注意:需要注入JmsTemplate对象和Destination对象。(即spring容器中配置好的)

二、同步索引库操作。(消息接收端:e3-search-service)

(1)书写消息接收监听器类(ItemUpdateMessageLinstener)
package cn.e3mall.search.message;

import cn.e3mall.search.service.SearchService;
import org.springframework.beans.factory.annotation.Autowired;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 商品数据更新消息监听器
 * Author: xushuai
 * Date: 2018/5/26
 * Time: 15:37
 * Description:
 */
public class ItemUpdateMessageLinstener implements MessageListener{

    @Autowired
    private SearchService searchService;

    @Override
    public void onMessage(Message message) {
        try {
            //等待一秒,防止消息发送端事务没有提交,发生不必要的错误
            Thread.sleep(1000);
            //执行更新索引库操作
            TextMessage textMessage = (TextMessage) message;
            Long itemId = Long.valueOf(textMessage.getText());
            searchService.syncIndex(itemId);
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

(2)在e3-search-service中配置ActiveMQ。(在spring目录中,新建applicationContext-activemq.xml)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
	http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd
	http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">


    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${activemq.url}" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

    <!-- 配置生产者 -->
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!-- 商品数据更新消息 -->
    <bean id="itemUpdate" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="itemUpdate" />  //这里的值,需要和消息发送方的值一致
    </bean>

    <!--配置消息接收监听器-->
    <bean id="itemUpdateMessageLinstener" class="cn.e3mall.search.message.ItemUpdateMessageLinstener"/>
    <!--商品数据更新消息监听器容器-->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="itemUpdate" />
        <property name="messageListener" ref="itemUpdateMessageLinstener" />
    </bean>

</beans>

(3)在SearchService和SearchServiceImpl中,实现同步索引库操作(抽象方法和实现)

    /**
     * 同步索引库
     * @auther: xushuai
     * @date: 2018/5/26 15:40
     */
    void syncIndex(Long itemId);
    @Override
    public void syncIndex(Long itemId) {
        //根据ID查询索引结果
        SearchResult searchResult = searchMapper.getItemById(itemId);
        if(searchResult != null){
            //说明商品为正常状态,更新其在索引库中的数据
            try {
                write(searchResult);  //这个方法在之前的一键导入时,就已经封装好了。可以在之前的一键导入索引库文章中查看该方法
            } catch (Exception e) {
                e.printStackTrace();
            }
        }else{
            try {
                //说明商品状态为下架或者删除,执行删除索引库操作
                solrServer.deleteById(itemId.toString());
                //提交删除操作
                solrServer.commit();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


(4)书写Mapper接口和映射文件(mybatis相关)

    /**
     * 根据ID查询商品
     * @auther: xushuai
     * @date: 2018/5/26 15:41
     * @return: 索引库结果对象
     */
    SearchResult getItemById(Long itemId);
	<!--按查询商品,返回索引结果对象-->
	<select id="getItemById" parameterType="long" resultType="cn.e3mall.common.entity.SearchResult">
		SELECT
			a.id,
			a.title,
			a.sell_point,
			a.price,
			a.image,
			b.`name` category_name
		FROM
			tb_item a
		LEFT JOIN tb_item_cat b ON a.cid = b.id
		WHERE
		a. `status` = 1 AND a.id=#{itemId}
	</select>