跳转至

bull

https://github.com/OptimalBits/bull

请在📻Twitter上关注我,了解重要的新闻和更新。 你可以在这个博客中找到教程和新闻: 🛠 教程

Bull 特性

  • [x] 最小的 CPU 使用率,由于无轮询设计。
  • [x] 基于 Redis 的稳健设计。
  • [x] 延迟的工作。
  • [x] 根据 cron 规范安排和重复作业。
  • [x] 对工作的率限制。
  • [x] 重试。
  • [x] 优先级。
  • [x] 并发性。
  • [x] 暂停/恢复-全局或本地。
  • [x] 每个队列有多个作业类型。
  • [x] 线程(沙盒)处理函数。
  • [x] 从进程崩溃中自动恢复。

接下来是路线图…

  • [ ] 作业完成确认(同时可以使用消息队列pattern)。
  • [ ] 父子的工作关系。

UIs

你可以使用一些第三方 ui 来进行监控:

BullMQ

Bull v3

Bull <= v2


监测和报警


特征比较

由于有一些作业队列解决方案,这里有一个表比较它们:

Feature Bullmq-Pro Bullmq Bull Kue Bee Agenda
后端 redis redis redis redis redis mongo
观察
组速率限制
集群支持
父/子依赖关系
优先级
并发性
演示工作
全局事件
速度限制器
暂停/恢复
沙箱工人
可重复的工作
原子操作
持久性
用户界面
优化了 Jobs / Messages Jobs / Messages Jobs / Messages Jobs Messages Jobs

安装

Bash
npm install bull --save

或者

Bash
yarn add bull

要求: Bull 需要大于或等于' 2.8.18 '的 Redis 版本。

Typescript 定义

Bash
npm install @types/bull --save-dev
Bash
yarn add --dev @types/bull

定义目前维护在DefinitelyTyped repo 中。

快速指南

基本用法

JavaScript
const Queue = require("bull");

const videoQueue = new Queue("video transcoding", "redis://127.0.0.1:6379");
const audioQueue = new Queue("audio transcoding", {
  redis: { port: 6379, host: "127.0.0.1", password: "foobared" },
}); // Specify Redis connection using object
const imageQueue = new Queue("image transcoding");
const pdfQueue = new Queue("pdf transcoding");

videoQueue.process(function (job, done) {
  // job.data contains the custom data passed when the job was created
  // job.id contains id of this job.

  // transcode video asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error("error transcoding"));

  // or pass it a result
  done(null, {
    framerate: 29.5,
    /* etc...
     */
  });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error("some unexpected error");
});

audioQueue.process(function (job, done) {
  // transcode audio asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error("error transcoding"));

  // or pass it a result
  done(null, {
    samplerate: 48000,
    /* etc...
     */
  });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error("some unexpected error");
});

imageQueue.process(function (job, done) {
  // transcode image asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error("error transcoding"));

  // or pass it a result
  done(null, {
    width: 1280,
    height: 720,
    /* etc...
     */
  });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error("some unexpected error");
});

pdfQueue.process(function (job) {
  // Processors can also return promises instead of using the done callback
  return pdfAsyncProcessor();
});

videoQueue.add({ video: "http://example.com/video1.mov" });
audioQueue.add({ audio: "http://example.com/audio1.mp3" });
imageQueue.add({ image: "http://example.com/image1.tiff" });

使用承诺

或者,你可以使用 return promises 来代替done回调:

JavaScript
videoQueue.process(function (job) {
  // 不要忘记删除done回调!
  // 简单地回报一个承诺
  return fetchVideo(job.data.url).then(transcodeVideo);

  // 处理承诺拒绝
  return Promise.reject(new Error("error transcoding"));

  // 将承诺解析的值传递给“completed”事件
  return Promise.resolve({
    framerate: 29.5,
    /* etc...
     */
  });

  // 如果作业抛出一个未处理的异常,它也会得到正确的处理
  throw new Error("some unexpected error");
  // 一样
  return Promise.reject(new Error("some unexpected error"));
});

独立的进程

进程函数也可以在单独的进程中运行。这有几个好处:

  • 这个进程是沙箱化的,所以即使它崩溃了,也不会影响工作进程。
  • 您可以在不影响队列的情况下运行阻塞代码(作业不会停止)。
  • 更好地利用多核 cpu。
  • 减少与 redis 的连接。

为了使用这个特性,只需创建一个单独的处理器文件:

JavaScript
// processor.js
module.exports = function (job) {
  // 做一些繁重的工作
  return Promise.resolve(result);
};

然后像这样定义处理器:

JavaScript
// 单流程:
queue.process("/path/to/my/processor.js");

// 你也可以使用并发:
queue.process(5, "/path/to/my/processor.js");

// 和指定的处理器:
queue.process("my processor", 5, "/path/to/my/processor.js");

重复的工作

作业可以被添加到队列中,并根据 cron 规范重复处理:

JavaScript
paymentsQueue.process(function (job) {
  // Check payments
});

// Repeat payment job once every day at 3:15 (am)
paymentsQueue.add(paymentsData, { repeat: { cron: "15 3 * * *" } });

作为提示,请检查这里的表达式,以验证它们是正确的:cron 表达式生成器

暂停/恢复

一个队列可以被全局暂停和恢复(传递 true 来暂停这个 worker 的处理):

JavaScript
queue.pause().then(function () {
  // queue is paused now
});

queue.resume().then(function () {
  // queue is resumed now
});

事件

队列会发出一些有用的事件,例如…

JavaScript
.on('completed', function (job, result) {
  // Job completed with output result!
})

有关事件的更多信息,包括所触发事件的完整列表,请参阅事件参考资料

队列性能

队列很便宜,所以如果你需要很多队列,只需创建新的不同名称的队列:

JavaScript
const userJohn = new Queue('john');
const userLisa = new Queue('lisa');
.
.
.

然而,每个队列实例将需要新的 redis 连接,检查如何重用连接,或者你也可以使用命名处理器来实现类似的结果。

集群的支持

NOTE: 从 3.2.0 及以上版本开始,建议使用线程处理器。

队列是健壮的,可以在几个线程或进程中并行运行,没有任何危险或队列损坏的风险。 检查这个简单的例子,使用 cluster 跨进程并行化任务:

JavaScript
const Queue = require("bull");
const cluster = require("cluster");

const numWorkers = 8;
const queue = new Queue("test concurrent queue");

if (cluster.isMaster) {
  for (let i = 0; i < numWorkers; i++) {
    cluster.fork();
  }

  cluster.on("online", function (worker) {
    // Let's create a few jobs for the queue workers
    for (let i = 0; i < 500; i++) {
      queue.add({ foo: "bar" });
    }
  });

  cluster.on("exit", function (worker, code, signal) {
    console.log("worker " + worker.process.pid + " died");
  });
} else {
  queue.process(function (job, jobDone) {
    console.log("Job done by worker", cluster.worker.id, job.id);
    jobDone();
  });
}

文档

要获取完整的文档,请查看参考和常用模式:

  • 指南 - 你使用 Bull 开发的起点。
  • 参考 - 包含所有可用对象和方法的引用文档。
  • 模式 - 一组常见模式的示例。
  • 许可证 - Bull 许可证-麻省理工学院。

如果你看到任何可以使用更多文档的东西,请提交一个 pull request!


重要的笔记

队列的目标是“至少一次”的工作策略。 这意味着在某些情况下,一个作业可能会被多次处理。 这种情况通常发生在一个 worker 在整个处理过程中没有为给定的作业保持锁的时候。

当一个工人正在处理一项工作时,它将使该工作保持“锁定”,以便其他工人不能处理它。

理解锁定是如何工作的,以防止您的作业失去锁- becoming stalled - 并因此重新启动,这一点很重要。 锁是通过在 lockRenewTime (通常是 lockDuration 的一半)上为 lockDuration 创建一个锁来实现的。 如果 lockDuration 在锁被更新之前过期,则该作业将被视为暂停并自动重启;它将被二次加工

这种情况可能发生在:

  1. 运行作业处理器的 Node 进程意外终止。
  2. 您的作业处理器 cpu 过于密集,导致 Node 事件循环停顿,结果,Bull 无法更新作业锁(请参阅#488了解如何更好地检测此问题)。 您可以通过将作业处理器分解为更小的部分来解决这个问题,这样单个部分就不会阻塞 Node 事件循环。 或者,您可以为 lockDuration 设置传递一个更大的值(代价是它将花费更长的时间来识别真正的暂停作业)。

因此,您应该始终侦听 stopped 事件并将其记录到错误监视系统中,因为这意味着您的作业可能会被重复处理。

作为一种安全措施,有问题的作业不会被无限期重启(例如,如果作业处理器总是崩溃它的 Node 进程),作业将从停止状态恢复,最大次数为 maxStalledCount (默认为 1)。

谁在使用

Bull 在大大小小的组织中都很受欢迎,比如以下这些组织:

Atlassian Autodesk Mozilla Nest Salesforce

BullMQ

如果你想开始使用完全用 Typescript 编写的下一个主要版本的 Bull,欢迎使用新的 repo这里. 否则,我们非常欢迎你仍然使用 Bull,这是一个安全的、经过战斗测试的代码库。