rabbitmq 使用
API模块接收请求,推送到消息队列
router模块消费消息,分发到各个模块
每个模块消费消息,在推回API模块,因为api模块需要知道最终执行结果
API模块配置:
spring:
cloud:
stream:
bindings:
outbound-agent-state-list.destination: outbound.agent-state-list #生产
agent-state-list-reply-channel: #消费回调回来的消息
destination: outbound.agent-state-list-reply
group: ${nodeNo:0}
durableSubscription: false
consumer.maxPriority: 10
rabbit.bindings:
outbound-agent-state-list.producer.routing-key-expression: '''router'''
agent-state-list-reply-channel.consumer.durableSubscription: false
@Component
public interface OutboundOutputChannels {
String OUTBOUND_AGENT_STATE_LIST = "outbound-agent-state-list";
@Output(value = OUTBOUND_AGENT_STATE_LIST)
MessageChannel agentStateListOutput();
}
@Component
public interface OutboundInputChannels {
String OUTBOUND_AGENT_STATE_LIST_REPLY = "agent-state-list-reply-channel";
@Input(value = OUTBOUND_AGENT_STATE_LIST_REPLY)
SubscribableChannel agentStateListReply();
}
接收回调消息并出来业务逻辑
@Slf4j
@RequiredArgsConstructor
@EnableBinding(OutboundInputChannels.class)
public class OutboundReplyMonitor extends DestroyableMonitor {
@StreamListener(OutboundInputChannels.OUTBOUND_AGENT_STATE_LIST_REPLY)
public void agentStateListReply(AgentStateListReplyDTO payload) {
countUp();
agentStateService.processAgentStateListReply(payload);
countDown();
}
}
API接口入口
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/api")
@Api(value = "外呼接口", tags = "精准智能请求人工及人工接起")
public class AgentStateController {
private final AgentStateService agentStateService;
/**
* 5.9 精准智能请求人工及人工接起
* 调用商路通服务,获取所有坐席首页数据
*
* @param agentStateRequestDTO 请求参数
* @return 返回ResponseServer
*/
@ApiOperation(value = "用商路通服务,获取所有坐席首页数据", httpMethod = "POST")
@PostMapping("/agentStateList")
public ApiResponse agentStateList(@Validated @RequestBody AgentStateListRequestDTO agentStateRequestDTO, @RequestHeader(value = WebConstant.PRIORITY, required = false, defaultValue = "2") Integer priority) throws Exception {
log.info("请求:调用商路通服务,获取所有坐席首页数据,param:{}", agentStateRequestDTO.toString());
return agentStateService.agentStateList(agentStateRequestDTO, priority);
}
}
API接收请求后,数据校验,发送请求到消息队列,并等待消息的响应
@Slf4j
@Service
@RequiredArgsConstructor
public class AgentStateService {
private final MessageService messageService;
private final CallSupplierTypeRelationService callSupplierTypeRelationService;
private final CallSupplierService callSupplierService;
private final CallTypeService callTypeService;
protected final ApplicationProperties properties;
private final Map<String, CompletableFuture<ApiResponse>> queryFutureMap = new ConcurrentHashMap<>();
public ApiResponse agentStateList(AgentStateListRequestDTO queue, Integer priority) throws Exception {
long startTime = System.currentTimeMillis();
String businessId = SltBusinessEnum.getNameByCallType(queue.getCallType());
if (StringUtils.isEmpty(businessId)) {
throw new BusinessException("商路通不支持的外呼类型:" + queue.getCallType());
}
CallSupplierPO supplierPO = callSupplierService.findByCode(queue.getSupplier());
if (supplierPO == null) {
throw new BusinessException("供应商不存在:" + queue.getSupplier());
}
CallTypePO callTypePO = callTypeService.findByCode(queue.getCallType());
if (callTypePO == null) {
throw new BusinessException("外呼类型不存在:" + queue.getCallType());
}
CallSupplierTypeRelationPO callSupplierTypeRelationPo = callSupplierTypeRelationService.findBySupplierIdAndCallTypeId(supplierPO.getId(), callTypePO.getId());
if (null == callSupplierTypeRelationPo) {
throw new BusinessException("供应商外呼类型关系不存在");
}
queue.setCallType(callTypePO.getCode());
queue.setSupplierCode(supplierPO.getCode());
queue.setQueueId(StringCheckUtil.uuid());
messageService.sendToAgentStateListChannel(queue, priority);
CompletableFuture<ApiResponse> future = new CompletableFuture<>();
queryFutureMap.put(queue.getQueueId(), future);
long maxWaitMillis = properties.getMaxWaitMillis().toMillis();
Duration duration = properties.getMaxWaitMillis().minusMillis(System.currentTimeMillis() - startTime);
if (duration.isNegative() || duration.isZero()) {
log.info("调用商路通服务,获取所有坐席首页数据,已超过最大等待时间{}ms,param:{}", properties.getMaxWaitMillis().toMillis(), queue.toString());
throw new RequestTimeoutException(maxWaitMillis, queue.getQueueId());
}
log.debug("等待返回调用商路通服务,获取所有坐席首页数据请求结果,允许等待时间{}ms", duration.toMillis());
try {
return future.get(duration.toMillis(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new RequestTimeoutException(maxWaitMillis, queue.getQueueId());
} finally {
queryFutureMap.remove(queue.getQueueId());
}
}
public void processAgentStateListReply(AgentStateListReplyDTO payload) {
String queueId = payload.getQueueId();
log.info("API模块-接收-调用商路通服务,获取所有坐席首页数据结果,{}", payload.toString());
if (queryFutureMap.containsKey(queueId)) {
queryFutureMap.get(queueId).complete(ApiResponse.success(payload.getRows()));
} else {
log.debug("未找到queueId记录:{}", queueId);
}
}
}
停止服务前,检查是否有消息正在处理
@Slf4j
public class DestroyableMonitor {
private static final Integer MAX_WAIT_COUNT = 20;
private AtomicLong messageCount = new AtomicLong();
protected Long countUp() {
return messageCount.incrementAndGet();
}
protected Long countDown() {
return messageCount.decrementAndGet();
}
@PreDestroy
private void tearDown() throws InterruptedException {
int waitCount = 0;
while (messageCount.get() > 0 && waitCount++ < MAX_WAIT_COUNT) {
log.info("正在关闭消息监听程序{},等待3秒[{}/{}]...", this.getClass().getCanonicalName(), waitCount, MAX_WAIT_COUNT);
Thread.sleep(3000L);
}
if (messageCount.get() > 0) {
log.warn("应用非安全关闭,当前仍有{}条正在处理的消息", messageCount.get());
}
}
}
router模块队列配置:
spring.cloud.stream:
bindings:
agent-state-list-input-channel:
destination: outbound.agent-state-list
group: router
consumer:
maxAttempts: 1
concurrency: 10
agent-state-list-slt-acv-channel.destination: outbound.agent-state-list
agent-state-list-slt-ivr-channel.destination: outbound.agent-state-list
rabbit.bindings:
agent-state-list-input-channel.consumer.bindingRoutingKey: router
agent-state-list-slt-acv-channel.producer.routing-key-expression: '''slt-acv'''
agent-state-list-slt-ivr-channel.producer.routing-key-expression: '''slt-ivr'''
@Component
public interface RouterInputChannels {
String AGENT_STATE_LIST_INPUT_CHANNEL = "agent-state-list-input-channel";
@Input(value = AGENT_STATE_LIST_INPUT_CHANNEL)
SubscribableChannel agentStateListInput();
}
@Component
public interface RouterOutputChannels {
String AGENT_STATE_LIST_SLT_ACV = "agent-state-list-slt-acv-channel";
String AGENT_STATE_LIST_SLT_IVR = "agent-state-list-slt-ivr-channel";
@Output(AGENT_STATE_LIST_SLT_ACV)
MessageChannel agentStateListSltACR();
@Output(AGENT_STATE_LIST_SLT_IVR)
MessageChannel agentStateListSltIVR();
}
router 模块监听
@Slf4j
@Component
@EnableBinding(RouterInputChannels.class)
@RequiredArgsConstructor
public class AgentStatusListMonitor extends DestroyableMonitor {
private final AgentStateListService agentStateListService;
@StreamListener(RouterInputChannels.AGENT_STATE_LIST_INPUT_CHANNEL)
public void onMessage(AgentStateListRequestDTO req) {
countUp();
try {
log.info("AgentStatusListMonitor收到请求,调用商路通服务,获取所有坐席首页数据:查询参数:{}", req.toString());
agentStateListService.process(req);
} finally {
countDown();
}
}
}
router业务处理和消息分发
@Slf4j
@Service
@RequiredArgsConstructor
public class AgentStateListService {
private final AgentStateListMessageService agentStateListMessageService;
private final RouterOutputChannels routerOutputChannels;
public void process(AgentStateListRequestDTO payload) {
agentStateListMessageService.send(payload);
Message<AgentStateListRequestDTO> message = MessageBuilder.withPayload(payload).build();
switch (payload.getSupplier()) {
case OutboundSupplierConstant.SLT:
switch (payload.getCallType()) {
case OutboundTypeConstant.IVR:
routerOutputChannels.agentStateListSltIVR().send(message);
return;
case OutboundTypeConstant.ACV:
routerOutputChannels.agentStateListSltACR().send(message);
return;
default:
routerOutputChannels.agentStateListSltACR().send(message);
routerOutputChannels.agentStateListSltIVR().send(message);
return;
}
default:
log.error("调用商路通服务,获取所有坐席首页数据的消息转发失败,供应商不支持,[payload:{}]", payload);
}
}
}
分发后的模块
base模块
application-slt.yml
spring.cloud.stream:
bindings:
agent-state-list-reply-channel.destination: outbound.agent-state-list-reply
@Component
public interface SltBoundInputChannels {
String AGENT_STATE_LIST_CHANNEL = "agent-state-list-channel";
@Input(value = AGENT_STATE_LIST_CHANNEL)
SubscribableChannel agentStateListSltAcvInput();
}
@Component
public interface SltBoundOutputChannels {
String AGENT_STATE_LIST_REPLY_CHANNEL = "agent-state-list-reply-channel";
@Output(AGENT_STATE_LIST_REPLY_CHANNEL)
MessageChannel agentStateListReplyOutput();
}
base模块的monitor
@Slf4j
@Component
@RequiredArgsConstructor
@EnableBinding(SltBoundInputChannels.class)
public class SltAgentInfoMonitor extends DestroyableMonitor {
private final SltAgentStateListService sltAgentStateListService;
@StreamListener(SltBoundInputChannels.AGENT_STATE_LIST_CHANNEL)
public void process(Message<AgentStateListRequestDTO> message) {
log.info("外呼平台:SLT,代理模块收到消息,调用商路通服务,获取所有坐席首页数据, params: {}", message.getPayload());
countUp();
sltAgentStateListService.process(message.getPayload());
countDown();
}
}
service实现了一些slt请求的方法
@Slf4j
@Service
public class SltAgentStateListService extends AbstractSltRequestServiceTemplate<AgentStateListRequestDTO, AgentStateListResponseDTO> {
private final SltBoundOutputChannels sltBoundOutputChannels;
public SltAgentStateListService(ApplicationProperties properties, ObjectMapper objectMapper, RestTemplateService restTemplateService, SltBoundOutputChannels sltBoundOutputChannels) {
super(properties, objectMapper, restTemplateService);
this.sltBoundOutputChannels = sltBoundOutputChannels;
}
@Override
protected void handleException(Exception exception, AgentStateListRequestDTO payload) {
log.info("外呼平台:SLT,代理模块执行,调用商路通服务,获取所有坐席首页数据-指令,调用异常, params: {}", payload, exception);
}
@Override
public RequestDTO generateRequest(AgentStateListRequestDTO payload) {
log.info("外呼平台:SLT,代理模块执行,调用商路通服务,获取所有坐席首页数据-指令,创建请求,params:{}", payload);
SupplierRequestInfoDTO requestInfo = properties.getRequestInfo(payload.getSupplierCode());
RequestDTO request = new RequestDTO();
request.setAction(OUTBOUND_AGENT_STATE_LIST_ACTION);
request.setStartTime(payload.getStartTime());
request.setEndTime(payload.getEndTime());
request.setBusinessID(SltBusinessEnum.getNameByCallType(payload.getCallType()));
request.setBaseUrl(requestInfo.getBaseUrl());
request.setUrl(requestInfo.getBaseUrl().concat(properties.getRequestUrlMap().get(OUTBOUND_AGENT_STATE_LIST_SERVLET)));
request.setLoginUser(requestInfo.getUser());
request.setLoginPwd(requestInfo.getPassword());
return request;
}
@Override
public void handleResponse(AgentStateListResponseDTO responseDTO, AgentStateListRequestDTO payload) {
if (SltResponseCode.RESPONSE_SUCCESS_STR.equals(responseDTO.getReturnCode())) {
processReply(responseDTO, payload);
} else {
log.warn("外呼平台:SLT,代理模块执行,调用商路通服务,获取所有坐席首页数据-处理响应,获取失败,params:{}, result:{}", payload, responseDTO);
}
}
@Override
protected TypeReference<AgentStateListResponseDTO> instanceReference() {
return new TypeReference<>() {
};
}
//消息写会API模块
private void processReply(AgentStateListResponseDTO responseDTO, AgentStateListRequestDTO payload) {
List<AgentStateListJsonTO> rowsJsonObject = responseDTO.getRows();
List<AgentStateListDTO> rowsList = new ArrayList<>();
for (AgentStateListJsonTO e : rowsJsonObject) {
AgentStateListDTO agentStateListDTO = new AgentStateListDTO();
BeanUtils.copyProperties(e, agentStateListDTO);
rowsList.add(agentStateListDTO);
}
AgentStateListReplyDTO reply = new AgentStateListReplyDTO();
reply.setReturnCode(responseDTO.getReturnCode());
reply.setMessage(responseDTO.getReturnMessage());
reply.setRows(rowsList);
reply.setQueueId(payload.getQueueId());
reply.setTotal(responseDTO.getTotal());
sltBoundOutputChannels.agentStateListReplyOutput().send(MessageBuilder.withPayload(reply).build());
}
}
@Slf4j
public abstract class AbstractSltRequestServiceTemplate<T extends BaseQueue, K extends ResponseDTO> extends SltSendRequestService<K>
implements IEncapsulationRequestEntityInterface<T, RequestDTO>, IParsingResponseBodyInterface<T, K> {
public AbstractSltRequestServiceTemplate(ApplicationProperties properties, ObjectMapper objectMapper, RestTemplateService restTemplateService) {
super(properties, objectMapper, restTemplateService);
}
public void process(T payload) {
try {
RequestDTO requestDTO = generateRequest(payload);
K response = sendRequest(requestDTO);
handleResponse(response, payload);
} catch (Exception e) {
handleException(e, payload);
}
}
/**
* 处理异常
*
* @param exception 异常对象
* @param payload 传递数据
*/
protected abstract void handleException(Exception exception, T payload);
}
下面几个类是对SLT的所有请求的封装
@Slf4j
@Component
@RequiredArgsConstructor
public class SltSendRequestService<T extends ResponseDTO> implements ISendRequestInterface<RequestDTO, T> {
protected final ApplicationProperties properties;
protected final ObjectMapper objectMapper;
protected final RestTemplateService restTemplateService;
ThreadLocal<Boolean> checkLogin = new ThreadLocal<>();
/**
* 发送请求
*
* @param request 请求参数
* @return 响应实体
* @throws Exception 异常
*/
@Override
public T sendRequest(RequestDTO request) throws Exception {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
String requestParam = "param=".concat(objectMapper.writeValueAsString(request));
HttpEntity<String> requestEntity = new HttpEntity<>(requestParam, headers);
long startTime = System.currentTimeMillis();
ResponseEntity<String> responseEntity = null;
log.info("请求:[{}],header: {}", requestParam, headers.toString());
try {
responseEntity = restTemplateService.post(request.getUrl(), requestEntity, properties.getRequestTimeoutMaximum(), String.class);
Assert.hasText(responseEntity.getBody(), "接口调用异常:商路通接口响应体为空");
if (responseEntity.getStatusCode().equals(HttpStatus.OK)) {
T response = objectMapper.readValue(responseEntity.getBody(), instanceReference());
return checkResponse(response, request);
} else {
log.error("商路通外呼请求:接口调用失败 [statusCode:{},body:{}]", responseEntity.getStatusCodeValue(), responseEntity.getBody());
throw new BusinessException(CommonEnum.REQUEST_EXCEPTION.getCode(), "商路通外呼请求:接口调用失败");
}
} finally {
checkLogin.remove();
long elapsedTime = System.currentTimeMillis() - startTime;
if (responseEntity == null) {
log.error("商路通外呼请求详情:[请求地址:[{}],请求体:[{}],响应体:null]", request.getUrl(), requestParam);
} else {
log.info("商路通外呼请求详情:[请求地址:[{}],请求时间:{},请求体:[{}],响应体:[{}]]",
request.getUrl(), elapsedTime, requestParam, responseEntity.getBody());
}
}
}
protected TypeReference<T> instanceReference() {
return new TypeReference<T>() {
};
}
/**
* 校验响应体是否合法
*
* @param response 响应体对象
* @param request 请求体对象
* @return 响应体对象
* @throws Exception 异常
*/
T checkResponse(T response, RequestDTO request) throws Exception {
if (StringUtils.equals(SltResponseCode.RESPONSE_TIME_OUT_STR, response.getReturnCode()) &&
StringUtils.equals("超时", response.getReturnMessage())) {
log.info("商路通外呼请求:接口请求响应码超时:[{}],调用登录接口", response.toString());
return login(request);
} else {
return response;
}
}
/**
* 商路通登陆操作
*
* @param request 请求参数对象
* @return 响应体对象
* @throws Exception 异常
*/
private T login(RequestDTO request) throws Exception {
checkLogin();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
String loginParam = "userId=".concat(request.getLoginUser()).concat("&md5pwd=").concat(request.getLoginPwd());
HttpEntity<String> requestEntity = new HttpEntity<>(loginParam, headers);
ResponseEntity<String> loginResponseEntity = restTemplateService.post(request.getBaseUrl().concat(properties.getRequestUrlMap().get(OUTBOUND_LOGIN)), requestEntity);
log.info("商路通外呼请求:登陆接口响应:[body:{}]", loginResponseEntity.getBody());
Assert.hasText(loginResponseEntity.getBody(), "接口调用异常:商路通登录接口响应体为空");
LoginResponseDTO loginResponse = objectMapper.readValue(loginResponseEntity.getBody(), LoginResponseDTO.class);
if (SltResponseCode.RESPONSE_SUCCESS.equals(loginResponse.getReturnCode())) {
return sendRequest(request);
} else {
throw new BusinessException(CommonEnum.REQUEST_EXCEPTION.getCode(),
"商路通外呼请求:登录接口调用失败:".concat(Objects.requireNonNull(loginResponseEntity.getBody())));
}
}
/**
* 校验请求而否二次登陆
*/
private void checkLogin() {
if (checkLogin.get() == null || !checkLogin.get()) {
checkLogin.set(true);
} else {
throw new BusinessException(CommonEnum.REQUEST_EXCEPTION.getCode(), "操作失败: 商路通登录接口已经调用过");
}
}
}
public interface ISendRequestInterface<T extends BaseRequestEntity,K extends BaseResponseEntity> {
/**
* 发送请求
*
* @param request 请求实体
* @return 响应实体
* @throws Exception 异常
*/
K sendRequest(T request) throws Exception;
}
public interface IEncapsulationRequestEntityInterface<Q extends BaseQueue,T extends BaseRequestEntity> {
/**
* 初始化请求对象
*
* @param payload 消息队列传递请求相关参数信息
* @return 请求参数
*/
T generateRequest(Q payload) throws Exception;
}
@Data
public class BaseRequestEntity {
}
下面是IVR模块的示例,ACV模块省略
配置队列:
spring.cloud.stream:
bindings:
agent-state-list-channel:
destination: outbound.agent-state-list
group: slt-ivr
consumer:
maxAttempts: 1
concurrency: 10
rabbit.bindings:
agent-state-list-channel.consumer.bindingRoutingKey: slt-ivr