go语言实现RabbitMQ

GO语言实现RabbitMQ

利用Go语言来实现RabbitMQ的几种工作模式

RabbitMQ基础代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
const MQURL = "amqp://用户名:密码@host:端口/vhost"
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
//队列名称
QueueName string
//交换机
Exchange string
//key
Key string
//连接信息
MqUrl string
//互斥锁
sync.Mutex
}
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ{
rabbitMQ := &RabbitMQ{
QueueName: queueName,
Exchange: exchange,
Key: key,
MqUrl:MQURL,
}
var err error
rabbitMQ.conn,err = amqp.Dial(rabbitMQ.MqUrl)
if err != nil {
rabbitMQ.failOnErr(err,"创建连接错误")
}
rabbitMQ.channel,err = rabbitMQ.conn.Channel()
if err != nil {
rabbitMQ.failOnErr(err,"获取Channel失败")
}
return rabbitMQ
}
//断开channel conn
func (r *RabbitMQ) Destroy(){
r.channel.Close()
r.conn.Close()
}
//错误处理函数
func (r *RabbitMQ) failOnErr(err error,msg string) {
if err != nil {
log.Fatal("%s:%s",msg,err)
panic(fmt.Sprintf("%s:%s",msg,err))
}
}

简单模式(点对点模式)

应用场景:发送验证码,

一个生产者,一个消费者。生产者将消息放入队列,消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理就已经从队列中消失了,造成消息的丢失)

简单模式下生产者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//创建简单模式下RabbitMQ实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
//创建RabbitMQ实例
rabbitmq := NewRabbitMQ(queueName, "", "")
var err error
//获取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabb"+
"itmq!")
//获取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
//直接模式队列生产
func (r *RabbitMQ) PublishSimple(message string) error {
//加锁
r.Lock()
//执行结束解锁
defer r.Unlock()
//1.申请队列,如果队列不存在会自动创建,存在则跳过创建
_, err := r.channel.QueueDeclare(
r.QueueName,
//是否持久化
false,
//是否自动删除
false,
//是否具有排他性
false,
//是否阻塞处理
false,
//额外的属性
nil,
)
if err != nil {
return err
}
//调用channel 发送消息到队列中
r.channel.Publish(
r.Exchange,
r.QueueName,
//如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
false,
//如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
return nil
}

对于高并发来说,防止资源产生竞争关系,我在生成端添加了互斥锁,要保证信息已经成功的被添加到队列中;队列创建好,我们就把产生的消息发送出去。相关参数介绍我已经在代码里做了备注。

接下来我们来实现,消费端的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//实现简单模式下消费者代码
func (r *RabbitMQ) ConsumeSimple(orderService service.IOrderService,
productService service.ProductService){
//申请队列,如果不存在会自动创建,存在跳过创建,保证队列存在,消息能发送到队列中
q,err := r.channel.QueueDeclare(
r.QueueName,
//控制消息是否持久化,true开启
false,
//是否为自动删除
false,
//是否具有排他性
false,
//是否阻塞
false,
//额外属性
nil,
)
if err != nil{
fmt.Println(err)
}
//消费者流控,防止暴库
r.channel.Qos(
//每次只接受一个消息进行消费
1,
//服务器传递的最大容量(以8位字节为单位)
0,
//true对全局可用,false只对当前channel可用
false,
)
//接收消息
messages,err := r.channel.Consume(
q.Name,
//用来区分多个消费者
"",
//是否自动应答
false,
//是否具有排他性
false,
//如果设置为true,表示不能将同一个connection中发送的消息
//传递给同一个connection的消费者
false,
//是否为阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
//启用协程处理消息
go func() {
for d := range messages{
//实现我们的处理逻辑函数
log.Printf("Received a message : %s",d.Body)
message := &datamodels.Message{}
err :=json.Unmarshal([]byte(d.Body),message)
if err !=nil {
fmt.Println(err)
}
//业务代码
...
//业务代码
//如果为true表示确认所有未确认的消息,false为当前消息
d.Ack(false)
}
}()
log.Printf("[*] Waiting for messages,To exit press CTRAL+C")
<-forever
}

消费端同样首先判断队列是否生成成功,接下来要注意的地方就是,RabbitMQ的限流,等消费者消费完一个信息,才去接收下一个信息,防止消费端在处理类似于Mysql数据库的时候发生暴库,具体实现如下:

1
2
3
4
5
6
7
8
9
//消费者流控,防止暴库
r.channel.Qos(
//每次只接受一个消息进行消费
1,
//服务器传递的最大容量(以8位字节为单位)
0,
//true对全局可用,false只对当前channel可用
false,
)

工作模式

应用场景: 抢包红;大项目中的资源调度;秒杀系统的订单处理服务等

一个生产者,多个消费者,每个消费者获取到的消息唯一。工作模式对于简单模式的代码来说是一样的,只是消费端的个数增加了,这种情况适用于生产者生成消息过于快,消费端来不及消费,才会采用多个消费端进行消费。
如图

订阅模式

短信群发;发送广播,公告消息等

一个生产者发送的消息会被多个消费者获取。
如图
实例订阅模式的代码如下:

1
2
3
4
//实现订阅模式
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
return NewRabbitMQ("",exchangeName ,"")
}

订阅模式下的生产端的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//实现订阅模式下生产者代码
func (r *RabbitMQ) PublishPub(message string) error{
r.Lock()
defer r.Unlock()
err := r.channel.ExchangeDeclare(
r.Exchange,
"fanout", //设置交换机的类型,在订阅模式下交换机的类型为广播类型
true,
false,
false,//true表示这个参数exchange不可以被client用来推送信息,仅用来exchange和exchange之间的绑定
false,
nil,
)
if err != nil {
r.failOnErr(err,"创建交换机异常")
}
//发送消息到队列中
r.channel.Publish(
r.Exchange,
"",
//如果为true,根据exchange类型和routekey类型,如果无法找到符合条件的队列,name会把发送的信息返回给发送者
false,
//如果为true,当exchange发送到消息队列后发现队列上没有绑定的消费者,则会将消息返还给发送者
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
return nil
}

订阅模式的消费端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//订阅模式下的消费端代码
func (r *RabbitMQ) ConsumePub(){
//1.试探创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange,
"fanout", //设置交换机的类型,在订阅模式下交换机的类型为广播类型
true,
false,
false,//true表示这个参数exchange不可以被client用来推送信息,仅用来exchange和exchange之间的绑定
false,
nil,
)
if err != nil {
r.failOnErr(err,"创建交换机异常")
}
//2.申请队列,如果不存在会自动创建,存在跳过创建,保证队列存在,消息能发送到队列中
q,err := r.channel.QueueDeclare(
"", //随机生成队列名称留空
//控制消息是否持久化,true开启
false,
//是否为自动删除
false,
//是否具有排他性
true,
//是否阻塞
false,
//额外属性
nil,
)
if err != nil{
r.failOnErr(err,"生产队列异常")
}
//绑定队列到Exchange中
err = r.channel.QueueBind(
q.Name,
"",//在订阅模式下这个参数必须为空,
r.Exchange,
false,
nil,
)
//接收消息
messages,err := r.channel.Consume(
q.Name,
//用来区分多个消费者
"",
//是否自动应答
true,
//是否具有排他性
false,
//如果设置为true,表示不能将同一个connection中发送的消息
//传递给同一个connection的消费者
false,
//是否为阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
//启用协程处理消息
go func() {
for d := range messages{
//实现我们的处理逻辑函数
log.Printf("Received a message : %s",d.Body)
}
}()
log.Printf("[*] Waiting for messages,To exit press CTRAL+C")
<-forever
}

路由模式

发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
如图
路由模式的实例代码如下:

1
2
3
4
//实现simple模式
func NewRabbitMQRouting(exchangeName string, routingKey string) *RabbitMQ {
return NewRabbitMQ("",exchangeName ,routingKey )
}

注意路由模式和订阅模式的区别在于,将广播模式改为direct模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (r *RabbitMQ) PublishRouting(message string ) {
//尝试chuangjian
err := r.channel.ExchangeDeclare(
r.Exchange,
"direct", //广播模式设置direct模式
true,
false,
false,//true表示这个参数exchange不可以被client用来推送信息,仅用来exchange和exchange之间的绑定
false,
nil,
)
if err != nil {
r.failOnErr(err,"创建direct异常")
}
//发送消息到队列中
r.channel.Publish(
r.Exchange,
r.Key,
//如果为true,根据exchange类型和routekey类型,如果无法找到符合条件的队列,name会把发送的信息返回给发送者
false,
//如果为true,当exchange发送到消息队列后发现队列上没有绑定的消费者,则会将消息返还给发送者
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
}

消费端代码实现:

在声明交换机的时候,将广播模式改为direct模式。绑定队列的时候,要将key参数进行添加,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func (r *RabbitMQ) RecieveRouting() {
//1.试探性创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange,
//交换机类型
"direct",
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "Failed to declare an exch"+
"ange")
//2.试探性创建队列,这里注意队列名称不要写
q, err := r.channel.QueueDeclare(
"", //随机生产队列名称
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//绑定队列到 exchange 中
err = r.channel.QueueBind(
q.Name,
r.Key,
r.Exchange,
false,
nil,
)
//消费消息
messges, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range messges {
log.Printf("Received a message: %s", d.Body)
}
}()
fmt.Println("退出请按 CTRL+C\n")
<-forever
}

话题模式

将路由键和某模式进行匹配,此时队列需要绑定在一个模式上。“”和“#”代表通配符,“#”匹配一个词或多个词,“”只匹配一个词。

消息产生者产生消息,把消息交给交换机,交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息进行消费
如图
和路由模式的区别在于,将交换机的模式改变为topic模式,其他的不变。