You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

10 KiB

RabbitMQ

#go安装依赖
go get github.com/rabbitmq/qmqp091-go
#docker 安装延迟插件(从系统copy)
docker cp rabbitmq-delayed-message-exchange-4.1.0.ez rabbitmq:/plugins
#启用延迟插件
rabbitmq-plugins enable rabbitmq_delayed_message_exch
#docker启动
docker start my-rabbit # 容器名

创建连接

conn, err := amqp.Dial("amqp://url") //连接mq
	if err != nil {
		panic(err)
	}
chl, err := conn.Channel() //获取管道
	if err != nil {
		panic(err)
	}
	defer chl.Close()

声明队列

queue, err := chl.QueueDeclare(
		"queue1", //队列名
		true, //是否持久化
		false, //是否自动删除
		false, //是否排他
		false, //是否阻塞
		amqp.Table{ //队列选项
			amqp.QueueTypeArg:amqp.QueueTypeQuorum,
		},
	)

队列选项

"x-message-ttl" //消息过期时间
"x-max-length" //队列最大长度
"x-max-priority" //优先级
"x-dead-letter-exchange" //死信交换机
"x-dead-letter-routing-key" //死信路由键
//还有几个枚举:
amqp.QueueTypeClassic//经典队列
amqp.QueueTypeQuorum//仲裁队列(分布式)
amqp.QueueTypeStream//流式队列(抄袭kafka)
//抛弃策略(有点像连接池啊)
QueueOVerflowDropHead//喜新厌旧
QueueOVerflowRejectPublish//拒收新的
QueueOVerflowRejectPublishDLX//新的扔进死信队列

发送消息

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
//上下文控制消息超时时间
	defer cancel()
	body := "测试信息"
	chl.PublishWithContext(ctx,
                           "",//交换机名
                           queue.Name,//路由键
                           false,//强制性设置
                           false,//及时性
                           amqp.Publishing{
		ContentType: "text/plain",//格式
		Body:        []byte(body),//发送的消息内容
	})

消费消息

func consumeMsg1() {
    //为防止创建消费者时队列还没被创建,应该在创建消费者前也创建一遍队列
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	chl, err := conn.Channel()
	if err != nil {
		panic(err)
	}
	defer chl.Close()
	queue, err := chl.QueueDeclare(
		"test.queue1",
		true,
		false,
		false,
		false,
		amqp.Table{
			amqp.QueueTypeArg: amqp.QueueTypeQuorum,
		},
	)
	msgs, err := chl.Consume(
		queue.Name,//队列名
		"test.consumer1",//消费者标识,留空则自动生成
		true,//auto-ack
		false,//是否排他
		false,//no-local 设置为true则不允许同一连接中的消费者消费发布者的消息
		false,//是否不等待服务器响应
		nil,//amqp.Table
	)//返回值是一个 Delievery管道
	go func() {
		for msg := range msgs {
			println("收到信息:", string(msg.Body))
		}
	}()
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	<-ch
	println("关闭线程")
	close(ch)
}

工作队列

//工作队列只要设置多个消费者消费同一个队列即可,默认使用轮询消费
//测试代码:
//生产者
func sendBatchMsg1() {
	conn, chl := createConn()
	defer conn.Close()
	defer chl.Close()
	queue, err := chl.QueueDeclare(
		"test.queue1",
		true,
		false,
		false,
		false,
		amqp.Table{
			amqp.QueueTypeArg: amqp.QueueTypeQuorum,
		},
	)
	if err != nil {
		panic(err)
	}
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	for i := 0; i < 10; i++ {
		body := "发送消息第" + strconv.Itoa(i+1) + "条"
		chl.PublishWithContext(ctx, "", queue.Name, false, false, amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
		time.Sleep(1 * time.Second)
	}
}
//消费者
func WorkReceiver() {
	conn, chl := createConn()
	defer conn.Close()
	defer chl.Close()
	queue, err := chl.QueueDeclare(
		"test.queue1",
		true,
		false,
		false,
		false,
		amqp.Table{
			amqp.QueueTypeArg: amqp.QueueTypeQuorum,
		},
	)
	if err != nil {
		panic(err)
	}
	for i := 0; i < 3; i++ {//开三个消费者
		go func(i1 int) {
			msgs, err2 := chl.Consume(
				queue.Name,
				"worker"+strconv.Itoa(i1),
				true,
				false,
				false,
				false,
				amqp.Table{},
			)
			if err2 != nil {
				panic(err2)
			}
			for msg := range msgs {
				println("worker "+strconv.Itoa(i1)+"receive a msg:", string(msg.Body))
			}
		}(i)
	}
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	<-ch
}

发布订阅模式

//测试代码:
//公共代码:
func createSubscribeQueue(chl *amqp.Channel) {
	err := chl.ExchangeDeclare(
		"sub.genshinImpact",
		"fanout",
		false,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		panic(err)
	}
	err = chl.ExchangeDeclare(
		"sub.wutheringWaves",
		"fanout",
		false,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		panic(err)
	}
	chl.QueueDeclare(
		"test.genshinImpact",
		true,
		false,
		false,
		false,
		nil,
	)
	chl.QueueBind(//绑定队列和交换机
		"test.genshinImpact",//队列名
		"",//路由key会被忽略
		"sub.genshinImpact",//交换机名
		false,
		nil,
	)
	chl.QueueDeclare(
		"test.wutheringWaves",
		true,
		false,
		false,
		false,
		nil,
	)
	chl.QueueBind(
		"test.wutheringWaves",
		"",
		"sub.wutheringWaves",
		false,
		nil,
	)
	chl.QueueDeclare(
		"test.both",
		true,
		false,
		false,
		false,
		nil,
	)
	chl.QueueBind(
		"test.both",
		"",
		"sub.wutheringWaves",
		false,
		nil,
	)
	chl.QueueBind(
		"test.both",
		"",
		"sub.genshinImpact",
		false,
		nil,
	)
}
//生产者
func subscribeMsg1() {
	conn, chl := createConn()
	defer conn.Close()
	defer chl.Close()
	createSubscribeQueue(chl)
	ctx1, cancel1 := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel1()
	body1 := "原神牛逼"
	err := chl.PublishWithContext(ctx1,
		"sub.genshinImpact",
		"",
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body1),
		},
	)
	if err != nil {
		panic(err)
	}
	ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel2()
	body2 := "鸣潮牛逼"
	err = chl.PublishWithContext(ctx2,
		"sub.wutheringWaves",
		"",
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body2),
		},
	)
	if err != nil {
		panic(err)
	}
}
//消费者
func subscriber() {
	conn, chl := createConn()
	defer conn.Close()
	defer chl.Close()
	createSubscribeQueue(chl)
	msgs, err := chl.Consume(
		"test.genshinImpact",
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		log.Fatal(err)
	}
	go func() {
		for msg := range msgs {
			fmt.Printf("subscriber1 receive a msg: %s \n", string(msg.Body))
		}
	}()
	msgs1, err := chl.Consume(
		"test.wutheringWaves",
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		log.Fatal(err)
	}
	go func() {
		for msg := range msgs1 {
			fmt.Printf("subscriber2 receive a msg: %s \n", string(msg.Body))
		}
	}()
	msgs2, err := chl.Consume(
		"test.both",
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		log.Fatal(err)
	}
	go func() {
		for msg := range msgs2 {
			fmt.Printf("subscriber3 receive a msg: %s \n", string(msg.Body))
		}
	}()
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	<-ch
}


#测试结果:
PS D:\Prj\learn_prj\gf_demo_02\internal\service\rabbit> go test -v -run TestSubscriber                                                                                                                                              
=== RUN   TestSubscriber
subscriber3 receive a msg: 原神牛逼 
subscriber1 receive a msg: 原神牛逼 
subscriber2 receive a msg: 鸣潮牛逼 
subscriber3 receive a msg: 鸣潮牛逼 
--- PASS: TestSubscriber (108.40s)
PASS
ok      gf_demo_02/internal/service/rabbit      109.396s

路由模式

//和发布订阅大同小异,代码略
//区别:
1.交换机类型是direct
2.Bind和Publisher需要指定路由key

主题模式

主题模式用topic名替代路由Key

可用通配符订阅,*代表单级通配符,#代表递归通配符

//测试代码
//通用代码:
func topicQueueMaker(chl *amqp.Channel) (q1 amqp.Queue, q2 amqp.Queue, q3 amqp.Queue) {
	chl.ExchangeDeclare(
		"top.ex1",
		"topic",//类型是topic
		true,
		false,
		false,
		false,
		nil,
	)
	q1, _ = chl.QueueDeclare(
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	q2, _ = chl.QueueDeclare(
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	q3, _ = chl.QueueDeclare(
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	chl.QueueBind(q1.Name,
		"top.gen",
		"top.ex1",
		false,
		nil,
	)
	chl.QueueBind(q2.Name,
		"top.wave",
		"top.ex1",
		false,
		nil,
	)
	chl.QueueBind(q3.Name,
		"top.*",//通配符订阅
		"top.ex1",
		false,
		nil,
	)
	return
}
//发布者
func topicSender() {
	conn, chl := createConn()
	defer conn.Close()
	defer chl.Close()
	topicQueueMaker(chl)
	ctx, cancel1 := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel1()
	chl.PublishWithContext(ctx,
		"top.ex1",
		"top.gen",
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte("原神牛批"),
		},
	)
	chl.PublishWithContext(ctx,
		"top.ex1",
		"top.wave",
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte("鸣潮牛批"),
		},
	)
}

//消费者
func topicReceiver() {
	conn, chl := createConn()
	defer conn.Close()
	defer chl.Close()
	q1, q2, q3 := topicQueueMaker(chl)
	msgs1, _ := chl.Consume(
		q1.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	msgs2, _ := chl.Consume(
		q2.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	msgs3, _ := chl.Consume(
		q3.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	go func() {
		for msg := range msgs1 {
			fmt.Printf("topicReceiver1 receive a msg: %s \n", string(msg.Body))
		}
	}()
	go func() {
		for msg := range msgs2 {
			fmt.Printf("topicReceiver2 receive a msg: %s \n", string(msg.Body))
		}
	}()
	go func() {
		for msg := range msgs3 {
			fmt.Printf("topicReceiver3 receive a msg: %s \n", string(msg.Body))
		}
	}()
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	<-ch
}
#测试输出结果:
PS D:\Prj\learn_prj\gf_demo_02\internal\service\rabbit> go test -v -run TestTopicReceiver
=== RUN   TestTopicReceiver
topicReceiver3 receive a msg: 原神牛批
topicReceiver1 receive a msg: 原神牛批
topicReceiver3 receive a msg: 鸣潮牛批
topicReceiver2 receive a msg: 鸣潮牛批
--- PASS: TestTopicReceiver (234.84s)
PASS
ok      gf_demo_02/internal/service/rabbit      235.972s

手动ack