欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rabbitmq 使用

程序员文章站 2022-03-04 09:36:05
...

API模块接收请求,推送到消息队列

router模块消费消息,分发到各个模块

每个模块消费消息,在推回API模块,因为api模块需要知道最终执行结果

 

 

API模块配置:

spring:

  cloud:

    stream:

      bindings:

        outbound-agent-state-list.destination: outbound.agent-state-list   #生产

        agent-state-list-reply-channel:                                    #消费回调回来的消息

          destination: outbound.agent-state-list-reply

          group: ${nodeNo:0}

          durableSubscription: false

          consumer.maxPriority: 10

      rabbit.bindings:

        outbound-agent-state-list.producer.routing-key-expression: '''router'''

        agent-state-list-reply-channel.consumer.durableSubscription: false

 

@Component

public interface OutboundOutputChannels {

String OUTBOUND_AGENT_STATE_LIST = "outbound-agent-state-list";

 

@Output(value = OUTBOUND_AGENT_STATE_LIST)

    MessageChannel agentStateListOutput();

}

 

@Component

public interface OutboundInputChannels {

 

String OUTBOUND_AGENT_STATE_LIST_REPLY = "agent-state-list-reply-channel";

 

@Input(value = OUTBOUND_AGENT_STATE_LIST_REPLY)

    SubscribableChannel agentStateListReply();

}

 

接收回调消息并出来业务逻辑

@Slf4j

@RequiredArgsConstructor

@EnableBinding(OutboundInputChannels.class)

public class OutboundReplyMonitor extends DestroyableMonitor {

 

@StreamListener(OutboundInputChannels.OUTBOUND_AGENT_STATE_LIST_REPLY)

    public void agentStateListReply(AgentStateListReplyDTO payload) {

        countUp();

        agentStateService.processAgentStateListReply(payload);

        countDown();

    }

}

 

API接口入口

@Slf4j

@RestController

@RequiredArgsConstructor

@RequestMapping("/api")

@Api(value = "外呼接口", tags = "精准智能请求人工及人工接起")

public class AgentStateController {

 

    private final AgentStateService agentStateService;

 

    /**

     * 5.9 精准智能请求人工及人工接起

     * 调用商路通服务,获取所有坐席首页数据

     *

     * @param agentStateRequestDTO 请求参数

     * @return 返回ResponseServer

     */

    @ApiOperation(value = "用商路通服务,获取所有坐席首页数据", httpMethod = "POST")

    @PostMapping("/agentStateList")

    public ApiResponse agentStateList(@Validated @RequestBody AgentStateListRequestDTO agentStateRequestDTO, @RequestHeader(value = WebConstant.PRIORITY, required = false, defaultValue = "2") Integer priority) throws Exception {

        log.info("请求:调用商路通服务,获取所有坐席首页数据,param:{}", agentStateRequestDTO.toString());

        return agentStateService.agentStateList(agentStateRequestDTO, priority);

    }

 

}

 

API接收请求后,数据校验,发送请求到消息队列,并等待消息的响应

@Slf4j

@Service

@RequiredArgsConstructor

public class AgentStateService {

 

    private final MessageService messageService;

 

    private final CallSupplierTypeRelationService callSupplierTypeRelationService;

 

    private final CallSupplierService callSupplierService;

 

    private final CallTypeService callTypeService;

 

    protected final ApplicationProperties properties;

 

    private final Map<String, CompletableFuture<ApiResponse>> queryFutureMap = new ConcurrentHashMap<>();

 

    public ApiResponse agentStateList(AgentStateListRequestDTO queue, Integer priority) throws Exception {

        long startTime = System.currentTimeMillis();

 

        String businessId = SltBusinessEnum.getNameByCallType(queue.getCallType());

        if (StringUtils.isEmpty(businessId)) {

            throw new BusinessException("商路通不支持的外呼类型:" + queue.getCallType());

        }

 

        CallSupplierPO supplierPO = callSupplierService.findByCode(queue.getSupplier());

        if (supplierPO == null) {

            throw new BusinessException("供应商不存在:" + queue.getSupplier());

        }

 

        CallTypePO callTypePO = callTypeService.findByCode(queue.getCallType());

        if (callTypePO == null) {

            throw new BusinessException("外呼类型不存在:" + queue.getCallType());

        }

 

        CallSupplierTypeRelationPO callSupplierTypeRelationPo = callSupplierTypeRelationService.findBySupplierIdAndCallTypeId(supplierPO.getId(), callTypePO.getId());

        if (null == callSupplierTypeRelationPo) {

            throw new BusinessException("供应商外呼类型关系不存在");

        }

        queue.setCallType(callTypePO.getCode());

        queue.setSupplierCode(supplierPO.getCode());

        queue.setQueueId(StringCheckUtil.uuid());

        messageService.sendToAgentStateListChannel(queue, priority);

        CompletableFuture<ApiResponse> future = new CompletableFuture<>();

        queryFutureMap.put(queue.getQueueId(), future);

 

        long maxWaitMillis = properties.getMaxWaitMillis().toMillis();

        Duration duration = properties.getMaxWaitMillis().minusMillis(System.currentTimeMillis() - startTime);

        if (duration.isNegative() || duration.isZero()) {

            log.info("调用商路通服务,获取所有坐席首页数据,已超过最大等待时间{}ms,param:{}", properties.getMaxWaitMillis().toMillis(), queue.toString());

            throw new RequestTimeoutException(maxWaitMillis, queue.getQueueId());

        }

        log.debug("等待返回调用商路通服务,获取所有坐席首页数据请求结果,允许等待时间{}ms", duration.toMillis());

        try {

            return future.get(duration.toMillis(), TimeUnit.MILLISECONDS);

        } catch (TimeoutException e) {

            throw new RequestTimeoutException(maxWaitMillis, queue.getQueueId());

        } finally {

            queryFutureMap.remove(queue.getQueueId());

        }

    }

 

    public void processAgentStateListReply(AgentStateListReplyDTO payload) {

        String queueId = payload.getQueueId();

        log.info("API模块-接收-调用商路通服务,获取所有坐席首页数据结果,{}", payload.toString());

        if (queryFutureMap.containsKey(queueId)) {

            queryFutureMap.get(queueId).complete(ApiResponse.success(payload.getRows()));

        } else {

            log.debug("未找到queueId记录:{}", queueId);

        }

    }

}

 

 

停止服务前,检查是否有消息正在处理

@Slf4j

public class DestroyableMonitor {

 

    private static final Integer MAX_WAIT_COUNT = 20;

 

    private AtomicLong messageCount = new AtomicLong();

 

    protected Long countUp() {

        return messageCount.incrementAndGet();

    }

 

    protected Long countDown() {

        return messageCount.decrementAndGet();

    }

 

    @PreDestroy

    private void tearDown() throws InterruptedException {

        int waitCount = 0;

        while (messageCount.get() > 0 && waitCount++ < MAX_WAIT_COUNT) {

            log.info("正在关闭消息监听程序{},等待3秒[{}/{}]...", this.getClass().getCanonicalName(), waitCount, MAX_WAIT_COUNT);

            Thread.sleep(3000L);

        }

        if (messageCount.get() > 0) {

            log.warn("应用非安全关闭,当前仍有{}条正在处理的消息", messageCount.get());

        }

    }

}

 

 

 

router模块队列配置:

spring.cloud.stream:

  bindings:

    agent-state-list-input-channel:

      destination: outbound.agent-state-list

      group: router

      consumer:

        maxAttempts: 1

        concurrency: 10

    agent-state-list-slt-acv-channel.destination: outbound.agent-state-list

    agent-state-list-slt-ivr-channel.destination: outbound.agent-state-list

  rabbit.bindings:

    agent-state-list-input-channel.consumer.bindingRoutingKey: router

    agent-state-list-slt-acv-channel.producer.routing-key-expression: '''slt-acv'''

    agent-state-list-slt-ivr-channel.producer.routing-key-expression: '''slt-ivr'''

 

 

@Component

public interface RouterInputChannels {

String AGENT_STATE_LIST_INPUT_CHANNEL = "agent-state-list-input-channel";

 

@Input(value = AGENT_STATE_LIST_INPUT_CHANNEL)

    SubscribableChannel agentStateListInput();

}

 

@Component

public interface RouterOutputChannels {

String AGENT_STATE_LIST_SLT_ACV = "agent-state-list-slt-acv-channel";

    String AGENT_STATE_LIST_SLT_IVR = "agent-state-list-slt-ivr-channel";

 

    @Output(AGENT_STATE_LIST_SLT_ACV)

    MessageChannel agentStateListSltACR();

 

    @Output(AGENT_STATE_LIST_SLT_IVR)

    MessageChannel agentStateListSltIVR();

}

 

router 模块监听

@Slf4j

@Component

@EnableBinding(RouterInputChannels.class)

@RequiredArgsConstructor

public class AgentStatusListMonitor extends DestroyableMonitor {

    private final AgentStateListService agentStateListService;

 

    @StreamListener(RouterInputChannels.AGENT_STATE_LIST_INPUT_CHANNEL)

    public void onMessage(AgentStateListRequestDTO req) {

        countUp();

        try {

            log.info("AgentStatusListMonitor收到请求,调用商路通服务,获取所有坐席首页数据:查询参数:{}", req.toString());

            agentStateListService.process(req);

        } finally {

            countDown();

        }

    }

}

 

router业务处理和消息分发

@Slf4j

@Service

@RequiredArgsConstructor

public class AgentStateListService {

 

    private final AgentStateListMessageService agentStateListMessageService;

 

    private final RouterOutputChannels routerOutputChannels;

 

    public void process(AgentStateListRequestDTO payload) {

        agentStateListMessageService.send(payload);

 

        Message<AgentStateListRequestDTO> message = MessageBuilder.withPayload(payload).build();

        switch (payload.getSupplier()) {

            case OutboundSupplierConstant.SLT:

                switch (payload.getCallType()) {

                    case OutboundTypeConstant.IVR:

                        routerOutputChannels.agentStateListSltIVR().send(message);

                        return;

                    case OutboundTypeConstant.ACV:

                        routerOutputChannels.agentStateListSltACR().send(message);

                        return;

                    default:

                        routerOutputChannels.agentStateListSltACR().send(message);

                        routerOutputChannels.agentStateListSltIVR().send(message);

                        return;

                }

            default:

                log.error("调用商路通服务,获取所有坐席首页数据的消息转发失败,供应商不支持,[payload:{}]", payload);

        }

    }

}

 

分发后的模块

base模块

application-slt.yml

spring.cloud.stream:

  bindings:

    agent-state-list-reply-channel.destination: outbound.agent-state-list-reply

 

@Component

public interface SltBoundInputChannels {

String AGENT_STATE_LIST_CHANNEL = "agent-state-list-channel";

 

@Input(value = AGENT_STATE_LIST_CHANNEL)

    SubscribableChannel agentStateListSltAcvInput();

}

 

@Component

public interface SltBoundOutputChannels {

String AGENT_STATE_LIST_REPLY_CHANNEL = "agent-state-list-reply-channel";

 

@Output(AGENT_STATE_LIST_REPLY_CHANNEL)

    MessageChannel agentStateListReplyOutput();

}

 

base模块的monitor

@Slf4j

@Component

@RequiredArgsConstructor

@EnableBinding(SltBoundInputChannels.class)

public class SltAgentInfoMonitor extends DestroyableMonitor {

 

    private final SltAgentStateListService sltAgentStateListService;

 

    @StreamListener(SltBoundInputChannels.AGENT_STATE_LIST_CHANNEL)

    public void process(Message<AgentStateListRequestDTO> message) {

        log.info("外呼平台:SLT,代理模块收到消息,调用商路通服务,获取所有坐席首页数据, params: {}", message.getPayload());

        countUp();

        sltAgentStateListService.process(message.getPayload());

        countDown();

    }

}

 

service实现了一些slt请求的方法

@Slf4j

@Service

public class SltAgentStateListService extends AbstractSltRequestServiceTemplate<AgentStateListRequestDTO, AgentStateListResponseDTO> {

 

    private final SltBoundOutputChannels sltBoundOutputChannels;

 

    public SltAgentStateListService(ApplicationProperties properties, ObjectMapper objectMapper, RestTemplateService restTemplateService, SltBoundOutputChannels sltBoundOutputChannels) {

        super(properties, objectMapper, restTemplateService);

        this.sltBoundOutputChannels = sltBoundOutputChannels;

    }

 

    @Override

    protected void handleException(Exception exception, AgentStateListRequestDTO payload) {

        log.info("外呼平台:SLT,代理模块执行,调用商路通服务,获取所有坐席首页数据-指令,调用异常, params: {}", payload, exception);

    }

 

    @Override

    public RequestDTO generateRequest(AgentStateListRequestDTO payload) {

        log.info("外呼平台:SLT,代理模块执行,调用商路通服务,获取所有坐席首页数据-指令,创建请求,params:{}", payload);

        SupplierRequestInfoDTO requestInfo = properties.getRequestInfo(payload.getSupplierCode());

        RequestDTO request = new RequestDTO();

        request.setAction(OUTBOUND_AGENT_STATE_LIST_ACTION);

        request.setStartTime(payload.getStartTime());

        request.setEndTime(payload.getEndTime());

        request.setBusinessID(SltBusinessEnum.getNameByCallType(payload.getCallType()));

        request.setBaseUrl(requestInfo.getBaseUrl());

        request.setUrl(requestInfo.getBaseUrl().concat(properties.getRequestUrlMap().get(OUTBOUND_AGENT_STATE_LIST_SERVLET)));

        request.setLoginUser(requestInfo.getUser());

        request.setLoginPwd(requestInfo.getPassword());

        return request;

    }

 

 

    @Override

    public void handleResponse(AgentStateListResponseDTO responseDTO, AgentStateListRequestDTO payload) {

        if (SltResponseCode.RESPONSE_SUCCESS_STR.equals(responseDTO.getReturnCode())) {

            processReply(responseDTO, payload);

        } else {

            log.warn("外呼平台:SLT,代理模块执行,调用商路通服务,获取所有坐席首页数据-处理响应,获取失败,params:{}, result:{}", payload, responseDTO);

        }

    }

 

    @Override

    protected TypeReference<AgentStateListResponseDTO> instanceReference() {

        return new TypeReference<>() {

        };

    }

    

//消息写会API模块

    private void processReply(AgentStateListResponseDTO responseDTO, AgentStateListRequestDTO payload) {

        List<AgentStateListJsonTO> rowsJsonObject = responseDTO.getRows();

        List<AgentStateListDTO> rowsList = new ArrayList<>();

        for (AgentStateListJsonTO e : rowsJsonObject) {

            AgentStateListDTO agentStateListDTO = new AgentStateListDTO();

            BeanUtils.copyProperties(e, agentStateListDTO);

            rowsList.add(agentStateListDTO);

        }

        AgentStateListReplyDTO reply = new AgentStateListReplyDTO();

        reply.setReturnCode(responseDTO.getReturnCode());

        reply.setMessage(responseDTO.getReturnMessage());

        reply.setRows(rowsList);

        reply.setQueueId(payload.getQueueId());

        reply.setTotal(responseDTO.getTotal());

        sltBoundOutputChannels.agentStateListReplyOutput().send(MessageBuilder.withPayload(reply).build());

    }

 

}

 

@Slf4j

public abstract class AbstractSltRequestServiceTemplate<T extends BaseQueue, K extends ResponseDTO> extends SltSendRequestService<K>

        implements IEncapsulationRequestEntityInterface<T, RequestDTO>, IParsingResponseBodyInterface<T, K> {

 

 

    public AbstractSltRequestServiceTemplate(ApplicationProperties properties, ObjectMapper objectMapper, RestTemplateService restTemplateService) {

        super(properties, objectMapper, restTemplateService);

    }

 

    public void process(T payload) {

        try {

            RequestDTO requestDTO = generateRequest(payload);

            K response = sendRequest(requestDTO);

            handleResponse(response, payload);

        } catch (Exception e) {

            handleException(e, payload);

        }

    }

 

    /**

     * 处理异常

     *

     * @param exception 异常对象

     * @param payload   传递数据

     */

    protected abstract void handleException(Exception exception, T payload);

}

 

下面几个类是对SLT的所有请求的封装

@Slf4j

@Component

@RequiredArgsConstructor

public class SltSendRequestService<T extends ResponseDTO> implements ISendRequestInterface<RequestDTO, T> {

 

    protected final ApplicationProperties properties;

    protected final ObjectMapper objectMapper;

    protected final RestTemplateService restTemplateService;

 

    ThreadLocal<Boolean> checkLogin = new ThreadLocal<>();

 

    /**

     * 发送请求

     *

     * @param request 请求参数

     * @return 响应实体

     * @throws Exception 异常

     */

    @Override

    public T sendRequest(RequestDTO request) throws Exception {

        HttpHeaders headers = new HttpHeaders();

        headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);

        String requestParam = "param=".concat(objectMapper.writeValueAsString(request));

        HttpEntity<String> requestEntity = new HttpEntity<>(requestParam, headers);

        long startTime = System.currentTimeMillis();

        ResponseEntity<String> responseEntity = null;

        log.info("请求:[{}],header: {}", requestParam, headers.toString());

        try {

            responseEntity = restTemplateService.post(request.getUrl(), requestEntity, properties.getRequestTimeoutMaximum(), String.class);

            Assert.hasText(responseEntity.getBody(), "接口调用异常:商路通接口响应体为空");

            if (responseEntity.getStatusCode().equals(HttpStatus.OK)) {

                T response = objectMapper.readValue(responseEntity.getBody(), instanceReference());

                return checkResponse(response, request);

            } else {

                log.error("商路通外呼请求:接口调用失败 [statusCode:{},body:{}]", responseEntity.getStatusCodeValue(), responseEntity.getBody());

                throw new BusinessException(CommonEnum.REQUEST_EXCEPTION.getCode(), "商路通外呼请求:接口调用失败");

            }

        } finally {

            checkLogin.remove();

            long elapsedTime = System.currentTimeMillis() - startTime;

            if (responseEntity == null) {

                log.error("商路通外呼请求详情:[请求地址:[{}],请求体:[{}],响应体:null]", request.getUrl(), requestParam);

            } else {

                log.info("商路通外呼请求详情:[请求地址:[{}],请求时间:{},请求体:[{}],响应体:[{}]]",

                        request.getUrl(), elapsedTime, requestParam, responseEntity.getBody());

            }

        }

    }

 

    protected TypeReference<T> instanceReference() {

        return new TypeReference<T>() {

        };

    }

 

    /**

     * 校验响应体是否合法

     *

     * @param response 响应体对象

     * @param request  请求体对象

     * @return 响应体对象

     * @throws Exception 异常

     */

    T checkResponse(T response, RequestDTO request) throws Exception {

        if (StringUtils.equals(SltResponseCode.RESPONSE_TIME_OUT_STR, response.getReturnCode()) &&

                StringUtils.equals("超时", response.getReturnMessage())) {

            log.info("商路通外呼请求:接口请求响应码超时:[{}],调用登录接口", response.toString());

            return login(request);

        } else {

            return response;

        }

    }

 

    /**

     * 商路通登陆操作

     *

     * @param request 请求参数对象

     * @return 响应体对象

     * @throws Exception 异常

     */

    private T login(RequestDTO request) throws Exception {

        checkLogin();

        HttpHeaders headers = new HttpHeaders();

        headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);

        String loginParam = "userId=".concat(request.getLoginUser()).concat("&md5pwd=").concat(request.getLoginPwd());

        HttpEntity<String> requestEntity = new HttpEntity<>(loginParam, headers);

        ResponseEntity<String> loginResponseEntity = restTemplateService.post(request.getBaseUrl().concat(properties.getRequestUrlMap().get(OUTBOUND_LOGIN)), requestEntity);

        log.info("商路通外呼请求:登陆接口响应:[body:{}]", loginResponseEntity.getBody());

        Assert.hasText(loginResponseEntity.getBody(), "接口调用异常:商路通登录接口响应体为空");

        LoginResponseDTO loginResponse = objectMapper.readValue(loginResponseEntity.getBody(), LoginResponseDTO.class);

        if (SltResponseCode.RESPONSE_SUCCESS.equals(loginResponse.getReturnCode())) {

            return sendRequest(request);

        } else {

            throw new BusinessException(CommonEnum.REQUEST_EXCEPTION.getCode(),

                    "商路通外呼请求:登录接口调用失败:".concat(Objects.requireNonNull(loginResponseEntity.getBody())));

        }

    }

 

    /**

     * 校验请求而否二次登陆

     */

    private void checkLogin() {

        if (checkLogin.get() == null || !checkLogin.get()) {

            checkLogin.set(true);

        } else {

            throw new BusinessException(CommonEnum.REQUEST_EXCEPTION.getCode(), "操作失败: 商路通登录接口已经调用过");

        }

    }

 

}

 

public interface ISendRequestInterface<T extends BaseRequestEntity,K extends BaseResponseEntity> {

 

 

    /**

     * 发送请求

     *

     * @param request 请求实体

     * @return 响应实体

     * @throws Exception  异常

     */

    K sendRequest(T request) throws Exception;

 

}

 

public interface IEncapsulationRequestEntityInterface<Q extends BaseQueue,T extends BaseRequestEntity> {

 

    /**

     * 初始化请求对象

     *

     * @param payload 消息队列传递请求相关参数信息

     * @return 请求参数

     */

     T generateRequest(Q payload) throws Exception;

 

}

 

@Data

public class BaseRequestEntity {

}

 

 

下面是IVR模块的示例,ACV模块省略

配置队列:

spring.cloud.stream:

  bindings:

    agent-state-list-channel:

      destination: outbound.agent-state-list

      group: slt-ivr

      consumer:

        maxAttempts: 1

        concurrency: 10

  rabbit.bindings:

    agent-state-list-channel.consumer.bindingRoutingKey: slt-ivr

相关标签: spring