跳转至

RabbitMQ

RabbitMQ是一个开源的轻量级消息代理, 支持多种消息协议。 它可以部署在分布式和联合配置中,以满足大规模、高可用性需求。 此外,它是部署最广泛的消息代理,在全球范围内用于小型初创企业和大型企业。

安装

要开始构建基于 rabbitmq 的微服务,首先要安装所需的包:

$ npm i --save amqplib amqp-connection-manager

概述

要使用 RabbitMQ 传输器,需要将以下选项对象传递给 createMicroservice() 方法:

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.RMQ,
    options: {
      urls: ['amqp://localhost:5672'],
      queue: 'cats_queue',
      queueOptions: {
        durable: false,
      },
    },
  },
);
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.RMQ,
  options: {
    urls: ['amqp://localhost:5672'],
    queue: 'cats_queue',
    queueOptions: {
      durable: false,
    },
  },
});

Hint

Transport 枚举是从 @nestjs/microservices 包中导入的。

选项

options属性特定于所选的传输器。 RabbitMQ 传输器暴露了下面描述的属性。

urls 连接url
queue 您的服务器将侦听的队列名称
prefetchCount 设置通道的预取计数
isGlobalPrefetchCount 启用每个通道预取
noAck 如果false,则启用手动确认模式
queueOptions 额外的队列选项(读取更多这里)
socketOptions 额外的套接字选项(读取更多这里)
headers 标题将随每条消息一起发送

客户端

像其他微服务传输器一样,创建 RabbitMQ ClientProxy 实例有几个选项

创建实例的一种方法是使用ClientsModule。 要使用ClientsModule创建一个客户端实例,请导入它并使用register()方法传递一个选项对象,该对象具有上面createMicroservice()方法中显示的相同属性,以及一个name属性,用于作为注入令牌。 点击这里阅读更多关于 ClientsModule 的信息。

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'],
          queue: 'cats_queue',
          queueOptions: {
            durable: false
          },
        },
      },
    ]),
  ]
  ...
})

也可以使用其他创建客户端的选项(ClientProxyFactory@Client())。 你可以在这里阅读。

上下文

在更复杂的场景中,您可能希望访问关于传入请求的更多信息。 当使用 RabbitMQ 传输器时,你可以访问RmqContext对象。

1
2
3
4
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(`Pattern: ${context.getPattern()}`);
}
1
2
3
4
5
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
  console.log(`Pattern: ${context.getPattern()}`);
}

Hint

@Payload(), @Ctx()RmqContext@nestjs/microservices 包导入.

要访问原始的 RabbitMQ 消息 (with the properties, fields, and content), 使用RmqContext对象的getMessage()方法, 如下:

1
2
3
4
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage());
}
1
2
3
4
5
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
  console.log(context.getMessage());
}

获取对 RabbitMQ channel的引用, 使用RmqContext对象的getChannelRef方法,如下所示:

1
2
3
4
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getChannelRef());
}
1
2
3
4
5
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
  console.log(context.getChannelRef());
}

消息确认

为了确保消息永远不会丢失,RabbitMQ 支持消息确认. 一个确认信息被消费者发送回 RabbitMQ,告诉 RabbitMQ 已经收到并处理了一个特定的消息,并且 RabbitMQ 可以自由删除它。 如果一个使用者死了(它的通道被关闭,连接被关闭,或者 TCP 连接丢失)而没有发送一个 ack, RabbitMQ 将会理解一个消息没有被完全处理,并将它重新排队。

要启用手动确认模式,请将noAck属性设置为false:

1
2
3
4
5
6
7
8
options: {
  urls: ['amqp://localhost:5672'],
  queue: 'cats_queue',
  noAck: false,
  queueOptions: {
    durable: false
  },
},

当手动使用者确认被打开时,我们必须从工作者发送一个适当的确认,以表明我们完成了一个任务。

1
2
3
4
5
6
7
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();

  channel.ack(originalMsg);
}
1
2
3
4
5
6
7
8
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();

  channel.ack(originalMsg);
}

记录构建

要配置消息选项,您可以使用RmqRecordBuilder类(注意:这对于基于事件的流也是可行的)。 例如,要设置headerspriority属性,使用setOptions方法,如下所示:

const message = ':cat:';
const record = new RmqRecordBuilder(message)
  .setOptions({
    headers: {
      ['x-version']: '1.0.0',
    },
    priority: 3,
  })
  .build();

this.client.send('replace-emoji', record).subscribe(...);

Hint

RmqRecordBuilder类从@nestjs/microservices包中导出。

你也可以在服务器端读取这些值,通过访问RmqContext,如下所示:

1
2
3
4
5
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
  const { properties: { headers } } = context.getMessage();
  return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
1
2
3
4
5
6
@Bind(Payload(), Ctx())
@MessagePattern('replace-emoji')
replaceEmoji(data, context) {
  const { properties: { headers } } = context.getMessage();
  return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}