1.前言
算法工程师不只要搭建模型,还要对模型进行优化及相关线上部署。这里面涉及到不少方面:特征处理(独热编码、归一化)、自定义损失函数、自定义评价函数、超参调节、构建pipeline流程线上部署(100ms返回要求)等。html
2.跑模型前准备
2.1 独热编码
对于LR模型,进行独热编码(类别型)和归一化(数值型)是颇有必要的。好比某个类别型特征的枚举值是0到9(类别型再输入到模型前都会作label enconding,好比有三个枚举值:A、B、C,经过label enconding都变成0、一、2),某个数值型的分布范围是0-1000,那么输入到LR中,模型会认为这两个特征是同一类,就会给数值型特征一个相对小的权重(好比0.001),而给数值型特征一个相对大的权重,但这显然不是咱们想要的。所以须要把类别型和数值型特征都转换成0-1之间的数字。java
- 对类别型特征进行独热编码前,须要对该字段进行label enconding。
import pandas as pd df_raw=pd.DataFrame(['A','B','C','A'],columns=['col_raw']) #对类别型进行转换 from sklearn import preprocessing lbl = preprocessing.LabelEncoder() col='col_raw' df_raw['col_lab'] = lbl.fit_transform(df_raw[col].astype(str)) #保存label转换映射关系 import pickle save_lbl_path='./onehot_model/'+col+'.pkl' output = open(save_lbl_path, 'wb') pickle.dump(lbl, output) output.close() #读取和转换 pkl_path='./onehot_model/'+col+'.pkl' pkl_file = open(pkl_path, 'rb') le_departure = pickle.load(pkl_file) df_raw['col_t_raw'] = le_departure.inverse_transform(df_raw['col_lab'])
df_raw
- 对label enconding后的列进行独热编码
df_cate_tmp=pd.get_dummies(df_raw['col_lab'],prefix='col_lab') df_raw=pd.concat([df_raw, df_cate_tmp],axis=1) df_raw
2.2 归一化和标准化
归一化是把数值型特征映射到0-1值域区间,而标准化是把数值分布变成均值为0方差为1标准正态分布。python
- 归一化
nor_model=[] col_num='num_first' #赋值数值型 df_raw[col_num]=[0.2,4,22,8] tmp_min=df_raw[col_num].min() tmp_max=df_raw[col_num].max() nor_model.append((col_num,tmp_min,tmp_max)) #最大-最小归一化法 df_raw[col+'_nor']=df_raw[col_num].apply(lambda x:(x-tmp_min)/(tmp_max-tmp_min)) #保存对应的列名及最大值、最小值 with open("./nor_model/col_min_max.txt","w") as f: for i in nor_model: result=i[0]+','+str(i[1])+','+str(i[2])+'\n' f.write(str(result))
2.3 自定义损失函数
在不一样的业务场景中,python包提供的损失函数知足不了咱们的项目要求,这个时候就须要对自定义损失函数。(参考:https://cloud.tencent.com/developer/article/1357671)git
- 自定义一个MSE,使得它对正残差的惩罚是负残差的10倍
正如定义的那样,非对称MSE很好,由于它很容易计算梯度和hessian,以下图所示。注意,hessian在两个不一样的值上是常量,左边是2,右边是20,尽管在下面的图中很难看到这一点。github
LightGBM提供了一种直接实现定制训练和验证损失的方法。其余的梯度提高包,包括XGBoost和Catboost,也提供了这个选项。这里是一个Jupyter笔记本,展现了如何实现自定义培训和验证损失函数。细节在笔记本上,但在高层次上,实现略有不一样。算法
一、训练损失:在LightGBM中定制训练损失须要定义一个包含两个梯度数组的函数,目标和它们的预测。反过来,该函数应该返回梯度的两个梯度和每一个观测值的hessian数组。如上所述,咱们须要使用微积分来派生gradient和hessian,而后在Python中实现它。docker
二、验证丢失:在LightGBM中定制验证丢失须要定义一个函数,该函数接受相同的两个数组,但返回三个值: 要打印的名称为metric的字符串、损失自己以及关因而否更高更好的布尔值。flask
#---sklearn api def custom_asymmetric_train(y_true, y_pred): residual = (y_true - y_pred).astype("float") grad = np.where(residual<0, -2*10.0*residual, -2*residual) hess = np.where(residual<0, 2*10.0, 2.0) return grad, hess def custom_asymmetric_valid(y_true, y_pred): residual = (y_true - y_pred).astype("float") loss = np.where(residual < 0, (residual**2)*10.0, residual**2) return "custom_asymmetric_eval", np.mean(loss), False #---lgb api def custom_asymmetric_train(preds,dtrain): y_true=np.array(dtrain.get_label()) y_pred=np.argmax(preds.reshape(len(y_true),-1), axis=0) residual = np.array((y_pred-y_true)).astype("float") p=20#参数 tmpGrad=[] tmpHess=[] for i in residual: if i<0: tmpGrad.append(-i*p) tmpHess.append(p) elif (i>=0 and i<=12): tmpGrad.append(i*(p/10)) tmpHess.append(p/10) else: tmpGrad.append(i*p) tmpHess.append(p) grad=np.array(tmpGrad) hess=np.array(tmpHess) return grad, hess def custom_asymmetric_valid(preds,dtrain): p=20#参数 y_true=np.array(dtrain.get_label()) y_pred=np.argmax(preds.reshape(len(y_true),-1), axis=0) residual = np.array((y_pred-y_true)).astype("float") tmpLoss=[] for i in residual: if i<0: tmpLoss.append(-i*p) elif (i>=0 and i<=12): tmpLoss.append(i*(p/10)) else: tmpLoss.append(i*p) loss=np.array(tmpLoss) return "custom_asymmetric_eval", np.mean(loss), False
相应的调用代码(这里须要区分lgb和sklearn的lgb,两个模块的y_red、y是不一样的格式):
import lightgbm ********* Sklearn API ********** # default lightgbm model with sklearn api gbm = lightgbm.LGBMRegressor() # updating objective function to custom # default is "regression" # also adding metrics to check different scores gbm.set_params(**{'objective': custom_asymmetric_train}, metrics = ["mse", 'mae']) # fitting model gbm.fit( X_train, y_train, eval_set=[(X_valid, y_valid)], eval_metric=custom_asymmetric_valid, verbose=False, ) y_pred = gbm.predict(X_valid) ********* Python API ********** # create dataset for lightgbm # if you want to re-use data, remember to set free_raw_data=False lgb_train = lgb.Dataset(X_train, y_train, free_raw_data=False) lgb_eval = lgb.Dataset(X_valid, y_valid, reference=lgb_train, free_raw_data=False) # specify your configurations as a dict params = { 'objective': 'regression', 'verbose': 0 } gbm = lgb.train(params, lgb_train, num_boost_round=10, init_model=gbm, fobj=custom_asymmetric_train, feval=custom_asymmetric_valid, valid_sets=lgb_eval) y_pred = gbm.predict(X_valid)
2.4 自定义scoring
在评估模型预测结果时,若是python库中没有对应的指标(好比k-s值、GINI纯度等),就须要自定义scoring。---此处是针对sklearn的metrics。api
- sklearn中自带的评估函数:
from sklearn.metrics import * SCORERS.keys() #结果 dict_keys(['explained_variance', 'r2', 'neg_median_absolute_error', 'neg_mean_absolute_error', 'neg_mean_squared_error', 'neg_mean_squared_log_error', 'accuracy', 'roc_auc', 'balanced_accuracy', 'average_precision', 'neg_log_loss', 'brier_score_loss', 'adjusted_rand_score', 'homogeneity_score', 'completeness_score', 'v_measure_score', 'mutual_info_score', 'adjusted_mutual_info_score', 'normalized_mutual_info_score', 'fowlkes_mallows_score', 'precision', 'precision_macro', 'precision_micro', 'precision_samples', 'precision_weighted', 'recall', 'recall_macro', 'recall_micro', 'recall_samples', 'recall_weighted', 'f1', 'f1_macro', 'f1_micro', 'f1_samples', 'f1_weighted'])
- 自定义scoring
import numpy as np def ginic(actual, pred): actual = np.asarray(actual) # In case, someone passes Series or list n = len(actual) a_s = actual[np.argsort(pred)] a_c = a_s.cumsum() giniSum = a_c.sum() / a_s.sum() - (n + 1) / 2.0 return giniSum / n def gini_normalizedc(a, p): if p.ndim == 2: # Required for sklearn wrapper p = p[:,1] # If proba array contains proba for both 0 and 1 classes, just pick class 1 return ginic(a, p) / ginic(a, a) from sklearn import metrics gini_sklearn = metrics.make_scorer(gini_normalizedc, True, True) import numpy as np import pandas as pd from scipy.stats import ks_2samp from sklearn.metrics import make_scorer, roc_auc_score, log_loss,f1_score from sklearn.model_selection import GridSearchCV def ks_stat(y, yhat): y=np.array(y) yhat=np.array(yhat) try: ks =ks_2samp(yhat[y==1],yhat[y!=1]).statistic except: print(yhat.shape,y.shape) kmp=yhat[y==1] kmp2=yhat[y!=1] print(kmp.shape,kmp2.shape) ks=0 return ks ks_scorer = make_scorer(ks_stat, needs_proba=True, greater_is_better=True) log_scorer = make_scorer(log_loss, needs_proba=True, greater_is_better=False) roc_scorer = make_scorer(roc_auc_score, needs_proba=True) f1_scorer = make_scorer(f1_score, needs_proba=True, greater_is_better=True)
3.跑模型ing
3.1 GridSearchCV调参
模型有不少超参数,经过格点搜索进行(此处,若是设置并行n_jobs>1,则须要把自定义的scoring放在一个外部py文件中,由于:若是在声明函数以前声明池,则尝试并行使用它将引起此错误。颠倒顺序,它将再也不抛出此错误)。数组
from sklearn.ensemble import GradientBoostingClassifier #必须从外部导入scoring import multiprocessing #multiprocessing.set_start_method('spawn') from external import * from sklearn.model_selection import GridSearchCV tuned_param = {'learning_rate': [0.1, 0.2, 0.5, 0.8, 1], 'max_depth':[3,5,7,10], 'min_samples_split':[50,100,200], 'n_estimators':[50,70,100,150,200], 'subsample':[0.6,0.8]} # n=20 clf_gbdt=GridSearchCV(GradientBoostingClassifier(),tuned_param,cv=2,scoring={'auc': roc_scorer, 'k-s': ks_scorer}, refit='k-s',n_jobs=n, verbose=10) clf_gbdt.fit(x_train[col_gbdt],y_train) #获取最优超参组合 def print_best_score(gsearch,param_test): # 输出best score print("Best score: %0.3f" % gsearch.best_score_) print("Best parameters set:") # 输出最佳的分类器到底使用了怎样的参数 best_parameters = gsearch.best_estimator_.get_params() for param_name in sorted(param_test.keys()): print("\t%s: %r" % (param_name, best_parameters[param_name])) print_best_score(clf_gbdt,tuned_param)
3.2 k-fold验证模型稳定性
对于工业界算法模型,模型的稳定是首要!
from sklearn.model_selection import cross_val_score scores=cross_val_score(estimator=lr,X=x_train[col_lr],y=y_train,cv=5,n_jobs=10,scoring=ks_scorer,verbose=10) print('CV k-s scores: %s'%scores) print('CV k-s: %.3f +/- %.3f'%(np.mean(scores),np.std(scores)))
3.3 模型训练
from sklearn.ensemble import GradientBoostingClassifier gbdt = GradientBoostingClassifier(learning_rate=0.2,max_depth=10,min_samples_split=200,n_estimators=200, subsample=0.8,min_samples_leaf=50) gbdt.fit(x_train[col_gbdt], y_train) #模型保存 pickle.dump(gbdt, open('./ml_model/gbdt_model.pkl', 'wb')) #模型预测输出几率值 y_pred_gbdt=gbdt.predict_proba(x_valid[col_gbdt])
3.4 模型验证
- AUC
import seaborn as sns sns.set_style('darkgrid') import matplotlib.pyplot as plt plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False plt.rcParams.update({'font.size': 10}) plt.rcParams['savefig.dpi'] = 300 #图片像素 plt.rcParams['figure.dpi'] = 300 #分辨率 # 计算AUC fpr_lr,tpr_lr,thresholds = roc_curve(y_valid,y_pred_lr[:,1],pos_label=1) roc_auc_lr = auc(fpr_lr, tpr_lr) # 绘制roc plt.rcParams['figure.figsize']=(8,5) plt.figure() plt.plot(fpr_lr, tpr_lr, color='darkorange', label='ROC curve (area = %0.2f)' % roc_auc_lr) plt.plot([0, 1], [0, 1], color='navy', linestyle='--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title('ROC曲线-LR') plt.legend(loc="lower right")
- K-S
# 绘制K-S曲线 import numpy as np import pandas as pd def PlotKS(preds, labels, n, asc): # preds is score: asc=1 # preds is prob: asc=0 pred = preds # 预测值 bad = labels # 取1为bad, 0为good ksds = pd.DataFrame({'bad': bad, 'pred': pred}) ksds['good'] = 1 - ksds.bad if asc == 1: ksds1 = ksds.sort_values(by=['pred', 'bad'], ascending=[True, True]) elif asc == 0: ksds1 = ksds.sort_values(by=['pred', 'bad'], ascending=[False, True]) ksds1.index = range(len(ksds1.pred)) ksds1['cumsum_good1'] = 1.0*ksds1.good.cumsum()/sum(ksds1.good) ksds1['cumsum_bad1'] = 1.0*ksds1.bad.cumsum()/sum(ksds1.bad) if asc == 1: ksds2 = ksds.sort_values(by=['pred', 'bad'], ascending=[True, False]) elif asc == 0: ksds2 = ksds.sort_values(by=['pred', 'bad'], ascending=[False, False]) ksds2.index = range(len(ksds2.pred)) ksds2['cumsum_good2'] = 1.0*ksds2.good.cumsum()/sum(ksds2.good) ksds2['cumsum_bad2'] = 1.0*ksds2.bad.cumsum()/sum(ksds2.bad) # ksds1 ksds2 -> average ksds = ksds1[['cumsum_good1', 'cumsum_bad1']] ksds['cumsum_good2'] = ksds2['cumsum_good2'] ksds['cumsum_bad2'] = ksds2['cumsum_bad2'] ksds['cumsum_good'] = (ksds['cumsum_good1'] + ksds['cumsum_good2'])/2 ksds['cumsum_bad'] = (ksds['cumsum_bad1'] + ksds['cumsum_bad2'])/2 # ks ksds['ks'] = ksds['cumsum_bad'] - ksds['cumsum_good'] ksds['tile0'] = range(1, len(ksds.ks) + 1) ksds['tile'] = 1.0*ksds['tile0']/len(ksds['tile0']) qe = list(np.arange(0, 1, 1.0/n)) qe.append(1) qe = qe[1:] ks_index = pd.Series(ksds.index) ks_index = ks_index.quantile(q = qe) ks_index = np.ceil(ks_index).astype(int) ks_index = list(ks_index) ksds = ksds.loc[ks_index] ksds = ksds[['tile', 'cumsum_good', 'cumsum_bad', 'ks']] ksds0 = np.array([[0, 0, 0, 0]]) ksds = np.concatenate([ksds0, ksds], axis=0) ksds = pd.DataFrame(ksds, columns=['tile', 'cumsum_good', 'cumsum_bad', 'ks']) ks_value = ksds.ks.max() ks_pop = ksds.tile[ksds.ks.idxmax()] tmp_str='ks_value is ' + str(np.round(ks_value, 4)) + ' at pop = ' + str(np.round(ks_pop, 4)) # chart # chart plt.plot(ksds.tile, ksds.cumsum_good, label='cum_good', color='blue', linestyle='-', linewidth=2) plt.plot(ksds.tile, ksds.cumsum_bad, label='cum_bad', color='red', linestyle='-', linewidth=2) plt.plot(ksds.tile, ksds.ks, label='ks', color='green', linestyle='-', linewidth=2) plt.axvline(ks_pop, color='gray', linestyle='--') plt.axhline(ks_value, color='green', linestyle='--') plt.axhline(ksds.loc[ksds.ks.idxmax(), 'cumsum_good'], color='blue', linestyle='--') plt.axhline(ksds.loc[ksds.ks.idxmax(),'cumsum_bad'], color='red', linestyle='--') plt.title('KS=%s ' %np.round(ks_value, 4) + 'at Pop=%s' %np.round(ks_pop, 4), fontsize=15) return tmp_str
#调用
PlotKS(y_valid,y_pred_lr[:,1], n=10, asc=0)
- 其余指标计算
#根据上述可知,在0.2的时候区分度最好,所以认为大于0.2的就是1,小于则为0 y_pred_lr_new=[] for i in y_pred_lr[:,1]: if i<=0.2: y_pred_lr_new.append(0) else: y_pred_lr_new.append(1) y_pred_gbdt_new=[] for i in y_pred_gbdt[:,1]: if i<=0.2: y_pred_gbdt_new.append(0) else: y_pred_gbdt_new.append(1) y_pred_lr_gbdt_new=[] for i in y_pred_gbdt_lr[:,1]: if i<=0.2: y_pred_lr_gbdt_new.append(0) else: y_pred_lr_gbdt_new.append(1) # gbdt acc=accuracy_score(y_valid, y_pred_gbdt_new) p = precision_score(y_valid, y_pred_gbdt_new, average='binary') r = recall_score(y_valid, y_pred_gbdt_new, average='binary') f1score = f1_score(y_valid, y_pred_gbdt_new, average='binary') print(acc,p,r,f1score)
4.线上部署(高能预警:有大杀器)
4.1 提供flask服务
初始模型以读取pickle文件方式:首先经过python脚本读取pickle文件,再起flask提供模型预测服务,最后java应用调用flask实现预测。
- 优势:方便更新模型,且能用与除LR模型以外的复杂模型(树模型等)。
- 缺点:java调flask,可能会有通讯延时等,会影响速度。flask服务部署的docker和java应用部署的docker要分开,计算效率低。
- 参考连接:https://www.cnblogs.com/demodashi/p/8491170.html
4.2 纯java代码实现
用java脚本实现LR模型类:经过java脚本构建模型(经过python训练好的模型能够输出成规则)方法,java应用之间调这个方法实现预测。
- 优势:相对于第一种方法,该方法不须要进行通讯(这里指和python),且计算速度快(纯java方法)。
- 缺点:代码实现复杂,且只能用于LR模型,不能用于复杂模型。模型特征加工复杂(这里特指独热编码、归一化和gbdt特征生成),开发费劲。
4.3 Java调用jpmml类
使用Java调用jpmml:把python相关的模型(独热编码、LR模型等)转成PMML文件(至关于一个本地txt文件),再经过java类(jpmml)调用。
- 优势:结合1和2的优势,把全部(独热编码、模型预测)的操做都聚集到一个方法中,速度快。
- 缺点:暂无。
- 参考连接:https://github.com/jpmml/jpmml-evaluator
综上所述,综合考虑计算速度及计算资源,推荐使用方案3。
4.4 GBDT+LR模型训练及线上部署(快、粗、好、猛)
参考连接:https://openscoring.io/blog/2019/06/19/sklearn_gbdt_lr_ensemble/
from lightgbm import LGBMClassifier from sklearn_pandas import DataFrameMapper from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier from sklearn.linear_model import LogisticRegression from sklearn.preprocessing import LabelBinarizer, LabelEncoder from sklearn2pmml import sklearn2pmml from sklearn2pmml.decoration import CategoricalDomain, ContinuousDomain from sklearn2pmml.ensemble import GBDTLRClassifier from sklearn2pmml.pipeline import PMMLPipeline from xgboost import XGBClassifier import pandas df = pandas.read_csv("audit.csv") cat_columns = ["Education", "Employment", "Marital", "Occupation"] cont_columns = ["Age", "Hours", "Income"] label_column = "Adjusted" def make_fit_gbdtlr(gbdt, lr): mapper = DataFrameMapper( [([cat_column], [CategoricalDomain(), LabelBinarizer()]) for cat_column in cat_columns] + [(cont_columns, ContinuousDomain())] ) classifier = GBDTLRClassifier(gbdt, lr) pipeline = PMMLPipeline([ ("mapper", mapper), ("classifier", classifier) ]) pipeline.fit(df[cat_columns + cont_columns], df[label_column]) return pipeline pipeline = make_fit_gbdtlr(GradientBoostingClassifier(n_estimators = 499, max_depth = 2), LogisticRegression()) sklearn2pmml(pipeline, "GBDT+LR.pmml") pipeline = make_fit_gbdtlr(RandomForestClassifier(n_estimators = 31, max_depth = 6), LogisticRegression()) sklearn2pmml(pipeline, "RF+LR.pmml") pipeline = make_fit_gbdtlr(XGBClassifier(n_estimators = 299, max_depth = 3), LogisticRegression()) sklearn2pmml(pipeline, "XGB+LR.pmml") def make_fit_lgbmlr(gbdt, lr): mapper = DataFrameMapper( [([cat_column], [CategoricalDomain(), LabelEncoder()]) for cat_column in cat_columns] + [(cont_columns, ContinuousDomain())] ) classifier = GBDTLRClassifier(gbdt, lr) pipeline = PMMLPipeline([ ("mapper", mapper), ("classifier", classifier) ]) pipeline.fit(df[cat_columns + cont_columns], df[label_column], classifier__gbdt__categorical_feature = range(0, len(cat_columns))) return pipeline pipeline = make_fit_lgbmlr(LGBMClassifier(n_estimators = 71, max_depth = 5), LogisticRegression()) sklearn2pmml(pipeline, "LGBM+LR.pmml")
注意:jdk必须是1.8及以上、sklearn的版本必须0.21.0及以上、sklearn2pmml版本最好经过git获取安装
import sklearn import sklearn2pmml print('The sklearn2pmml version is {}.'.format(sklearn2pmml.__version__)) print('The scikit-learn version is {}.'.format(sklearn.__version__)) #多个jdk版本设置切换 export JAVA_HOME=/software/servers/jdk1.8.0_121/ export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar