2 Commits
7661c714a1
...
7ca828fc54
| Author | SHA1 | Message | Date |
|---|---|---|---|
|
|
7ca828fc54 |
Merge branch 'zhouxinzhong/feature-20260327150147-学习笔记' into milestone-20260325-学习笔记
|
5 days ago |
|
|
6f243d6d73 |
4.9提交笔记
|
5 days ago |
6 changed files with 594 additions and 0 deletions
-
12周新忠学习笔记/4.10/4.10笔记.md
-
BIN周新忠学习笔记/4.4/1.jpg
-
BIN周新忠学习笔记/4.4/2.jpg
-
BIN周新忠学习笔记/4.4/3.jpg
-
BIN周新忠学习笔记/4.4/4.jpg
-
582周新忠学习笔记/4.9/4.8_4.9学习笔记.md
@ -0,0 +1,12 @@ |
|||||
|
## GO读取配置文件 |
||||
|
|
||||
|
|
||||
|
|
||||
|
```go |
||||
|
|
||||
|
``` |
||||
|
|
||||
|
|
||||
|
|
||||
|
## 远程调用 |
||||
|
|
||||
|
After Width: 1080 | Height: 1920 | Size: 1.3 MiB |
|
After Width: 1080 | Height: 1920 | Size: 1.2 MiB |
|
After Width: 1080 | Height: 1920 | Size: 1.3 MiB |
|
After Width: 1080 | Height: 1920 | Size: 1.3 MiB |
@ -0,0 +1,582 @@ |
|||||
|
|
||||
|
|
||||
|
## RabbitMQ |
||||
|
|
||||
|
```shell |
||||
|
#go安装依赖 |
||||
|
go get github.com/rabbitmq/qmqp091-go |
||||
|
#docker 安装延迟插件(从系统copy) |
||||
|
docker cp rabbitmq-delayed-message-exchange-4.1.0.ez rabbitmq:/plugins |
||||
|
#启用延迟插件 |
||||
|
rabbitmq-plugins enable rabbitmq_delayed_message_exch |
||||
|
#docker启动 |
||||
|
docker start my-rabbit # 容器名 |
||||
|
|
||||
|
``` |
||||
|
|
||||
|
### 创建连接 |
||||
|
|
||||
|
```go |
||||
|
conn, err := amqp.Dial("amqp://url") //连接mq |
||||
|
if err != nil { |
||||
|
panic(err) |
||||
|
} |
||||
|
chl, err := conn.Channel() //获取管道 |
||||
|
if err != nil { |
||||
|
panic(err) |
||||
|
} |
||||
|
defer chl.Close() |
||||
|
``` |
||||
|
|
||||
|
### 声明队列 |
||||
|
|
||||
|
```go |
||||
|
queue, err := chl.QueueDeclare( |
||||
|
"queue1", //队列名 |
||||
|
true, //是否持久化 |
||||
|
false, //是否自动删除 |
||||
|
false, //是否排他 |
||||
|
false, //是否阻塞 |
||||
|
amqp.Table{ //队列选项 |
||||
|
amqp.QueueTypeArg:amqp.QueueTypeQuorum, |
||||
|
}, |
||||
|
) |
||||
|
``` |
||||
|
|
||||
|
#### 队列选项 |
||||
|
|
||||
|
```go |
||||
|
"x-message-ttl" //消息过期时间 |
||||
|
"x-max-length" //队列最大长度 |
||||
|
"x-max-priority" //优先级 |
||||
|
"x-dead-letter-exchange" //死信交换机 |
||||
|
"x-dead-letter-routing-key" //死信路由键 |
||||
|
//还有几个枚举: |
||||
|
amqp.QueueTypeClassic//经典队列 |
||||
|
amqp.QueueTypeQuorum//仲裁队列(分布式) |
||||
|
amqp.QueueTypeStream//流式队列(抄袭kafka) |
||||
|
//抛弃策略(有点像连接池啊) |
||||
|
QueueOVerflowDropHead//喜新厌旧 |
||||
|
QueueOVerflowRejectPublish//拒收新的 |
||||
|
QueueOVerflowRejectPublishDLX//新的扔进死信队列 |
||||
|
``` |
||||
|
|
||||
|
### 发送消息 |
||||
|
|
||||
|
```go |
||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
||||
|
//上下文控制消息超时时间 |
||||
|
defer cancel() |
||||
|
body := "测试信息" |
||||
|
chl.PublishWithContext(ctx, |
||||
|
"",//交换机名 |
||||
|
queue.Name,//路由键 |
||||
|
false,//强制性设置 |
||||
|
false,//及时性 |
||||
|
amqp.Publishing{ |
||||
|
ContentType: "text/plain",//格式 |
||||
|
Body: []byte(body),//发送的消息内容 |
||||
|
}) |
||||
|
``` |
||||
|
|
||||
|
### 消费消息 |
||||
|
|
||||
|
```go |
||||
|
func consumeMsg1() { |
||||
|
//为防止创建消费者时队列还没被创建,应该在创建消费者前也创建一遍队列 |
||||
|
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") |
||||
|
if err != nil { |
||||
|
panic(err) |
||||
|
} |
||||
|
defer conn.Close() |
||||
|
chl, err := conn.Channel() |
||||
|
if err != nil { |
||||
|
panic(err) |
||||
|
} |
||||
|
defer chl.Close() |
||||
|
queue, err := chl.QueueDeclare( |
||||
|
"test.queue1", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
amqp.Table{ |
||||
|
amqp.QueueTypeArg: amqp.QueueTypeQuorum, |
||||
|
}, |
||||
|
) |
||||
|
msgs, err := chl.Consume( |
||||
|
queue.Name,//队列名 |
||||
|
"test.consumer1",//消费者标识,留空则自动生成 |
||||
|
true,//auto-ack |
||||
|
false,//是否排他 |
||||
|
false,//no-local 设置为true则不允许同一连接中的消费者消费发布者的消息 |
||||
|
false,//是否不等待服务器响应 |
||||
|
nil,//amqp.Table |
||||
|
)//返回值是一个 Delievery管道 |
||||
|
go func() { |
||||
|
for msg := range msgs { |
||||
|
println("收到信息:", string(msg.Body)) |
||||
|
} |
||||
|
}() |
||||
|
ch := make(chan os.Signal, 1) |
||||
|
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) |
||||
|
<-ch |
||||
|
println("关闭线程") |
||||
|
close(ch) |
||||
|
} |
||||
|
``` |
||||
|
|
||||
|
### 工作队列 |
||||
|
|
||||
|
```go |
||||
|
//工作队列只要设置多个消费者消费同一个队列即可,默认使用轮询消费 |
||||
|
//测试代码: |
||||
|
//生产者 |
||||
|
func sendBatchMsg1() { |
||||
|
conn, chl := createConn() |
||||
|
defer conn.Close() |
||||
|
defer chl.Close() |
||||
|
queue, err := chl.QueueDeclare( |
||||
|
"test.queue1", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
amqp.Table{ |
||||
|
amqp.QueueTypeArg: amqp.QueueTypeQuorum, |
||||
|
}, |
||||
|
) |
||||
|
if err != nil { |
||||
|
panic(err) |
||||
|
} |
||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
||||
|
defer cancel() |
||||
|
for i := 0; i < 10; i++ { |
||||
|
body := "发送消息第" + strconv.Itoa(i+1) + "条" |
||||
|
chl.PublishWithContext(ctx, "", queue.Name, false, false, amqp.Publishing{ |
||||
|
ContentType: "text/plain", |
||||
|
Body: []byte(body), |
||||
|
}) |
||||
|
time.Sleep(1 * time.Second) |
||||
|
} |
||||
|
} |
||||
|
//消费者 |
||||
|
func WorkReceiver() { |
||||
|
conn, chl := createConn() |
||||
|
defer conn.Close() |
||||
|
defer chl.Close() |
||||
|
queue, err := chl.QueueDeclare( |
||||
|
"test.queue1", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
amqp.Table{ |
||||
|
amqp.QueueTypeArg: amqp.QueueTypeQuorum, |
||||
|
}, |
||||
|
) |
||||
|
if err != nil { |
||||
|
panic(err) |
||||
|
} |
||||
|
for i := 0; i < 3; i++ {//开三个消费者 |
||||
|
go func(i1 int) { |
||||
|
msgs, err2 := chl.Consume( |
||||
|
queue.Name, |
||||
|
"worker"+strconv.Itoa(i1), |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
amqp.Table{}, |
||||
|
) |
||||
|
if err2 != nil { |
||||
|
panic(err2) |
||||
|
} |
||||
|
for msg := range msgs { |
||||
|
println("worker "+strconv.Itoa(i1)+"receive a msg:", string(msg.Body)) |
||||
|
} |
||||
|
}(i) |
||||
|
} |
||||
|
ch := make(chan os.Signal, 1) |
||||
|
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) |
||||
|
<-ch |
||||
|
} |
||||
|
``` |
||||
|
|
||||
|
|
||||
|
|
||||
|
### 发布订阅模式 |
||||
|
|
||||
|
```go |
||||
|
//测试代码: |
||||
|
//公共代码: |
||||
|
func createSubscribeQueue(chl *amqp.Channel) { |
||||
|
err := chl.ExchangeDeclare( |
||||
|
"sub.genshinImpact", |
||||
|
"fanout", |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
if err != nil { |
||||
|
panic(err) |
||||
|
} |
||||
|
err = chl.ExchangeDeclare( |
||||
|
"sub.wutheringWaves", |
||||
|
"fanout", |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
if err != nil { |
||||
|
panic(err) |
||||
|
} |
||||
|
chl.QueueDeclare( |
||||
|
"test.genshinImpact", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
chl.QueueBind(//绑定队列和交换机 |
||||
|
"test.genshinImpact",//队列名 |
||||
|
"",//路由key会被忽略 |
||||
|
"sub.genshinImpact",//交换机名 |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
chl.QueueDeclare( |
||||
|
"test.wutheringWaves", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
chl.QueueBind( |
||||
|
"test.wutheringWaves", |
||||
|
"", |
||||
|
"sub.wutheringWaves", |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
chl.QueueDeclare( |
||||
|
"test.both", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
chl.QueueBind( |
||||
|
"test.both", |
||||
|
"", |
||||
|
"sub.wutheringWaves", |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
chl.QueueBind( |
||||
|
"test.both", |
||||
|
"", |
||||
|
"sub.genshinImpact", |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
} |
||||
|
//生产者 |
||||
|
func subscribeMsg1() { |
||||
|
conn, chl := createConn() |
||||
|
defer conn.Close() |
||||
|
defer chl.Close() |
||||
|
createSubscribeQueue(chl) |
||||
|
ctx1, cancel1 := context.WithTimeout(context.Background(), 5*time.Second) |
||||
|
defer cancel1() |
||||
|
body1 := "原神牛逼" |
||||
|
err := chl.PublishWithContext(ctx1, |
||||
|
"sub.genshinImpact", |
||||
|
"", |
||||
|
false, |
||||
|
false, |
||||
|
amqp.Publishing{ |
||||
|
ContentType: "text/plain", |
||||
|
Body: []byte(body1), |
||||
|
}, |
||||
|
) |
||||
|
if err != nil { |
||||
|
panic(err) |
||||
|
} |
||||
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) |
||||
|
defer cancel2() |
||||
|
body2 := "鸣潮牛逼" |
||||
|
err = chl.PublishWithContext(ctx2, |
||||
|
"sub.wutheringWaves", |
||||
|
"", |
||||
|
false, |
||||
|
false, |
||||
|
amqp.Publishing{ |
||||
|
ContentType: "text/plain", |
||||
|
Body: []byte(body2), |
||||
|
}, |
||||
|
) |
||||
|
if err != nil { |
||||
|
panic(err) |
||||
|
} |
||||
|
} |
||||
|
//消费者 |
||||
|
func subscriber() { |
||||
|
conn, chl := createConn() |
||||
|
defer conn.Close() |
||||
|
defer chl.Close() |
||||
|
createSubscribeQueue(chl) |
||||
|
msgs, err := chl.Consume( |
||||
|
"test.genshinImpact", |
||||
|
"", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
if err != nil { |
||||
|
log.Fatal(err) |
||||
|
} |
||||
|
go func() { |
||||
|
for msg := range msgs { |
||||
|
fmt.Printf("subscriber1 receive a msg: %s \n", string(msg.Body)) |
||||
|
} |
||||
|
}() |
||||
|
msgs1, err := chl.Consume( |
||||
|
"test.wutheringWaves", |
||||
|
"", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
if err != nil { |
||||
|
log.Fatal(err) |
||||
|
} |
||||
|
go func() { |
||||
|
for msg := range msgs1 { |
||||
|
fmt.Printf("subscriber2 receive a msg: %s \n", string(msg.Body)) |
||||
|
} |
||||
|
}() |
||||
|
msgs2, err := chl.Consume( |
||||
|
"test.both", |
||||
|
"", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
if err != nil { |
||||
|
log.Fatal(err) |
||||
|
} |
||||
|
go func() { |
||||
|
for msg := range msgs2 { |
||||
|
fmt.Printf("subscriber3 receive a msg: %s \n", string(msg.Body)) |
||||
|
} |
||||
|
}() |
||||
|
ch := make(chan os.Signal, 1) |
||||
|
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) |
||||
|
<-ch |
||||
|
} |
||||
|
|
||||
|
|
||||
|
``` |
||||
|
|
||||
|
```powershell |
||||
|
#测试结果: |
||||
|
PS D:\Prj\learn_prj\gf_demo_02\internal\service\rabbit> go test -v -run TestSubscriber |
||||
|
=== RUN TestSubscriber |
||||
|
subscriber3 receive a msg: 原神牛逼 |
||||
|
subscriber1 receive a msg: 原神牛逼 |
||||
|
subscriber2 receive a msg: 鸣潮牛逼 |
||||
|
subscriber3 receive a msg: 鸣潮牛逼 |
||||
|
--- PASS: TestSubscriber (108.40s) |
||||
|
PASS |
||||
|
ok gf_demo_02/internal/service/rabbit 109.396s |
||||
|
``` |
||||
|
|
||||
|
### 路由模式 |
||||
|
|
||||
|
```go |
||||
|
//和发布订阅大同小异,代码略 |
||||
|
//区别: |
||||
|
1.交换机类型是direct |
||||
|
2.Bind和Publisher需要指定路由key |
||||
|
``` |
||||
|
|
||||
|
|
||||
|
|
||||
|
### 主题模式 |
||||
|
|
||||
|
主题模式用topic名替代路由Key |
||||
|
|
||||
|
可用通配符订阅,*代表单级通配符,#代表递归通配符 |
||||
|
|
||||
|
```go |
||||
|
//测试代码 |
||||
|
//通用代码: |
||||
|
func topicQueueMaker(chl *amqp.Channel) (q1 amqp.Queue, q2 amqp.Queue, q3 amqp.Queue) { |
||||
|
chl.ExchangeDeclare( |
||||
|
"top.ex1", |
||||
|
"topic",//类型是topic |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
q1, _ = chl.QueueDeclare( |
||||
|
"", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
q2, _ = chl.QueueDeclare( |
||||
|
"", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
q3, _ = chl.QueueDeclare( |
||||
|
"", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
chl.QueueBind(q1.Name, |
||||
|
"top.gen", |
||||
|
"top.ex1", |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
chl.QueueBind(q2.Name, |
||||
|
"top.wave", |
||||
|
"top.ex1", |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
chl.QueueBind(q3.Name, |
||||
|
"top.*",//通配符订阅 |
||||
|
"top.ex1", |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
return |
||||
|
} |
||||
|
//发布者 |
||||
|
func topicSender() { |
||||
|
conn, chl := createConn() |
||||
|
defer conn.Close() |
||||
|
defer chl.Close() |
||||
|
topicQueueMaker(chl) |
||||
|
ctx, cancel1 := context.WithTimeout(context.Background(), 5*time.Second) |
||||
|
defer cancel1() |
||||
|
chl.PublishWithContext(ctx, |
||||
|
"top.ex1", |
||||
|
"top.gen", |
||||
|
false, |
||||
|
false, |
||||
|
amqp.Publishing{ |
||||
|
ContentType: "text/plain", |
||||
|
Body: []byte("原神牛批"), |
||||
|
}, |
||||
|
) |
||||
|
chl.PublishWithContext(ctx, |
||||
|
"top.ex1", |
||||
|
"top.wave", |
||||
|
false, |
||||
|
false, |
||||
|
amqp.Publishing{ |
||||
|
ContentType: "text/plain", |
||||
|
Body: []byte("鸣潮牛批"), |
||||
|
}, |
||||
|
) |
||||
|
} |
||||
|
|
||||
|
//消费者 |
||||
|
func topicReceiver() { |
||||
|
conn, chl := createConn() |
||||
|
defer conn.Close() |
||||
|
defer chl.Close() |
||||
|
q1, q2, q3 := topicQueueMaker(chl) |
||||
|
msgs1, _ := chl.Consume( |
||||
|
q1.Name, |
||||
|
"", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
msgs2, _ := chl.Consume( |
||||
|
q2.Name, |
||||
|
"", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
msgs3, _ := chl.Consume( |
||||
|
q3.Name, |
||||
|
"", |
||||
|
true, |
||||
|
false, |
||||
|
false, |
||||
|
false, |
||||
|
nil, |
||||
|
) |
||||
|
go func() { |
||||
|
for msg := range msgs1 { |
||||
|
fmt.Printf("topicReceiver1 receive a msg: %s \n", string(msg.Body)) |
||||
|
} |
||||
|
}() |
||||
|
go func() { |
||||
|
for msg := range msgs2 { |
||||
|
fmt.Printf("topicReceiver2 receive a msg: %s \n", string(msg.Body)) |
||||
|
} |
||||
|
}() |
||||
|
go func() { |
||||
|
for msg := range msgs3 { |
||||
|
fmt.Printf("topicReceiver3 receive a msg: %s \n", string(msg.Body)) |
||||
|
} |
||||
|
}() |
||||
|
ch := make(chan os.Signal, 1) |
||||
|
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) |
||||
|
<-ch |
||||
|
} |
||||
|
``` |
||||
|
|
||||
|
```powershell |
||||
|
#测试输出结果: |
||||
|
PS D:\Prj\learn_prj\gf_demo_02\internal\service\rabbit> go test -v -run TestTopicReceiver |
||||
|
=== RUN TestTopicReceiver |
||||
|
topicReceiver3 receive a msg: 原神牛批 |
||||
|
topicReceiver1 receive a msg: 原神牛批 |
||||
|
topicReceiver3 receive a msg: 鸣潮牛批 |
||||
|
topicReceiver2 receive a msg: 鸣潮牛批 |
||||
|
--- PASS: TestTopicReceiver (234.84s) |
||||
|
PASS |
||||
|
ok gf_demo_02/internal/service/rabbit 235.972s |
||||
|
``` |
||||
|
|
||||
|
|
||||
|
|
||||
|
### 手动ack |
||||
|
|
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue