diff --git a/周新忠学习笔记/4.10/4.10笔记.md b/周新忠学习笔记/4.10/4.10笔记.md new file mode 100644 index 0000000..a8f23bd --- /dev/null +++ b/周新忠学习笔记/4.10/4.10笔记.md @@ -0,0 +1,12 @@ +## GO读取配置文件 + + + +```go + +``` + + + +## 远程调用 + diff --git a/周新忠学习笔记/4.4/1.jpg b/周新忠学习笔记/4.4/1.jpg new file mode 100644 index 0000000..61b77de Binary files /dev/null and b/周新忠学习笔记/4.4/1.jpg differ diff --git a/周新忠学习笔记/4.4/2.jpg b/周新忠学习笔记/4.4/2.jpg new file mode 100644 index 0000000..c29d658 Binary files /dev/null and b/周新忠学习笔记/4.4/2.jpg differ diff --git a/周新忠学习笔记/4.4/3.jpg b/周新忠学习笔记/4.4/3.jpg new file mode 100644 index 0000000..7405c2d Binary files /dev/null and b/周新忠学习笔记/4.4/3.jpg differ diff --git a/周新忠学习笔记/4.4/4.jpg b/周新忠学习笔记/4.4/4.jpg new file mode 100644 index 0000000..7d040fb Binary files /dev/null and b/周新忠学习笔记/4.4/4.jpg differ diff --git a/周新忠学习笔记/4.9/4.8_4.9学习笔记.md b/周新忠学习笔记/4.9/4.8_4.9学习笔记.md new file mode 100644 index 0000000..e51677e --- /dev/null +++ b/周新忠学习笔记/4.9/4.8_4.9学习笔记.md @@ -0,0 +1,582 @@ + + +## RabbitMQ + +```shell +#go安装依赖 +go get github.com/rabbitmq/qmqp091-go +#docker 安装延迟插件(从系统copy) +docker cp rabbitmq-delayed-message-exchange-4.1.0.ez rabbitmq:/plugins +#启用延迟插件 +rabbitmq-plugins enable rabbitmq_delayed_message_exch +#docker启动 +docker start my-rabbit # 容器名 + +``` + +### 创建连接 + +```go +conn, err := amqp.Dial("amqp://url") //连接mq + if err != nil { + panic(err) + } +chl, err := conn.Channel() //获取管道 + if err != nil { + panic(err) + } + defer chl.Close() +``` + +### 声明队列 + +```go +queue, err := chl.QueueDeclare( + "queue1", //队列名 + true, //是否持久化 + false, //是否自动删除 + false, //是否排他 + false, //是否阻塞 + amqp.Table{ //队列选项 + amqp.QueueTypeArg:amqp.QueueTypeQuorum, + }, + ) +``` + +#### 队列选项 + +```go +"x-message-ttl" //消息过期时间 +"x-max-length" //队列最大长度 +"x-max-priority" //优先级 +"x-dead-letter-exchange" //死信交换机 +"x-dead-letter-routing-key" //死信路由键 +//还有几个枚举: +amqp.QueueTypeClassic//经典队列 +amqp.QueueTypeQuorum//仲裁队列(分布式) +amqp.QueueTypeStream//流式队列(抄袭kafka) +//抛弃策略(有点像连接池啊) +QueueOVerflowDropHead//喜新厌旧 +QueueOVerflowRejectPublish//拒收新的 +QueueOVerflowRejectPublishDLX//新的扔进死信队列 +``` + +### 发送消息 + +```go +ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) +//上下文控制消息超时时间 + defer cancel() + body := "测试信息" + chl.PublishWithContext(ctx, + "",//交换机名 + queue.Name,//路由键 + false,//强制性设置 + false,//及时性 + amqp.Publishing{ + ContentType: "text/plain",//格式 + Body: []byte(body),//发送的消息内容 + }) +``` + +### 消费消息 + +```go +func consumeMsg1() { + //为防止创建消费者时队列还没被创建,应该在创建消费者前也创建一遍队列 + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + if err != nil { + panic(err) + } + defer conn.Close() + chl, err := conn.Channel() + if err != nil { + panic(err) + } + defer chl.Close() + queue, err := chl.QueueDeclare( + "test.queue1", + true, + false, + false, + false, + amqp.Table{ + amqp.QueueTypeArg: amqp.QueueTypeQuorum, + }, + ) + msgs, err := chl.Consume( + queue.Name,//队列名 + "test.consumer1",//消费者标识,留空则自动生成 + true,//auto-ack + false,//是否排他 + false,//no-local 设置为true则不允许同一连接中的消费者消费发布者的消息 + false,//是否不等待服务器响应 + nil,//amqp.Table + )//返回值是一个 Delievery管道 + go func() { + for msg := range msgs { + println("收到信息:", string(msg.Body)) + } + }() + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + <-ch + println("关闭线程") + close(ch) +} +``` + +### 工作队列 + +```go +//工作队列只要设置多个消费者消费同一个队列即可,默认使用轮询消费 +//测试代码: +//生产者 +func sendBatchMsg1() { + conn, chl := createConn() + defer conn.Close() + defer chl.Close() + queue, err := chl.QueueDeclare( + "test.queue1", + true, + false, + false, + false, + amqp.Table{ + amqp.QueueTypeArg: amqp.QueueTypeQuorum, + }, + ) + if err != nil { + panic(err) + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + for i := 0; i < 10; i++ { + body := "发送消息第" + strconv.Itoa(i+1) + "条" + chl.PublishWithContext(ctx, "", queue.Name, false, false, amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(body), + }) + time.Sleep(1 * time.Second) + } +} +//消费者 +func WorkReceiver() { + conn, chl := createConn() + defer conn.Close() + defer chl.Close() + queue, err := chl.QueueDeclare( + "test.queue1", + true, + false, + false, + false, + amqp.Table{ + amqp.QueueTypeArg: amqp.QueueTypeQuorum, + }, + ) + if err != nil { + panic(err) + } + for i := 0; i < 3; i++ {//开三个消费者 + go func(i1 int) { + msgs, err2 := chl.Consume( + queue.Name, + "worker"+strconv.Itoa(i1), + true, + false, + false, + false, + amqp.Table{}, + ) + if err2 != nil { + panic(err2) + } + for msg := range msgs { + println("worker "+strconv.Itoa(i1)+"receive a msg:", string(msg.Body)) + } + }(i) + } + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + <-ch +} +``` + + + +### 发布订阅模式 + +```go +//测试代码: +//公共代码: +func createSubscribeQueue(chl *amqp.Channel) { + err := chl.ExchangeDeclare( + "sub.genshinImpact", + "fanout", + false, + false, + false, + false, + nil, + ) + if err != nil { + panic(err) + } + err = chl.ExchangeDeclare( + "sub.wutheringWaves", + "fanout", + false, + false, + false, + false, + nil, + ) + if err != nil { + panic(err) + } + chl.QueueDeclare( + "test.genshinImpact", + true, + false, + false, + false, + nil, + ) + chl.QueueBind(//绑定队列和交换机 + "test.genshinImpact",//队列名 + "",//路由key会被忽略 + "sub.genshinImpact",//交换机名 + false, + nil, + ) + chl.QueueDeclare( + "test.wutheringWaves", + true, + false, + false, + false, + nil, + ) + chl.QueueBind( + "test.wutheringWaves", + "", + "sub.wutheringWaves", + false, + nil, + ) + chl.QueueDeclare( + "test.both", + true, + false, + false, + false, + nil, + ) + chl.QueueBind( + "test.both", + "", + "sub.wutheringWaves", + false, + nil, + ) + chl.QueueBind( + "test.both", + "", + "sub.genshinImpact", + false, + nil, + ) +} +//生产者 +func subscribeMsg1() { + conn, chl := createConn() + defer conn.Close() + defer chl.Close() + createSubscribeQueue(chl) + ctx1, cancel1 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel1() + body1 := "原神牛逼" + err := chl.PublishWithContext(ctx1, + "sub.genshinImpact", + "", + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(body1), + }, + ) + if err != nil { + panic(err) + } + ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel2() + body2 := "鸣潮牛逼" + err = chl.PublishWithContext(ctx2, + "sub.wutheringWaves", + "", + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(body2), + }, + ) + if err != nil { + panic(err) + } +} +//消费者 +func subscriber() { + conn, chl := createConn() + defer conn.Close() + defer chl.Close() + createSubscribeQueue(chl) + msgs, err := chl.Consume( + "test.genshinImpact", + "", + true, + false, + false, + false, + nil, + ) + if err != nil { + log.Fatal(err) + } + go func() { + for msg := range msgs { + fmt.Printf("subscriber1 receive a msg: %s \n", string(msg.Body)) + } + }() + msgs1, err := chl.Consume( + "test.wutheringWaves", + "", + true, + false, + false, + false, + nil, + ) + if err != nil { + log.Fatal(err) + } + go func() { + for msg := range msgs1 { + fmt.Printf("subscriber2 receive a msg: %s \n", string(msg.Body)) + } + }() + msgs2, err := chl.Consume( + "test.both", + "", + true, + false, + false, + false, + nil, + ) + if err != nil { + log.Fatal(err) + } + go func() { + for msg := range msgs2 { + fmt.Printf("subscriber3 receive a msg: %s \n", string(msg.Body)) + } + }() + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + <-ch +} + + +``` + +```powershell +#测试结果: +PS D:\Prj\learn_prj\gf_demo_02\internal\service\rabbit> go test -v -run TestSubscriber +=== RUN TestSubscriber +subscriber3 receive a msg: 原神牛逼 +subscriber1 receive a msg: 原神牛逼 +subscriber2 receive a msg: 鸣潮牛逼 +subscriber3 receive a msg: 鸣潮牛逼 +--- PASS: TestSubscriber (108.40s) +PASS +ok gf_demo_02/internal/service/rabbit 109.396s +``` + +### 路由模式 + +```go +//和发布订阅大同小异,代码略 +//区别: +1.交换机类型是direct +2.Bind和Publisher需要指定路由key +``` + + + +### 主题模式 + +主题模式用topic名替代路由Key + +可用通配符订阅,*代表单级通配符,#代表递归通配符 + +```go +//测试代码 +//通用代码: +func topicQueueMaker(chl *amqp.Channel) (q1 amqp.Queue, q2 amqp.Queue, q3 amqp.Queue) { + chl.ExchangeDeclare( + "top.ex1", + "topic",//类型是topic + true, + false, + false, + false, + nil, + ) + q1, _ = chl.QueueDeclare( + "", + true, + false, + false, + false, + nil, + ) + q2, _ = chl.QueueDeclare( + "", + true, + false, + false, + false, + nil, + ) + q3, _ = chl.QueueDeclare( + "", + true, + false, + false, + false, + nil, + ) + chl.QueueBind(q1.Name, + "top.gen", + "top.ex1", + false, + nil, + ) + chl.QueueBind(q2.Name, + "top.wave", + "top.ex1", + false, + nil, + ) + chl.QueueBind(q3.Name, + "top.*",//通配符订阅 + "top.ex1", + false, + nil, + ) + return +} +//发布者 +func topicSender() { + conn, chl := createConn() + defer conn.Close() + defer chl.Close() + topicQueueMaker(chl) + ctx, cancel1 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel1() + chl.PublishWithContext(ctx, + "top.ex1", + "top.gen", + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte("原神牛批"), + }, + ) + chl.PublishWithContext(ctx, + "top.ex1", + "top.wave", + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte("鸣潮牛批"), + }, + ) +} + +//消费者 +func topicReceiver() { + conn, chl := createConn() + defer conn.Close() + defer chl.Close() + q1, q2, q3 := topicQueueMaker(chl) + msgs1, _ := chl.Consume( + q1.Name, + "", + true, + false, + false, + false, + nil, + ) + msgs2, _ := chl.Consume( + q2.Name, + "", + true, + false, + false, + false, + nil, + ) + msgs3, _ := chl.Consume( + q3.Name, + "", + true, + false, + false, + false, + nil, + ) + go func() { + for msg := range msgs1 { + fmt.Printf("topicReceiver1 receive a msg: %s \n", string(msg.Body)) + } + }() + go func() { + for msg := range msgs2 { + fmt.Printf("topicReceiver2 receive a msg: %s \n", string(msg.Body)) + } + }() + go func() { + for msg := range msgs3 { + fmt.Printf("topicReceiver3 receive a msg: %s \n", string(msg.Body)) + } + }() + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + <-ch +} +``` + +```powershell +#测试输出结果: +PS D:\Prj\learn_prj\gf_demo_02\internal\service\rabbit> go test -v -run TestTopicReceiver +=== RUN TestTopicReceiver +topicReceiver3 receive a msg: 原神牛批 +topicReceiver1 receive a msg: 原神牛批 +topicReceiver3 receive a msg: 鸣潮牛批 +topicReceiver2 receive a msg: 鸣潮牛批 +--- PASS: TestTopicReceiver (234.84s) +PASS +ok gf_demo_02/internal/service/rabbit 235.972s +``` + + + +### 手动ack +