简介

RabbitMQ是一个开源的消息代理软件,它使用高级消息队列协议(AMQP)来实现消息的发送和接收。RabbitMQ支持多种消息协议,包括STOMP、MQTT等,并且能够与多种编程语言和平台集成,如Java、.NET、Python等。

AMQP 即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

基本架构设计

图片来源:https://blog.csdn.net/cuierdan/article/details/123824300

基本概念

  • Message Broker:(消息代理服务器)是一个虚拟的概念,而RabbitMQ是Message Broker的一个实例。
  • Producer:(生产者)产生数据并将数据发送到消息代理服务器(Message Broker)的程序被称作消息的生产者。
  • Connection:(连接)生产者与消费者通过TCP协议与消息代理服务器(Message Broker)创建的连接。
  • Channel:(信道)创建在Connection中的虚拟连接,类似于连接数据库时的连接池的概念,生产者和消费者并不是直接与MQ通过Connection进行通讯的,而是通过Channel进行连接通讯的,数据的流动是在Channel中进行的。
  • VirtualHost:(虚拟消息服务器)就像mysql数据库中有数据库实例的概念,并且可以指定用户对库和表等操作的设置权限。也可以类别成LINUX系统中的不同用户,不同用户之间是相互独立的。每个VirtualHost相当于一个相对独立mini的RabbitMQ服务器。每个VirtutalHost之间是相互隔离的,exchange,queue,message等不能互通。
  • Exchange:(交换机)交换机直接与Channel(信道)连接,接收来自于消息生产者产生的数据,在由Exchange将消息路由到一个或多个Queue中(或者丢弃)。Exchange并不存储消息。RabbitMQ的交换机有fanout(扇出),direct(直接),topic(主题),headers(标题)四种类型,每种交换机类型都对应着不同的路由规则,根据不同的路由规则,交换机会将消息路由到不同的队列中。
  • Binding: (绑定)交换机与队列之间的虚拟连接,在这个绑定中可以设置Binding Key,一个绑定就是用一个Binding Key将交换器和队列连接起来,设置的Binding Key存在着一定的规则,Exchange会将消息中携带的Routing Key与Binding Key 中设置的规则进行匹配,将消息发送到相应的队列中。Binding信息被保存到Exchange中的查询表中,用于Exchange将消息分发到队列的依据。
  • Routing Key:(路由键)用于匹配路由规则的依据,生产者在将消息发送到Exchange时,一般会指定一个Routing Key,交换机会根据Routing Key 来匹配Binding中设置的路由规则,将符合规则的消息发送到指定的队列中。
  • Queue:(消息队列)RabbitMQ中的内部对象用于存放消息的容器,RabbitMQ会将消息按照RabbitMQ的六大模式中的一种将队列中的消息发送给消费者,RabbitMQ会根据选择模式的不同将队列中的消息发送给一个或多个消费者,在连接到消费者之前,消息一直在等待消费者到队列中将消息取走。
  • Consumer:(消费者)消息的消费者,表示一个从队列中取消息的应用程序。

特点

  1. 可靠性:RabbitMQ使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。
  2. 灵活的路由:在消息进入队列之前,通过交换器来路由消息。
  3. 扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展 集群中节点。
  4. 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队 列仍然可用。
  5. 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP, MQTT等多种消息 中间件协议。
  6. **支持多语言客户端:**RabbitMQ 几乎支持所有常用语言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript 等。

主要功能

  1. 消息队列:允许应用程序将消息发送到队列中,然后由另一个应用程序从队列中取出并处理。
  2. 消息路由:支持将消息从发送者路由到一个或多个接收者。
  3. 消息持久化:确保消息在系统故障后不会丢失。
  4. 消息确认:确保消息被正确处理,如果处理失败,可以重新发送。
  5. 集群:支持在多个节点上运行,以提供高可用性和负载均衡。

安装

这里使用1Panel安装,1Panel 是一个现代化、开源的 Linux 服务器运维管理面板

应用商店

安装镜像

注:这里需要勾选【端口外部访问】,方便本地调试

镜像

安装成功后,就可以在镜像中找到已安装好的RabbitMQ镜像容器

外网可访问地址:http://{{你的公网ip}}:15672,如果是在云服务器,记得安全策略放开端口15672

  • RabbitMQ管理界面端口15672。是一个Web应用程序,用于管理和监控RabbitMQ消息代理
  • AMQP默认端口5672。是一种网络协议,用于在应用程序之间传递消息,通常用于消息队列系统。

控制台

登陆控制台,账号和密码都是rabbitmq

使用

这里使用webman插件RabbitMQ客户端,插件地址:https://www.workerman.net/plugin/67

非常感谢兔子大佬的插件贡献!🌻

非常感谢兔子大佬的插件贡献!🌻

非常感谢兔子大佬的插件贡献!🌻

插件特点

  • 支持5种消费模式:简单队列、workQueue、routing、pub/sub、exchange;
  • 支持延迟队列(RabbitMQ须安装插件);
  • 异步无阻塞消费、异步无阻塞生产、同步阻塞生产

安装插件

通过composer包管理安装

composer require workbunny/webman-rabbitmq

注:安装该插件前,请确保你已经安装好webman框架,相关安装文档:https://www.workerman.net/doc/webman/install.html

配置

插件所有配置文件路径:config/plugin/workbunny/webman-rabbitmq/app.php

<?php
return [
    'enable' => true,
    'host'               => '120.120.120.74',
    'vhost'              => '/',
    'port'               => 5672,
    'username'           => 'rabbitmq',
    'password'           => 'rabbitmq',
    'mechanisms'         => 'AMQPLAIN',
    ...
];
  • host 修改为服务器公网ip
  • port 修改为15672

消费者

这里使用命令创建一个拥有单进程消费者的RestyBuilder

./webman workbunny:rabbitmq-builder resty --mode=queue

ℹ️ Config updated.
ℹ️ Builder created.
✅ Builder RestyBuilder created successfully.

创建完成后完整的消费者文件位置process/workbunny/rabbitmq/RestyBuilder.php

<?php declare(strict_types=1);

namespace process\workbunny\rabbitmq;

use Bunny\Channel as BunnyChannel;
use Bunny\Async\Client as BunnyClient;
use Bunny\Message as BunnyMessage;
use Workbunny\WebmanRabbitMQ\Constants;
use Workbunny\WebmanRabbitMQ\Builders\QueueBuilder;

class RestyBuilder extends QueueBuilder
{
    /**
     * @var array = [
     *   'name'           => 'example',
     *   'delayed'        => false,
     *   'prefetch_count' => 1,
     *   'prefetch_size'  => 0,
     *   'is_global'      => false,
     *   'routing_key'    => '',
     * ]
     */
    protected array $queueConfig = [
        // 队列名称 ,默认由类名自动生成
        'name'           => 'process.workbunny.rabbitmq.RestyBuilder',
        // 是否延迟          
        'delayed'        => false,
        // QOS 数量
        'prefetch_count' => 0,
        // QOS size 
        'prefetch_size'  => 0,
        // QOS 全局
        'is_global'      => false,
        // 路由键
        'routing_key'    => '',
    ];
    
    /** @var string 交换机类型 */
    protected string $exchangeType = Constants::DIRECT;
    
    /** @var string|null 交换机名称,默认由类名自动生成 */
    protected ?string $exchangeName = 'process.workbunny.rabbitmq.RestyBuilder';
    
    /** @inheritDoc */
    public function handler(BunnyMessage $message, BunnyChannel $channel, BunnyClient $client): string 
    {
        echo '[RabbitMQ][队列消费] Tag:' .$message->consumerTag. PHP_EOL;
        echo '[RabbitMQ][队列消费着] Content:' .$message->content. PHP_EOL;
        echo '[RabbitMQ][队列消费着] Exchange:' .$message->exchange. PHP_EOL;
        return Constants::ACK;
    }
}

启动webman

php start.php start
Workerman[start.php] start in DEBUG mode
----------------------------------------------------------------------- WORKERMAN -----------------------------------------------------------------------
Workerman version:4.1.15          PHP version:8.2.18           Event-Loop:\Workerman\Events\Event
------------------------------------------------------------------------ WORKERS ------------------------------------------------------------------------
proto   user            worker                                                                      listen                 processes    status           
tcp     root            webman                                                                      http://0.0.0.0:8217    2             [OK]            
tcp     root            monitor                                                                     none                   1             [OK]            
tcp     root            plugin.workbunny.webman-rabbitmq.process.workbunny.rabbitmq.RestyBuilder    none                   1             [OK]            
---------------------------------------------------------------------------------------------------------------------------------------------------------

生产者

use function Workbunny\WebmanRabbitMQ\sync_publish;
use process\workbunny\rabbitmq\RestyBuilder;

sync_publish(RestyBuilder::instance(), '兔子大佬你好呀!'); # return bool

消费结果

[RabbitMQ][队列消费] Tag:process.workbunny.rabbitmq.RestyBuilder
[RabbitMQ][队列消费着] Content:开源技术小栈你好呀!
[RabbitMQ][队列消费着] Exchange:process.workbunny.rabbitmq.RestyBuilder

...

[RabbitMQ][队列消费] Tag:process.workbunny.rabbitmq.RestyBuilder
[RabbitMQ][队列消费着] Content:兔子大佬你好呀!
[RabbitMQ][队列消费着] Exchange:process.workbunny.rabbitmq.RestyBuilder

RabbitMQ管理界面

通过RabbitMQ管理界面端发送消息

消费者消费情况

Last Updated:
贡献者: Tinywan, 万少波