MonkeyCode实现分布式锁:从Redis到ZooKeeper的完整实战

发布时间:2026/6/30 5:07:06

MonkeyCode实现分布式锁:从Redis到ZooKeeper的完整实战 为什么需要分布式锁场景1库存超卖# ❌ 没有锁两个请求同时扣库存结果变负数 async def deduct_inventory(sku: str, qty: int): stock await db.fetch_val(SELECT stock FROM inventory WHERE sku ?, sku) if stock qty: await db.execute(UPDATE inventory SET stock stock - ? WHERE sku ?, qty, sku) return True return False # 并发请求同时读到stock10都通过判断最终stock-10超卖场景2定时任务重复执行两台服务器同时跑Cron → 客户收到2封日报邮件Redis分布式锁最常用方案最简单的锁SET NX EXimport redis.asyncio as redis import uuid class RedisDistributedLock: def __init__(self, redis_urlredis://localhost:6379): self.redis_url redis_url self._r None async def _get_conn(self): if not self._r: self._r await redis.from_url(self.redis_url) return self._r async def acquire(self, lock_key: str, timeout: int 30, retry_times: int 3, retry_delay: float 0.1) - str | None: 获取锁返回唯一标识用于安全释放失败返回None r await self._get_conn() identifier str(uuid.uuid4()) for _ in range(retry_times): # SET key value NX EX timeout —— 原子操作 result await r.set(lock_key, identifier, nxTrue, extimeout) if result: return identifier await asyncio.sleep(retry_delay) return None async def release(self, lock_key: str, identifier: str) - bool: 安全释放锁用Lua脚本保证「检查删除」原子性 r await self._get_conn() # Lua脚本只有value匹配才删除防止误解别人的锁 lua_script if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end result await r.eval(lua_script, 1, lock_key, identifier) return result 1 async def extend(self, lock_key: str, identifier: str, additional_time: int 30) - bool: 续期锁长时间任务需要 r await self._get_conn() lua_script if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(expire, KEYS[1], ARGV[2]) else return 0 end result await r.eval(lua_script, 1, lock_key, identifier, str(additional_time)) return result 1实战用锁防止库存超卖lock RedisDistributedLock() async def safe_deduct_inventory(sku: str, qty: int) - bool: lock_key finventory_lock:{sku} identifier await lock.acquire(lock_key, timeout10) if not identifier: raise Exception(f获取库存锁失败: {sku}) try: stock await db.fetch_val(SELECT stock FROM inventory WHERE sku ?, sku) if stock qty: return False await db.execute(UPDATE inventory SET stock stock - ? WHERE sku ?, qty, sku) return True finally: await lock.release(lock_key, identifier)实战防止定时任务重复执行import asyncio from datetime import datetime async def run_daily_report(): 每天0点执行多实例只运行一次 today datetime.now().strftime(%Y-%m-%d) lock_key fcron:daily_report:{today} identifier await lock.acquire(lock_key, timeout3600) # 1小时超时 if not identifier: print(今日报告已在其他实例执行) return try: await generate_daily_report() await send_report_email() finally: await lock.release(lock_key, identifier)长时间任务自动续期async def long_running_task_with_lock(lock_key: str): identifier await lock.acquire(lock_key, timeout30) if not identifier: raise Exception(获取锁失败) async def renew_loop(): 后台协程每10秒续期一次 while True: await asyncio.sleep(10) extended await lock.extend(lock_key, identifier, 30) if not extended: print(续期失败锁可能已被其他人获取) break renew_task asyncio.create_task(renew_loop()) try: await do_long_work() # 可能运行几分钟 finally: renew_task.cancel() await lock.release(lock_key, identifier)Redlock算法Redis集群场景单节点Redis有单点故障风险。Redlock用多节点投票保证可靠性import time class RedLock: def __init__(self, redis_nodes: list[str], quorum: int None): redis_nodes: 多个Redis实例地址 quorum: 多数派数量默认 (len(nodes) // 2) 1 self.nodes redis_nodes self.quorum quorum or (len(redis_nodes) // 2 1) async def acquire(self, lock_key: str, timeout: int 30) - str | None: identifier str(uuid.uuid4()) start_time time.time() acquired 0 for node_url in self.nodes: try: r await redis.from_url(node_url) result await r.set(lock_key, identifier, nxTrue, pxtimeout * 1000) if result: acquired 1 except Exception: continue # 检查是否获得多数派 elapsed_ms (time.time() - start_time) * 1000 if acquired self.quorum and elapsed_ms timeout * 1000: return identifier # 获取失败释放所有已获得的锁 for node_url in self.nodes: try: r await redis.from_url(node_url) lua if redis.call(get,KEYS[1])ARGV[1] then return redis.call(del,KEYS[1]) else return 0 end await r.eval(lua, 1, lock_key, identifier) except Exception: continue return NoneZooKeeper分布式锁最严谨方案ZooKeeper通过临时顺序节点实现公平锁from kazoo.client import KazooClient class ZooKeeperDistributedLock: def __init__(self, hostslocalhost:2181, lock_path/locks): self.zk KazooClient(hostshosts) self.lock_path lock_path self.lock None def connect(self): self.zk.start() def acquire(self, lock_name: str, timeout: float None) - bool: 获取锁阻塞等待 lock_path f{self.lock_path}/{lock_name} self.lock self.zk.Lock(lock_path) return self.lock.acquire(timeouttimeout) def release(self): 释放锁 if self.lock: self.lock.release() self.lock None def close(self): self.zk.stop() # 使用 zk_lock ZooKeeperDistributedLock() zk_lock.connect() if zk_lock.acquire(inventory_sku001): try: deduct_inventory(SKU001, 5) finally: zk_lock.release()三种方案对比

相关新闻