本文作者:陈进坚
个人博客:https://jian1098.github.io
CSDN博客:https://blog.csdn.net/c_jian
简书:https://www.jianshu.com/u/8ba9ac5706b6
联系方式:jian1098@qq.com
安装
windows
下载安装ErLang
到https://www.erlang.org/downloads
下载安装程序,一直按提示安装即可
配置环境变量
1.在用变量新建变量名为ERLANG_HOME
,变量值为C:\Program Files\erl-23.1
的变量,变量值根据自己的安装路径填
2.在PATH
中添加%ERLANG_HOME%\bin
,保存
3.在cmd命令行中输入erl
并回车,看到版本号说明erlang
安装成功
1 | $ erl |
下载安装RabbitMQ
到https://www.rabbitmq.com/download.html
下载安装程序,一直按提示安装即可
安装RabbitMQ-Plugins
进入RabbitMQ安装目录/sbin
目录下,打开cmd命令行输入下面命令并回车
1 | rabbitmq-plugins enable rabbitmq_management |
然后再输入下面点命令并回车,会看到一堆信息
1 | rabbitmqctl status |
启动RabbitMQ
进入RabbitMQ安装目录/sbin
目录下,双击rabbitmq-server.bat
启动,然后在浏览器打开http://localhost:15672
即可打开web管理界面,默认用户名和密码都是guest
。在开始菜单也有RabbitMQ Service - start
启动和RabbitMQ Service - stop
结束快捷方式,RabbitMQ默认运行在本机的 5672
端口。
Linux
略
消息队列
创建队列
先创建连接,然后设置队列属性创建队列,golang代码如下:
1 | conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") |
生产消息
定义字符数组,然后调用Publish()方法发布消息,需要设置消息的具体参数,其中Body
为消息内容,golang代码如下:
1 | body := "Hello World!" |
消费消息
队列中的消息由消费者进行消费,同样需要设置消费参数然后调用Consume()方法,golang代码如下:
1 | msgs, err := ch.Consume( |
消息的遍历
1 | go func() { |
消息确认
当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中移除。这种情况,你只要把一个工作者(worker)停止,正在处理的消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。
为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。
如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。
在golang代码中实现
1 | msgs, err := ch.Consume( |
这里代码里将auto-ack
设置为false
,并且在业务结束后调用d.Ack(false)
手动确认即可,工作者(worker)挂掉之后,重启RabbitMQ所有没有响应的消息都会重新发送。如果忘记确认RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经消息的消息 为了排除这种错误,你可以使用rabbitmqctl
命令,输出messages_unacknowledged
字段:
1 | sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged |
Windows上执行:
1 | rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged |
数据持久化
如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。
队列持久化
在golang代码中实现
1 | q, err := ch.QueueDeclare( |
这里设置durable
参数为true
,此时,已经确保即使RabbitMQ重新启动,task_queue
队列也不会丢失。如果已经存在一个名为”hello”的非持久化队列,重新改为持久化的话会报错,必须修改name
或者清空队列。
消息持久化
在golang代码中实现
1 | err = ch.Publish( |
通过设置amqp.Publishing
的amqp.Persistent
属性即可。
注意:将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果您需要更强的保证,那么您可以使用使用后面的发布订阅功能。
公平调度
如果有多个消费者,并且有的消费者处理消息比较繁忙,有的处理消息比较轻松,可以设置计数器,让RabbitMQ一次只向一个worker发送一条消息。换句话说,在处理并确认前一个消息之前,不要向正在工作人员发送新消息。
在golang代码中实现
1 | err = ch.Qos( |
这里置预取计数值为1即可。
订阅发布
创建交换机
发布者(producer)不会直接发送任何消息给队列,只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。有几个可供选择的交换机类型:direct
, topic
, headers
和fanout
。
显示交换机列表
1 | sudo rabbitmqctl list_exchanges |
在golang代码中创建交换机
1 | err = ch.ExchangeDeclare( |
name
为交换机名称,type
为交换机类型。前面的教程中我们对交换机一无所知,但仍然能够发送消息到队列中。因为我们使用了命名为空字符串(“”)的匿名交换机。如果要发送消息到指定队列中,在调用Publish()生产消息时指定exchange
字段即可,不过该指定的交换机必须是创建好的。
绑定交换机
我们已经创建了一个fanout
交换机和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。交换机和队列之间的联系我们称之为绑定(binding)。golang代码实现:
1 | err = ch.QueueBind( |
绑定好交易机之后生产者将消息存放在指定队列,在调用Consume()方法消费时只要指定queue
队列名就能获取指定队列消息。
路由
在上面绑定交换机的设置中可以看到有一个参数routing key
是空的,这个参数就是路由
绑定路由
绑定(binding)是指交换机(exchange)和队列(queue)的关系。可以简单理解为:这个队列(queue)对这个交换机(exchange)的消息感兴趣。绑定的时候可以带上一个额外的routing_key
参数。为了避免与Channel.Publish
的参数混淆,我们把它叫做绑定键binding key
。绑定键的意义取决于交换机(exchange)的类型。我们之前使用过fanout
交换机会忽略这个值,使用direct
交换机来代替这个值会生效。
每个队列可以有多个绑定键,每个交换机可以设置相同的或多个绑定键到各个队列中。原理如下图
其中P为生产者,X为交换机,amqp为队列,C为消费者
订阅路由
绑定好路由之后就可以订阅指定路由的队列,在消费队列的队列绑定QueueBind()方法中指定routing key
的名称即可,golang代码实现:
1 | for _, s := range os.Args[1:] { |
主题交换机
发送到topic
交换机的消息不可以携带随意routing_key
,它的routing_key必须是一个由.
分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。
topic
交换机背后的逻辑跟direct
交换机很相似 —— 一个携带着特定routing_key的消息会被topic交换机投递给绑定键与之想匹配的队列。但是它的binding key和routing_key有两个特殊应用方式:
*
(星号) 用来表示一个单词.#
(井号) 用来表示任意数量(零个或多个)单词。
下边用图说明:
这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.
分割开。路由键里的第一个单词描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的: <celerity>.<colour>.<species>
。
我们创建了三个绑定:Q1的绑定键为 *.orange.*
,Q2的绑定键为 *.*.rabbit
和 lazy.#
。
这三个绑定键被可以总结为:
- Q1 对所有的桔黄色动物都感兴趣。
- Q2 则是对所有的兔子和所有懒惰的动物感兴趣。
一个携带有 quick.orange.rabbit
的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant
的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox
的消息会投递给第一个队列,携带有 lazy.brown.fox
的消息会投递给第二个队列。携带有 lazy.pink.rabbit
的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox
的消息不会投递给任何一个队列。
如果我们违反约定,发送了一个携带有一个单词或者四个单词("orange"
or "quick.orange.male.rabbit"
)的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。
但是另一方面,即使 "lazy.orange.male.rabbit"
有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。