欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  神坛PHP

swiftmailer来对邮件的使用,打造多进程异步邮件发送

程序员文章站 2022-01-19 20:04:08
...

PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:

redis

rabbitmq

kafka

redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:

// 入列

$redis->lpush($key, $data);

// 出列

$data = $redis->rpop($key);

// 阻塞出列

$data = $redis->brpop($key, 10);

邮件发送库选型

swiftmailer 是安装在 MixPHP 项目中,在项目根目录中执行以下命令安装:

composer require swiftmailer/swiftmailer


redis 会安装一个类库来使用

$redis = new \Redis();

if (!$redis->connect('127.0.0.1', 6379)) {

    throw new \Exception('Redis Connect Failure');

}

$redis->auth('');

$redis->select(0);

// 投递任务

$data = [

    'to'      => ['***@qq.com' => 'A name'],

    'body'    => 'Here is the message itself',

    'subject' => 'The title content',

];

$redis->lpush('queue:email', serialize($data));

通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有执行。


我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。

TaskExecutor 的 MODE_PUSH 模式有二种进程:

左进程:负责从消息队列取出任务数据,投放给中进程。

中进程:负责执行邮件发送任务。

<?php

namespace apps\daemon\commands;

use mix\console\ExitCode;

use mix\facades\Input;

use mix\facades\Redis;

use mix\task\CenterProcess;

use mix\task\LeftProcess;

use mix\task\TaskExecutor;

/**


 * 推送模式范例


 * @author 刘健 <coder.liu@qq.com>


 */


class PushCommand extends BaseCommand

{

    // 配置信息


    const HOST = 'smtpdm.aliyun.com';

    const PORT = 465;

    const SECURITY = 'ssl';

    const USERNAME = '****@email.***.com';

    const PASSWORD = '****';

    // 初始化事件


    public function onInitialize()

    {

        parent::onInitialize(); // TODO: Change the autogenerated stub

        // 获取程序名称

        $this->programName = Input::getCommandName();

        // 设置pidfile

        $this->pidFile = "/var/run/{$this->programName}.pid";

    }


 


    /**

     * 获取服务

     * @return TaskExecutor

     */

    public function getTaskService()

    {

        return create_object(

            [

                // 类路径

                'class'         => 'mix\task\TaskExecutor',

                // 服务名称

                'name'          => "mix-daemon: {$this->programName}",

                // 执行类型

                'type'          => \mix\task\TaskExecutor::TYPE_DAEMON,

                // 执行模式

                'mode'          => \mix\task\TaskExecutor::MODE_PUSH,

                // 左进程数

                'leftProcess'   => 1,

                // 中进程数

                'centerProcess' => 5,

                // 任务超时时间 (秒)

                'timeout'       => 5,

            ]

        );

    }


 


    // 启动


    public function actionStart()

    {

        // 预处理

        if (!parent::actionStart()) {

            return ExitCode::UNSPECIFIED_ERROR;

        }

        // 启动服务

        $service = $this->getTaskService();

        $service->on('LeftStart', [$this, 'onLeftStart']);

        $service->on('CenterStart', [$this, 'onCenterStart']);

        $service->start();

        // 返回退出码

        return ExitCode::OK;

    }


    // 左进程启动事件回调函数

    public function onLeftStart(LeftProcess $worker)

    {

        try {

            // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线

            $queueModel = Redis::getInstance();

            // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出

            for ($j = 0; $j < 16000; $j++) {

                // 从消息队列中间件阻塞获取一条消息

                $data = $queueModel->brpop('queue:email', 10);

                if (empty($data)) {

                    continue;

                }

                list(, $data) = $data;

                // 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)

                $worker->push($data, false);

            }

        } catch (\Exception $e) {

            // 休息一会,避免 CPU 出现 100%

            sleep(1);

            // 抛出错误

            throw $e;

        }

    }

    // 中进程启动事件回调函数

    public function onCenterStart(CenterProcess $worker)

    {

        // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出

        for ($j = 0; $j < 16000; $j++) {

            // 从进程消息队列中抢占一条消息

            $data = $worker->pop();

            if (empty($data)) {

                continue;

            }

            // 处理消息

            try {

                // 处理消息,比如:发送短信、发送邮件、微信推送

                var_dump($data);

                $ret = self::sendEmail($data);

                var_dump($ret);

            } catch (\Exception $e) {

                // 回退数据到消息队列

                $worker->rollback($data);

                // 休息一会,避免 CPU 出现 100%

                sleep(1);

                // 抛出错误

                throw $e;

            }

        }

    }

    // 发送邮件

    public static function sendEmail($data)

    {

        // Create the Transport

        $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))

            ->setUsername(self::USERNAME)

            ->setPassword(self::PASSWORD);


        // Create the Mailer using your created Transport

        $mailer = new \Swift_Mailer($transport);


        // Create a message

        $message = (new \Swift_Message($data['subject']))

            ->setFrom([self::USERNAME => '**网'])

            ->setTo($data['to'])

            ->setBody($data['body']);

        // Send the message

        $result = $mailer->send($message);

        return $result;

    }

}


在 shell 中启动 push 常驻程序。

./mix-daemon push start

mix-daemon 'push' start successed.


相关标签: swiftmailer 邮件