跳转至

Redis Streams教程

如果你是流的新手,请查看Redis streams 介绍。要了解更全面的教程,请继续阅读。

简介

Redis 5.0 中引入了 Redis 流数据类型。 Streams 对日志数据结构建模,但也实现了一些操作,以克服典型的仅追加日志的一些限制。 这包括 O(1)时间内的随机访问和复杂的消费策略,如消费集群。

流基础知识

流是一种只能追加的数据结构。名为XADD的基本写命令将一个新条目追加到指定的流。

每个流条目由一个或多个字段值对组成,有点像记录或 Redis 散列:

Text Only
> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

上述对XADD命令的调用使用自动生成的条目 ID(即命令返回的条目 ID)将条目sensor-id: 1234, temperature: 19.8添加到键mystream处的流中,具体为1518951480106-0。 它的第一个参数是键名mystream,第二个参数是标识流中每个条目的条目 ID。 然而,在本例中,我们传递了*,因为我们希望服务器为我们生成一个新的 ID。每个新 ID 都是单调增加的,所以更简单地说,与所有过去的条目相比,每个新添加的条目将具有更高的 ID。 由服务器自动生成 ID 几乎总是您想要的,而显式指定 ID 的原因很少。 我们稍后会详细讨论这个问题。 每个 Stream 条目都有一个 ID,这是与日志文件的另一个相似之处,其中可以使用行号或文件中的字节偏移量来标识给定的条目。 回到我们的XADD示例,在键名和 ID 之后,下一个参数是组成流条目的字段值对。

可以使用XLEN命令来获取流中项目的数量:

Text Only
> XLEN mystream
(integer) 1

条目 ID

XADD命令返回的条目 ID,以及唯一标识给定流中的每个条目,由两部分组成:

Text Only
<millisecondsTime>-<sequenceNumber>

毫秒时间部分实际上是生成流 ID 的本地 Redis 节点中的本地时间,但是,如果当前的毫秒时间恰好小于之前的输入时间,则使用之前的输入时间,因此,如果时钟向后跳转,单调递增的 ID 属性仍然保持。 序列号用于在同一毫秒内创建的条目。 由于序列号是 64 位宽的,实际上在同一毫秒内可以生成的条目的数量没有限制。

这种 ID 的格式乍一看可能很奇怪,温和的读者可能会想,为什么时间是 ID 的一部分。 原因是 Redis 流支持 ID 范围查询。 因为 ID 与条目生成的时间相关,这提供了查询时间范围的能力,基本上是免费的。我们将很快在介绍XRANGE命令时看到这一点。

如果由于某种原因,用户需要增量 ID,这些 ID 与时间无关,但实际上与另一个外部系统 ID 相关,如前所述,XADD命令可以采用显式 ID,而不是触发自动生成的*通配符 ID,如以下示例所示:

Text Only
> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

注意,在这种情况下,最小 ID 是 0-1,命令不接受等于或小于前一个 ID 的 ID:

Text Only
> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

如果你运行的是 Redis 7 或更高版本,你还可以提供一个由毫秒部分组成的显式 ID。 在这种情况下,ID 的序列部分将自动生成。要做到这一点,使用以下语法:

Text Only
> XADD somestream 0-* baz qux
0-3

从流中获取数据

现在我们终于能够通过XADD在流中添加条目了。 然而,虽然向流追加数据是很明显的,但为了提取数据而查询流的方法却不那么明显。 如果我们继续以日志文件为例,一个明显的方法是模仿我们通常使用 Unix 命令tail -f所做的事情,也就是说,我们可以开始监听,以便获得附加到流中的新消息。 注意,与 Redis 的阻塞列表操作不同,在 Redis 中,一个给定的元素将到达一个客户端,该客户端在pop style操作中阻塞,如BLPOP,对于流,我们希望多个消费者看到新消息追加到流中(与许多tail -f进程可以看到添加到日志中的内容相同)。 使用传统术语,我们希望流能够将消息扇出到多个客户端。

然而,这只是一种潜在的访问模式。我们还可以以一种完全不同的方式来看待流:不是作为一个消息传递系统,而是作为一个时间序列存储。 在这种情况下,可能附加新消息也很有用,但另一种自然的查询模式是按时间范围获取消息,或者使用游标迭代消息以增量地检查所有历史记录。 这绝对是另一种有用的访问模式。

最后,如果我们从消费者的角度来看一个流,我们可能希望以另一种方式访问该流,也就是说,将其作为一个消息流,该消息流可以被划分为多个正在处理此类消息的消费者,以便消费组只能看到到达单个流中的消息的子集。 通过这种方式,可以跨不同的使用者扩展消息处理,而不需要单个使用者处理所有消息:每个使用者只需要处理不同的消息。 这基本上就是 Kafka (TM)对消费组所做的。 通过消费组阅读信息是另一种从 Redis 流中阅读的有趣模式。

Redis Streams 通过不同的命令支持上述所有三种查询模式。 下一节将展示所有这些查询,从最简单、最直接的用法开始:范围查询。

按范围查询:XRANGE 和 XREVRANGE

要按范围查询流,我们只需要指定两个 id, startend。 返回的范围将包括起始或结束为 ID 的元素,因此范围是包含的。 两个特殊 ID -+ 分别表示可能的最小 ID 和最大 ID。

Text Only
> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

返回的每个条目都是由两个项组成的数组:ID 和字段值对列表。 我们已经说过,条目 id 与时间有关系,因为-字符左侧的部分是创建流条目的本地节点在条目被创建时的 Unix 时间(以毫秒为单位)(但是请注意,流是通过完全指定的XADD命令复制的,因此副本将具有与主节点相同的 id)。 这意味着我可以使用XRANGE查询一个时间范围。 然而,为了做到这一点,我可能想要省略 ID 的序列部分:如果省略,在范围的开始部分它将被假设为 0,而在结束部分它将被假设为可用的最大序列号。 通过这种方式,只需使用 Unix 时间的两毫秒查询,我们就可以以一种包容的方式获得在这段时间内生成的所有条目。 例如,如果我想查询一个 2 毫秒的周期,我可以使用:

Text Only
> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

在这个范围内我只有一个条目,但是在真实的数据集中,我可以查询小时的范围,或者在仅仅两毫秒内有许多项,返回的结果可能非常大。 因此,XRANGE在末尾支持可选的COUNT选项。 通过指定一个计数,我可以只获取前N项。如果我想要更多,我可以返回最后一个 ID,将序列部分增加 1,然后再次查询。 让我们在下面的例子中看看。 我们开始用XADD添加 10 个条目(我不会展示这个,假设流mystream填充了 10 个条目)。 为了开始我的迭代,每个命令获得 2 个项目,我从完整的范围开始,但计数为 2。

Text Only
> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

为了继续迭代接下来的两项,我必须选择返回的最后一个 ID,即1519073279157-0,并在它前面添加前缀(。 产生的独占范围间隔,在本例中是(1519073279157-0,现在可以用作下一个XRANGE调用的新的start参数:

Text Only
> XRANGE mystream (1519073279157-0 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

等等。由于XRANGE的复杂度是O(log(N))来寻找,然后O(M)来返回 M 个元素,因此在数量较少的情况下,该命令的时间复杂度为对数,这意味着迭代的每一步都是快速的。 因此XRANGE实际上也是 streams iterator ,不需要XSCAN命令。

命令XREVRANGE相当于XRANGE,但按倒序返回元素,因此XREVRANGE的实际用途是检查流中的最后一项是什么:

Text Only
> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

注意,XREVRANGE命令以相反的顺序接受startstop参数。

用 XREAD 监听新项

当我们不想在流中按范围访问项时,通常我们想要的是订阅到达流的新项。 这个概念可能出现与 Redis Pub/Sub 相关,在那里你订阅一个频道,或 Redis 阻塞列表,在那里你等待一个键获得新的元素来获取,但在你消费流的方式有基本的区别:

  1. 一个流可以有多个客户端(消费者)等待数据。 默认情况下,每个新项目将被交付给正在等待给定流中的数据的 每个消费者。 这种行为不同于阻塞列表,在阻塞列表中,每个消费者将获得不同的元素。 然而,向多个消费者展开的能力类似于 Pub/Sub。
  2. 在 Pub/Sub 模式下,消息被触发后就会被忘记,而且无论如何都不会被存储,而在使用阻塞列表时,当客户端接收到消息时就会从列表中弹出(有效地删除),流以一种完全不同的方式工作。 所有消息都无限期地追加到流中(除非用户显式地要求删除条目):不同的消费者将通过记住所接收到的最后一条消息的 ID 从其角度知道什么是新消息。
  3. 流消费组提供了 Pub/Sub 或阻塞列表无法实现的控制级别,通过对同一流的不同组、已处理项目的明确确认、检查待处理项目的能力、声明未处理消息的能力以及每个客户机的一致历史可见性(仅能查看消息的私有过去历史)。

提供监听到达流的新消息能力的命令称为XREAD。 它比XRANGE稍微复杂一点,因此我们将开始展示简单的表单,稍后将提供整个命令布局。

Bash
> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

以上是XREAD的非阻塞形式。 注意,COUNT选项不是必选的,实际上该命令的唯一必选选项是STREAMS选项,它指定键的列表以及调用方已经看到的每个流的相应最大 ID,因此该命令将只向客户端提供 ID 大于我们指定的消息。

在上面的命令中,我们写了STREAMS mystream 0,所以我们希望mystream中的所有消息的 ID 大于0-0。 正如您在上面的示例中所看到的,该命令返回键名,因为实际上可以使用多个键调用此命令,以便同时从不同的流中读取数据。 我可以写,例如:STREAMS mystream otherstream 0 0。 注意,在STREAMS选项之后,我们需要提供键名,然后是 id。 因此,STREAMS选项必须总是最后一个。

除了XREAD可以一次访问多个流,并且我们能够指定我们拥有的最后一个 ID 来获取更新的消息之外,在这种简单的形式中,命令所做的事情与XRANGE并没有太大的不同。 然而,有趣的是,我们可以通过指定BLOCK参数,轻松地将XREAD转换为blocking 命令:

Bash
> XREAD BLOCK 0 STREAMS mystream $

注意,在上面的例子中,除了删除COUNT之外,我还指定了新的BLOCK选项,其超时时间为 0 毫秒(这意味着永不超时)。 此外,我没有为流mystream传递普通 ID,而是传递了特殊 ID$。 这个特殊的 ID 意味着XREAD应该使用mystream流中已经存储的最大 ID 作为最后一个 ID,这样我们将只从开始监听的时间开始接收 消息。 这在某种程度上类似于 Unix 的tail -f命令。

Note

注意,当使用BLOCK选项时,我们不必使用特殊 ID $。我们可以使用任何有效的 ID。 如果命令能够立即处理我们的请求而不阻塞,它就会这样做,否则就会阻塞。 通常,如果我们想从新的条目开始使用流,我们从 ID $开始,然后继续使用接收到的最后一个消息的 ID 进行下一个调用,依此类推。

阻塞形式的XREAD也能够监听多个流,只需要指定多个键名。 如果因为至少有一个流的元素大于我们指定的对应 ID,所以可以同步服务请求,那么它将返回结果。 否则,该命令将阻塞并返回获取新数据的第一个流的项(根据指定的 ID)。

与阻塞列表操作类似,从等待数据的客户机的角度来看,阻塞流读取是公平的,因为语义是 FIFO 风格的。 当有新项可用时,为给定流阻塞的第一个客户端将最先被解除阻塞。

XREAD除了COUNTBLOCK之外没有其他选项,因此它是一个非常基本的命令,具有将消费者附加到一个或多个流的特定目的。 使用消费组 API 可以使用更强大的功能来消费流,但是通过消费组读取是由另一个名为XREADGROUP的命令实现的,这将在本指南的下一节中介绍。

消费组

当手头的任务是使用来自不同客户机的相同流时,那么XREAD已经提供了一种fan-out到 N 个客户机的方法,可能还使用副本以提供更多的读取可伸缩性。 然而,在某些问题中,我们希望做的不是向多个客户端提供相同的消息流,而是向多个客户端提供来自同一消息流的不同的消息子集。 一个很明显的例子是处理速度较慢的消息:让 N 个不同的工作者接收流的不同部分的能力允许我们通过将不同的消息路由到准备做更多工作的不同工作者来扩展消息处理。

在实践中,如果我们想象有三个消费者 C1、C2、C3 和一个包含消息 1、2、3、4、5、6、7 的流,那么我们想要的是按照下面的图表提供消息:

Text Only
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

为了实现这个目标,Redis 使用了一个叫做 消费组 的概念。 从实现的角度来看,Redis 消费组与 Kafka (TM)消费组没有任何关系,理解这一点非常重要。 然而它们在功能上是相似的,所以我决定保留 Kafka (TM)的术语,因为它最初普及了这个想法。

一个消费组就像一个从流中获取数据的 伪消费者,实际上服务于多个消费者,提供一定的保证:

  1. 每个消息都提供给不同的使用者,因此不可能将相同的消息传递给多个使用者。
  2. 在消费组中,消费者是通过名称来标识的,这是一个区分大小写的字符串,客户端实现消费者必须选择该字符串。 这意味着即使在断开连接之后,流消费组仍然保留所有的状态,因为客户机将再次声明自己是同一个消费者。 然而,这也意味着由客户机提供唯一的标识符。
  3. 每个消费组都有第一个 ID 从未使用的概念,因此,当消费者请求新消息时,它可以只提供以前没有传递的消息。
  4. 然而,使用消息需要使用特定命令显式确认。 Redis 将确认解释为:此消息已被正确处理,因此可以将其从消费组中删除。
  5. 一个消费组跟踪当前挂起的所有消息,即交付给消费组中的某个消费者但尚未被确认为已处理的消息。 由于这个特性,在访问流的消息历史记录时,每个消费者 将只看到交付给它的消息

在某种程度上,消费组可以被想象成流的 某种状态:

Text Only
+----------------------------------------+
| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
+----------------------------------------+

如果您从这个角度来看这个问题,那么就很容易理解一个消费组可以做什么,它如何能够只向消费者提供他们的待定消息的历史,以及消费者请求新消息时如何只得到大于last_delivered_id的消息 id。 同时,如果您将消费组视为 Redis 流的辅助数据结构,很明显,单个流可以有多个消费组,这些消费组具有不同的消费者集。 实际上,同一个流甚至可以通过XREAD让客户端在没有消费组的情况下进行阅读,并让客户端在不同消费组中通过XREADGROUP进行阅读。

现在是时候放大看看基本的消费组命令了。它们如下:

  • XGROUP 用于创建、销毁和管理消费组。
  • XREADGROUP 用于通过消费组从流中读取。
  • XACK 允许使用者将挂起的消息标记为已正确处理的命令。

创建消费组

假设我有一个流类型的mystream键已经存在,为了创建一个消费组,我只需要做以下:

Text Only
> XGROUP CREATE mystream mygroup $
OK

正如你在上面的命令中看到的,当创建消费组时,我们必须指定一个 ID,在这个例子中就是$。 这是必需的,因为在其他状态中,消费组必须知道在第一个消费者连接时下一步要服务什么消息,也就是说,在刚创建组时,最后一个消息 ID 是什么。 如果我们像以前那样提供$,那么从现在开始,只有到达流的新消息才会提供给组中的消费者。 如果我们指定0,消费组将使用流历史记录中的所有消息。 当然,您可以指定任何其他有效 ID。 您所知道的是,消费组将开始传递大于您指定的 ID 的消息。 因为$意味着流中当前最大的 ID,因此指定$将只产生使用新消息的效果。

XGROUP CREATE也支持自动创建流,如果它不存在,使用可选的MKSTREAM子命令作为最后一个参数:

Text Only
> XGROUP CREATE newstream mygroup $ MKSTREAM
OK

既然已经创建了消费组,我们可以立即尝试使用XREADGROUP命令通过消费组读取消息。 我们将读取来自消费者的信息,我们将调用 Alice 和 Bob,以查看系统将如何向 Alice 或 Bob 返回不同的消息。

XREADGROUP非常类似于XREAD,并提供相同的BLOCK选项,否则它是一个同步命令。 但是,有一个必须始终指定的强制选项,即GROUP,它有两个参数:消费组的名称和试图读取的消费组的名称。 选项COUNT也被支持,它与XREAD中的选项相同。

在从流中读取之前,让我们在里面放入一些消息:

Text Only
> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Note

这里 message 是字段名,fruit 是相关值,记住流项是小型字典。

是时候试着用消费群来阅读一些东西了:

Text Only
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

XREADGROUP的回复就像XREAD的回复。 但是请注意上面提供的GROUP <group-name> <consumer-name>。 它声明我想使用消费组mygroup从流中读取,我是消费组Alice。 每当使用者对消费组执行操作时,它必须指定它的名称,在组中唯一标识这个使用者。

在上面的命令行中还有一个非常重要的细节,在强制性的STREAMS选项之后,键mystream请求的 ID 是特殊 ID >。 这个特殊 ID 只在消费组的上下文中有效,它的意思是:消息迄今为止从未传递给其他消费者

这几乎总是您想要的,但是也可以指定一个真实的 ID,例如0或任何其他有效的 ID,然而,在这种情况下,发生的情况是我们从XREADGROUP请求只向我们提供挂起消息历史,在这种情况下,将永远不会在组中看到新消息。所以基本上XREADGROUP基于我们指定的 ID 有以下行为:

  • 如果 ID 是特殊 ID >,那么该命令将只返回迄今为止从未交付给其他消费者的新消息,并且作为副作用,将更新消费组的最后一个 ID
  • 如果该 ID 是任何其他有效的数字 ID,则该命令将允许我们访问挂起消息的历史记录。 也就是说,传递到指定消费者(通过提供的名称标识)的消息集,到目前为止,XACK 从未确认过这些消息。

我们可以立即测试此行为,指定 ID 为 0,不带任何COUNT选项:我们只会看到唯一的待处理消息,也就是关于苹果的消息:

Text Only
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   1) 1) 1) 1526569495631-0
         1) 1) "message"
            1) "apple"

然而,如果我们确认消息已被处理,它将不再是待处理消息历史的一部分,因此系统将不再报告任何内容:

Text Only
> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

如果你还不知道XACK是如何工作的,不要担心,它的思想只是处理过的消息不再是我们可以访问的历史记录的一部分。

现在轮到鲍勃读了:

Text Only
> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob 最多请求两条消息,并且正在通过同一个组mygroup进行阅读。 所以 Redis 只报告新消息。 正如您所看到的,apple消息没有被传递,因为它已经被传递给了 Alice,所以 Bob 得到了橘子和草莓,以此类推。

这样,Alice、Bob 和组中的任何其他消费者都能够从同一流中读取不同的消息,读取尚未处理消息的历史记录,或者将消息标记为已处理。 这允许创建不同的拓扑和语义来使用来自流的消息。

有几件事需要记住:

  • 消费者在第一次被提及时就会自动创建,不需要显式创建。
  • 即使使用XREADGROUP,您也可以同时从多个键中读取,但是要实现这一点,您需要在每个流中创建具有相同名称的消费组。这不是常见的需求,但值得一提的是,该特性在技术上是可用的。
  • XREADGROUP是一个写命令,因为即使它从流中读取,作为读取的副作用,消费组也会被修改,因此只能在主实例上调用它。

下面是一个使用 Ruby 语言编写的使用消费组的消费者实现示例。 Ruby 代码的目标是让几乎所有有经验的程序员都能读懂,即使他们不懂 Ruby:

Ruby
require`redis'

if ARGV.length == 0
    puts "Please specify a consumer name"
    exit 1
end

ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new

def process_message(id,msg)
    puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end

$lastid =`0-0'

puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
    # Pick the ID based on the iteration: the first time we want to
    # read our pending messages, in case we crashed and are recovering.
    # Once we consumed our history, we can start getting new messages.
    if check_backlog
        myid = $lastid
    else
        myid =`>'
    end

    items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)

    if items == nil
        puts "Timeout!"
        next
    end

    # If we receive an empty reply, it means we were consuming our history
    # and that the history is now empty. Let's start to consume new messages.
    check_backlog = false if items[0][1].length == 0

    items[0][1].each{|i|
        id,fields = i

        # Process the message
        process_message(id,fields)

        # Acknowledge the message as processed
        r.xack(:my_stream_key,GroupName,id)

        $lastid = id
    }
end

下面是一个使用 Ruby 语言编写的使用消费组的消费者实现示例。 Ruby 代码的目标是让几乎所有有经验的程序员都能读懂,即使他们不懂 Ruby:

一旦使用了历史记录,我们得到了一个空的消息列表,我们可以切换到使用>特殊 ID 来使用新消息。

从永久失败中恢复

上面的示例允许我们编写参与同一消费组的消费者,每个消费者都有一个消息子集要处理,当从失败中恢复时,重新读取仅交付给他们的挂起消息。然而,在现实世界中,消费者可能永远失败,永远无法恢复。消费者的挂起的消息在因任何原因停止后从未恢复,会发生什么?

Redis 消费组提供了在这些情况下使用的特性,以声明给定消费者的待处理消息,以便这些消息将改变所有权并将被重新分配给不同的消费者。这个功能非常明确。使用者必须检查挂起的消息列表,并必须使用特殊命令声明特定的消息,否则服务器将永远保留挂起的消息并将其分配给旧的使用者。通过这种方式,不同的应用程序可以选择是否使用这种功能,以及具体如何使用它。

此过程的第一步只是一个命令,该命令提供消费组中挂起条目的可观测性,称为XPENDING。 这是一个只读命令,调用它总是安全的,不会改变任何消息的所有权。 在最简单的形式中,调用该命令时使用两个参数,一个是流的名称,另一个是消费组的名称。

Text Only
> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

当以这种方式调用时,该命令输出消费组中待定消息的总数(本例中为两个)、待定消息中较低和较高的消息 ID,最后输出消费者的列表和他们拥有的待定消息的数量。 我们只有 Bob 和两个挂起的消息,因为 Alice 请求的单个消息是使用XACK确认的。

我们可以通过给XPENDING更多的参数来请求更多的信息,因为完整的命令签名如下:

Text Only
XPENDING <key> <groupname> [[IDLE <min-idle-time>] <start-id> <end-id> <count> [<consumer-name>]]

通过提供一个开始和结束 ID(可以是-+,就像XRANGE中那样)和一个计数来控制命令返回的信息量,我们能够了解更多关于挂起消息的信息。如果希望将输出限制为仅为给定消费者的待处理消息,则使用可选的最终参数消费者名,但在下面的示例中不使用此特性。

Text Only
> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

现在我们有了每条消息的详细信息:ID、消费者名、idle time(以毫秒为单位),这是自消息最后一次被传递给某个消费者以来经过了多少毫秒,最后是给定消息被传递的次数。 我们有两个来自 Bob 的消息,它们空闲时间为 74170458 毫秒,大约 20 小时。

注意,没有人阻止我们通过使用XRANGE来检查第一个消息内容是什么。

Text Only
> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

我们只需要在参数中重复相同的 ID 两次。既然我们已经有了一些想法,那么 Alice 可能会认为,在 20 小时不处理消息之后,Bob 可能无法及时恢复,现在是时候请求处理这些消息并代替 Bob 恢复处理了。为此,我们使用XCLAIM命令。

这个命令非常复杂,它的完整形式充满了选项,因为它用于复制消费组更改,但是我们将只使用通常需要的参数。在这种情况下,它很简单:

Text Only
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

基本上,我们说,对于这个特定的键和组,我希望指定的消息 id 将改变所有权,并将被分配给指定的消费者名<consumer>。但是,我们还提供了一个最小空闲时间,这样,只有当提到的消息的空闲时间大于指定的空闲时间时,操作才会工作。这很有用,因为可能有两个客户端同时试图重新声明一条消息:

Text Only
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Client 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

然而,作为一个副作用,声明消息将重置其空闲时间并增加其交付计数器的数量,因此第二个客户机将无法声明它。通过这种方式,我们避免了琐碎的消息再处理(即使在一般情况下,您不能精确地获得一次处理)。

命令执行的结果如下:

Text Only
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Alice 成功地认领了消息,她现在可以处理消息并确认它,并向前推进,即使原始消费者没有恢复。

从上面的示例可以清楚地看到,作为成功声明给定消息的副作用,XCLAIM命令也会返回该消息。然而,这不是强制性的。可以使用JUSTID选项仅返回成功认领的消息的 id。如果您希望减少客户端和服务器之间使用的带宽(以及命令的性能),并且您对消息不感兴趣,因为您的使用者的实现方式将不时重新扫描挂起的消息的历史,那么这是很有用的。

认领也可以由单独的流程实现:该流程只检查挂起的消息列表,并将空闲消息分配给看起来活跃的使用者。使用 Redis 流的可观测性特征之一可以获得活动消费者。这是下一节的主题。

自动认领

在 Redis 6.2 中添加的XAUTOCLAIM命令实现了我们上面描述的认领过程。 XPENDINGXCLAIM为不同类型的恢复机制提供了基本的构建块。 该命令通过让 Redis 管理通用流程来优化它,并为大多数恢复需求提供了一个简单的解决方案。

XAUTOCLAIM识别空闲的挂起消息并将其所有权转移给使用者。 命令的签名如下所示:

`XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT COUNT] [JUSTID]'

所以,在上面的例子中,我可以使用自动声明来声明一条消息,就像这样:

Text Only
> XAUTOCLAIM mystream mygroup Alice 3600000 0-0 COUNT 1
1) 1526569498055-0
2) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

XCLAIM类似,该命令用声明的消息数组进行响应,但它也返回一个流 ID,允许迭代挂起的条目。 流 ID 是一个游标,我可以在下一个调用中使用它来继续声明空闲的挂起消息:

Text Only
> XAUTOCLAIM mystream mygroup Lora 3600000 1526569498055-0 COUNT 1
1) 0-0
2) 1) 1526569506935-0
   2) 1) "message"
      2) "strawberry"

XAUTOCLAIM返回0-0流 ID 作为游标时,这意味着它到达了消费组待处理条目列表的末尾。 这并不意味着没有新的空闲挂起消息,因此流程从流的开始调用XAUTOCLAIM继续。

认领和交付计数

您在XPENDING输出中观测到的计数器是每条消息的交付数量。计数器以两种方式递增:当通过XCLAIM成功声明消息时,或当使用XREADGROUP调用以访问挂起消息的历史记录时。

当出现失败时,通常会多次传递消息,但最终它们通常会得到处理和确认。然而,在处理某些特定消息时可能会出现问题,因为它被损坏或以触发处理代码中的错误的方式制作。在这种情况下,发生的情况是消费者将不断地无法处理这条特定的消息。因为我们有传递尝试的计数器,所以我们可以使用该计数器来检测由于某种原因不可处理的消息。因此,一旦交付计数器达到您所选择的一个给定的大数字,可能更明智的做法是将这些消息放在另一个流中,并向系统管理员发送通知。这基本上是 Redis Streams 实现dead letter概念的方式。

流可观测性

缺乏可观测性的消息传递系统非常难以使用。 不知道谁在使用消息、什么消息挂起、给定流中活动的消费组,这使得一切都变得不透明。 因此,Redis Streams 和消费组有不同的观测方式。 我们已经讨论了XPENDING,它允许我们检查在给定时刻正在处理的消息列表,以及它们的空闲时间和传递数量。

然而,我们可能想做更多的事情,XINFO命令是一个可观测性接口,可以与子命令一起使用,以获取关于流或消费组的信息。

这个命令使用子命令来显示关于流及其消费组状态的不同信息。例如,XINFO STREAM 报告流本身的信息。

Text Only
> XINFO STREAM mystream
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1638125141232-0"
 9) "max-deleted-entryid"
10) "0-0"
11) "entries-added"
12) (integer) 2
13) "groups"
14) (integer) 1
15) "first-entry"
16) 1) "1638125133432-0"
    2) 1) "message"
       2) "apple"
17) "last-entry"
18) 1) "1638125141232-0"
    2) 1) "message"
       2) "banana"

输出显示了关于流内部如何编码的信息,还显示了流中的第一条和最后一条消息。 另一个可用的信息是与此流相关联的消费组的数量。 我们可以进一步调查,询问更多关于消费组的信息。

Text Only
> XINFO GROUPS mystream
1)  1) "name"
    2) "mygroup"
    3) "consumers"
    4) (integer) 2
    5) "pending"
    6) (integer) 2
    7) "last-delivered-id"
    8) "1638126030001-0"
    9) "entries-read"
   10) (integer) 2
   11) "lag"
   12) (integer) 0
2)  1) "name"
    2) "some-other-group"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1638126028070-0"
    9) "entries-read"
   10) (integer) 1
   11) "lag"
   12) (integer) 1

正如您在此输出和前面的输出中所看到的,XINFO命令输出一系列字段值项。 因为它是一个可观测性命令,这允许人类用户立即了解报告了什么信息,并允许该命令在未来通过添加更多字段来报告更多的信息,而不破坏与旧客户机的兼容性。 其他一些带宽效率更高的命令,比如XPENDING,只报告没有字段名的信息。

上面的示例(其中使用了GROUPS子命令)的输出应该很清楚,可以观测字段名。 通过检查在组中注册的消费者,我们可以更详细地检查特定消费组的状态。

Text Only
> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

如果你不记得命令的语法,就向命令本身寻求帮助:

Text Only
> XINFO HELP
1) XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) CONSUMERS <key> <groupname>
3)     Show consumers of <groupname>.
4) GROUPS <key>
5)     Show the stream consumer groups.
6) STREAM <key> [FULL [COUNT <count>]
7)     Show information about the stream.
8) HELP
9)     Prints this help.

与 Kafka (TM)分区的区别

Redis 流中的消费组可能在某些方面类似于基于 Kafka (TM)分区的消费组,但是请注意,Redis 流在实际中是非常不同的。 分区只有logical,消息只是放在一个单独的 Redis 键中,因此不同客户机的服务方式取决于谁准备处理新消息,而不是客户机从哪个分区读取。 例如,如果消费者 C3 在某个时刻永久失效,Redis 将继续为到达的所有新消息提供 C1 和 C2 服务,就像现在只有两个logical分区一样。

类似地,如果一个给定的消费者处理消息的速度比其他消费者快得多,那么这个消费者在相同的时间单位内将按比例收到更多的消息。 这是可能的,因为 Redis 显式地跟踪所有未确认的消息,并记住谁收到了哪条消息,以及第一条消息的 ID 从未交付给任何消费者。

然而,这也意味着在 Redis 中,如果你真的想将同一流中的消息划分到多个 Redis 实例中,你必须使用多个键和一些分片系统,如 Redis Cluster 或其他特定于应用程序的分片系统。 单个 Redis 流不会自动分区到多个实例。

我们可以简单地说以下是正确的:

  • 如果您使用 1 个流 -> 1 个消费者,则您将按顺序处理消息。
  • 如果使用 N 个流和 N 个消费者,那么只有一个给定的消费者触及 N 个流的子集,您可以扩展上面的 1 个流模型 -> 1 消费者。
  • 如果您使用 1 个流 -> N 个消费者,那么您将负载均衡到 N 个消费者,然而,在这种情况下,关于相同逻辑项的消息可能会被打乱顺序使用,因为给定的消费者处理消息 3 的速度可能比另一个消费者处理消息 4 的速度快。

因此,基本上 Kafka 分区更类似于使用 N 个不同的 Redis 键,而 Redis 消费组是一个服务器端负载平衡系统,将消息从给定流发送到 N 个不同的消费者。

限制流

许多应用程序不希望永远将数据收集到流中。 有时,在流中拥有最多给定数量的条目是有用的,其他时候,一旦达到了给定的大小,将数据从 Redis 移动到一个不在内存中、速度不那么快但适合存储历史的存储空间是有用的,可能是未来几十年。 Redis 流对此有一些支持。 一个是XADD命令的MAXLEN选项。这个选项使用起来非常简单:

Text Only
> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

使用MAXLEN,当达到指定的长度时,旧的条目将被自动删除,因此流将保持恒定的大小。 目前还没有选项告诉流只保留不超过给定周期的项,因为这样的命令,为了一致地运行,可能会阻塞很长时间以驱逐项目。 想象一下,如果有一个插入高峰,然后是一个很长的暂停,然后是另一个插入,所有这些都有相同的最长时间,会发生什么。 流将阻塞以驱逐在暂停期间变得太旧的数据。 因此,用户需要做一些计划,并了解所需的最大流长度。 此外,尽管流的长度与所使用的内存成正比,但按时间进行修剪就不那么容易控制和预测了:它取决于插入率,而插入率通常会随时间变化(当它不变化时,那么仅按大小进行修剪是微不足道的)。

然而,使用MAXLEN进行修剪可能会非常昂贵:流通过宏节点表示为基树,以非常高效的内存。 改变由几十个元素组成的单个宏节点不是最优的。 因此,可以以以下特殊形式使用该命令:

Text Only
XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

MAXLEN选项和实际计数之间的~参数意味着,我并不真的需要它恰好是 1000 项。它可以是 1000 或 1010 或 1030,只要确保至少保存 1000 个项目。 使用此参数,只有在可以删除整个节点时才执行修剪。 这使它更有效,这通常是您想要的。

还有XTRIM命令,它执行的操作与上面的MAXLEN选项非常相似,只不过它可以自己运行:

Text Only
> XTRIM mystream MAXLEN 10

或者,对于XADD选项:

Text Only
> XTRIM mystream MAXLEN ~ 10

然而,XTRIM被设计为接受不同的修剪策略。 另一种删除策略是MINID,它删除 id 低于指定值的项。

由于XTRIM是一个显式命令,用户应该知道不同的裁剪策略可能存在的缺点。

另一个有用的删除策略可能会在未来添加到XTRIM,是通过一系列 id 删除,以方便使用XRANGEXTRIM将数据从 Redis 移动到其他存储系统,如果需要的话。

流 API 中的特殊 id

您可能已经注意到,在 Redis API 中可以使用几个特殊的 id。 这里有一个简短的回顾,以便将来能更有意义。

前两个特殊 id 是-+,在使用XRANGE命令进行范围查询时使用。 这两个 ID 分别表示可能的最小 ID(基本上是0-1)和可能的最大 ID(即18446744073709551615-18446744073709551615)。 如你所见,写-+比写那些数字要简洁得多。

还有一些 api,我们想说,流中 ID 最大的项的 ID。 这就是$的含义。 因此,例如,如果我只想要有XREADGROUP的新条目,我使用这个 ID 来表示我已经有了所有现有的条目,而不是未来将插入的新条目。 类似地,当我创建或设置消费组的 ID 时,我可以将最后交付的项目设置为$,以便只向组中的消费者交付新条目。

正如你所看到的,$并不意味着+,它们是两个不同的东西,因为+是每一个可能的流中可能的最大 ID,而$是包含给定条目的给定流中的最大 ID。 此外,api 通常只理解+$,但避免加载具有多种含义的给定符号是有用的。

另一个特殊 ID 是>,这是一个仅与消费组相关的特殊含义,且仅当使用XREADGROUP命令时。 这个特殊 ID 意味着我们只想要到目前为止从未交付给其他消费者的条目。 所以基本上>ID 是消费组的 最后一个发送的 ID

最后,特殊 ID *只能与XADD命令一起使用,它意味着为新条目自动选择一个 ID。

所以我们有-+$>*,它们都有不同的含义,大多数情况下,可以在不同的上下文中使用。

持久性、复制和消息安全性

像任何其他 Redis 数据结构一样,流被异步复制到副本并持久保存到 AOF 和 RDB 文件中。 然而,可能不那么明显的是,消费组的完整状态也被传播到 AOF、RDB 和副本,因此,如果一条消息在主服务器中挂起,副本也将具有相同的信息。类似地,在重新启动之后,AOF 将恢复消费组的状态。

但是请注意,Redis 流和消费组是使用 Redis 默认复制持久化和复制的,所以:

  • 如果消息的持久性在你的应用中很重要,AOF 必须与强 fsync 策略一起使用。
  • 默认情况下,异步复制不会保证复制XADD命令或消费组状态更改:在故障转移之后,可能会丢失一些东西,这取决于副本从主服务器接收数据的能力。
  • 可以使用WAIT命令强制将更改传播到一组副本。 然而,请注意,虽然这使得数据不太可能丢失,但由 Sentinel 或 Redis Cluster 操作的 Redis 故障转移过程只执行 最大努力检查,以故障转移到更新最快的副本,在某些特定的故障条件下,可能会促使缺乏一些数据的副本。

因此,当使用 Redis 流和消费组设计应用程序时,确保理解应用程序在故障期间应该具有的语义属性,并相应地进行配置,评估它是否足够安全。

从流中删除单个项

流还有一个用于从流中间删除项的特殊命令,仅通过 ID。 通常,对于仅附加数据结构,这可能看起来像一个奇怪的特性,但它实际上对涉及(例如隐私规则)的应用程序很有用。 该命令被称为XDEL,接收流的名称和要删除的 id:

Text Only
> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

然而,在当前实现中,直到宏节点完全为空时才真正回收内存,因此不应滥用此特性。

零长度流

流和其他 Redis 数据结构的不同之处在于,当其他数据结构不再有任何元素时,作为调用删除元素命令的副作用,键本身将被删除。 例如,当调用ZREM将删除排序集中的最后一个元素时,排序集将被完全删除。另一方面,流被允许保持零元素,这是使用计数为零的MAXLEN选项(XADDXTRIM命令)的结果,或者是因为调用了XDEL

存在这种不对称的原因是因为流可能有关联的消费组,而我们不希望仅仅因为流中不再有任何项目而失去消费组定义的状态。 目前,即使流没有关联的消费组,也不会被删除。

使用消息的总延迟

XRANGEXREADXREADGROUP这样没有 BLOCK 选项的非阻塞流命令像其他任何 Redis 命令一样是同步服务的,所以讨论此类命令的延迟是没有意义的:在 Redis 文档中检查命令的时间复杂度更有趣。 可以这样说,在提取范围时,流命令至少和排序的集合命令一样快,而XADD非常快,如果使用流,它可以很容易地在一台普通机器中每秒插入 50 万到 100 万项。

但是,如果我们想了解在阻塞消费组中的消费者的情况下,从通过XADD产生消息的时刻到由于XREADGROUP与消息一起返回而由消费者获取消息的时刻,延迟就成为一个有趣的参数。

如何服务被屏蔽的消费者

在提供所执行测试的结果之前,了解 Redis 使用什么模型来路由流消息是很有趣的(实际上,在一般情况下,任何等待数据的阻塞操作是如何管理的)。

  • 被阻塞的客户端在一个哈希表中被引用,该哈希表将至少有一个阻塞消费者的键映射到一个正在等待该键的消费者列表。 通过这种方式,给定一个接收数据的键,我们就可以解析正在等待此类数据的所有客户机。
  • 当写入发生时,在本例中,当XADD命令被调用时,它调用signalKeyAsReady()函数。 这个函数将把键放入需要处理的键列表中,因为这些键可能有被阻塞的使用者的新数据。 注意,这样的 就绪 键稍后将被处理,因此在同一个事件循环周期中,键可能会接收到其他写操作。
  • 最后,在返回到事件循环之前,最后处理 就绪 键。 对于每个键,将扫描等待数据的客户端列表,如果适用,这样的客户端将接收到达的新数据。 在流的情况下,数据是消费者请求的适用范围内的消息。

正如你所看到的,基本上,在返回到事件循环之前,调用XADD的客户端和被阻塞消费消息的客户端都将在输出缓冲区中有它们的回复,所以XADD的调用者应该在消费者将收到新消息的同时从 Redis 收到回复。

该模型是 基于推 的,因为向消费者缓冲区添加数据将由调用 XADD 的操作直接执行,因此延迟往往是相当可预测的。

延迟测试结果

为了检查这些延迟特征,我们使用多个 Ruby 程序实例来执行一个测试,这些 Ruby 程序推送的消息有一个附加字段,即计算机毫秒时间,Ruby 程序从消费组读取消息并处理它们。 消息处理步骤包括比较当前计算机时间和消息时间戳,以便了解总延迟。

结果:

Text Only
Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%

所以 99.9%的请求延迟<= 2 毫秒,异常值仍然非常接近平均值。

向流中添加几百万条未确认的消息并不会改变基准测试的要点,大多数查询仍然以非常短的延迟处理。

几点备注:

  • 这里我们每次迭代最多处理 10k 消息,这意味着XREADGROUPCOUNT参数被设置为 10000。 这增加了大量的延迟,但这是为了允许速度较慢的使用者能够跟上消息流而需要的。 所以现实世界的延迟会小得多。
  • 与今天的标准相比,用于此基准测试的系统非常慢。
回到页面顶部