
Node Redlock5 API


这个方法在 duration 指定的持续时间内获取资源上的锁。


此方法通过提供的 duration 扩展有效锁。

import { randomBytes, createHash } from "crypto";
import { EventEmitter } from "events";

// AbortController became available as a global in node version 16.
// Once version 14 reaches its end-of-life, this can be removed.
import PolyfillAbortController from "node-abort-controller";

import { Redis as IORedisClient } from "ioredis";
type Client = IORedisClient;

// Define script constants.
  -- Return 0 if an entry already exists.
  for i, key in ipairs(KEYS) do
    if redis.call("exists", key) == 1 then
      return 0

  -- Create an entry for each provided key.
  for i, key in ipairs(KEYS) do
    redis.call("set", key, ARGV[1], "PX", ARGV[2])

  -- Return the number of entries added.
  return #KEYS

  -- Return 0 if an entry exists with a *different* lock value.
  for i, key in ipairs(KEYS) do
    if redis.call("get", key) ~= ARGV[1] then
      return 0

  -- Update the entry for each provided key.
  for i, key in ipairs(KEYS) do
    redis.call("set", key, ARGV[1], "PX", ARGV[2])

  -- Return the number of entries updated.
  return #KEYS

  local count = 0
  for i, key in ipairs(KEYS) do
    -- Only remove entries for *this* lock value.
    if redis.call("get", key) == ARGV[1] then
      redis.pcall("del", key)
      count = count + 1

  -- Return the number of entries removed.
  return count

export type ClientExecutionResult =
  | {
      client: Client;
      vote: "for";
      value: number;
  | {
      client: Client;
      vote: "against";
      error: Error;

 * This object contains a summary of results.
export type ExecutionStats = {
  readonly membershipSize: number;
  readonly quorumSize: number;
  readonly votesFor: Set<Client>;
  readonly votesAgainst: Map<Client, Error>;

 * This object contains a summary of results. Because the result of an attempt
 * can sometimes be determined before all requests are finished, each attempt
 * contains a Promise that will resolve ExecutionStats once all requests are
 * finished. A rejection of these promises should be considered undefined
 * behavior and should cause a crash.
export type ExecutionResult = {
  attempts: ReadonlyArray<Promise<ExecutionStats>>;

export interface Settings {
  readonly driftFactor: number;
  readonly retryCount: number;
  readonly retryDelay: number;
  readonly retryJitter: number;
  readonly automaticExtensionThreshold: number;

// Define default settings.
const defaultSettings: Readonly<Settings> = {
  driftFactor: 0.01,
  retryCount: 10,
  retryDelay: 200,
  retryJitter: 100,
  automaticExtensionThreshold: 500,

// Modifyng this object is forbidden.

 * This error indicates a failure due to the existence of another lock for one
 * or more of the requested resources.
export class ResourceLockedError extends Error {
  constructor(public readonly message: string) {
    this.name = "ResourceLockedError";

 * This error indicates a failure of an operation to pass with a quorum.
export class ExecutionError extends Error {
    public readonly message: string,
    public readonly attempts: ReadonlyArray<Promise<ExecutionStats>>
  ) {
    this.name = "ExecutionError";
 * An object of this type is returned when a resource is successfully locked. It
 * contains convenience methods `release` and `extend` which perform the
 * associated Redlock method on itself.
export class Lock {
    public readonly redlock: Redlock,
    public readonly resources: string[],
    public readonly value: string,
    public readonly attempts: ReadonlyArray<Promise<ExecutionStats>>,
    public expiration: number
  ) {}

  async release(): Promise<ExecutionResult> {
    return this.redlock.release(this);

  async extend(duration: number): Promise<Lock> {
    return this.redlock.extend(this, duration);

type RedlockAbortSignal = AbortSignal & { error?: Error };
 * A redlock object is instantiated with an array of at least one redis client
 * and an optional `options` object. Properties of the Redlock object should NOT
 * be changed after it is first used, as doing so could have unintended
 * consequences for live locks.
export default class Redlock extends EventEmitter {
  public readonly clients: Set<Client>;
  public readonly settings: Settings;
  public readonly scripts: {
    readonly acquireScript: { value: string; hash: string };
    readonly extendScript: { value: string; hash: string };
    readonly releaseScript: { value: string; hash: string };

  public constructor(
    clients: Iterable<Client>,
    settings: Partial<Settings> = {},
    scripts: {
      readonly acquireScript?: string | ((script: string) => string);
      readonly extendScript?: string | ((script: string) => string);
      readonly releaseScript?: string | ((script: string) => string);
    } = {}
  ) {

    // Prevent crashes on error events.
    this.on("error", () => {
      // Because redlock is designed for high availability, it does not care if
      // a minority of redis instances/clusters fail at an operation.
      // However, it can be helpful to monitor and log such cases. Redlock emits
      // an "error" event whenever it encounters an error, even if the error is
      // ignored in its normal operation.
      // This function serves to prevent node's default behavior of crashing
      // when an "error" event is emitted in the absence of listeners.

    // Create a new array of client, to ensure no accidental mutation.
    this.clients = new Set(clients);
    if (this.clients.size === 0) {
      throw new Error(
        "Redlock must be instantiated with at least one redis client."

    // Customize the settings for this instance.
    this.settings = {
        typeof settings.driftFactor === "number"
          ? settings.driftFactor
          : defaultSettings.driftFactor,
        typeof settings.retryCount === "number"
          ? settings.retryCount
          : defaultSettings.retryCount,
        typeof settings.retryDelay === "number"
          ? settings.retryDelay
          : defaultSettings.retryDelay,
        typeof settings.retryJitter === "number"
          ? settings.retryJitter
          : defaultSettings.retryJitter,
        typeof settings.automaticExtensionThreshold === "number"
          ? settings.automaticExtensionThreshold
          : defaultSettings.automaticExtensionThreshold,

    // Use custom scripts and script modifiers.
    const acquireScript =
      typeof scripts.acquireScript === "function"
        ? scripts.acquireScript(ACQUIRE_SCRIPT)
    const extendScript =
      typeof scripts.extendScript === "function"
        ? scripts.extendScript(EXTEND_SCRIPT)
        : EXTEND_SCRIPT;
    const releaseScript =
      typeof scripts.releaseScript === "function"
        ? scripts.releaseScript(RELEASE_SCRIPT)

    this.scripts = {
      acquireScript: {
        value: acquireScript,
        hash: this._hash(acquireScript),
      extendScript: {
        value: extendScript,
        hash: this._hash(extendScript),
      releaseScript: {
        value: releaseScript,
        hash: this._hash(releaseScript),

   * Generate a sha1 hash compatible with redis evalsha.
  private _hash(value: string): string {
    return createHash("sha1").update(value).digest("hex");

   * Generate a cryptographically random string.
  private _random(): string {
    return randomBytes(16).toString("hex");

   * This method runs `.quit()` on all client connections.
  public async quit(): Promise<void> {
    const results = [];
    for (const client of this.clients) {

    await Promise.all(results);

   * 这个方法在 `duration` 指定的持续时间内获取资源上的锁。
  public async acquire(
    resources: string[],
    duration: number,
    settings?: Settings
  ): Promise<Lock> {
    const start = Date.now();
    const value = this._random();

    try {
      const { attempts } = await this._execute(
        [value, duration],

      // Add 2 milliseconds to the drift to account for Redis expires precision,
      // which is 1 ms, plus the configured allowable drift factor.
      const drift =
          (settings?.driftFactor ?? this.settings.driftFactor) * duration
        ) + 2;

      return new Lock(
        start + duration - drift
    } catch (error) {
      // If there was an error acquiring the lock, release any partial lock
      // state that may exist on a minority of clients.
      await this._execute(this.scripts.releaseScript, resources, [value], {
        retryCount: 0,
      }).catch(() => {
        // Any error here will be ignored.

      throw error;

   * This method unlocks the provided lock from all servers still persisting it.
   * It will fail with an error if it is unable to release the lock on a quorum
   * of nodes, but will make no attempt to restore the lock in the case of a
   * failure to release. It is safe to re-attempt a release or to ignore the
   * error, as the lock will automatically expire after its timeout.
  public async release(
    lock: Lock,
    settings?: Partial<Settings>
  ): Promise<ExecutionResult> {
    // Immediately invalidate the lock.
    lock.expiration = 0;

    // Attempt to release the lock.
    return this._execute(

   * This method extends a valid lock by the provided `duration`.
  public async extend(
    existing: Lock,
    duration: number,
    settings?: Partial<Settings>
  ): Promise<Lock> {
    const start = Date.now();

    // The lock has already expired.
    if (existing.expiration < Date.now()) {
      throw new ExecutionError("Cannot extend an already-expired lock.", []);

    const { attempts } = await this._execute(
      [existing.value, duration],

    // Invalidate the existing lock.
    existing.expiration = 0;

    // Add 2 milliseconds to the drift to account for Redis expires precision,
    // which is 1 ms, plus the configured allowable drift factor.
    const drift =
        (settings?.driftFactor ?? this.settings.driftFactor) * duration
      ) + 2;

    const replacement = new Lock(
      start + duration - drift

    return replacement;

   * Execute a script on all clients. The resulting promise is resolved or
   * rejected as soon as this quorum is reached; the resolution or rejection
   * will contains a `stats` property that is resolved once all votes are in.
  private async _execute(
    script: { value: string; hash: string },
    keys: string[],
    args: (string | number)[],
    _settings?: Partial<Settings>
  ): Promise<ExecutionResult> {
    const settings = _settings
      ? {
      : this.settings;

    const maxAttempts = settings.retryCount + 1;
    const attempts: Promise<ExecutionStats>[] = [];

    while (true) {
      const { vote, stats } = await this._attemptOperation(script, keys, args);


      // The operation acheived a quorum in favor.
      if (vote === "for") {
        return { attempts };

      // Wait before reattempting.
      if (attempts.length < maxAttempts) {
        await new Promise((resolve) => {
              settings.retryDelay +
                Math.floor((Math.random() * 2 - 1) * settings.retryJitter)
      } else {
        throw new ExecutionError(
          "The operation was unable to acheive a quorum during its retry window.",

  private async _attemptOperation(
    script: { value: string; hash: string },
    keys: string[],
    args: (string | number)[]
  ): Promise<
    | { vote: "for"; stats: Promise<ExecutionStats> }
    | { vote: "against"; stats: Promise<ExecutionStats> }
  > {
    return await new Promise((resolve) => {
      const clientResults = [];
      for (const client of this.clients) {
          this._attemptOperationOnClient(client, script, keys, args)

      const stats: ExecutionStats = {
        membershipSize: clientResults.length,
        quorumSize: Math.floor(clientResults.length / 2) + 1,
        votesFor: new Set<Client>(),
        votesAgainst: new Map<Client, Error>(),

      let done: () => void;
      const statsPromise = new Promise<typeof stats>((resolve) => {
        done = () => resolve(stats);

      // This is the expected flow for all successful and unsuccessful requests.
      const onResultResolve = (clientResult: ClientExecutionResult): void => {
        switch (clientResult.vote) {
          case "for":
          case "against":
            stats.votesAgainst.set(clientResult.client, clientResult.error);

        // A quorum has determined a success.
        if (stats.votesFor.size === stats.quorumSize) {
            vote: "for",
            stats: statsPromise,

        // A quorum has determined a failure.
        if (stats.votesAgainst.size === stats.quorumSize) {
            vote: "against",
            stats: statsPromise,

        // All votes are in.
        if (
          stats.votesFor.size + stats.votesAgainst.size ===
        ) {

      // This is unexpected and should crash to prevent undefined behavior.
      const onResultReject = (error: Error): void => {
        throw error;

      for (const result of clientResults) {
        result.then(onResultResolve, onResultReject);

  private async _attemptOperationOnClient(
    client: Client,
    script: { value: string; hash: string },
    keys: string[],
    args: (string | number)[]
  ): Promise<ClientExecutionResult> {
    try {
      let result: number;
      try {
        // Attempt to evaluate the script by its hash.
        const shaResult = (await client.evalsha(script.hash, keys.length, [
        ])) as unknown;

        if (typeof shaResult !== "number") {
          throw new Error(
            `Unexpected result of type ${typeof shaResult} returned from redis.`

        result = shaResult;
      } catch (error) {
        // If the redis server does not already have the script cached,
        // reattempt the request with the script's raw text.
        if (
          !(error instanceof Error) ||
        ) {
          throw error;
        const rawResult = (await client.eval(script.value, keys.length, [
        ])) as unknown;

        if (typeof rawResult !== "number") {
          throw new Error(
            `Unexpected result of type ${typeof rawResult} returned from redis.`

        result = rawResult;

      // One or more of the resources was already locked.
      if (result !== keys.length) {
        throw new ResourceLockedError(
          `The operation was applied to: ${result} of the ${keys.length} requested resources.`

      return {
        vote: "for",
        value: result,
    } catch (error) {
      if (!(error instanceof Error)) {
        throw new Error(
          `Unexpected type ${typeof error} thrown with value: ${error}`

      // Emit the error on the redlock instance for observability.
      this.emit("error", error);

      return {
        vote: "against",

   * Wrap and execute a routine in the context of an auto-extending lock,
   * returning a promise of the routine's value. In the case that auto-extension
   * fails, an AbortSignal will be updated to indicate that abortion of the
   * routine is in order, and to pass along the encountered error.
   * @example
   * ```ts
   * await redlock.using([senderId, recipientId], 5000, { retryCount: 5 }, async (signal) => {
   *   const senderBalance = await getBalance(senderId);
   *   const recipientBalance = await getBalance(recipientId);
   *   if (senderBalance < amountToSend) {
   *     throw new Error("Insufficient balance.");
   *   }
   *   // The abort signal will be true if:
   *   // 1. the above took long enough that the lock needed to be extended
   *   // 2. redlock was unable to extend the lock
   *   //
   *   // In such a case, exclusivity can no longer be guaranteed for further
   *   // operations, and should be handled as an exceptional case.
   *   if (signal.aborted) {
   *     throw signal.error;
   *   }
   *   await setBalances([
   *     {id: senderId, balance: senderBalance - amountToSend},
   *     {id: recipientId, balance: recipientBalance + amountToSend},
   *   ]);
   * });
   * ```

  public async using<T>(
    resources: string[],
    duration: number,
    settings: Partial<Settings>,
    routine?: (signal: RedlockAbortSignal) => Promise<T>
  ): Promise<T>;

  public async using<T>(
    resources: string[],
    duration: number,
    routine: (signal: RedlockAbortSignal) => Promise<T>
  ): Promise<T>;

  public async using<T>(
    resources: string[],
    duration: number,
      | undefined
      | Partial<Settings>
      | ((signal: RedlockAbortSignal) => Promise<T>),
    optionalRoutine?: (signal: RedlockAbortSignal) => Promise<T>
  ): Promise<T> {
    const settings =
      settingsOrRoutine && typeof settingsOrRoutine !== "function"
        ? {
        : this.settings;

    const routine = optionalRoutine ?? settingsOrRoutine;
    if (typeof routine !== "function") {
      throw new Error("INVARIANT: routine is not a function.");

    if (settings.automaticExtensionThreshold > duration - 100) {
      throw new Error(
        "A lock `duration` must be at least 100ms greater than the `automaticExtensionThreshold` setting."

    // The AbortController/AbortSignal pattern allows the routine to be notified
    // of a failure to extend the lock, and subsequent expiration. In the event
    // of an abort, the error object will be made available at `signal.error`.
    const controller =
      typeof AbortController === "undefined"
        ? new PolyfillAbortController()
        : new AbortController();

    const signal = controller.signal as RedlockAbortSignal;

    function queue(): void {
      timeout = setTimeout(
        () => (extension = extend()),
        lock.expiration - Date.now() - settings.automaticExtensionThreshold

    async function extend(): Promise<void> {
      timeout = undefined;

      try {
        lock = await lock.extend(duration);
      } catch (error) {
        if (lock.expiration > Date.now()) {
          return (extension = extend());

        signal.error = error instanceof Error ? error : new Error(`${error}`);

    let timeout: undefined | NodeJS.Timeout;
    let extension: undefined | Promise<void>;
    let lock = await this.acquire(resources, duration, settings);

    try {
      return await routine(signal);
    } finally {
      // Clean up the timer.
      if (timeout) {
        timeout = undefined;

      // Wait for an in-flight extension to finish.
      if (extension) {
        await extension.catch(() => {
          // An error here doesn't matter at all, because the routine has
          // already completed, and a release will be attempted regardless. The
          // only reason for waiting here is to prevent possible contention
          // between the extension and release.

      await lock.release();