案例引入
某银行A与某互联网公司B达成了企业级的合做。互联网公司A与银行B有着一大部分重合的用户,A有着客户上网行为等特征信息。B有着客户的存贷状况等特征信息以及客户的标签信息——客户的还贷状况(Y)。B但愿可以将他所独有的特征信息与A所独有的特征信息相结合,训练出一个更强大的识别客户信用风险的模型,但因为不一样行业之间的行政手续,用户数据隐私安全等因素,企业A,B没法直接互通数据,联邦学习应运而生。python
联邦学习概述
联邦学习的定义
联邦学习旨在创建一个基于分布数据集的联邦学习模型。在模型训练的过程当中,模型相关的信息可以在各方之间交换(或者是以加密形式交换),但原始数据不能。这一交换不会暴露每一个站点上数据的任何受保护的隐私部分。已训练好的联邦学习模型能够置于联邦学习系统的各参与方,也能够在多方之间共享。 设有N位参与方协做使用各自的训练数据集
来训练机器学习模型。传统的方法是将全部的数据
收集起来而且存储在同一个地方,例如存储在某一台云端数据服务器上,从而在该服务器上使用集中后的数据集训练获得一个机器学习模型
。在传统方法的训练过程当中,任何一位参与方会将本身的数据暴露给服务器甚至其余参与方。联邦学习是一种不须要收集各参与方全部的数据便能协做训练一个模型
的机器学习过程。 设
和
分别为集中型模型
和联邦型模型
的性能度量。在使用安全的联邦学习在分布式数据源上构建机器学习模型时,咱们容许在保护用户隐私的状况下,联邦学习模型的性能略低于集中型模型的性能。
其中
即为容许的性能损失。算法
联邦学习的分类
根据联邦学习所使用数据在各参与方的不一样分布状况,咱们能够将联邦学习划分为三类:横向联邦学习(Horizontal Federated Learning, HFL)、纵向联邦学习(Vertical Federated Learning, VFL)和联邦迁移学习(Federated Transfer Learning, FTL)。下面是这三种类型联邦学习所针对的不一样数据分布状况:安全
- 横向联邦学习:不一样参与方的数据有较大的特征的重叠(横向),但数据样本(纵向),即特征所属的样本的重叠度不高。例如,联邦学习的参与方是两家服务于不一样区域市场的银行,他们所服务的客户群体差异较大,但客户的特征可能会由于类似的商业模式而重叠度较高。
- 纵向联邦学习:不一样参与方的数据样本有较大的重叠,但样本特征的重叠度不高。例如,两家公司(银行和电子商务公司)向客户提供不一样的服务,拥有客户不一样方面的数据,但他们所服务的客户群体有较大的重叠。
- 联邦迁移学习:不一样参与方的数据在特征和样本维度重叠度都不是很是高。
纵向联邦学习算法
纵向联邦学习算法有利于各企业之间创建合做,使用各自的特有数据,共同创建更增强大的模型。本篇将着重介绍一种基于加法同态加密的纵向联邦学习算法。服务器
应用情景
细化开头的案例,企业B 有特征X3 和Y(标签),可独立建模,企业A 有特征X一、X2,缺少Y,没法独立建模,如今企业A,B 合做,创建联合模型,显然效果会超过企业B单边数据建模。 但两方之间如何合做来共同训练一个模型呢?以逻辑回归为例,一个经典的逻辑回归的损失函数和梯度公式以下所示:
app
能够看到,梯度的计算离不开特征数据(x)和标签数据(y)。所以,一种最直接的数据交互方向就是其中一方将本身独有的数据直接以明文的方式发送给对方,由对方计算出梯度后再返回。但这样的交互方式会产生信息的泄露,其中一方会得到所有的信息,这显然是不符合规范的。 既然明文的传输不行,一种解决思路就是将须要的数据以密文的形式发送,但这又会产生另外一个问题,其中一方得到另外一方的密文数据后没法解密,又如何进行计算呢?这时就须要引入同态加密算法。
dom
同态加密算法简介
因为篇幅所限,这里将只介绍同态加密算法的做用,而不介绍其具体细节。 同态加密(Homomorphic Encryption)是一种特殊的加密方法,容许对密文进行处理获得仍然是加密的结果,即对密文直接进行处理,跟对明文进行处理后再对处理结果加密,获得的结果相同。从抽象代数的角度讲,保持了同态性。 假设存在两个数x、y,OP(x,y)表示x与y之间的一种操做运算(加、减、乘、除、指数……)。E(x)表示对x的加密操做,D(x)表示对x的解密操做,则当某种加密算法对某个操做OP知足同态性时,表达式以下: 或
根据算法所能支持的操做运算的范围和次数的大小,能够将同态加密算法分为部分同态加密算法(PHE)、些许同态加密算法(SHE)和全同态加密算法(FHE),其支持的运算范围与次数依次扩大。本文以后的纵向联邦学习算法将基于Paillier算法实现,它是一种部分同态加密算法,支持加法以及与常数的乘法运算。下面我将基于Python的phe库演示Paillier算法的做用。机器学习
#phe库须要安装 from phe import paillier #生成公钥与私钥 public_key, private_key = paillier.generate_paillier_keypair() #须要加密的数据 secret_number_list = [3.141592653, 300, -4.6e-12] #公钥加密 encrypted_number_list = [public_key.encrypt(x) for x in secret_number_list] #私钥解密 [private_key.decrypt(x) for x in encrypted_number_list]
支持加减法以及与常数的乘除法分布式
a, b, c = encrypted_number_list a_plus_5 = a + 5 #= a + 5 print("a + 5 =",private_key.decrypt(a_plus_5)) a_plus_b = a + b #= a + b print("a + b =",private_key.decrypt(a_plus_b)) a_times_3_5 = a * 3.5 #= a * 3.5 print("a * 3.5 =",private_key.decrypt(a_times_3_5)) a_minus_1 = a - 1 #= a + (-1) print("a - 1=",private_key.decrypt(a_minus_1)) a_div_minus_3_1 = a / -3.1 #= a * (-1/3.1) print("a / -3.1 =",private_key.decrypt(a_div_minus_3_1)) a_minus_b = a - b #= a + (b*-1) print("a - b =",private_key.decrypt(a_minus_b))
若一些函数内部的逻辑是加法或者是与常数的乘法,一样支持。函数
import numpy as np enc_mean = np.mean(encrypted_number_list) enc_dot = np.dot(encrypted_number_list, [2, -400.1, 5318008]) print("enc_mean:", private_key.decrypt(enc_mean)) print("enc_dot:", private_key.decrypt(enc_dot))
算法流程
逻辑回归的损失和梯度的公式中包含着指数运算,所以,若是要用Paillier算法进行加密,须要对原公式进行必定的改造,使其仅用加法和乘法来表示。将指数运算改造为加法与乘法运算的一个经常使用方法就是用泰勒展开来进行近似。 最终获得的转化后的梯度矩阵的上半部分就是参与方A更新其参数须要的梯度(其中包含了正则项),下半部分对应B。咱们的目标是但愿参与方A、B可以尽可能地进行单独的计算,再经过加密信息的交互得到各自的梯度计算结果,所以咱们须要对计算的任务进行必定的划分,能够采用如下的一种设计流程。 在每一轮参数更新中,各参与方须要按序进行以下的计算和交互:性能
- 参与方A和B各自初始化本身的参数,参与方C生成秘钥对并分发公钥给A和B。
- 参与方A计算
,使用公钥加密后发送给B。参与方B计算
,使用公钥加密后发送给A。
- 此时A和B能各自计算
以及
([[x]]表示x的同态加密形式)。
- A和B须要加密的梯度发送给C来进行解密,但为了不C直接得到梯度信息,A和B能够将梯度加上一个随机数
与
再发送给C。C得到加密梯度进行后进行解密再返还A和B。
- A和B只须要再减去之间加的随机数就能得到真实的梯度,更新其参数。
代码实现
下面咱们将基于Python代码来实现这整个算法流程。为了更清晰地展示算法的流程,将极度简化交互流程的实现。
导入所需模块
import math import numpy as np from phe import paillier import pandas as pd from sklearn import datasets from sklearn.datasets import load_diabetes from sklearn.preprocessing import StandardScaler from sklearn.datasets import load_breast_cancer from sklearn.model_selection import train_test_split from sklearn.utils import shuffle
各参与方的定义
设置参与方的父类,各参与方都须要保存模型的参数、一些中间计算结果以及与其余参与方的链接情况。
class Client: def __init__(self, config): ## 模型参数 self.config = config ## 中间计算结果 self.data = {} ## 与其余节点的链接情况 self.other_client = {} ## 与其余参与方创建链接 def connect(self, client_name, target_client): self.other_client[client_name] = target_client ## 向特定参与方发送数据 def send_data(self, data, target_client): target_client.data.update(data)
参与方A在训练过程当中仅提供特征数据。
class ClientA(Client): def __init__(self, X, config): super().__init__(config) self.X = X self.weights = np.zeros(X.shape[1]) def compute_z_a(self): z_a = np.dot(self.X, self.weights) return z_a ## 加密梯度的计算,对应step4 def compute_encrypted_dJ_a(self, encrypted_u): encrypted_dJ_a = self.X.T.dot(encrypted_u) + self.config['lambda'] * self.weights return encrypted_dJ_a ##参数的更新 def update_weight(self, dJ_a): self.weights = self.weights - self.config["lr"] * dJ_a / len(self.X) return ## A: step2 def task_1(self, client_B_name): dt = self.data assert "public_key" in dt.keys(), "Error: 'public_key' from C in step 1 not successfully received." public_key = dt['public_key'] z_a = self.compute_z_a() u_a = 0.25 * z_a z_a_square = z_a ** 2 encrypted_u_a = np.asarray([public_key.encrypt(x) for x in u_a]) encrypted_z_a_square = np.asarray([public_key.encrypt(x) for x in z_a_square]) dt.update({"encrypted_u_a": encrypted_u_a}) data_to_B = {"encrypted_u_a": encrypted_u_a, "encrypted_z_a_square": encrypted_z_a_square} self.send_data(data_to_B, self.other_client[client_B_name]) ## A: step三、4 def task_2(self, client_C_name): dt = self.data assert "encrypted_u_b" in dt.keys(), "Error: 'encrypted_u_b' from B in step 1 not successfully received." encrypted_u_b = dt['encrypted_u_b'] encrypted_u = encrypted_u_b + dt['encrypted_u_a'] encrypted_dJ_a = self.compute_encrypted_dJ_a(encrypted_u) mask = np.random.rand(len(encrypted_dJ_a)) encrypted_masked_dJ_a = encrypted_dJ_a + mask dt.update({"mask": mask}) data_to_C = {'encrypted_masked_dJ_a': encrypted_masked_dJ_a} self.send_data(data_to_C, self.other_client[client_C_name]) ## A: step6 def task_3(self): dt = self.data assert "masked_dJ_a" in dt.keys(), "Error: 'masked_dJ_a' from C in step 2 not successfully received." masked_dJ_a = dt['masked_dJ_a'] dJ_a = masked_dJ_a - dt['mask'] self.update_weight(dJ_a) print(f"A weight: {self.weights}") return
参与方B在训练过程当中既提供特征数据,又提供标签数据。
class ClientB(Client): def __init__(self, X, y, config): super().__init__(config) self.X = X self.y = y self.weights = np.zeros(X.shape[1]) self.data = {} def compute_u_b(self): z_b = np.dot(self.X, self.weights) u_b = 0.25 * z_b - self.y + 0.5 return z_b, u_b def compute_encrypted_dJ_b(self, encrypted_u): encrypted_dJ_b = self.X.T.dot(encrypted_u) + self.config['lambda'] * self.weights return encrypted_dJ_b def update_weight(self, dJ_b): self.weights = self.weights - self.config["lr"] * dJ_b / len(self.X) ## B: step2 def task_1(self, client_A_name): try: dt = self.data assert "public_key" in dt.keys(), "Error: 'public_key' from C in step 1 not successfully received." public_key = dt['public_key'] except Exception as e: print("B step 1 exception: %s" % e) try: z_b, u_b = self.compute_u_b() encrypted_u_b = np.asarray([public_key.encrypt(x) for x in u_b]) dt.update({"encrypted_u_b": encrypted_u_b}) dt.update({"z_b": z_b}) except Exception as e: print("Wrong 1 in B: %s" % e) data_to_A= {"encrypted_u_b": encrypted_u_b} self.send_data(data_to_A, self.other_client[client_A_name]) ## B: step三、4 def task_2(self,client_C_name): try: dt = self.data assert "encrypted_u_a" in dt.keys(), "Error: 'encrypt_u_a' from A in step 1 not successfully received." encrypted_u_a = dt['encrypted_u_a'] encrypted_u = encrypted_u_a + dt['encrypted_u_b'] encrypted_dJ_b = self.compute_encrypted_dJ_b(encrypted_u) mask = np.random.rand(len(encrypted_dJ_b)) encrypted_masked_dJ_b = encrypted_dJ_b + mask dt.update({"mask": mask}) except Exception as e: print("B step 2 exception: %s" % e) try: assert "encrypted_z_a_square" in dt.keys(), "Error: 'encrypted_z_a_square' from A in step 1 not successfully received." encrypted_z = 4*encrypted_u_a + dt['z_b'] encrypted_loss = np.sum((0.5-self.y)*encrypted_z + 0.125*dt["encrypted_z_a_square"] + 0.125*dt["z_b"] * (encrypted_z+4*encrypted_u_a)) except Exception as e: print("B step 2 exception: %s" % e) data_to_C = {"encrypted_masked_dJ_b": encrypted_masked_dJ_b, "encrypted_loss": encrypted_loss} self.send_data(data_to_C, self.other_client[client_C_name]) ## B: step6 def task_3(self): try: dt = self.data assert "masked_dJ_b" in dt.keys(), "Error: 'masked_dJ_b' from C in step 2 not successfully received." masked_dJ_b = dt['masked_dJ_b'] dJ_b = masked_dJ_b - dt['mask'] self.update_weight(dJ_b) except Exception as e: print("A step 3 exception: %s" % e) print(f"B weight: {self.weights}") return
参与方C在整个训练过程当中主要的做用就是分发秘钥,以及最后的对A和B加密梯度的解密。
class ClientC(Client): """ Client C as trusted dealer. """ def __init__(self, A_d_shape, B_d_shape, config): super().__init__(config) self.A_data_shape = A_d_shape self.B_data_shape = B_d_shape self.public_key = None self.private_key = None ## 保存训练中的损失值(泰展开近似) self.loss = [] ## C: step1 def task_1(self, client_A_name, client_B_name): try: public_key, private_key = paillier.generate_paillier_keypair() self.public_key = public_key self.private_key = private_key except Exception as e: print("C step 1 error 1: %s" % e) data_to_AB = {"public_key": public_key} self.send_data(data_to_AB, self.other_client[client_A_name]) self.send_data(data_to_AB, self.other_client[client_B_name]) return ## C: step5 def task_2(self, client_A_name, client_B_name): try: dt = self.data assert "encrypted_masked_dJ_a" in dt.keys() and "encrypted_masked_dJ_b" in dt.keys(), "Error: 'masked_dJ_a' from A or 'masked_dJ_b' from B in step 2 not successfully received." encrypted_masked_dJ_a = dt['encrypted_masked_dJ_a'] encrypted_masked_dJ_b = dt['encrypted_masked_dJ_b'] masked_dJ_a = np.asarray([self.private_key.decrypt(x) for x in encrypted_masked_dJ_a]) masked_dJ_b = np.asarray([self.private_key.decrypt(x) for x in encrypted_masked_dJ_b]) except Exception as e: print("C step 2 exception: %s" % e) try: assert "encrypted_loss" in dt.keys(), "Error: 'encrypted_loss' from B in step 2 not successfully received." encrypted_loss = dt['encrypted_loss'] loss = self.private_key.decrypt(encrypted_loss) / self.A_data_shape[0] + math.log(2) print("******loss: ", loss, "******") self.loss.append(loss) except Exception as e: print("C step 2 exception: %s" % e) data_to_A = {"masked_dJ_a": masked_dJ_a} data_to_B = {"masked_dJ_b": masked_dJ_b} self.send_data(data_to_A, self.other_client[client_A_name]) self.send_data(data_to_B, self.other_client[client_B_name]) return
模拟数据的生成
这里将基于sklearn中的乳腺癌数据集生成一组模拟数据,参与方A得到部分特征数据,参与方B得到部分特征数据与标签数据。
def load_data(): # 加载数据 breast = load_breast_cancer() # 数据拆分 X_train, X_test, y_train, y_test = train_test_split(breast.data, breast.target, random_state=1) # 数据标准化 std = StandardScaler() X_train = std.fit_transform(X_train) X_test = std.transform(X_test) return X_train, y_train, X_test, y_test ## 将特征分配给A和B def vertically_partition_data(X, X_test, A_idx, B_idx): """ Vertically partition feature for party A and B :param X: train feature :param X_test: test feature :param A_idx: feature index of party A :param B_idx: feature index of party B :return: train data for A, B; test data for A, B """ XA = X[:, A_idx] XB = X[:, B_idx] XB = np.c_[np.ones(X.shape[0]), XB] XA_test = X_test[:, A_idx] XB_test = X_test[:, B_idx] XB_test = np.c_[np.ones(XB_test.shape[0]), XB_test] return XA, XB, XA_test, XB_test
训练流程的实现
def vertical_logistic_regression(X, y, X_test, y_test, config): """ Start the processes of the three clients: A, B and C. :param X: features of the training dataset :param y: labels of the training dataset :param X_test: features of the test dataset :param y_test: labels of the test dataset :param config: the config dict :return: True """ ## 获取数据 XA, XB, XA_test, XB_test = vertically_partition_data(X, X_test, config['A_idx'], config['B_idx']) print('XA:',XA.shape, ' XB:',XB.shape) ## 各参与方的初始化 client_A = ClientA(XA, config) print("Client_A successfully initialized.") client_B = ClientB(XB, y, config) print("Client_B successfully initialized.") client_C = ClientC(XA.shape, XB.shape, config) print("Client_C successfully initialized.") ## 各参与方之间链接的创建 client_A.connect("B", client_B) client_A.connect("C", client_C) client_B.connect("A", client_A) client_B.connect("C", client_C) client_C.connect("A", client_A) client_C.connect("B", client_B) ## 训练 for i in range(config['n_iter']): client_C.task_1("A", "B") client_A.task_1("B") client_B.task_1("A") client_A.task_2("C") client_B.task_2("C") client_C.task_2("A", "B") client_A.task_3() client_B.task_3() print("All process done.") return True config = { 'n_iter': 100, 'lambda': 10, 'lr': 0.05, 'A_idx': [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], 'B_idx': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], } X, y, X_test, y_test = load_data() vertical_logistic_regression(X, y, X_test, y_test, config)
训练效果
为测试该纵向联邦学习算法的训练效果。能够设置普通的集中式训练的逻辑回归算法做为对照组,基于乳腺癌数据集,使用相同的训练集数据及相同的逻辑回归模型来进行训练,观察其损失值的降低曲线以及在相同测试集上的预测准确率。 如下是两种状况下,训练的损失值的降低状况: 各曲线表明的情形: Logistic: 普通逻辑回归的损失值变化曲线,使用的是正常的损失函数 Taylor_Logistic: 普通逻辑回归的损失值变化曲线,使用的是泰勒展开拟合的损失函数 Taylor_Taylor:纵向逻辑回归的损失值变化曲线,使用的是泰勒展开拟合的损失函数
如下是在sklearn中不一样数据集上,普通逻辑回归与纵向逻辑回归的训练结果的正确率及AUC的差别,其中rows表明样本数量,feat表明特征数量,logistic表明集中式逻辑回归的训练结果,Vertical表明纵向联邦学习算法的训练效果。 由训练结果的比较能够看到,与普通的逻辑回归相比,该纵向逻辑回归算法在保证各方数据隐私性的同时,在实验数据集上可以达到不错的训练效果。
参考文献
[1] Yang Q , Liu Y , Chen T , et al. Federated Machine Learning: Concept and Applications[J]. ACM Transactions on Intelligent Systems and Technology, 2019, 10(2):1-19. [2] Hardy S , Henecka W , Ivey-Law H , et al. Private federated learning on vertically partitioned data via entity resolution and additively homomorphic encryption[J]. 2017. [3] https://zhuanlan.zhihu.com/p/94105330