跳转至

快速指南

基本用法

JavaScript
const Queue = require("bull");

const videoQueue = new Queue("video transcoding", "redis://127.0.0.1:6379");
const audioQueue = new Queue("audio transcoding", { redis: { port: 6379, host: "127.0.0.1", password: "foobared" } }); // Specify Redis connection using object
const imageQueue = new Queue("image transcoding");
const pdfQueue = new Queue("pdf transcoding");

videoQueue.process(function (job, done) {
  // job.data contains the custom data passed when the job was created
  // job.id contains id of this job.

  // transcode video asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error("error transcoding"));

  // or pass it a result
  done(null, { framerate: 29.5 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error("some unexpected error");
});

audioQueue.process(function (job, done) {
  // transcode audio asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error("error transcoding"));

  // or pass it a result
  done(null, { samplerate: 48000 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error("some unexpected error");
});

imageQueue.process(function (job, done) {
  // transcode image asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error("error transcoding"));

  // or pass it a result
  done(null, { width: 1280, height: 720 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error("some unexpected error");
});

pdfQueue.process(function (job) {
  // Processors can also return promises instead of using the done callback
  return pdfAsyncProcessor();
});

videoQueue.add({ video: "http://example.com/video1.mov" });
audioQueue.add({ audio: "http://example.com/audio1.mp3" });
imageQueue.add({ image: "http://example.com/image1.tiff" });

使用承诺

或者,你可以使用return promises来代替done回调:

JavaScript
videoQueue.process(function (job) {
  // don't forget to remove the done callback!
  // Simply return a promise
  return fetchVideo(job.data.url).then(transcodeVideo);

  // Handles promise rejection
  return Promise.reject(new Error("error transcoding"));

  // Passes the value the promise is resolved with to the "completed" event
  return Promise.resolve({ framerate: 29.5 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error("some unexpected error");
  // same as
  return Promise.reject(new Error("some unexpected error"));
});

独立的进程

进程函数也可以在单独的进程中运行。这有几个好处:

  • 这个进程是沙箱化的,所以即使它崩溃了,也不会影响工作进程。
  • 您可以在不影响队列的情况下运行阻塞代码(作业不会停止)。
  • 更好地利用多核 cpu。
  • 减少与 redis 的连接。

为了使用这个特性,只需创建一个单独的处理器文件:

JavaScript
// processor.js
module.exports = function (job) {
  // Do some heavy work
  return Promise.resolve(result);
};

然后像这样定义处理器:

JavaScript
// Single process:
queue.process("/path/to/my/processor.js");

// You can use concurrency as well:
queue.process(5, "/path/to/my/processor.js");

// and named processors:
queue.process("my processor", 5, "/path/to/my/processor.js");

重复的工作

A job can be added to a queue and processed repeatedly according to a cron specification:

JavaScript
paymentsQueue.process(function (job) {
  // Check payments
});

// Repeat payment job once every day at 3:15 (am)
paymentsQueue.add(paymentsData, { repeat: { cron: "15 3 * * *" } });

As a tip, check your expressions here to verify they are correct: cron expression generator

暂停/恢复

队列可以全局暂停和恢复(传入true暂停处理只针对这个 worker):

JavaScript
queue.pause().then(function () {
  // queue is paused now
});

queue.resume().then(function () {
  // queue is resumed now
});

事件

队列会发出一些有用的事件,例如…

JavaScript
.on('completed', function (job, result) {
  // Job completed with output result!
})

有关事件的更多信息,包括触发事件的完整列表,请参阅 events 参考

队列性能

队列很容易,所以如果你需要很多队列,只需创建新的不同名称的队列:

JavaScript
const userJohn = new Queue('john');
const userLisa = new Queue('lisa');
.
.
.

然而,每个队列实例都需要新的 redis 连接,检查如何重用连接, 或者你也可以使用命名处理器来实现类似的结果。

集群的支持

NOTE: 从 3.2.0 及以上版本开始,建议使用线程处理器来代替。

队列是健壮的,可以在多个线程或进程中并行运行,没有任何危险或队列损坏的风险。查看下面这个简单的例子,使用集群来跨进程并行作业:

JavaScript
const Queue = require("bull");
const cluster = require("cluster");

const numWorkers = 8;
const queue = new Queue("test concurrent queue");

if (cluster.isMaster) {
  for (let i = 0; i < numWorkers; i++) {
    cluster.fork();
  }

  cluster.on("online", function (worker) {
    // Let's create a few jobs for the queue workers
    for (let i = 0; i < 500; i++) {
      queue.add({ foo: "bar" });
    }
  });

  cluster.on("exit", function (worker, code, signal) {
    console.log("worker " + worker.process.pid + " died");
  });
} else {
  queue.process(function (job, jobDone) {
    console.log("Job done by worker", cluster.worker.id, job.id);
    jobDone();
  });
}

Basic Usage

JavaScript
const Queue = require("bull");

const videoQueue = new Queue("video transcoding", "redis://127.0.0.1:6379");
const audioQueue = new Queue("audio transcoding", { redis: { port: 6379, host: "127.0.0.1", password: "foobared" } }); // Specify Redis connection using object
const imageQueue = new Queue("image transcoding");
const pdfQueue = new Queue("pdf transcoding");

videoQueue.process(function (job, done) {
  // job.data contains the custom data passed when the job was created
  // job.id contains id of this job.

  // transcode video asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error("error transcoding"));

  // or pass it a result
  done(null, { framerate: 29.5 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error("some unexpected error");
});

audioQueue.process(function (job, done) {
  // transcode audio asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error("error transcoding"));

  // or pass it a result
  done(null, { samplerate: 48000 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error("some unexpected error");
});

imageQueue.process(function (job, done) {
  // transcode image asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error("error transcoding"));

  // or pass it a result
  done(null, { width: 1280, height: 720 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error("some unexpected error");
});

pdfQueue.process(function (job) {
  // Processors can also return promises instead of using the done callback
  return pdfAsyncProcessor();
});

videoQueue.add({ video: "http://example.com/video1.mov" });
audioQueue.add({ audio: "http://example.com/audio1.mp3" });
imageQueue.add({ image: "http://example.com/image1.tiff" });

Using promises

Alternatively, you can use return promises instead of using the done callback:

JavaScript
videoQueue.process(function (job) {
  // don't forget to remove the done callback!
  // Simply return a promise
  return fetchVideo(job.data.url).then(transcodeVideo);

  // Handles promise rejection
  return Promise.reject(new Error("error transcoding"));

  // Passes the value the promise is resolved with to the "completed" event
  return Promise.resolve({ framerate: 29.5 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error("some unexpected error");
  // same as
  return Promise.reject(new Error("some unexpected error"));
});

Separate processes

The process function can also be run in a separate process. This has several advantages:

  • The process is sandboxed so if it crashes it does not affect the worker.
  • You can run blocking code without affecting the queue (jobs will not stall).
  • Much better utilization of multi-core CPUs.
  • Less connections to redis.

In order to use this feature just create a separate file with the processor:

JavaScript
// processor.js
module.exports = function (job) {
  // Do some heavy work

  return Promise.resolve(result);
};

And define the processor like this:

JavaScript
// Single process:
queue.process("/path/to/my/processor.js");

// You can use concurrency as well:
queue.process(5, "/path/to/my/processor.js");

// and named processors:
queue.process("my processor", 5, "/path/to/my/processor.js");

Repeated jobs

A job can be added to a queue and processed repeatedly according to a cron specification:

JavaScript
paymentsQueue.process(function (job) {
  // Check payments
});

// Repeat payment job once every day at 3:15 (am)
paymentsQueue.add(paymentsData, { repeat: { cron: "15 3 * * *" } });

As a tip, check your expressions here to verify they are correct: cron expression generator

Pause / Resume

A queue can be paused and resumed globally (pass true to pause processing for just this worker):

JavaScript
queue.pause().then(function () {
  // queue is paused now
});

queue.resume().then(function () {
  // queue is resumed now
});

Events

A queue emits some useful events, for example...

JavaScript
.on('completed', function (job, result) {
  // Job completed with output result!
})

For more information on events, including the full list of events that are fired, check out the Events reference

Queues performance

Queues are cheap, so if you need many of them just create new ones with different names:

JavaScript
const userJohn = new Queue('john');
const userLisa = new Queue('lisa');
.
.
.

However every queue instance will require new redis connections, check how to reuse connections or you can also use named processors to achieve a similar result.

Cluster support

NOTE: From version 3.2.0 and above it is recommended to use threaded processors instead.

Queues are robust and can be run in parallel in several threads or processes without any risk of hazards or queue corruption. Check this simple example using cluster to parallelize jobs across processes:

JavaScript
const Queue = require("bull");
const cluster = require("cluster");

const numWorkers = 8;
const queue = new Queue("test concurrent queue");

if (cluster.isMaster) {
  for (let i = 0; i < numWorkers; i++) {
    cluster.fork();
  }

  cluster.on("online", function (worker) {
    // Let's create a few jobs for the queue workers
    for (let i = 0; i < 500; i++) {
      queue.add({ foo: "bar" });
    }
  });

  cluster.on("exit", function (worker, code, signal) {
    console.log("worker " + worker.process.pid + " died");
  });
} else {
  queue.process(function (job, jobDone) {
    console.log("Job done by worker", cluster.worker.id, job.id);
    jobDone();
  });
}

回到页面顶部