Redis Streams 是 Redis 5.0 引入的一种新的数据类型,它提供了一种强大的日志结构化数据存储方式。Streams 类型非常适合用于构建消息队列、事件日志以及其他需要持久化和高效处理时间序列数据的应用场景。
向指定的流中添加一条新消息,ID 可以是 *(表示自动生成)或指定的时间戳和序列号。
1 2 |
127.0.0.1:6379> xadd mystream * sensor_id 123 temmperature 22.5 "1729306027171-0" |
返回的结构可以分为两部分:
COUNT 指定返回的最大条目数。BLOCK 指定在没有新消息时阻塞的时间(毫秒)。STREAMS 指定要读取的 Stream 和起始 ID。
1 2 3 4 5 6 7 |
127.0.0.1:6379> xread count 2 streams mystream 0-0 1) 1) "mystream" 2) 1) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5" |
1 2 3 4 5 6 |
127.0.0.1:6379> xrange mystream - + 1) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5" |
返回指定 ID 范围内的条目,但按逆序排列。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
127.0.0.1:6379> xadd mystream * sensor_id 234 temmperature 23.5 "1729329067777-0" 127.0.0.1:6379> xadd mystream * sensor_id 345 "1729329079135-0" 127.0.0.1:6379> xrevrange mystream + - count 2 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 2) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5" |
1 2 |
127.0.0.1:6379> xgroup create mystream mygroup 0 OK |
1 2 3 4 5 6 7 8 9 10 11 12 |
127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream > 1) 1) "mystream" 2) 1) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5" 2) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5" |
确认已处理的消息。XACK 命令用于确认消费者组中的消息已经被成功处理。当你使用 XACK 命令时,Redis 会将指定的消息从“待处理”状态转换为“已确认”状态,并从消费者的待处理列表中移除。
1 2 3 4 |
127.0.0.1:6379> xack mystream mygroup 1729329067777-0 (integer) 1 127.0.0.1:6379> xack mystream mygroup 1729329079135-0 (integer) 0 |
当 XACK 命令成功确认一条消息时,返回值为 1,表示该消息已经被确认并且从待处理列表中移除。例如,如果消息 1729329067777-0 是由 consumer1 处理的,并且现在调用 XACK 确认它,那么这条消息将不再出现在 consumer1 的待处理列表中。
查看待处理的消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
127.0.0.1:6379> xpending mystream mygroup 1) (integer) 1 2) "1729306027171-0" 3) "1729306027171-0" 4) 1) 1) "consumer1" 2) "1" 127.0.0.1:6379> xack mystream mygroup 1729306027171-0 (integer) 1 127.0.0.1:6379> xpending mystream mygroup 1) (integer) 0 2) (nil) 3) (nil) 4) (nil) |
用于将一个或多个消息从一个消费者转移到另一个消费者。这个命令通常用于处理消息超时或重新分配消息的情况。XCLAIM 允许你手动将消息从一个消费者的待处理列表移动到另一个消费者的待处理列表。
1 2 3 4 5 6 7 8 9 |
127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream > 1) 1) "mystream" 2) 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 127.0.0.1:6379> xclaim mystream mygroup consumer2 10000 1729329079135-0 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" |
获取 Stream 或消费者组的信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
127.0.0.1:6379> xinfo stream mystream 1) "length" 2) (integer) 3 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "groups" 8) (integer) 1 9) "last-generated-id" 10) "1729329079135-0" 11) "first-entry" 12) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5" 13) "last-entry" 14) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 127.0.0.1:6379> xinfo groups mystream 1) 1) "name" 2) "mygroup" 3) "consumers" 4) (integer) 2 5) "pending" 6) (integer) 1 7) "last-delivered-id" 8) "1729329079135-0" 127.0.0.1:6379> xinfo consumers mystream mygroup 1) 1) "name" 2) "consumer1" 3) "pending" 4) (integer) 0 5) "idle" 6) (integer) 255317 2) 1) "name" 2) "consumer2" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 191940 |
XINFO STREAM mystream
length:
流中的消息总数:3 条。
radix-tree-keys:
用于存储流数据的 radix tree 中的键的数量:1 个。
radix-tree-nodes:
用于存储流数据的 radix tree 中的节点数量:2 个。
groups:
与该流关联的消费者组数量:1 个。
last-generated-id:
流中最后生成的消息 ID:1729329079135-0。
first-entry:
流中的第一条消息: 消息 ID: 1729306027171-0消息内容: sensor_id: 123temmperature: 22.5
last-entry:
流中的最后一条消息: 消息 ID: 1729329079135-0消息内容: sensor_id: 345
XINFO GROUPS mystream
name:
消费者组的名称:mygroup。
consumers:
该组中的消费者数量:2 个。
pending:
该组中待处理的消息数量:1 条。
last-delivered-id:
该组中最后一个被交付的消息 ID:1729329079135-0。
XINFO CONSUMERS mystream mygroup
第一个消费者:
第一个消费者:
第二个消费者:
从 Stream 中删除一个或多个条目。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
127.0.0.1:6379> xdel mystream 1729306027171-0 (integer) 1 127.0.0.1:6379> xrange mystrea - + (empty list or set) 127.0.0.1:6379> xrange mystream - + 1) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5" 2) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" |
修剪 Stream,保留最多 len 个条目,~ 表示近似长度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
127.0.0.1:6379> xrange mystream - + 1) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5" 2) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 127.0.0.1:6379> xtrim mystream maxlen 1 (integer) 1 127.0.0.1:6379> xrange mystream - + 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" |
更多命令请参考:Commands | Docs