什么是rabbitMQ?
RabbitMQ是用Erlang语言开发的AMQP(高级消息队列协议)的开源实现。
RabbitMQ轻量级且易于部署,并且可以支持多种消息协议。
RabbitMQ可以部署在分布式和联合配置中,以满足大规模的高可用性需求。
具体功能包括:
可靠性:RabbitMQ使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认等。
灵活的路由:消息在进入队列之前通过Exchange进行路由。对于典型的路由功能,RabbitMQ提供了几个内置的交换来实现。对于更复杂的路由功能,可以连接多个exchange,也可以通过插件机制实现自己的exchange。
集群:多个RabbitMQ服务器可以组成一个集群,形成一个逻辑代理。
高可用队列:队列可以在集群中的计算机上进行镜像,因此即使某些节点发生故障,队列仍然可用。
多协议:RabbitMQ支持STOMP、MQTT等多种消息队列协议。
Multilingualclient(多客户端):RabbitMQ支持几乎所有常见的语言,如Java、.NET、Ruby等。
管理UI:RabbitMQ提供了一个易于使用的用户界面,允许用户监控和管理消息代理的许多方面。
Tracing:如果消息异常,RabbitMQ提供了消息追踪机制,让用户可以及时发现发生了什么。
插件机制(plug-insystem):RabbitMQ提供了多种插件进行多种方式的扩展,你也可以编写自己的插件。
展开全文
RabbitMQ管理模型
为什么要使用rabbitMQ?
下面我将以商人(花店)的角色给大家举个例子:
异步地
在顾客来店里下单之前,我会先让顾客等一会,同时让店员准备好订单,等到可以送货到店时,顾客才能离开。顾客现在客户给我打电话:“我要买xxx,地址是:xxx,发给我,我拿个小本子,记下:客户a,电话:xxx,地址:xxx
当员工有空时准备好订单并发货
解耦
以前有新订单时,我亲自去找每个文员(负责准备花束、会计、送花等),告诉他们有新订单,我有空就去处理.
有官员参加工作,我再通知一个人;当我下班时,我通知的人少了(我保留了一份需要通知的人的名单)
现在,当有新订单时,我只是把它记在一个小本子上,店员有空的时候就会看一下。
剪裁
去年七夕节我接到了很多电话。我把每一个订单都告诉了店员,但是店员太忙了,顾客不停地打电话提醒我。
最后,这位官员受够了,罢工了
今年七夕我学会了做人。当电话打进来时,我告诉客户,“我知道了,我会尽快安排修好”,然后我把它记在一本小本子上。当店员有空时,他会按顺序完成订单。
另一个思路是引导客户不要在七夕节开始下单,而是提前购买(淘宝双十一预售就是基于这个topidea)
以上就是rabbitMQ解决的主要问题。
如何使用rabbitMQ?
基本安装方法
MAC页面
酿造安装rabbitmq
复制代码
窗边
安装Erlang,下载地址:erlang.org/download/ot...
安装RabbitMQ,下载地址:dl.bintray.com/rabbitmq/al...
安装完成后,进入RabbitMQ安装目录下的sbin目录
在地址栏中输入cmd并回车启动命令提示符,然后输入以下命令启动管理功能:
Králikmq-plugins启用králikmq_management
复制代码
CentOS页面
安装二郎
#rabbitmq依赖erlang,需要自己下载
cd/path/to/erlang-sound-code&&./configure--prefix=/usr/local/erlang
做&&安装
vim/etc/配置文件
#添加
导出PATH=$PATH:/usr/local/erlang/bin资源/etc/profile
#输入erl,会显示版本信息,表示安装成功
复制代码
安装rabbitmq
#下载abbitmq_server-3.8.16并移动到/usr/local/
vim/etc/配置文件
#添加
导出PATH=$PATH:/usr/local/rabbitmq_server-3.8.16/sbin
资源/etc/profile
cd/usr/local/rabbitmq_server-3.8.16/sbin
#开始
./rabbitmq-server启动
复制代码
函数的实现
RabbitMQ有两种实现延迟消息的方式,一种是使用死叶队列,另一种是使用延迟插件。
网上有很多死信队列的实现。本文介绍使用延迟插件(mac环境,java版本)的更简单实现。
其他安装方式(推荐使用这一种)
首先准备好需要用到的安装文件和插件(rabbitmq_delayed_message_exchange)。
版本必须匹配。不一致的版本可能无法安装或可能导致兼容性问题。
我用的是erl_25.0和rabbitMQ-3.10.0(可以去官网下载或者私信作者)。使用这种安装方式的好处是本地安装和服务器安装过程完全一样,但是服务器需要根据情况开启安全端口5672和15672,一般建议测试环境开启,关闭生产环境。
安装erl和rabbitMQ,具体步骤省略(这个应该不行,逃~)。
将插件文件复制到RabbitMQ安装目录的plugins目录下,执行如下命令重启rabbitMQ:
Králikmq-plugins启用králikmq_delayed_message_exchange
复制代码
实施延迟消息
以一个真实的业务场景为例:当客服状态为在线,且客户留言3分钟内未得到回复时,即时聊天机器人会自动重启接管会话。这是使用延迟消息的常见场景。
首先在pom.xml文件中添加AMQP相关依赖
♬org.springframework.boot
spring-boot-starter-amqp
复制代码在application.yml文件中添加RabbitMQ相关配置
春天:
兔子:
host:localhost#Rabbitmq连接地址
port:5672#Rabbitmq连接端口号
virtual-host:/mall#rabbitmq的虚拟主机
用户名:im#用户名rabbitmq
密码:xxxxxx#密码rabbitmq
publisher-confirms:true#如果需要异步消息的回调,必须设置为true
复制代码
接下来创建RabbitMQjava配置,主要用于配置交换机、队列、绑定
*消息队列的配置
@配置
公共类RabbitMqConfig{
*Bot管理重启插件消息队列绑定的开关
@豆
CustomExchangechatPluginDirect(){
//创建一个可以发送延迟消息的自定义开关
Mapargs=newHashMap<>();
参数。
put("x-delayed-type","direct");
returnnewCustomExchange(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getExchange(),"x-delayed-message",true,false,args);
*机器人管理重启插件队列
@豆
公共队列chatPluginQueue(){
返回新队列(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getName());
*绑定bot重启插件队列替换*/
@豆
publicBindingchatPluginBinding(CustomExchangechatPluginDirect,QueuechatPluginQueue){
返回绑定生成器
.bind(聊天插件队列)
.to(chatPluginDirect)
.with(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getRouteKey())
.noargs();
复制代码
创建一个消息发送器,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间
*机器人重新启动发送者队列
@零件
@Slf4j
公共类ChatQueueSender{
私有静态记录器LOGGER=LoggerFactory。
getLogger(ChatQueueSender.class);
@Autowired
私有AmqpTemplateamqpTemplate;
publicvoidsendMessageToChat(Longcmid,finallongdelayTimes){
//发送消息到延迟队列
amqpTemplate.convertAndSend(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getExchange(),QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getRouteKey(),cmid,newMessagePostProcessor(){
@覆盖publicMessagepostProcessMessage(Messagemessage)抛出AmqpException{
//以毫秒为单位设置消息延迟
message.getMessageProperties().setHeader("x-delay",delayTimes);
回馈;
复制代码
创建一个消息接收器来处理延迟插件队列中的消息。
*机器人重启队列处理
@零件
@Slf4j
@RabbitListener(queues="im.chat.cancel")
公共类ChatQueueReceiver{
@Autowired
私人聊天重启机器人服务聊天重启机器人服务;
@RabbitHandler
publicvoidhandleOnChat(Longcmid){
//log.info("机器人会话重启");
chatRestartRobotService.restartRobot(cmid);
复制代码
最后调用到合适的位置:
完毕!
作者:端卢衣裳
特别声明
本文仅代表作者观点,不代表本站立场,本站仅提供信息存储服务。