欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

PHP使用消息队列ActiveMQ

程序员文章站 2022-03-10 08:24:24
...

消息队列中间件是分布式系统中的重要组件,主要解决应用耦合、异步消息、流量削锋等问题。可帮助实现高性能,高可用,可伸缩和最终一致性的架构。

消息队列应用场景

异步任务

假设场景:现在很多网站或App注册时都采用了验证码的机制,因此,当服务器收到客户端发起获取验证码的请求,有以下处理方式

  1. 在当前线程中立即发送短信(会阻塞当前线程一小会儿)
  2. 新建立一个线程发送短信
  3. 交由其他的服务来处理这个任务(让消息队列处理)

那么,哪种方式更好呢?

第一种:实时性肯定更好,收到请求立即处理,但它阻塞了当前线程,会造成其他客户端的请求被阻塞(请求少的时候我们可能根本感觉不到);
第二种:在当前进程中建立一个线程来处理,实时性不如第一种,但它不会阻塞其他客户端的请求。不过一个进程中能创建的线程数量有限,因此也有瓶颈;
第三种:使用其他特定场景的服务,这种实时性最差(但如果服务器配置好,我们也不一定能感觉到差异),但其是使用的最多的,并且其上线后效果是最好的(稳定性、可伸缩性)

因此,如果是正式上线的版本(比如项目初期用于验证市场的版本,往往会为了速度而不考虑架构,这时可能会选择第一种或第二种方案),且峰值较高的服务,选用第三种方案无疑是最好的。因为对于上线的服务,稳定性是非常重要的。对于发送短信这样的任务(对实时性要求不是那么高),使用消息队列是非常合适的。将任务交由消息队列之后,发送短信具体要做的事情主服务就不需要干涉了。

消息服务

现如今的微服务、分布式集群等,各个节点之间的通信,就可以使用消息队列来处理。具体使用什么方式,可更具场景从以下两种选择

  • P2P(Point to Point)点对点模式
  • Publish/Subscribe(Pub/Sub) 发布订阅模式

ActiveMQ

ActiveMQ 是Apache出品,流行的能力强劲的开源消息总线。

P2P模式案例

P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。

P2P的特点:

  • 每条消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。
  • 每条消息只有一个消费者,即一旦被消费,消息就会被移除消息队列,在运行了多个消费者之后,一条消息只会有一个消费者收到,其他的消费者是不可以收到的;
  • 接收者在成功接收消息之后需向队列应答成功:可以通过指定应答模式来更改,默认是自动应答模式

生产者向ActiveMQ发送消息(queue)

启动生产者 /opt/php/bin/php /mnt/d/server/www/activemq/consumer.php
require __DIR__.'/vendor/autoload.php'; //引入自动加载的文件
try {
    $stomp = new \FuseSource\Stomp\Stomp('tcp://127.0.0.1:61613');
    $stomp->connect();

    $data['username'] = 'root';
    $data['password'] = '123456';
    $result = $stomp->send('/queue/userReg', json_encode($data));
    var_dump($result);

} catch(StompException $e) {
    die('Connection failed: ' . $e->getMessage());
}

消费者订阅/监听队列消息

启动消费者 /opt/php/bin/php /mnt/d/server/www/activemq/consumer.php
require __DIR__.'/vendor/autoload.php'; //引入自动加载的文件

try {
    $stomp = new \FuseSource\Stomp\Stomp('tcp://127.0.0.1:61613');
    $stomp->connect();
    $stomp->subscribe('/queue/userReg');

    while (true) {
        if ($stomp->hasFrameToRead()) {
            $frame = $stomp->readFrame();
            $data = json_decode($frame->body, true);
            var_dump($frame);
            $stomp->ack($frame);
        }
    }

} catch(StompException $e) {
    die('Connection failed: ' . $e->getMessage());
}

运行结果,每条消息只有一个消费者;如果没有启动消费者监听,队列保留着消息直到被消费;
PHP使用消息队列ActiveMQ

Pub/Sub模式

Pub/Sub模式:包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber)。多个发布者将消息发送到Topic, 系统将这些消息传递给多个订阅者,可以认为生产者与消费者之间是多对多的关系

Pub/Sub的特点

  • 每条消息可以有多个消费者
  • 为了消费消息,订阅者必须保持运行的状态
  • 为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样即使订阅者没有运行,在运行之后它也能接收到发布者的消息。

在 PHP中,它与 P2P 的使用区别不大,将queue改成topic即可

$result = $stomp->send('/topic/userReg', json_encode($data));

运行结果,每个订阅了topic的消费者都能收到消息,一般情况下需要有消费者正在运行,生产者产生的topic才会被接收,否则丢失。

PHP使用消息队列ActiveMQ