You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
10 KiB
10 KiB
RabbitMQ
#go安装依赖
go get github.com/rabbitmq/qmqp091-go
#docker 安装延迟插件(从系统copy)
docker cp rabbitmq-delayed-message-exchange-4.1.0.ez rabbitmq:/plugins
#启用延迟插件
rabbitmq-plugins enable rabbitmq_delayed_message_exch
#docker启动
docker start my-rabbit # 容器名
创建连接
conn, err := amqp.Dial("amqp://url") //连接mq
if err != nil {
panic(err)
}
chl, err := conn.Channel() //获取管道
if err != nil {
panic(err)
}
defer chl.Close()
声明队列
queue, err := chl.QueueDeclare(
"queue1", //队列名
true, //是否持久化
false, //是否自动删除
false, //是否排他
false, //是否阻塞
amqp.Table{ //队列选项
amqp.QueueTypeArg:amqp.QueueTypeQuorum,
},
)
队列选项
"x-message-ttl" //消息过期时间
"x-max-length" //队列最大长度
"x-max-priority" //优先级
"x-dead-letter-exchange" //死信交换机
"x-dead-letter-routing-key" //死信路由键
//还有几个枚举:
amqp.QueueTypeClassic//经典队列
amqp.QueueTypeQuorum//仲裁队列(分布式)
amqp.QueueTypeStream//流式队列(抄袭kafka)
//抛弃策略(有点像连接池啊)
QueueOVerflowDropHead//喜新厌旧
QueueOVerflowRejectPublish//拒收新的
QueueOVerflowRejectPublishDLX//新的扔进死信队列
发送消息
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
//上下文控制消息超时时间
defer cancel()
body := "测试信息"
chl.PublishWithContext(ctx,
"",//交换机名
queue.Name,//路由键
false,//强制性设置
false,//及时性
amqp.Publishing{
ContentType: "text/plain",//格式
Body: []byte(body),//发送的消息内容
})
消费消息
func consumeMsg1() {
//为防止创建消费者时队列还没被创建,应该在创建消费者前也创建一遍队列
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
chl, err := conn.Channel()
if err != nil {
panic(err)
}
defer chl.Close()
queue, err := chl.QueueDeclare(
"test.queue1",
true,
false,
false,
false,
amqp.Table{
amqp.QueueTypeArg: amqp.QueueTypeQuorum,
},
)
msgs, err := chl.Consume(
queue.Name,//队列名
"test.consumer1",//消费者标识,留空则自动生成
true,//auto-ack
false,//是否排他
false,//no-local 设置为true则不允许同一连接中的消费者消费发布者的消息
false,//是否不等待服务器响应
nil,//amqp.Table
)//返回值是一个 Delievery管道
go func() {
for msg := range msgs {
println("收到信息:", string(msg.Body))
}
}()
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-ch
println("关闭线程")
close(ch)
}
工作队列
//工作队列只要设置多个消费者消费同一个队列即可,默认使用轮询消费
//测试代码:
//生产者
func sendBatchMsg1() {
conn, chl := createConn()
defer conn.Close()
defer chl.Close()
queue, err := chl.QueueDeclare(
"test.queue1",
true,
false,
false,
false,
amqp.Table{
amqp.QueueTypeArg: amqp.QueueTypeQuorum,
},
)
if err != nil {
panic(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for i := 0; i < 10; i++ {
body := "发送消息第" + strconv.Itoa(i+1) + "条"
chl.PublishWithContext(ctx, "", queue.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
time.Sleep(1 * time.Second)
}
}
//消费者
func WorkReceiver() {
conn, chl := createConn()
defer conn.Close()
defer chl.Close()
queue, err := chl.QueueDeclare(
"test.queue1",
true,
false,
false,
false,
amqp.Table{
amqp.QueueTypeArg: amqp.QueueTypeQuorum,
},
)
if err != nil {
panic(err)
}
for i := 0; i < 3; i++ {//开三个消费者
go func(i1 int) {
msgs, err2 := chl.Consume(
queue.Name,
"worker"+strconv.Itoa(i1),
true,
false,
false,
false,
amqp.Table{},
)
if err2 != nil {
panic(err2)
}
for msg := range msgs {
println("worker "+strconv.Itoa(i1)+"receive a msg:", string(msg.Body))
}
}(i)
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-ch
}
发布订阅模式
//测试代码:
//公共代码:
func createSubscribeQueue(chl *amqp.Channel) {
err := chl.ExchangeDeclare(
"sub.genshinImpact",
"fanout",
false,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
err = chl.ExchangeDeclare(
"sub.wutheringWaves",
"fanout",
false,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
chl.QueueDeclare(
"test.genshinImpact",
true,
false,
false,
false,
nil,
)
chl.QueueBind(//绑定队列和交换机
"test.genshinImpact",//队列名
"",//路由key会被忽略
"sub.genshinImpact",//交换机名
false,
nil,
)
chl.QueueDeclare(
"test.wutheringWaves",
true,
false,
false,
false,
nil,
)
chl.QueueBind(
"test.wutheringWaves",
"",
"sub.wutheringWaves",
false,
nil,
)
chl.QueueDeclare(
"test.both",
true,
false,
false,
false,
nil,
)
chl.QueueBind(
"test.both",
"",
"sub.wutheringWaves",
false,
nil,
)
chl.QueueBind(
"test.both",
"",
"sub.genshinImpact",
false,
nil,
)
}
//生产者
func subscribeMsg1() {
conn, chl := createConn()
defer conn.Close()
defer chl.Close()
createSubscribeQueue(chl)
ctx1, cancel1 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel1()
body1 := "原神牛逼"
err := chl.PublishWithContext(ctx1,
"sub.genshinImpact",
"",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body1),
},
)
if err != nil {
panic(err)
}
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
body2 := "鸣潮牛逼"
err = chl.PublishWithContext(ctx2,
"sub.wutheringWaves",
"",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body2),
},
)
if err != nil {
panic(err)
}
}
//消费者
func subscriber() {
conn, chl := createConn()
defer conn.Close()
defer chl.Close()
createSubscribeQueue(chl)
msgs, err := chl.Consume(
"test.genshinImpact",
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
go func() {
for msg := range msgs {
fmt.Printf("subscriber1 receive a msg: %s \n", string(msg.Body))
}
}()
msgs1, err := chl.Consume(
"test.wutheringWaves",
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
go func() {
for msg := range msgs1 {
fmt.Printf("subscriber2 receive a msg: %s \n", string(msg.Body))
}
}()
msgs2, err := chl.Consume(
"test.both",
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
go func() {
for msg := range msgs2 {
fmt.Printf("subscriber3 receive a msg: %s \n", string(msg.Body))
}
}()
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-ch
}
#测试结果:
PS D:\Prj\learn_prj\gf_demo_02\internal\service\rabbit> go test -v -run TestSubscriber
=== RUN TestSubscriber
subscriber3 receive a msg: 原神牛逼
subscriber1 receive a msg: 原神牛逼
subscriber2 receive a msg: 鸣潮牛逼
subscriber3 receive a msg: 鸣潮牛逼
--- PASS: TestSubscriber (108.40s)
PASS
ok gf_demo_02/internal/service/rabbit 109.396s
路由模式
//和发布订阅大同小异,代码略
//区别:
1.交换机类型是direct
2.Bind和Publisher需要指定路由key
主题模式
主题模式用topic名替代路由Key
可用通配符订阅,*代表单级通配符,#代表递归通配符
//测试代码
//通用代码:
func topicQueueMaker(chl *amqp.Channel) (q1 amqp.Queue, q2 amqp.Queue, q3 amqp.Queue) {
chl.ExchangeDeclare(
"top.ex1",
"topic",//类型是topic
true,
false,
false,
false,
nil,
)
q1, _ = chl.QueueDeclare(
"",
true,
false,
false,
false,
nil,
)
q2, _ = chl.QueueDeclare(
"",
true,
false,
false,
false,
nil,
)
q3, _ = chl.QueueDeclare(
"",
true,
false,
false,
false,
nil,
)
chl.QueueBind(q1.Name,
"top.gen",
"top.ex1",
false,
nil,
)
chl.QueueBind(q2.Name,
"top.wave",
"top.ex1",
false,
nil,
)
chl.QueueBind(q3.Name,
"top.*",//通配符订阅
"top.ex1",
false,
nil,
)
return
}
//发布者
func topicSender() {
conn, chl := createConn()
defer conn.Close()
defer chl.Close()
topicQueueMaker(chl)
ctx, cancel1 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel1()
chl.PublishWithContext(ctx,
"top.ex1",
"top.gen",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("原神牛批"),
},
)
chl.PublishWithContext(ctx,
"top.ex1",
"top.wave",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("鸣潮牛批"),
},
)
}
//消费者
func topicReceiver() {
conn, chl := createConn()
defer conn.Close()
defer chl.Close()
q1, q2, q3 := topicQueueMaker(chl)
msgs1, _ := chl.Consume(
q1.Name,
"",
true,
false,
false,
false,
nil,
)
msgs2, _ := chl.Consume(
q2.Name,
"",
true,
false,
false,
false,
nil,
)
msgs3, _ := chl.Consume(
q3.Name,
"",
true,
false,
false,
false,
nil,
)
go func() {
for msg := range msgs1 {
fmt.Printf("topicReceiver1 receive a msg: %s \n", string(msg.Body))
}
}()
go func() {
for msg := range msgs2 {
fmt.Printf("topicReceiver2 receive a msg: %s \n", string(msg.Body))
}
}()
go func() {
for msg := range msgs3 {
fmt.Printf("topicReceiver3 receive a msg: %s \n", string(msg.Body))
}
}()
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-ch
}
#测试输出结果:
PS D:\Prj\learn_prj\gf_demo_02\internal\service\rabbit> go test -v -run TestTopicReceiver
=== RUN TestTopicReceiver
topicReceiver3 receive a msg: 原神牛批
topicReceiver1 receive a msg: 原神牛批
topicReceiver3 receive a msg: 鸣潮牛批
topicReceiver2 receive a msg: 鸣潮牛批
--- PASS: TestTopicReceiver (234.84s)
PASS
ok gf_demo_02/internal/service/rabbit 235.972s