
IT策士 10余年一线大厂经验专注 IT 思维、架构、职场进阶。我会在各个平台持续发布最新文章助你少走弯路。上一篇我们学会了用redis-py连接 Redis、操作五大基础数据类型还掌握了连接池和 Pipeline 两大性能神器。但生产环境远比“读写键值”复杂多个命令需要原子执行怎么办怎样高效存储 Python 对象高并发异步项目如何接入代码中到处散落的 Redis 调用又该如何优雅封装本篇作为 Python 操作 Redis 的进阶篇将逐一攻克这些难题事务与 Lua 脚本的 Python 调用、对象序列化方案、异步 Redisredis.asyncio以及工具类封装。掌握它们你的 Redis 代码就能从“能用”跃升至“专业”。1. 事务让多个命令原子执行Redis 的事务通过MULTI/EXEC实现能将一组命令打包在执行期间不会被其他客户端的命令插队。redis-py中使用Pipeline(transactionTrue)即可开启事务模式。1.1 基础事务——转账模拟账户 A 给账户 B 转账 50 元需要扣减 A、增加 B。这两个操作必须原子化。importredis rredis.Redis(hostlocalhost,port6379,decode_responsesTrue)# 初始化余额r.set(account:A,100)r.set(account:B,200)def transfer(from_account, to_account, amount): piper.pipeline(transactionTrue)pipe.decrby(from_account, amount)pipe.incrby(to_account, amount)resultspipe.execute()returnresults print(转账前: A , r.get(account:A),B , r.get(account:B))transfer(account:A,account:B,50)print(转账后: A , r.get(account:A),B , r.get(account:B))输出转账前: A100B200转账后: A50B250pipe.execute()返回一个列表[50, 250]分别对应两个命令的执行结果。中间不会出现 A 扣了钱 B 没加的中间态。1.2 乐观锁——WATCH 实现 CAS事务不提供回滚但可以通过WATCH监听键若事务执行前键被修改则取消事务实现乐观锁。def safe_transfer(from_acc, to_acc, amount):whileTrue: r.watch(from_acc)balanceint(r.get(from_acc))ifbalanceamount: r.unwatch()raise ValueError(余额不足)piper.pipeline(transactionTrue)pipe.decrby(from_acc, amount)pipe.incrby(to_acc, amount)try: pipe.execute()breakexcept redis.WatchError: print(冲突重试...)continuer.unwatch()# 测试r.set(account:A,100)r.set(account:B,200)safe_transfer(account:A,account:B,30)print(A , r.get(account:A),B , r.get(account:B))若并发修改from_accexecute()会抛出WatchError循环重试即可。这适用于竞争不激烈的场景。 事务在EXEC执行时才真正运行MULTI期间的命令只是排队。如果原子性要求极高且逻辑复杂推荐直接使用 Lua 脚本。2. Lua 脚本原子性的终极武器Lua 脚本在 Redis 服务端执行天然原子且能减少网络往返。复杂的判断、循环、条件更新一个脚本搞定。2.1 库存扣减——防超卖需求扣减商品库存库存不足时返回失败。# Lua 脚本原子扣减库存lua_inventorylocalkeyKEYS[1]localdeltatonumber(ARGV[1])localcurrentredis.call(GET, key)ifcurrentfalsethenreturn-1-- 键不存在 end currenttonumber(current)ifcurrentdeltathenreturn0-- 库存不足 end redis.call(DECRBY, key, delta)return1-- 成功# 注册脚本缓存 SHAinventory_scriptr.register_script(lua_inventory)# 初始化库存r.set(product:1001:stock,10)# 模拟扣减resultinventory_script(keys[product:1001:stock],args[3])print(扣减结果:, result)# 1 成功print(剩余库存:, r.get(product:1001:stock))# 7# 尝试超量扣减resultinventory_script(keys[product:1001:stock],args[10])print(超量扣减:, result)# 0 库存不足# 不存在的商品resultinventory_script(keys[product:9999:stock],args[1])print(不存在:, result)# -1输出扣减结果:1剩余库存:7超量扣减:0不存在:-1register_script()会自动缓存脚本的 SHA 值后续调用使用EVALSHA避免重复传输脚本内容高效又简洁。2.2 Lua 脚本的传参细节KEYS键名列表通过keys关键字传入。ARGV参数列表通过args关键字传入。返回值Lua 中的return会直接返回给 Python支持数字、字符串、表转为列表。scriptlocalnameredis.call(GET, KEYS[1])localcountredis.call(INCR, KEYS[2])return{name, count} r.set(username,Alice)resultr.register_script(script)(keys[username,counter],args[])print(result)# [Alice, 1]⚠️ Lua 脚本执行期间会阻塞其他命令务必轻量快速避免死循环或长时间计算。3. 序列化方案如何优雅存储 Python 对象Redis 值只能存字符串或字节遇到 Python 对象就需要序列化。常见三大方案3.1 JSON 方案推荐优先使用importjson def cache_user_json(r, user_id, user_dict): r.set(fuser:{user_id}, json.dumps(user_dict,ensure_asciiFalse),ex300)def get_user_json(r, user_id): datar.get(fuser:{user_id})returnjson.loads(data)ifdataelseNone user{id:1001,name:Alice,tags:[redis,python]}cache_user_json(r,1001, user)cachedget_user_json(r,1001)print(cached)# {id: 1001, name: Alice, tags: [redis, python]}优点跨语言、可读、安全。缺点速度一般datetime等类型需手动处理。3.2 MessagePack 方案高性能场景安装pip install msgpackimportmsgpack def cache_user_msgpack(r, user_id, user_dict):# use_bin_typeTrue 保证 bytes 类型正确packedmsgpack.packb(user_dict,use_bin_typeTrue)r.set(fuser:{user_id}, packed,ex300)def get_user_msgpack(r, user_id): datar.get(fuser:{user_id})returnmsgpack.unpackb(data,rawFalse)ifdataelseNone cache_user_msgpack(r,1002, user)cachedget_user_msgpack(r,1002)print(cached)# {id: 1001, name: Alice, tags: [redis, python]}性能对比1000次序列化/反序列化粗略测试importtime, pickle data{id:1,name:Alice,tags:[redis,python]*10}# JSONstarttime.perf_counter()for_inrange(10000): packedjson.dumps(data)unpackedjson.loads(packed)print(fJSON: {time.perf_counter() - start:.3f}s)# Picklestarttime.perf_counter()for_inrange(10000): packedpickle.dumps(data)unpackedpickle.loads(packed)print(fPickle: {time.perf_counter() - start:.3f}s)# MessagePackstarttime.perf_counter()for_inrange(10000): packedmsgpack.packb(data)unpackedmsgpack.unpackb(packed)print(fMsgPack: {time.perf_counter() - start:.3f}s)输出示例JSON:0.045s Pickle:0.021s MsgPack:0.018sMessagePack 速度和体积都有优势适合高吞吐缓存。避免 PicklePickle 可执行任意代码攻击者若控制 Redis 写入恶意 Pickle 数据反序列化时可执行系统命令。且不同 Python 版本兼容性差。生产环境绝对禁用3.3 最佳实践默认使用JSON配合ensure_asciiFalse节省空间处理好datetime等特殊类型。对性能要求极高且数据量大使用MessagePack。序列化/反序列化统一封装方便后续切换方案。避免存储超大数据对象10MB会阻塞 Redis消耗带宽。4. 异步 Redisredis.asyncio 高并发利器Python 异步框架FastAPI、Sanic、asyncio日益流行使用同步redis-py会阻塞事件循环。redis-py4.2 内置了redis.asyncio模块API 与同步版高度一致。4.1 基本用法importasyncioimportredis.asyncio as aioredis async def main():# 创建异步客户端rawait aioredis.from_url(redis://localhost:6379/0,decode_responsesTrue)# 基本操作await r.set(async_key,Hello Async)valueawait r.get(async_key)print(value)# Hello Async# Pipelineasync with r.pipeline()as pipe: pipe.set(a,1)pipe.set(b,2)resultsawait pipe.execute()print(results)# [True, True]# 关闭连接await r.close()asyncio.run(main())注意所有命令都要awaitPipeline 使用async with上下文。4.2 并发请求对比模拟 100 个并发 GET 请求importtimeimportredisimportredis.asyncio as aioredis# 同步版本def sync_bench(): rredis.Redis(decode_responsesTrue)r.set(test,value)starttime.perf_counter()for_inrange(100): r.get(test)print(f同步耗时: {time.perf_counter() - start:.4f}s)# 异步版本async def async_bench(): rawait aioredis.from_url(redis://localhost,decode_responsesTrue)await r.set(test,value)starttime.perf_counter()tasks[r.get(test)for_inrange(100)]await asyncio.gather(*tasks)print(f异步并发耗时: {time.perf_counter() - start:.4f}s)await r.close()sync_bench()asyncio.run(async_bench())输出示例本地同步耗时:0.0382s 异步并发耗时:0.0091s并发场景下异步优势明显且不会阻塞主线程的其他任务。4.3 连接池管理异步连接池同样重要async def create_pool(): poolaioredis.ConnectionPool.from_url(redis://localhost,max_connections20,decode_responsesTrue)clientaioredis.Redis(connection_poolpool)returnclient# 使用async def worker(): rawait create_pool()# ... 操作 ...await r.close()from_url()方法实际上会创建默认连接池简单场景无需手动管理。但在多协程共享 client 时手动创建连接池并传入多个Redis实例会更清晰。5. 工具类封装生产级 RedisHelper项目中到处散落 Redis 调用不易维护。我们封装一个RedisClient集成连接池、序列化、缓存穿透保护、异常处理等。importjsonimportredis from redis.exceptionsimportRedisError from typingimportOptional, Any class RedisClient:生产级 Redis 工具类 def __init__(self,hostlocalhost,port6379,db0,passwordNone,max_connections20,decode_responsesFalse): self.poolredis.ConnectionPool(hosthost,portport,dbdb,passwordpassword,max_connectionsmax_connections,decode_responsesdecode_responses,socket_timeout5,socket_connect_timeout5,health_check_interval30)self.clientredis.Redis(connection_poolself.pool)def set_json(self, key: str, value: dict, ex: intNone):存储 JSON 对象 self.client.set(key, json.dumps(value,ensure_asciiFalse),exex)def get_json(self, key: str)-Optional[dict]:获取 JSON 对象不存在或异常返回 None try: dataself.client.get(key)returnjson.loads(data)ifdataelseNone except(RedisError, json.JSONDecodeError)as e: print(f[Redis] get_json error: {e})returnNone def get_or_set(self, key: str, func, ex: int60):缓存穿透保护存在直接返回不存在则调用 func 获取并缓存 cachedself.get_json(key)ifcached is not None:returncached# 查询数据源datafunc()ifdata is not None: self.set_json(key, data, ex)returndata def delete(self, key: str): self.client.delete(key)def pipeline(self):returnself.client.pipeline()def close(self): self.pool.disconnect()# ---------- 使用示例 ----------cacheRedisClient(decode_responsesFalse)# 存储用户信息user{id:1001,name:IT策士,score:100}cache.set_json(user:1001, user,ex120)# 获取print(cache.get_json(user:1001))# 利用 get_or_set 实现缓存读取模板def query_db():# 模拟从数据库查询print(查询数据库...)return{id:2001,name:Bob,score:85}print(cache.get_or_set(user:2001, query_db,ex60))# 第一次查库print(cache.get_or_set(user:2001, query_db,ex60))# 命中缓存输出{id:1001,name:IT策士,score:100}查询数据库...{id:2001,name:Bob,score:85}{id:2001,name:Bob,score:85}后续可根据需要扩展lua方法、incr_json_field等形成团队标准库。6. 动手试试Lua 原子操作用 Lua 实现“抢红包”逻辑随机生成红包金额确保总金额不超并将抢到的金额写入用户的 Hash。序列化性能分别用 JSON、Pickle仅供学习别上生产、MessagePack 存储一个包含 10 万条简单数据的列表记录写入耗时和MEMORY USAGE大小。异步多任务用redis.asyncio同时执行 50 个INCR操作用asyncio.gather收集结果验证原子性最后计数器值为 50。封装实战基于RedisClient扩展一个hash_get_or_set方法利用 Hash 做对象缓存支持部分字段更新。预期效果Lua 红包总额一分不差MsgPack 体积最小异步并发速度快工具类让后续代码量减少 50%。7. 总结本篇我们从工程化角度将 Python 操作 Redis 的能力提升了一个台阶事务用Pipeline(transactionTrue)实现原子批处理WATCH做乐观锁。Lua 脚本register_script让复杂逻辑在服务端原子执行性能与安全兼得。序列化JSON 通用首选MessagePack 高性能场景利器坚决避开 Pickle。异步redis.asyncio无缝接入异步生态并发优势显著。工具类封装统一连接管理、序列化和缓存模板提升代码复用与可维护性。至此Python 操作 Redis 的核心技能你已经掌握。下一章我们将跳出数据操作进入 Redis 的发布订阅与消息队列初探看看 Redis 如何扮演消息中间件的角色。想了解更多还可以去各个平台搜索「IT策士」一起升级 IT 思维