RabbitMQ¶
RabbitMQ是一个开源的轻量级消息代理, 支持多种消息协议。 它可以部署在分布式和联合配置中,以满足大规模、高可用性需求。 此外,它是部署最广泛的消息代理,在全球范围内用于小型初创企业和大型企业。
安装¶
要开始构建基于 rabbitmq 的微服务,首先要安装所需的包:
概述¶
要使用 RabbitMQ 传输器,需要将以下选项对象传递给 createMicroservice() 方法:
Hint
Transport 枚举是从 @nestjs/microservices 包中导入的。
选项¶
options属性特定于所选的传输器。
RabbitMQ 传输器暴露了下面描述的属性。
urls |
连接url |
queue |
您的服务器将侦听的队列名称 |
prefetchCount |
设置通道的预取计数 |
isGlobalPrefetchCount |
启用每个通道预取 |
noAck |
如果false,则启用手动确认模式 |
queueOptions |
额外的队列选项(读取更多这里) |
socketOptions |
额外的套接字选项(读取更多这里) |
headers |
标题将随每条消息一起发送 |
客户端¶
像其他微服务传输器一样,创建 RabbitMQ ClientProxy 实例有几个选项。
创建实例的一种方法是使用ClientsModule。
要使用ClientsModule创建一个客户端实例,请导入它并使用register()方法传递一个选项对象,该对象具有上面createMicroservice()方法中显示的相同属性,以及一个name属性,用于作为注入令牌。
点击这里阅读更多关于 ClientsModule 的信息。
也可以使用其他创建客户端的选项(ClientProxyFactory或@Client())。
你可以在这里阅读。
上下文¶
在更复杂的场景中,您可能希望访问关于传入请求的更多信息。
当使用 RabbitMQ 传输器时,你可以访问RmqContext对象。
Hint
@Payload(), @Ctx() 和 RmqContext 从 @nestjs/microservices 包导入.
要访问原始的 RabbitMQ 消息 (with the properties, fields, and content), 使用RmqContext对象的getMessage()方法, 如下:
获取对 RabbitMQ channel的引用, 使用RmqContext对象的getChannelRef方法,如下所示:
消息确认¶
为了确保消息永远不会丢失,RabbitMQ 支持消息确认. 一个确认信息被消费者发送回 RabbitMQ,告诉 RabbitMQ 已经收到并处理了一个特定的消息,并且 RabbitMQ 可以自由删除它。 如果一个使用者死了(它的通道被关闭,连接被关闭,或者 TCP 连接丢失)而没有发送一个 ack, RabbitMQ 将会理解一个消息没有被完全处理,并将它重新排队。
要启用手动确认模式,请将noAck属性设置为false:
当手动使用者确认被打开时,我们必须从工作者发送一个适当的确认,以表明我们完成了一个任务。
记录构建¶
要配置消息选项,您可以使用RmqRecordBuilder类(注意:这对于基于事件的流也是可行的)。
例如,要设置headers和priority属性,使用setOptions方法,如下所示:
Hint
RmqRecordBuilder类从@nestjs/microservices包中导出。
你也可以在服务器端读取这些值,通过访问RmqContext,如下所示: