欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot+RabbitMQ,发送邮件

程序员文章站 2022-03-21 07:54:02
RabbitMQ、邮箱配置# rabbitmqspring.rabbitmq.host=spring.rabbitmq.port=#spring.rabbitmq.virtual-host=spring.rabbitmq.username=spring.rabbitmq.password=# 开启confirms回调 P -> Exchangespring.rabbitmq.publisher-confirms=true# 开启returnedMessage回调 Exchange ....

SpringBoot+RabbitMQ,发送邮件

RabbitMQ、邮箱配置
# rabbitmq
spring.rabbitmq.host=
spring.rabbitmq.port=
#spring.rabbitmq.virtual-host=
spring.rabbitmq.username=
spring.rabbitmq.password=
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100

# mail
spring.mail.host=smtp.163.com
spring.mail.username=
spring.mail.password=
spring.mail.from=
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
发送邮件配置类
@Component
@Slf4j
public class MailUtil {

    @Value("${spring.mail.from}")
    private String from;

    @Autowired
    private JavaMailSender mailSender;

    /**
     * 发送简单邮件
     *
     * @param mail
     */
    public boolean send(Mail mail) {
        String to = mail.getTo();// 目标邮箱
        String title = mail.getTitle();// 邮件标题
        String content = mail.getContent();// 邮件正文

        SimpleMailMessage message = new SimpleMailMessage();
        message.setFrom(from);
        message.setTo(to);
        message.setSubject(title);
        message.setText(content);

        try {
            mailSender.send(message);
            log.info("邮件发送成功");
            return true;
        } catch (MailException e) {
            log.error("邮件发送失败, to: {}, title: {}", to, title, e);
            return false;
        }
    }

    /**
     * 发送附件邮件
     *
     * @param mail 邮件
     * @param file 附件
     */
    public boolean sendAttachment(Mail mail, File file) {
        String to = mail.getTo();
        String title = mail.getTitle();
        String content = mail.getContent();

        MimeMessage message = mailSender.createMimeMessage();
        try {
            MimeMessageHelper helper = new MimeMessageHelper(message, true);
            helper.setFrom(from);
            helper.setTo(to);
            helper.setSubject(title);
            helper.setText(content);
            FileSystemResource resource = new FileSystemResource(file);
            String fileName = file.getName();
            helper.addAttachment(fileName, resource);
            mailSender.send(message);
            log.info("附件邮件发送成功");
            return true;
        } catch (Exception e) {
            log.error("附件邮件发送失败, to: {}, title: {}", to, title, e);
            return false;
        }
    }

}
JsonUtil
@Slf4j
public class JsonUtil {

    private static ObjectMapper objectMapper = new ObjectMapper();
    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

    static {
        // 对象的所有字段全部列入
        objectMapper.setSerializationInclusion(JsonInclude.Include.ALWAYS);
        // 取消默认转换timestamps形式
        objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
        // 忽略空bean转json的错误
        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        // 统一日期格式
        objectMapper.setDateFormat(new SimpleDateFormat(DATE_FORMAT));
        // 忽略在json字符串中存在, 但在java对象中不存在对应属性的情况, 防止错误
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }

    public static <T> String objToStr(T obj) {
        if (null == obj) {
            return null;
        }

        try {
            return obj instanceof String ? (String) obj : objectMapper.writeValueAsString(obj);
        } catch (Exception e) {
            log.warn("objToStr error: ", e);
            return null;
        }
    }

    public static <T> T strToObj(String str, Class<T> clazz) {
        if (StringUtils.isBlank(str) || null == clazz) {
            return null;
        }

        try {
            return clazz.equals(String.class) ? (T) str : objectMapper.readValue(str, clazz);
        } catch (Exception e) {
            log.warn("strToObj error: ", e);
            return null;
        }
    }

    public static <T> T strToObj(String str, TypeReference<T> typeReference) {
        if (StringUtils.isBlank(str) || null == typeReference) {
            return null;
        }

        try {
            return (T) (typeReference.getType().equals(String.class) ? str : objectMapper.readValue(str, typeReference));
        } catch (Exception e) {
            log.error("strToObj error", e);
            return null;
        }
    }

}

MessageHelper
public class MessageHelper {

    public static Message objToMsg(Object obj){
        if (null == obj){
            return null;
        }
        Message message = MessageBuilder.withBody(JsonUtil.objToStr(obj).getBytes()).build();
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化
        message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
        return message;
    }

    public static <T> T msgToObj(Message message,Class<T> clazz){
        if (null == message || null == clazz){
            return null;
        }
        String str = new String(message.getBody());
        T obj = JsonUtil.strToObj(str, clazz);
        return obj;
    }
}

RabbitConfig
@Configuration
@Slf4j
public class RabbitConfig {
    @Autowired
    private CachingConnectionFactory connectionFactory;
    @Autowired
    private MsgLogService msgLogService;

    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());

        //消息是否成功发送到 Exchange          CorrelationData
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
            if (ack){
                log.info("消息成功发送到Exchange");
                String msgId = correlationData.getId();
                msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);
            } else {
                log.info("消息发送到Exchange失败, {}, cause: {}",correlationData,cause);
            }
        });

        // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setMandatory(true);
        // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
            log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
        });

        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter converter(){
        return new Jackson2JsonMessageConverter();
    }


    public static final String MAIL_QUEUE_NAME = "mail.queue";
    public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
    public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";

    //创建queue
    @Bean(name = "mailQueue")
    public Queue mailQueue(){
        return new Queue(MAIL_QUEUE_NAME,true);
    }
    @Bean(name = "mailExchange")
    public DirectExchange mailExchange(){
        return new DirectExchange(MAIL_EXCHANGE_NAME,true,false);
    }
    //绑定队列到交换机声明使用的routingkey
/*    @Bean
    public Binding mailBinding(){
        Binding with = BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
        return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
    }*/
    //绑定队列道交换机
    @Bean
    public Binding queueExchange(@Qualifier("mailQueue")Queue queue, @Qualifier("mailExchange")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(MAIL_ROUTING_KEY_NAME).noargs();
    }

}

生产消息
    public ServerResponse send(Mail mail) {
        String msgId = RandomUtil.UUID32();
        mail.setMsgId(msgId);
        MsgLog msgLog = new MsgLog(msgId,mail,RabbitConfig.MAIL_EXCHANGE_NAME,RabbitConfig.MAIL_ROUTING_KEY_NAME);
        //消息入库
        msgLogMapper.insert(msgLog);
        //mq队列
        CorrelationData correlationData = new CorrelationData(msgId);
        rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME,RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail),correlationData);

        return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());
    }
MailConsumer消费消息, 发送邮件
@Component
@Slf4j
public class SimpleMailConsumer {
    @Autowired
    private MsgLogService msgLogService;
    @Autowired
    private MailUtil mailUtil;

    @RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)
    public void consumer(Message message, Channel channel) throws IOException {
        Mail mail = MessageHelper.msgToObj(message, Mail.class);
        log.info("收到消息: {}",mail.toString());

        String msgId = mail.getMsgId();

        MsgLog msgLog = msgLogService.selectByMsgId(msgId);
        if (null ==msgLog || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)){
            log.info("重复消费,msgId: {}",msgId);
            return;
        }

        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();

        boolean success = mailUtil.send(mail);
        if (success){
            msgLogService.updateStatus(msgId,Constant.MsgLogStatus.CONSUMED_SUCCESS);
            channel.basicAck(tag,false);//消费确认
        }else {
            //未被ack的消息(unacked)会重新入队并被消费, 这样就保证了消息不会走丢。
            channel.basicNack(tag,false,true);
        }
    }
}

原作者地址
注:抄别人的,自己只做个记录

本文地址:https://blog.csdn.net/weixin_43176399/article/details/107395495

相关标签: RabbitMQ