跳转至

NATS

NATS是一个简单、安全、高性能的开源消息传递系统,适用于云本地应用、物联网消息传递和微服务架构。 NATS 服务器是用 Go 编程语言编写的,但与服务器交互的客户端库可用于数十种主要的编程语言。 NATS 支持 At Most Once**和**At Least Once 交付。 它可以在任何地方运行,从大型服务器和云实例,到边缘网关,甚至是物联网设备。

安装

要开始构建基于 NATS 的微服务,首先要安装所需的包:

$ npm i --save nats

概述

要使用 NATS 传输器,将以下选项对象传递给createMicroservice()方法:

1
2
3
4
5
6
7
8
9
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.NATS,
    options: {
      servers: ['nats://localhost:4222'],
    },
  },
);
1
2
3
4
5
6
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.NATS,
  options: {
    servers: ['nats://localhost:4222'],
  },
});

Hint

Transport 枚举是从@nestjs/microservices包导入的。

选项

options对象特定于所选的传输器。 NATS 传输器公开了此处所描述的属性. 此外,有一个queue属性,它允许你指定你的服务器应该订阅的队列的名称(保留undefined忽略这个设置)。 阅读下面关于 NATS 队列组的更多信息

客户端

与其他微服务传输器一样,创建 NATS ClientProxy实例有几个选项

创建实例的一种方法是使用ClientsModule。 要使用ClientsModule创建一个客户端实例,请导入它并使用register()方法传递一个选项对象,该对象具有上面createMicroservice()方法中显示的相同属性,以及一个name属性,用于作为注入令牌。 点击这里阅读更多关于“ClientsModule”的信息

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.NATS,
        options: {
          servers: ['nats://localhost:4222'],
        }
      },
    ]),
  ]
  ...
})

也可以使用其他创建客户端的选项(ClientProxyFactory@Client())。 你可以在这里阅读。

请求-响应

对于 请求-响应 消息样式(阅读更多),NATS 传输器不使用 NATS 内置的请求-应答机制。 相反,使用publish()方法在给定的主题上发布一个“请求”,该方法具有惟一的应答主题名称,应答者监听该主题并向应答主题发送响应。 回复主题被动态地定向回请求者,而不管任何一方的位置。

基于事件的

对于 基于事件的 消息样式(阅读更多), NATS 传输器使用 NATS 内置的发布-订阅机制。 发布者发送关于主题的消息,任何监听该主题的活动订阅者都会接收该消息。 订阅者还可以注册对通配符主题的兴趣,它的工作方式有点像正则表达式。 这种一对多模式有时称为扇出。

队列组

NATS 提供了一个称为分布式队列的内置负载平衡特性。 要创建队列订阅,使用 queue 属性如下所示:

main.ts
1
2
3
4
5
6
7
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.NATS,
  options: {
    servers: ['nats://localhost:4222'],
    queue: 'cats_queue',
  },
});

上下文

在更复杂的场景中,您可能希望访问关于传入请求的更多信息。 当使用 NATS 传输器时,您可以访问 NatsContext 对象。

1
2
3
4
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: NatsContext) {
  console.log(`Subject: ${context.getSubject()}`);
}
1
2
3
4
5
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
  console.log(`Subject: ${context.getSubject()}`);
}

Hint

@Payload()@Ctx()NatsContext 是从 @nestjs/microservices 包导入的。

通配符

订阅可以是对显式主题的订阅,也可以包括通配符。

1
2
3
4
5
@MessagePattern('time.us.*')
getDate(@Payload() data: number[], @Ctx() context: NatsContext) {
  console.log(`Subject: ${context.getSubject()}`); // e.g. "time.us.east"
  return new Date().toLocaleTimeString(...);
}
1
2
3
4
5
6
@Bind(Payload(), Ctx())
@MessagePattern('time.us.*')
getDate(data, context) {
  console.log(`Subject: ${context.getSubject()}`); // e.g. "time.us.east"
  return new Date().toLocaleTimeString(...);
}

记录构建

要配置消息选项,可以使用 NatsRecordBuilder 类(注意:这对基于事件的流也是可行的)。 例如,要添加 x-version 标头,使用 setHeaders 方法,如下所示:

1
2
3
4
5
6
7
8
import * as nats from 'nats';

// somewhere in your code
const headers = nats.headers();
headers.set('x-version', '1.0.0');

const record = new NatsRecordBuilder(':cat:').setHeaders(headers).build();
this.client.send('replace-emoji', record).subscribe(...);

Hint

NatsRecordBuilder 类从 @nestjs/microservices 包中导出。

你也可以在服务器端读取这些头文件,通过访问 NatsContext ,如下所示:

1
2
3
4
5
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: NatsContext): string {
  const headers = context.getHeaders();
  return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
1
2
3
4
5
6
@Bind(Payload(), Ctx())
@MessagePattern('replace-emoji')
replaceEmoji(data, context) {
  const headers = context.getHeaders();
  return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}

在某些情况下,你可能想为多个请求配置头信息,你可以将这些信息作为选项传递给 ClientProxyFactory :

import { Module } from '@nestjs/common';
import { ClientProxyFactory, Transport } from '@nestjs/microservices';

@Module({
  providers: [
    {
      provide: 'API_v1',
      useFactory: () =>
        ClientProxyFactory.create({
          transport: Transport.NATS,
          options: {
            servers: ['nats://localhost:4222'],
            headers: { 'x-version': '1.0.0' },
          },
        }),
    },
  ],
})
export class ApiModule {}