RabbitMQ使用延迟插件,代码量直接减半?_队列_消息_安装

什么是rabbitMQ?

RabbitMQ是用Erlang语言开发的AMQP(高级消息队列协议)的开源实现。

RabbitMQ轻量级且易于部署,并且可以支持多种消息协议。

RabbitMQ可以部署在分布式和联合配置中,以满足大规模的高可用性需求。

具体功能包括:

可靠性:RabbitMQ使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认等。

灵活的路由:消息在进入队列之前通过Exchange进行路由。对于典型的路由功能,RabbitMQ提供了几个内置的交换来实现。对于更复杂的路由功能,可以连接多个exchange,也可以通过插件机制实现自己的exchange。

集群:多个RabbitMQ服务器可以组成一个集群,形成一个逻辑代理。

高可用队列:队列可以在集群中的计算机上进行镜像,因此即使某些节点发生故障,队列仍然可用。

多协议:RabbitMQ支持STOMP、MQTT等多种消息队列协议。

Multilingualclient(多客户端):RabbitMQ支持几乎所有常见的语言,如Java、.NET、Ruby等。

管理UI:RabbitMQ提供了一个易于使用的用户界面,允许用户监控和管理消息代理的许多方面。

Tracing:如果消息异常,RabbitMQ提供了消息追踪机制,让用户可以及时发现发生了什么。

插件机制(plug-insystem):RabbitMQ提供了多种插件进行多种方式的扩展,你也可以编写自己的插件。

展开全文

RabbitMQ管理模型

为什么要使用rabbitMQ?

下面我将以商人(花店)的角色给大家举个例子:

异步地

在顾客来店里下单之前,我会先让顾客等一会,同时让店员准备好订单,等到可以送货到店时,顾客才能离开。顾客现在客户给我打电话:“我要买xxx,地址是:xxx,发给我,我拿个小本子,记下:客户a,电话:xxx,地址:xxx

当员工有空时准备好订单并发货

解耦

以前有新订单时,我亲自去找每个文员(负责准备花束、会计、送花等),告诉他们有新订单,我有空就去处理.

有官员参加工作,我再通知一个人;当我下班时,我通知的人少了(我保留了一份需要通知的人的名单)

现在,当有新订单时,我只是把它记在一个小本子上,店员有空的时候就会看一下。

剪裁

去年七夕节我接到了很多电话。我把每一个订单都告诉了店员,但是店员太忙了,顾客不停地打电话提醒我。

最后,这位官员受够了,罢工了

今年七夕我学会了做人。当电话打进来时,我告诉客户,“我知道了,我会尽快安排修好”,然后我把它记在一本小本子上。当店员有空时,他会按顺序完成订单。

另一个思路是引导客户不要在七夕节开始下单,而是提前购买(淘宝双十一预售就是基于这个topidea)

以上就是rabbitMQ解决的主要问题。

如何使用rabbitMQ?

基本安装方法

MAC页面

酿造安装rabbitmq

复制代码

窗边

安装Erlang,下载地址:erlang.org/download/ot...

安装RabbitMQ,下载地址:dl.bintray.com/rabbitmq/al...

安装完成后,进入RabbitMQ安装目录下的sbin目录

在地址栏中输入cmd并回车启动命令提示符,然后输入以下命令启动管理功能:

Králikmq-plugins启用králikmq_management

复制代码

CentOS页面

安装二郎

#rabbitmq依赖erlang,需要自己下载

cd/path/to/erlang-sound-code&&./configure--prefix=/usr/local/erlang

做&&安装

vim/etc/配置文件

#添加

导出PATH=$PATH:/usr/local/erlang/bin资源/etc/profile

#输入erl,会显示版本信息,表示安装成功

复制代码

安装rabbitmq

#下载abbitmq_server-3.8.16并移动到/usr/local/

vim/etc/配置文件

#添加

导出PATH=$PATH:/usr/local/rabbitmq_server-3.8.16/sbin

资源/etc/profile

cd/usr/local/rabbitmq_server-3.8.16/sbin

#开始

./rabbitmq-server启动

复制代码

函数的实现

RabbitMQ有两种实现延迟消息的方式,一种是使用死叶队列,另一种是使用延迟插件。

网上有很多死信队列的实现。本文介绍使用延迟插件(mac环境,java版本)的更简单实现。

其他安装方式(推荐使用这一种)

首先准备好需要用到的安装文件和插件(rabbitmq_delayed_message_exchange)。

版本必须匹配。不一致的版本可能无法安装或可能导致兼容性问题。

我用的是erl_25.0和rabbitMQ-3.10.0(可以去官网下载或者私信作者)。使用这种安装方式的好处是本地安装和服务器安装过程完全一样,但是服务器需要根据情况开启安全端口5672和15672,一般建议测试环境开启,关闭生产环境。

安装erl和rabbitMQ,具体步骤省略(这个应该不行,逃~)。

将插件文件复制到RabbitMQ安装目录的plugins目录下,执行如下命令重启rabbitMQ:

Králikmq-plugins启用králikmq_delayed_message_exchange

复制代码

实施延迟消息

以一个真实的业务场景为例:当客服状态为在线,且客户留言3分钟内未得到回复时,即时聊天机器人会自动重启接管会话。这是使用延迟消息的常见场景。

首先在pom.xml文件中添加AMQP相关依赖

♬org.springframework.boot

spring-boot-starter-amqp

复制代码在application.yml文件中添加RabbitMQ相关配置

春天:

兔子:

host:localhost#Rabbitmq连接地址

port:5672#Rabbitmq连接端口号

virtual-host:/mall#rabbitmq的虚拟主机

用户名:im#用户名rabbitmq

密码:xxxxxx#密码rabbitmq

publisher-confirms:true#如果需要异步消息的回调,必须设置为true

复制代码

接下来创建RabbitMQjava配置,主要用于配置交换机、队列、绑定

*消息队列的配置

@配置

公共类RabbitMqConfig{

*Bot管理重启插件消息队列绑定的开关

@豆

CustomExchangechatPluginDirect(){

//创建一个可以发送延迟消息的自定义开关

Mapargs=newHashMap<>();

参数。

put("x-delayed-type","direct");

returnnewCustomExchange(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getExchange(),"x-delayed-message",true,false,args);

*机器人管理重启插件队列

@豆

公共队列chatPluginQueue(){

返回新队列(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getName());

*绑定bot重启插件队列替换*/

@豆

publicBindingchatPluginBinding(CustomExchangechatPluginDirect,QueuechatPluginQueue){

返回绑定生成器

.bind(聊天插件队列)

.to(chatPluginDirect)

.with(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getRouteKey())

.noargs();

复制代码

创建一个消息发送器,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间

*机器人重新启动发送者队列

@零件

@Slf4j

公共类ChatQueueSender{

私有静态记录器LOGGER=LoggerFactory。

getLogger(ChatQueueSender.class);

@Autowired

私有AmqpTemplateamqpTemplate;

publicvoidsendMessageToChat(Longcmid,finallongdelayTimes){

//发送消息到延迟队列

amqpTemplate.convertAndSend(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getExchange(),QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getRouteKey(),cmid,newMessagePostProcessor(){

@覆盖publicMessagepostProcessMessage(Messagemessage)抛出AmqpException{

//以毫秒为单位设置消息延迟

message.getMessageProperties().setHeader("x-delay",delayTimes);

回馈;

复制代码

创建一个消息接收器来处理延迟插件队列中的消息。

*机器人重启队列处理

@零件

@Slf4j

@RabbitListener(queues="im.chat.cancel")

公共类ChatQueueReceiver{

@Autowired

私人聊天重启机器人服务聊天重启机器人服务;

@RabbitHandler

publicvoidhandleOnChat(Longcmid){

//log.info("机器人会话重启");

chatRestartRobotService.restartRobot(cmid);

复制代码

最后调用到合适的位置:

完毕!

作者:端卢衣裳

特别声明

本文仅代表作者观点,不代表本站立场,本站仅提供信息存储服务。

分享:

扫一扫在手机阅读、分享本文