
用Python实战模拟隐私计算中的Beaver Triple从零构建安全乘法协议隐私计算正在重塑数据协作的边界而算术秘密分享作为其基础构建块在保护原始数据的同时实现了分布式计算。许多开发者虽然理解加法运算的简洁性却在乘法运算的复杂协议前望而却步。本文将用Python带您完整实现Beaver Triple的生成与应用揭开安全多方计算中乘法运算的神秘面纱。1. 环境搭建与基础框架我们先构建一个模拟两方计算的基础环境。这个环境不需要真实网络通信但会完整模拟协议中的关键步骤。class Party: def __init__(self, id): self.id id # 0或1表示参与方 self.share {} # 存储各类值的秘密分享 self.communication [] # 模拟通信记录 def send(self, to_party, data): self.communication.append({ from: self.id, to: to_party.id, data: data }) to_party.receive(self.id, data) def receive(self, from_id, data): self.communication.append({ from: from_id, to: self.id, data: data }) # 初始化两个参与方 P0 Party(0) P1 Party(1)算术秘密分享的基本操作需要实现三个核心功能分享(Share)将原始值拆分为两个随机数之和重构(Reconstruct)合并两个分享恢复原始值本地运算支持对分享值的本地操作def share_secret(x, bit_length32): 将数值x拆分为两个分享值 max_val 2**bit_length r random.randint(0, max_val-1) share0 (x - r) % max_val share1 r return share0, share1 def reconstruct(share0, share1, bit_length32): 从两个分享值重构原始值 return (share0 share1) % (2**bit_length) # 示例用法 original 123 s0, s1 share_secret(original) assert reconstruct(s0, s1) original # 验证重构正确性2. Beaver Triple的原理与结构Beaver Triple是隐私计算中用于乘法运算的预处理材料由三个秘密分享的值(a, b, c)组成满足c a * b。这三个值在计算前就被生成并分发给各方。典型乘法运算流程各方计算本地差值e x - af y - b公开e和f因为不泄露原始值各方计算最终分享P0: z0 fa0 eb0 c0P1: z1 ef fa1 e*b1 c1class BeaverTriple: def __init__(self, a0, a1, b0, b1, c0, c1): self.a (a0, a1) self.b (b0, b1) self.c (c0, c1) classmethod def generate_random(cls, bit_length32): 生成随机Beaver Triple a random.randint(0, 2**bit_length-1) b random.randint(0, 2**bit_length-1) c a * b % (2**bit_length) a0, a1 share_secret(a, bit_length) b0, b1 share_secret(b, bit_length) c0, c1 share_secret(c, bit_length) return cls(a0, a1, b0, b1, c0, c1)乘法运算的实现def secure_multiply(P0, P1, x_share, y_share, triple, bit_length32): 安全乘法协议实现 # 本地计算差值 e0 (x_share[0] - triple.a[0]) % (2**bit_length) e1 (x_share[1] - triple.a[1]) % (2**bit_length) f0 (y_share[0] - triple.b[0]) % (2**bit_length) f1 (y_share[1] - triple.b[1]) % (2**bit_length) # 交换并重构e和f P0.send(P1, e0) P1.send(P0, e1) e (e0 e1) % (2**bit_length) P0.send(P1, f0) P1.send(P0, f1) f (f0 f1) % (2**bit_length) # 计算最终分享 z0 (f * triple.a[0] e * triple.b[0] triple.c[0]) % (2**bit_length) z1 (e * f f * triple.a[1] e * triple.b[1] triple.c[1]) % (2**bit_length) return z0, z13. 两种Beaver Triple生成方案3.1 基于同态加密的生成方案我们模拟Pailler同态加密的核心特性来实现Beaver Triple生成class HomomorphicEncryption: 模拟Pailler同态加密 def __init__(self, p101, q103): self.n p * q self.n_sq self.n * self.n self.g self.n 1 # 简化计算的特殊g值 def encrypt(self, m): 加密明文m r random.randint(1, self.n-1) return (pow(self.g, m, self.n_sq) * pow(r, self.n, self.n_sq)) % self.n_sq def decrypt(self, c): 解密密文c pass # 实际实现需要私钥这里仅模拟 def add_homomorphic(self, c1, c2): 同态加法 return (c1 * c2) % self.n_sq def mul_homomorphic(self, c, k): 同态标量乘法 return pow(c, k, self.n_sq) he HomomorphicEncryption() def generate_triple_he(P0, P1, bit_length32): 基于HE生成Beaver Triple # 各方生成自己的分享 a0, a1 share_secret(random.getrandbits(bit_length), bit_length) b0, b1 share_secret(random.getrandbits(bit_length), bit_length) # P0加密并发送自己的分享 enc_a0 he.encrypt(a0) enc_b0 he.encrypt(b0) P0.send(P1, (enc_a0, enc_b0)) # P1计算交叉项 r random.getrandbits(bit_length) d he.mul_homomorphic(enc_a0, b1) d he.add_homomorphic(d, he.mul_homomorphic(enc_b0, a1)) d he.add_homomorphic(d, he.encrypt(r)) P1.send(P0, d) # P0解密并计算c0 c0 (a0 * b0 he.decrypt(d)) % (2**bit_length) # P1计算c1 c1 (a1 * b1 - r) % (2**bit_length) return BeaverTriple(a0, a1, b0, b1, c0, c1)3.2 基于不经意传输的生成方案不经意传输(OT)是另一种生成Beaver Triple的有效方法def bit_decomposition(x, bit_length): 数值的二进制分解 return [(x i) 1 for i in range(bit_length)] def ot_sender(P_sender, P_receiver, choices, bit_length32): 模拟1-out-of-2 OT发送方 results [] for i in range(bit_length): b choices[i] # 在实际OT中发送方不知道b的值 s0 random.getrandbits(bit_length) s1 (P_sender.a_share * (1 i) - s0) % (2**bit_length) results.append((s0, s1)) return results def generate_triple_ot(P0, P1, bit_length32): 基于OT生成Beaver Triple # 生成随机a和b的分享 a random.getrandbits(bit_length) b random.getrandbits(bit_length) a0, a1 share_secret(a, bit_length) b0, b1 share_secret(b, bit_length) # 交叉项计算a0*b1 b1_bits bit_decomposition(b1, bit_length) ot_results ot_sender(P0, P1, b1_bits, bit_length) # 计算u0和u1 u0 sum(r[0] for r in ot_results) % (2**bit_length) u1 sum(r[b1_bits[i]] for i, r in enumerate(ot_results)) % (2**bit_length) # 同理计算另一个交叉项a1*b0 a1_bits bit_decomposition(a1, bit_length) ot_results ot_sender(P1, P0, a1_bits, bit_length) v0 sum(r[0] for r in ot_results) % (2**bit_length) v1 sum(r[a1_bits[i]] for i, r in enumerate(ot_results)) % (2**bit_length) # 计算c的分享 c0 (a0 * b0 u0 v0) % (2**bit_length) c1 (a1 * b1 u1 v1) % (2**bit_length) return BeaverTriple(a0, a1, b0, b1, c0, c1)4. 性能对比与优化策略不同生成方法在通信轮次、计算复杂度和适用场景上各有特点指标同态加密方案不经意传输方案通信轮次2轮O(l)轮(l为位宽)计算复杂度高(模幂运算)中等(位操作)预处理数据量较大较小适合场景高延迟网络低延迟网络实际测试对比import time def benchmark(): sizes [8, 16, 32, 64] results [] for size in sizes: # HE方案测试 start time.time() for _ in range(100): generate_triple_he(P0, P1, size) he_time time.time() - start # OT方案测试 start time.time() for _ in range(100): generate_triple_ot(P0, P1, size) ot_time time.time() - start results.append((size, he_time, ot_time)) return results优化建议批量生成一次性生成大量Beaver Triple可分摊通信成本参数选择根据网络条件选择合适方案位宽优化使用最小必要位宽减少计算量并行处理独立的三元组可并行生成def batch_generate(method, count, bit_length32): 批量生成Beaver Triple triples [] for _ in range(count): if method he: triples.append(generate_triple_he(P0, P1, bit_length)) else: triples.append(generate_triple_ot(P0, P1, bit_length)) return triples5. 实战案例隐私保护的线性回归让我们用实现的Beaver Triple构建一个简单的隐私保护线性回归模型class PrivateLinearRegression: def __init__(self, input_dim): self.weights [random.random() for _ in range(input_dim)] self.bias random.random() def fit(self, X_shared, y_shared, triples, learning_rate0.01, epochs10): 安全训练过程 for _ in range(epochs): for x_share, y_share in zip(X_shared, y_shared): # 计算预测值 pred_share self._secure_dot_product(x_share, triples) # 计算误差 error0 (pred_share[0] - y_share[0]) % (2**32) error1 (pred_share[1] - y_share[1]) % (2**32) error reconstruct(error0, error1) # 更新权重(简化版) for i in range(len(self.weights)): grad error * reconstruct(*x_share[i]) self.weights[i] - learning_rate * grad self.bias - learning_rate * error def _secure_dot_product(self, x_share, triples): 安全计算点积 result0, result1 0, 0 for i, (x, triple) in enumerate(zip(x_share, triples)): w_share share_secret(self.weights[i]) z0, z1 secure_multiply(P0, P1, x, w_share, triple) result0 (result0 z0) % (2**32) result1 (result1 z1) % (2**32) # 加上偏置项 b_share share_secret(self.bias) result0 (result0 b_share[0]) % (2**32) result1 (result1 b_share[1]) % (2**32) return result0, result1使用示例# 准备数据 X [[1, 2], [2, 3], [3, 4]] y [3, 5, 7] # 秘密分享数据 X_shared [[share_secret(val) for val in row] for row in X] y_shared [share_secret(val) for val in y] # 预生成足够的Beaver Triple triples batch_generate(he, countlen(X[0])*len(X)) # 训练模型 model PrivateLinearRegression(input_dim2) model.fit(X_shared, y_shared, triples)在实现过程中我发现Beaver Triple的生成效率直接影响整体协议性能。通过预计算和批量生成可以将计算密集型操作转移到非实时阶段显著提升在线计算效率。同时合理选择位宽参数能在精度和性能间取得良好平衡。