美文网首页
基于NATS实现分布式通信模式

基于NATS实现分布式通信模式

作者: Go语言由浅入深 | 来源:发表于2022-05-04 13:41 被阅读0次

在分布式应用中经常需要实现服务间的通信,本文我们使用NATS消息中间件来实现服务间的不同通信方式。

准备工作

首先创建一个Go项目。注意:本文所介绍的例子运行在Linux/MacOS操作系统环境,但NATS也支持windows系统。

go mod init example

安装nats包:

go get  github.com/nats-io/nats.go/@latest

我们将使用以下目录结构:

.
├── cmd
│   ├── publish-subscribe
│   │   └── main.go
│   ├── request-reply
│   │   └── main.go
│   └── queue-groups
│       └── main.go
├── go.mod
└── go.sum

启动本地nats服务:

docker run -d --name nats-main -p 4222:4222 -p 6222:6222 -p 8222:8222 nats

发布订阅模式

发布订阅模式

NATS实现了消息的发布和订阅一对多模式。发布者在一个主题上发送消息,在该主题上的任何订阅者都可以收到消息。这种1:N一对多模式也称为:fan-out。
订阅者还可以在主题中使用通配符,优点类似正则表达式。例如:

  • foo.*可以匹配foo.bar和foo.baz。
  • fo o.*.bar匹配foo.a.bar和foo.b.bar。
  • foo.>匹配上面所有主题。
    消息大小有限制(在nats服务的max_payload配置参数中设置)。默认是1MB,但可以设置最大为64MB。但NATS开发团队推荐最大值设置小点比如8MB。

为什么需要这种通信模式?

发布订阅是很常见的使用场景,可以用于发送消息到不同的服务。

代码

我们在cmd/publish-subscribe/main.go文件中写该模式代码,首先初始化NATS客户端。

nc, err := nats.Connect(nats.DEFAULT_ENCODER)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

使用3个订阅者订阅foo主题,可以实现一个fan-out模式。

nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Subscribe 1:", string(msg.Data))
    })

    nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Subscribe 2:", string(msg.Data))
    })

    nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Subscribe 3:", string(msg.Data))
    })

向foo主题发布消息并等待。

if err := nc.Publish("foo", []byte("Here's some stuff")); err != nil {
        log.Fatal(err)
    }
    time.Sleep(2 * time.Second)

完整例子如下,NATS发布消息非常简单。

package main

import (
    "github.com/nats-io/nats.go"
    "log"
    "time"
)

func main() {
    nc, err := nats.Connect("nats://localhost:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Subscribe 1:", string(msg.Data))
    })

    nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Subscribe 2:", string(msg.Data))
    })

    nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Subscribe 3:", string(msg.Data))
    })

    if err := nc.Publish("foo", []byte("Here's some stuff")); err != nil {
        log.Fatal(err)
    }
    time.Sleep(2 * time.Second)
}

执行结果

如你所见,消息被发送到所有的订阅者。

go run cmd/publish-subscribe/main.go
2022/05/04 12:07:44 Subscribe 3: Here's some stuff
2022/05/04 12:07:44 Subscribe 1: Here's some stuff
2022/05/04 12:07:44 Subscribe 2: Here's some stuff

请求应答模式

请求应答模式

请求应答(Request-Reply)在分布式系统中也是很常见的通信模式。客户端发送一个请求,会在一定时间内异步等待接收应答消息。
NATS使请求-应答变得简单而强大,并支持一些强大的特性,比如位置透明、扩和缩容、可观察性等等。

为什么需要这种模式?

有时服务间需要一对一的通信,请求应答就非常适合。

代码

在cmd/reques-reply/main.go文件中写该模式的代码,还是以初始化NATS客户端代码开始:

nc, err := nats.Connect(nats.DefaultURL)

if err != nil {
    log.Fatalln(err)
}

defer nc.Close()

订阅foo主题,添加一些日志并对接收到消息时提供应答:

nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Request receive:", string(msg.Data))

        msg.Respond([]byte("Here you go"))
    })

我们还可以使用不同的应答主题,客户端可以向只响应特定请求者的服务发出请求,创建1对1的关系。
下面使用NATS客户端的Request方法。包含三个参数:主题、请求内容(字节数组)、请求超时时间。
以下是完整代码:

package main

import (
    "github.com/nats-io/nats.go"
    "log"
    "time"
)

func main() {
    nc, err := nats.Connect("nats://localhost:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Request receive:", string(msg.Data))

        msg.Respond([]byte("Here you go"))
    })

    reply, err := nc.Request("foo", []byte("Give me data"), 10*time.Second)
    if err != nil {
        log.Fatal(err)
    }
    log.Println("Got reply:", string(reply.Data))
}

执行结果

正如预期的那样,我们的请求收到了,订阅者用一些数据响应了请求。

$ go run cmd/request-reply/main.go
2022/05/04 12:25:21 Request receive: Give me data
2022/05/04 12:25:21 Got reply: Here you go
队列订阅模式
队列订阅模式

NATS提供了一种称为分布式队列功能内置负载平衡。使用队列订阅者将在一组订阅者之间平衡消息发送,这组订阅者可用于提供应用程序容错和扩展工作负载。

为什么需要这种模式

队列订阅是扩展服务的理想选择。扩展就和运行一个新应用程序一样简单,缩容可以向正在运行的应用发送信号来停止服务。这种灵活性和无需任何配置更改特点使NATS成为一种优秀的服务通信技术,可以与所有平台技术一起使用。NATS的一个重要特性是,队列组由应用程序及其队列订阅者组成,而不是在服务器端配置。

代码

要创建一个订阅队列,订阅者需要注册一个队列名。所有包含相同队列名的订阅者组成一个组。无需任何配置。当发布已注册主题的消息时,NATS服务从订阅组中随机选择一个成员接收消息。尽管队列组有多个订阅者,但每个消息只由一个订阅者消费。

我们在cmd/queue-groups/main.go文件中写代码,和前面例子一样先初始化NATS客户端。

nc, err := nats.Connect(nats.DefaultURL)

if err != nil {
    log.Fatalln(err)
}

defer nc.Close()

接下来创建主题为foo的3个队列订阅者,队列名为:queue.foo

nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
        log.Println("Subscribe 1:", string(msg.Data))
    })

    nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
        log.Println("Subscribe 2:", string(msg.Data))
    })

    nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
        log.Println("Subscribe 3:", string(msg.Data))
    })

最后,创建一个循环像foo主题发布不同的消息,可以看出订阅者是如何消费消息的。

for i:=1; i <= 3; i++{
        message := fmt.Sprintf("Message %d", i)

        if err := nc.Publish("foo", []byte(message)); err != nil {
            log.Fatal(err)
        }
    }

以下是完整代码:

package main

import (
    "fmt"
    "github.com/nats-io/nats.go"
    "log"
    "time"
)

func main() {
    nc, err := nats.Connect("nats://localhost:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
        log.Println("Subscribe 1:", string(msg.Data))
    })

    nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
        log.Println("Subscribe 2:", string(msg.Data))
    })

    nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
        log.Println("Subscribe 3:", string(msg.Data))
    })

    for i:=1; i <= 3; i++{
        message := fmt.Sprintf("Message %d", i)

        if err := nc.Publish("foo", []byte(message)); err != nil {
            log.Fatal(err)
        }
    }

    time.Sleep(2 * time.Second)
}

执行结果:

可以看到消息被随机地发送到不同的订阅者。因此,在某种程度上,NATS可以作为服务的7层负载均衡器。

$ go run cmd/queue-groups/main.go
2022/05/04 12:46:06 Subscribe 3: Message 1
2022/05/04 12:46:06 Subscribe 1: Message 3
2022/05/04 12:46:06 Subscribe 2: Message 2

总结

在本文中,我们研究了不同的通信模式,展示了NATS的实时分布式消息传递功能。此外,JetStream可以与这些模式结合使用,实现持久化消息传递和消息至少一次消费策略。

相关文章

网友评论

      本文标题:基于NATS实现分布式通信模式

      本文链接:https://www.haomeiwen.com/subject/vkmwyrtx.html