interfaceQueueOptions{createClient?(type:'client'|'subscriber'|'bclient',config?:Redis.RedisOptions,):Redis.Redis|Redis.Cluster;limiter?:RateLimiter;redis?:RedisOpts;prefix?:string='bull';// prefix for all queue keys.metrics?:MetricsOpts;// Configure metricsdefaultJobOptions?:JobOpts;settings?:AdvancedSettings;}
interfaceRateLimiter{max:number;// Max number of jobs processedduration:number;// per duration in millisecondsbounceBack?:boolean=false;// When jobs get rate limited, they stay in the waiting queue and are not moved to the delayed queuegroupKey?:string;// allows grouping of jobs with the specified key from the data object passed to the Queue#add (ex."network.handle")}
interfaceAdvancedSettings{lockDuration:number=30000;// Key expiration time for job locks.lockRenewTime:number=15000;// Interval on which to acquire the job lockstalledInterval:number=30000;// How often check for stalled jobs (use 0 for never checking).maxStalledCount:number=1;// Max amount of times a stalled job will be re-processed.guardInterval:number=5000;// Poll interval for delayed jobs and added jobs.retryProcessDelay:number=5000;// delay before processing next job in case of internal error.backoffStrategies:{};// A set of custom backoff strategies keyed by name.drainDelay:number=5;// A timeout for when the queue is in drained state (empty waiting for jobs).isSharedChildPool:boolean=false;// enables multiple queues on the same instance of child pool to share the same instance.}
/*** * 对于每个命名的处理器,并发性叠加在一起,因此这三个进程函数中的任何一个都可以以125并发性运行。 * 为了避免这种行为,您需要为每个进程函数创建一个自己的队列。 */constloadBalancerQueue=newQueue('loadbalancer');loadBalancerQueue.process('requestProfile',100,requestProfile);loadBalancerQueue.process('sendEmail',25,sendEmail);loadBalancerQueue.process('sendInvitation',0,sendInvite);constprofileQueue=newQueue('profile');// Max concurrency for requestProfile is 100profileQueue.process('requestProfile',100,requestProfile);constemailQueue=newQueue('email');// Max concurrency for sendEmail is 25emailQueue.process('sendEmail',25,sendEmail);
interfaceJobOpts{priority:number;// Optional priority value.ranges from 1 (highest priority) to MAX_INT (lowest priority).// Note that using priorities has a slight impact on performance, so do not use it if not required.delay:number;// An amount of milliseconds to wait until this job can be processed.// Note that for accurate delays, both server and clients should have their clocks synchronized.[optional].attempts:number;// The total number of attempts to try the job until it completes.repeat:RepeatOpts;// Repeat job according to a cron specification, see below for details.backoff:number|BackoffOpts;// Backoff setting for automatic retries if the job fails, default strategy: `fixed`.// Needs `attempts` to be set.lifo:boolean;// if true, adds the job to the right of the queue instead of the left (default false)timeout:number;// The number of milliseconds after which the job should fail with a timeout error [optional]jobId:number|string;// Override the job ID - by default, the job ID is a unique integer, but you can use this setting to override it.// If you use this option, it is up to you to ensure the jobId is unique.// If you attempt to add a job with an id that already exists, it will not be added (see caveat below about repeatable jobs).removeOnComplete:boolean|number|KeepJobs;// If true, removes the job when it successfully completes.// A number specified the amount of jobs to keep.// Default behavior is to keep the job in the completed set.// See KeepJobs if using that interface instead.removeOnFail:boolean|number|KeepJobs;// If true, removes the job when it fails after all attempts.// A number specified the amount of jobs to keep, see KeepJobs if using that interface instead.// Default behavior is to keep the job in the failed set.stackTraceLimit:number;// Limits the amount of stack trace lines that will be recorded in the stacktrace.}
/** * KeepJobs * * Specify which jobs to keep after finishing. * If both age and count are * specified, then the jobs kept will be the ones that satisfies both properties. */exportinterfaceKeepJobs{/** * Maximum age in *seconds* for job to be kept. */age?:number;/** * Maximum count of jobs to be kept. */count?:number;}
interfaceRepeatOpts{cron?:string;// Cron stringtz?:string;// TimezonestartDate?:Date|string|number;// Start date when the repeat job should start repeating (only with cron).endDate?:Date|string|number;// End date when the repeat job should stop repeating.limit?:number;// Number of times the job should repeat at max.every?:number;// Repeat every millis (cron setting cannot be used together with this setting.)count?:number;// The start value for the repeat iteration count.readonlykey:string;// The key for the repeatable job metadata in Redis.}
awaitqueue.add({},{jobId:'example',repeat:{every:5*1000}});awaitqueue.add({},{jobId:'example',repeat:{every:5*1000}});// Will not be created, same repeat configurationawaitqueue.add({},{jobId:'example',repeat:{every:10*1000}});// Will be created, different repeat configurationawaitqueue.add({},{jobId:'example'});// Will be created, no regular job with this idawaitqueue.add({},{jobId:'example'});// Will not be created, conflicts with previous regular job
interfaceBackoffOpts{type:string;// Backoff type, which can be either `fixed` or `exponential`.// A custom backoff strategy can also be specified in `backoffStrategies` on the queue settings.delay:number;// Backoff delay, in milliseconds.}
queue.on('cleaned',function(jobs,type){console.log('Cleaned %s %s jobs',jobs.length,type);});//cleans all jobs that completed over 5 seconds ago.awaitqueue.clean(5000);//clean all jobs that failed over 10 seconds ago.awaitqueue.clean(10000,'failed');
.on('error',function(error){// An error occured.}).on('waiting',function(jobId){// A Job is waiting to be processed as soon as a worker is idling.});.on('active',function(job,jobPromise){// A job has started.// You can use `jobPromise.cancel()`` to abort it.}).on('stalled',function(job){// A job has been marked as stalled.// This is useful for debugging job workers that crash or pause the event loop.}).on('lock-extension-failed',function(job,err){// A job failed to extend lock.// This will be useful to debug redis connection issues and jobs getting restarted because workers are not able to extend locks.});.on('progress',function(job,progress){// A job's progress was updated!}).on('completed',function(job,result){// A job successfully completed with a `result`.}).on('failed',function(job,err){// A job failed with reason `err`!}).on('paused',function(){// The queue has been paused.}).on('resumed',function(job){// The queue has been resumed.}).on('cleaned',function(jobs,type){// Old jobs have been cleaned from the queue.`jobs`isanarrayofcleaned// jobs, and `type` is the type of jobs cleaned.});.on('drained',function(){// Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed)});.on('removed',function(job){// A job successfully removed.});
// Will listen locally, just to this queue...queue.on('completed',listener):// Will listen globally, to instances of this queue...queue.on('global:completed',listener);
// Local events pass the job instance...queue.on('progress',function(job,progress){console.log(`Job ${job.id} is ${progress*100}% ready!`);});queue.on('completed',function(job,result){console.log(`Job ${job.id} completed! Result: ${result}`);job.remove();});// ...whereas global events only pass the job ID:queue.on('global:progress',function(jobId,progress){console.log(`Job ${jobId} is ${progress*100}% ready!`);});queue.on('global:completed',function(jobId,result){console.log(`Job ${jobId} completed! Result: ${result}`);queue.getJob(jobId).then(function(job){job.remove();});});