redis的数据类型和使用

发布时间:2026/6/6 10:38:04

redis的数据类型和使用 一、redis的常用数据类型对比数据类型像什么适合场景不适合String一个值缓存、计数器、状态值、分布式锁多字段结构、大量消息队列Hash一个对象/字典用户信息、设备状态、配置对象有顺序的消息流List简单队列简单先进先出任务队列多消费者组、消息确认、失败恢复Set不重复集合去重、标签、集合交并差排序、时间顺序消息Sorted Set带分数排序集合排行榜、延迟队列、按时间排序复杂可靠消息消费Pub/Sub广播通知在线实时通知、即时广播消息可靠性、离线补消费Stream消息日志/可靠队列事件流、任务同步、消费者组、可回放消息超大规模 Kafka 级场景二、redisStream的消费者组代码示例import redis import time r redis.Redis( hostlocalhost, port6379, db0, decode_responsesTrue ) stream_key sync_stream group_name sync_group consumer_name consumer_1 # 创建消费者组 # 如果已经存在会报错所以这里捕获异常 try: r.xgroup_create( namestream_key, groupnamegroup_name, id0, mkstreamTrue ) print(消费者组创建成功) except redis.exceptions.ResponseError as e: if BUSYGROUP in str(e): print(消费者组已存在) else: raise while True: # 使用消费者组读取消息 # 表示只读取还没有投递给任何消费者的新消息 result r.xreadgroup( groupnamegroup_name, consumernameconsumer_name, streams{stream_key: }, count1, block5000 ) if not result: print(暂无新消息) continue for stream, messages in result: for message_id, data in messages: print(收到消息:, message_id, data) try: # 这里模拟你的业务处理 # 比如 # 1. 根据 edge_bucket / edge_object_key 下载文件 # 2. 上传到中心站 # 3. 中心站返回 success print(开始处理任务:, data[sync_id]) time.sleep(1) print(处理成功:, data[sync_id]) # 处理成功后 ACK r.xack(stream_key, group_name, message_id) print(ACK 成功:, message_id) except Exception as e: # 如果处理失败不 ACK # 这条消息会留在 pending 里 print(处理失败不 ACK:, e)

相关新闻