使用 NestJs 和 Redis 实现 WebSocket 集群

https://medium.com/@mohsenes/websocket-cluster-with-nestjs-and-redis-a18882d418ed

扩展是后端应用程序生活中不可避免的一部分,一旦你决定将你的应用程序扩展到多个实例, 你将面临处理多个客户端(手机、笔记本电脑等)的用户的问题,每个客户端都连接到你集群的一个随机实例。

在本文中,我们将定义这个问题,并使用NestJs和Redis围绕它制定解决方案。

要求

  • 有使用Nodejs和NestJs的经验

  • 安装Nodejs

  • 安装了NestJs CLI

  • 安装Redis

存在的问题

在WebSocket上发出的消息需要在连接到我们实例的接收方的每个设备上发送。

解决方案

我们将处理消息到多个实例使用Redis PubSub流。 为了在NestJs上完成这一点,我们将创建一个名为socket module的模块。 我们将设置一个网关来处理套接字客户端,并设置一个服务来进行发现和连接到Redis并分发消息。

我们将用循序渐进的指南来介绍这一点。 将NestJs CLI安装为一个全局包

# run with sudo if you are on ubuntu
npm i -g @nestjs/cli

创建新的NestJS项目,依赖项也将通过此命令通过向导安装

nest g socket-cluster-app

生成我们讨论的Socket模块

# go into project folder
cd socket-cluster-app/
# generate socket module
nest g module socket
# generate socket service
nest g service socket
# generate socket gateway
nest g gateway socket/socket

使用 nest g 命令会自动将你的服务和套接字添加到相应的模块中

安装WebSocket适配器

npm i @nestjs/platform-ws
npm i @nestjs/websockets

main.ts 文件中注册适配器

main.ts
 import { NestFactory } from '@nestjs/core';
 import { WsAdapter } from '@nestjs/platform-ws';
 import { AppModule } from './app.module';

 async function bootstrap() {
     const app = await NestFactory.create(AppModule);
     // register adapter
     app.useWebSocketAdapter(new WsAdapter(app) as any);
     await app.listen(parseInt(process.env['PORT'] , 10) || 3000);
 }
 bootstrap();

然后,我们将在 handleConnection 调用中标识每个套接字,并将 userId 属性放入每个客户机。 在本例中,我们将通过客户端发送的令牌 cookie 设置 userId。 在实际示例中,您需要验证令牌并通过查询数据库或某些身份验证服务将 userId 分配给客户机。

src/socket.gateway.ts
 import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway } from '@nestjs/websockets';

 @WebSocketGateway()
 export class SocketGateway implements OnGatewayConnection, OnGatewayDisconnect {

     public connectedSockets: { [key: string]: any[] } = {};

     async handleConnection(client: any, req: Request) {
         try {
             const token = req.headers['cookie']
                 .split(';')
                 .map(p => p.trim())
                 .find(p => p.split('=')[0] === 'token')
                 .split('=')[1];

             // for this example, we simply set userId by token
             client.userId = token;

             if (!this.connectedSockets[client.userId])
                 this.connectedSockets[client.userId] = [];

             this.connectedSockets[client.userId].push(client);
         } catch (error) {
             client.close(4403, 'set JWT cookie to authenticate');
         }
     }

     handleDisconnect(client: any) {
         this.connectedSockets[client.userId] = this.connectedSockets[client.userId].filter(p => p.id !== client.id);
     }
 }

现在我们需要实现套接字服务,我们需要一个Redis包来在实例之间分发消息。

npm i redis
npm i --save-dev @types/redis

套接字服务将有多个功能

  • constructor, 第0步是在构造函数方法中为我们的服务分配一个随机id,并注入我们在最后一步中实现的“SocketGateWay”。

    src/main.ts
    constructor(private readonly socketGateway: SocketGateway) {
       this.serviceId = 'SOCKET_CHANNEL_' + Math.random().toString(26).slice(2);
    }
    
  • onModuleInit: 此外,我们在套接字服务中实现 onModuleInit 功能,它将创建并连接到3个Redis客户端。

    • redisClient 用于通过发现通道更新服务密钥

    • subscriberClient 获取分布式消息

    • publisherClient 将消息分发到其他实例

    src/socket/socket.service.ts
    async onModuleInit() {
        this.redisClient = await this.newRedisClient();
        this.subscriberClient = await this.newRedisClient();
        this.publisherClient = await this.newRedisClient();
    
        this.subscriberClient.subscribe(this.serviceId);
    
        this.subscriberClient.on('message', (channel, message) => {
            const { userId, payload } = JSON.parse(message);
            this.sendMessage(userId, payload, true);
        });
    
        await this.channelDiscovery();
    }
    
    private async newRedisClient() {
        return createClient({
            host: 'localhost',
            port: 6379,
        });
    }
    

    备注

    createClient从“redis”包中导入

  • channelDiscovery: 将在Redis上保存其serviceId,过期时间为3秒。 它还将开始自重复超时,每2秒重新执行一次。 这样,所有实例都可以访问更新后的套接字服务列表,以便分发消息。 在测试此服务时,清除发现间隔超时是防止打开处理程序问题的好方法。

    src/socket/socket.service.ts
    private async channelDiscovery() {
        this.redisClient.setex(this.serviceId, 3, Date.now().toString());
        this.discoveryInterval = setTimeout(() => {
            this.channelDiscovery();
        }, 2000);
    }
    
    async onModuleDestroy() {
        this.discoveryInterval && clearTimeout(this.discoveryInterval);
    }
    
  • sendMessage 最后一步是向特定用户的每个连接的客户端发送消息。我们将消息发送到连接的客户端,并将此消息分发到其他实例。 if(!fromRedisChannel) 将阻止在消息已经被另一个实例分发的情况下分发。

    src/socket/socket.service.ts
    async sendMessage(userId: string, payload: string,fromRedisChannel: boolean) {
        this.socketGateway.connectedSockets[userId]?.forEach(socket =>
            socket.send(payload),
        );
        if (!fromRedisChannel) {
            this.redisClient.keys('SOCKET_CHANNEL_', (err, ids) => {
                ids.filter(p => p != this.serviceId).forEach(id => {
                    this.publisherClient.publish(
                        id,
                        JSON.stringify({
                            payload,
                            userId,
                        }),
                    );
                });
            });
        }
    }
    

测试场景

好了,我们完成了,现在我们可以设置我们的测试场景了。

首先,我们将创建一个简单的测试脚本,该脚本将连接到我们的一个实例并打印接收到的消息。

运行 npm i ws 安装 ws

const ws = require('ws');
const port = 3001;
const socket = new ws(`ws://localhost:${port}`, {
    headers: { Cookie: 'token=user1' },
});
socket.on('message', data => {
    console.log(`Received message`, data);
});
socket.on('open', data => {
    console.log(`Connected to port ${port}`);
});
socket.on('close', data => {
    console.log(`Disconnected from port ${port}`);
});

然后向套接字服务添加一个简单的间隔,用于向user1发送时间。

最后,依次运行以下命令

PORT=3001 npm start
PORT=3002 npm start
node test-script.js

测试脚本应该每3秒记录一个来自两个实例的消息。

# output
Received message 8:21:55 AM | from server on port 3001
Received message 8:21:57 AM | from server on port 3002

这表明,现在我们的服务能够将来自不同实例的WebSocket消息分发到特定的客户机。

我们在本文中所做的工作的完整示例可以在 https://github.com/m-esm/socket-cluster-app 上找到