小坚的技术博客

RabbitMQ学习笔记

本文作者:陈进坚
个人博客:https://jian1098.github.io
CSDN博客:https://blog.csdn.net/c_jian
简书:https://www.jianshu.com/u/8ba9ac5706b6
联系方式:jian1098@qq.com

安装


windows

下载安装ErLang

https://www.erlang.org/downloads下载安装程序,一直按提示安装即可

配置环境变量

1.在用变量新建变量名为ERLANG_HOME,变量值为C:\Program Files\erl-23.1的变量,变量值根据自己的安装路径填

2.在PATH中添加%ERLANG_HOME%\bin,保存

3.在cmd命令行中输入erl并回车,看到版本号说明erlang安装成功

1
2
$ erl
Eshell V11.1 (abort with ^G)
下载安装RabbitMQ

https://www.rabbitmq.com/download.html下载安装程序,一直按提示安装即可

安装RabbitMQ-Plugins

进入RabbitMQ安装目录/sbin目录下,打开cmd命令行输入下面命令并回车

1
rabbitmq-plugins enable rabbitmq_management

然后再输入下面点命令并回车,会看到一堆信息

1
rabbitmqctl status
启动RabbitMQ

进入RabbitMQ安装目录/sbin目录下,双击rabbitmq-server.bat启动,然后在浏览器打开http://localhost:15672即可打开web管理界面,默认用户名和密码都是guest。在开始菜单也有RabbitMQ Service - start启动和RabbitMQ Service - stop结束快捷方式,RabbitMQ默认运行在本机的 5672端口。

Linux

消息队列


创建队列

先创建连接,然后设置队列属性创建队列,golang代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"hello", // name
false, // durable ,RabbitMQ重启后数据会消失,要持久化必须开启这个参数
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

生产消息

定义字符数组,然后调用Publish()方法发布消息,需要设置消息的具体参数,其中Body为消息内容,golang代码如下:

1
2
3
4
5
6
7
8
9
10
11
body := "Hello World!"
err = ch.Publish(
"", // exchange 交换机名称
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")

消费消息

队列中的消息由消费者进行消费,同样需要设置消费参数然后调用Consume()方法,golang代码如下:

1
2
3
4
5
6
7
8
9
10
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack 自动确认消息,如果改为false,在下面逻辑业务结束后需要调用d.Ack(false)
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError2(err, "Failed to register a consumer")

消息的遍历

1
2
3
4
5
6
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
//d.Ack(false) //手动消息确认时需要调用
}
}()

消息确认


当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中移除。这种情况,你只要把一个工作者(worker)停止,正在处理的消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。

为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。

如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。

在golang代码中实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack 修改这个值
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false) //手动确认
}
}()

这里代码里将auto-ack设置为false,并且在业务结束后调用d.Ack(false)手动确认即可,工作者(worker)挂掉之后,重启RabbitMQ所有没有响应的消息都会重新发送。如果忘记确认RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经消息的消息 为了排除这种错误,你可以使用rabbitmqctl命令,输出messages_unacknowledged字段:

1
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Windows上执行:

1
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

数据持久化


如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。

队列持久化

在golang代码中实现

1
2
3
4
5
6
7
8
9
q, err := ch.QueueDeclare(
"hello", // name
true, // durable 持久化设置
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

这里设置durable参数为true,此时,已经确保即使RabbitMQ重新启动,task_queue队列也不会丢失。如果已经存在一个名为”hello”的非持久化队列,重新改为持久化的话会报错,必须修改name或者清空队列。

消息持久化

在golang代码中实现

1
2
3
4
5
6
7
8
9
10
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})

通过设置amqp.Publishingamqp.Persistent属性即可。

注意:将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果您需要更强的保证,那么您可以使用使用后面的发布订阅功能。

公平调度


如果有多个消费者,并且有的消费者处理消息比较繁忙,有的处理消息比较轻松,可以设置计数器,让RabbitMQ一次只向一个worker发送一条消息。换句话说,在处理并确认前一个消息之前,不要向正在工作人员发送新消息。

在golang代码中实现

1
2
3
4
5
6
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")

这里置预取计数值为1即可。

订阅发布


创建交换机

发布者(producer)不会直接发送任何消息给队列,只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。有几个可供选择的交换机类型:direct, topic, headersfanout

显示交换机列表

1
sudo rabbitmqctl list_exchanges

在golang代码中创建交换机

1
2
3
4
5
6
7
8
9
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)

name为交换机名称,type为交换机类型。前面的教程中我们对交换机一无所知,但仍然能够发送消息到队列中。因为我们使用了命名为空字符串(“”)的匿名交换机。如果要发送消息到指定队列中,在调用Publish()生产消息时指定exchange字段即可,不过该指定的交换机必须是创建好的。

绑定交换机

我们已经创建了一个fanout交换机和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。交换机和队列之间的联系我们称之为绑定(binding)。golang代码实现:

1
2
3
4
5
6
7
err = ch.QueueBind(
q.Name, // queue name 队列名
"", // routing key
"logs", // exchange 交换机名
false,
nil,
)

绑定好交易机之后生产者将消息存放在指定队列,在调用Consume()方法消费时只要指定queue队列名就能获取指定队列消息。

路由


在上面绑定交换机的设置中可以看到有一个参数routing key是空的,这个参数就是路由

绑定路由

绑定(binding)是指交换机(exchange)和队列(queue)的关系。可以简单理解为:这个队列(queue)对这个交换机(exchange)的消息感兴趣。绑定的时候可以带上一个额外的routing_key参数。为了避免与Channel.Publish的参数混淆,我们把它叫做绑定键binding key。绑定键的意义取决于交换机(exchange)的类型。我们之前使用过fanout 交换机会忽略这个值,使用direct 交换机来代替这个值会生效。

每个队列可以有多个绑定键,每个交换机可以设置相同的或多个绑定键到各个队列中。原理如下图

其中P为生产者,X为交换机,amqp为队列,C为消费者

订阅路由

绑定好路由之后就可以订阅指定路由的队列,在消费队列的队列绑定QueueBind()方法中指定routing key的名称即可,golang代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_direct", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key 指定路由名称
"logs_direct", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

主题交换机

发送到topic交换机的消息不可以携带随意routing_key,它的routing_key必须是一个由.分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。

topic交换机背后的逻辑跟direct交换机很相似 —— 一个携带着特定routing_key的消息会被topic交换机投递给绑定键与之想匹配的队列。但是它的binding key和routing_key有两个特殊应用方式:

  • * (星号) 用来表示一个单词.
  • # (井号) 用来表示任意数量(零个或多个)单词。

下边用图说明:
None

这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.分割开。路由键里的第一个单词描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的: <celerity>.<colour>.<species>

我们创建了三个绑定:Q1的绑定键为 *.orange.*,Q2的绑定键为 *.*.rabbitlazy.#

这三个绑定键被可以总结为:

  • Q1 对所有的桔黄色动物都感兴趣。
  • Q2 则是对所有的兔子所有懒惰的动物感兴趣。

一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant 的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。

如果我们违反约定,发送了一个携带有一个单词或者四个单词("orange" or "quick.orange.male.rabbit")的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。

但是另一方面,即使 "lazy.orange.male.rabbit" 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。

参考文档

http://rabbitmq.mr-ping.com/tutorials_with_golang

-------------本文结束感谢您的阅读-------------
🐶 您的支持将鼓励我继续创作 🐶