传统上,咱们接触的机器学习算法,都是被设计为解决某一个某一类问题的肯定性算法。对于这些机器学习算法来讲,惟一的灵活性体如今参数搜索空间上,向算法输入样本,算法借助不一样的优化手段,对参数进行调整,以此来获得一个对训练样本和测试样本的最佳适配参数组。php
遗传编程算法彻底走了另外一外一条路,遗传编程算法的目标是编写一个程度,这个程序会尝试自动构造出解决某一问题的最佳程度。从本质上看,遗传编程算法构造的是一个可以构造算法的算法。html
另外一方面,咱们曾经讨论过遗传算法,遗传算法是一种优化技术,就优化技术而言,不管是何种形式的优化,算法或度量都是预先设定好的,而优化算法所作的工做就是尝试为其找到最佳参数。和优化算法同样,遗传编程也须要一种方法来度量题解的优劣程度。但与优化算法不一样的是,遗传编程中的题解并不只仅是一组用于给定算法的参数,相反,在遗传编程中,连同算法自己及其全部参数,都是须要搜索肯定的。node
从某种程度上来讲,遗传编程和遗传算法的区别在于,进化的基本单位不一样,python
遗传算法是受达尔文的进化论的启发,借鉴生物进化过程而提出的一种启发式搜索算法,所以遗传算法 ( GA , Genetic Algorithm ) 也称进化算法 。 所以,在讨论遗传编程的时候,会大量借用进化论中的术语和概念,为了更好地讨论遗传算法,咱们先介绍一些基本生物进化概念,git
生物所处的环境起到一个提供生存压的做用(反馈),虽然纵观整个地球历史,环境的因素是在不断变化的(有时甚至变化的还很快),可是在某个时间段内(例如5000年内)是基本保持不变的,而物种进化的目的就是经过一代代的繁衍,逐渐适应(拟合)当前的环境,并和其余物种达到最优平衡(纳什均衡)。github
遗传编程算法就是模拟了生物进化的过程,简单说来讲,web
下图是遗传算法的流程图,正则表达式
从大的方面看,遗传编程的两个重要概念是基因型和表现型,算法
其实遗传算法领域的研究中,这两个方面的研究都有,可是,由于遗传编程很难直接处理程序片断(表现型)(例如一段C++可执行代码、或者是一段python代码),由于基于随机变异获得的新代码极可能没法经过编译器语法检查。shell
可是相比之下,遗传算法反而容易处理程序片断的内在结构(基因型)(例如C++代码的AST抽象语法树)。
因此,笔者认为基因型的遗传算法研究才是更有研究价值的一个方向,本文的讨论也会围绕基因型的遗传算法展开。
根据基因型形态的不一样,遗传编程方法能够分为三种:
线性遗传编程有广义和狭义之分,
http://www.doc88.com/p-630428999834.html https://pdfs.semanticscholar.org/958b/f0936eda72c3fc03a09a0e6af16c072449a1.pdf
基于树的遗传编程的基因型是树结构。基于树的遗传编程是遗传编程最先的形态,也是遗传编程的主流方法。
大多数编程语言,在编译或解释时,首先会被转换成一棵解析树(Lisp编程语言及其变体,本质上就是一种直接访问解析树的方法),例以下图,
树上的节点有多是枝节点也多是叶节点,
例如上图中,圆形节点表明了应用于两个分支(Y变量和常量值5)之上的求和操做。一旦咱们求出了此处的计算值,就会将计算结果赋予上方的节点处。相应的,这一计算过程会一直向下传播,直到遍历全部的叶子节点(深度优先递归遍历)。
若是对整棵树进行遍历,咱们会发现它至关于下面这个python函数:
在遗传变异方面,基于树的遗传编程的演化操做有两种,变异和交叉,
树是一种特殊的图,所以人们很天然地想到将基于树的遗传编程扩展到基于图的遗传编程。下图就是基于图的遗传编程的基因型的一个示例。
Relevant Link:
《Adaptation in Natural and Artificial Systems》 John Henry Holland 1992 http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e4%b8%80%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%ae%80%e4%bb%8b 《Evolving Evolutionary Algorithms using Linear Genetic Programming (2005)》 《A comparison of several linear genetic programming techniques》Oltean, Mihai, and Crina Grosan. Complex Systems 14.4 (2003): 285-314. https://www.jianshu.com/p/a953066cb2eb
这个章节,咱们用数学的形式化视角,来从新审视一下遗传算法。
I | 种群中的个体 |
m | 全部可能个体的数量 |
n | 种群大小 |
pm | 变异几率 |
pc | 交叉几率 |
f(I) | 个体I的适应度。 |
p(I)t | 第t代种群中,个体I出现的几率 |
![]() |
第t代种群平均适应度。第t代种群中个体适应度的平均值。 |
由于遗传算法中有各类各样的编码方式、变异操做、交叉操做和选择操做,遗传算法的形态呈现多样性。
为了简化分析,咱们这里假设一个典型遗传算法,即,
模式定理是遗传算法创始人 J.Holland 在其突破性著做《Adaptation in Natural and Artificial Systems》引入的,用于分析遗传算法的工做原理。
模式是指基因编码空间中,由一类类似的基因组抽象获得的pattern,好比 [0,*,*,1] 就是一个模式。染色体[0,1,0,1]和[0,0,0,1]都包含该模式。
在具体讨论模式定理以前,咱们先介绍一些符号,
L(H) | 模式的长度。第一固定基因位和最后一个固定基因位的距离,其中L([0,*,*,1])=3。 |
O(H) | 模式的阶。固定基因位的个数,其中O([0,*,*,1])=2。 |
![]() |
模式平均适应度。种群中包含该模式的个体适应度的平均值。 |
p(H)t | 在第t代种群中,模式H出现的几率。 |
【模式定理】
在本章定义的典型遗传算法中,下面公式成立:
这个公式看起来有点复杂,其实很是简单,咱们逐个部分来分解,
整体来讲,遗传算法须要在,选择操做引发的收敛性和变异交叉操做引发的多样性之间取得平衡。
模式定理的通俗说法是这样的,低阶、短序以及平均适应度高于种群平均适应度的模式在子代中呈指数增加。
低阶、短长以及平均适应度高于种群平均适应度的模式H,
此时,
即模式H呈现指数增加。
这个小节咱们来讨论一个有些理论化的问题,即:遗传编程算法通过必定次数的迭代后,是否会收敛到某个稳态?若是会达到稳态,遗传编程的收敛速度是多少?
要解决这个问题,咱们须要引入数学工具,马尔柯夫链,有以下定义。
咱们把整个种群的状态当作马尔科夫链的一个状态 s,交叉、变异和选择操做则构建了一个几率转移矩阵。通常状况下,0<pm<1,0<=pc<=1,即物种变异必定会发生,但不是必然100%发生。咱们来分析一下这时的几率转移矩阵的性质。
标准的优化算法分析第一个要关心的问题是,优化算法能不能收敛到全局最优势。假设全局最优势的适应度值为maxf,收敛到全局最优势的定义以下,
一言以蔽之,典型遗传算法并不收敛。
根据几率转移矩阵收敛定理,咱们能够知道典型遗传算法会收敛到一个全部种群状态几率都大于0的几率分布上(稳态)。所以以后,不包含全局最优解的种群必定会不停出现,从而致使上面的公式不成立。
可是笔者这里要强调的是,这章讨论的典型遗传算法在实际工程中是几乎不存在的,实际上,几乎全部遗传算法代码都会将保持已发现最优解。加了这个变化以后的遗传算法是收敛的。
仍是根据上述几率转移矩阵收敛定理,咱们能够知道遗传算法会收敛到一个全部种群状态几率都大于0的几率分布上,那么包含全局最优解的种群必定会不停出现,保持已发现最优解的作法会使得上面的公式成立。
Relevant Link:
Adaptation in Natural and Artificial Systems: An Introductory Analysis with Applications to Biology, Control, and Artificial Intelligence Rudolph, Günter. “Convergence analysis of canonical genetic algorithms.” Neural Networks, IEEE Transactions on 5.1 (1994): 96-101. http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e4%b8%89%e6%95%b0%e5%ad%a6%e6%91%86%e6%91%86%e6%89%8b%ef%bc%8c%e5%be%88%e6%83%ad%e6%84%a7%ef%bc%8c%e5%8f%aa%e5%81%9a%e4%ba%86
自 John Henry Holland 在1992年提出《Adaptation in Natural and Artificial Systems》论文后,遗传编程又获得了大量研究者的关注和发展,提出了不少改进型的衍生算法。虽然这些算法在工业场景中不必定都适用,可是笔者以为咱们有必要了解和学习一下它们各自的算法思想,有利于咱们在遇到各自的实际问题的时候,触类旁通。
典型交叉变异随机选择两条染色体,按照pc的几率决定是否交叉,若是选择交叉则随机选择一点并将这点以后的基因交换。这种交叉方式被称为单点杂交。
多点杂交指定了多个交换点用于父本的基因交换重组,具体的执行过程以下图所示,
多点杂交改进的是突变率。
单点和多点杂交算法存在一个问题,杂交的染色体中某些部分的基因会被过早地舍弃,这是因为在交换前它们必须肯定交换父本染色体交换位前面仍是后面的基因,从而对于那些无关的基因段在交换前就已经收敛了。
均匀杂交算法(Uniform Crossover)就能够解决上述算法的这种局限性,该算法的主要过程以下:
洗牌杂交的最大特色是一般将染色体的中点做为基因的交换点,即从每一个父本中取它们一半的基因重组成新的个体。
另外针对于实值编码方式,还有离散杂交、中间杂交、线性杂交和扩展线性杂交等算法。
精英保留策略是一种典型的选择策略。精英保留策略是指每次迭代都保留已发现的最优解。这个策略是显而易见的,咱们不可能舍弃已发现的最优解,而只使用最后一代种群的最优解。同时,采用精英保留策略的典型遗传算法是保证收敛到全局最优解的。
轮盘赌选择策略是基于几率进行选择策略。轮盘赌算法有放回地采样出原种群大小的新一代种群,个体 Ii 的采样几率以下所示,
从几率上看,在某一轮中,即便是适应度最差的个体,也存在必定的概率能进入到下一轮,这种策略提升了多样性,但减缓了收敛性。
锦标赛法从大小为 n 的种群随机选择 k(k小于n) 个个体,而后在 k 个个体中选择适应度最大的个体做为下一代种群的一个个体。反复屡次,直到下一代种群有 n 个个体。
在大天然,物种的进化是以多种群的形式并发进行的。通常来讲,一个物种只有一个种群了,意味着这个物种有灭亡的危险(例如恐龙)。
受此启发,人们提出了多种群遗传算法。多种群遗传算法保持多个种群同时进化,具体流程以下图所示,
多种群遗传算法和遗传算法执行屡次的区别在于移民,种群之间会经过移民的方式交换基因。这种移民操做会带来更多的多样性。
遗传算法中,决定个体变异长度的主要因素有两个:交叉几率pc,和变异几率pm。
在实际工程问题中,须要针对不一样的优化问题和目标,反复实验来肯定pc和pm,调参成本很高。
并且在遗传算法训练的不一样阶段,咱们须要不一样的pc和pm,
Srinivas.M and Patnaik.L.M (1994) 为了让遗传算法具有更好的自适应性,提出来自适应遗传算法。在论文中,pc和pm的计算公式以下:
遗传算法的全局搜索能力强,但局部搜索能力较弱。这句话怎么理解呢?
好比对于一条染色体,遗传算法并不会去看看这条染色体周围局部的染色体适应度怎么样,是否比这条染色体好。遗传算法会经过变异和交叉产生新的染色体,但新产生的染色体可能和旧染色差的很远。所以遗传算法的局部搜索能力差。
相对的,梯度法、登山法和贪心法等算法的局部搜索能力强,运算效率也高。
受此启发,人们提出了混合遗传算法,将遗传算法和这些算法结合起来。混合遗传算法的框架是遗传算法的,只是生成新一代种群以后,对每一个个体使用局部搜索算法寻找个体周围的局部最优势。
整体来讲,遗传算法和梯度法分别表明了随机多样性优化和渐进定向收敛性优化的两种思潮,取各自的优势是一种很是好的思路。
Relevant Link:
Srinivas M, Patnaik L M. Adaptive probabilities of crossover and mutation in genetic algorithms[J]. Systems, Man and Cybernetics, IEEE Transactions on, 1994, 24(4): 656-667. http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e5%9b%9b%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%9a%84%e5%8f%98%e7%a7%8d
这个章节,咱们来完成一个小实验,咱们如今有一个数据集,数据集的生成算法以下:
# -*- coding: utf-8 -*- from random import random,randint,choice def hiddenfunction(x,y): return x**2 + 2*y + 3*x + 5 def buildhiddenset(): rows = [] for i in range(200): x=randint(0, 40) y=randint(0, 40) rows.append([x, y, hiddenfunction(x, y)]) return rows if __name__ == '__main__': print buildhiddenset()
部分数据例以下图:
可视化以下,
# -*- coding: utf-8 -*- import matplotlib.pyplot as plt import numpy as np from sklearn.preprocessing import PolynomialFeatures from sklearn.linear_model import LinearRegression from random import random,randint,choice from mpl_toolkits.mplot3d import axes3d def hiddenfunction(x,y): return x**2 + 2*y + 3*x + 5 def buildhiddenset(): X = [] y = [] for i in range(200): x_ = randint(0, 40) y_ = randint(0, 40) X.append([x_, y_]) y.append(hiddenfunction(x_, y_)) return np.array(X), np.array(y) if __name__ == '__main__': # generate a dataset X, y = buildhiddenset() print "X:", X print "y:", y fig = plt.figure() ax = fig.gca(projection='3d') ax.set_title("3D_Curve") ax.set_xlabel("x") ax.set_ylabel("y") ax.set_zlabel("z") # draw the figure, the color is r = read figure = ax.plot(X[:, 0], X[:, 1], y, c='r') plt.show()
很显然,一定存在一些函数,能够将(X,Y)映射为结果栏对应的数字,如今问题是这个(些)函数究竟是什么?
从数据分析和数理统计分析的角度来看,这个问题彷佛也不是很复杂。咱们能够用多元线性回归来尝试拟合数据集。
# -*- coding: utf-8 -*- import matplotlib.pyplot as plt import numpy as np from sklearn.preprocessing import PolynomialFeatures from sklearn.linear_model import LinearRegression from random import random,randint,choice from mpl_toolkits.mplot3d import axes3d from sklearn.metrics import accuracy_score from sklearn.metrics import classification_report, confusion_matrix import numpy as np np.set_printoptions(threshold=np.inf) def hiddenfunction(x,y): return x**2 + 2*y + 3*x + 5 def buildhiddenset(): X = [] y = [] for i in range(100): x_ = randint(0, 40) y_ = randint(0, 40) X.append([x_, y_]) y.append(hiddenfunction(x_, y_)) return np.array(X), np.array(y) if __name__ == '__main__': # generate a dataset X, y = buildhiddenset() print "X:", X print "y:", y fig = plt.figure() ax = fig.gca(projection='3d') ax.set_title("3D_Curve") ax.set_xlabel("x") ax.set_ylabel("y") ax.set_zlabel("z") # draw the figure, the color is r = read #figure = ax.plot(X[:, 0], X[:, 1], y, c='r') #plt.show() # use Scikit-Learn PolynomialFeature class to constructing parameter terms. # a,b,degree=2: [a, b, a^2, ab, b^2] # a,b,degree=3: [a, b, a^2, ab, b^2, a^3, a^2b, ab^2, b^3] # a,b,c,degree=3: [a, b, c, a^2, ab, ac, b^2, bc, c^2, a^3, a^2b, a^2c, ab^2, ac^2, abc, b^3, b^2c, bc^2, c^3] poly_features = PolynomialFeatures(degree=2, include_bias=False) # fit the dataset with Polynomial Regression Function, and X_poly is the fitting X result X_poly = poly_features.fit_transform(X) lin_reg = LinearRegression() lin_reg.fit(X_poly, y) print(lin_reg.intercept_, lin_reg.coef_) y_predict = lin_reg.predict(X_poly) y_predict = [int(i) for i in y_predict] print "y_predict: ", y_predict print "y: ", y print 'accuracy for LinearRegression is: {0}'.format(accuracy_score(y, y_predict)) print 'error for LinearRegression is: {0}'.format(confusion_matrix(y, y_predict)) # draw the prediction curve X_new_1 = np.linspace(0, 40, 100).reshape(100, 1) X_new_2 = np.linspace(0, 40, 100).reshape(100, 1) X_new = np.hstack((X_new_1, X_new_2)) # fit the X_new dataset with Polynomial Regression Function, and X_new_poly is the fitting X result X_new_poly = poly_features.transform(X_new) y_new = lin_reg.predict(X_new_poly) y_new = [int(i) for i in y_new] #print "X_new: ", X_new #print "y_new: ", y_new #print "y: ", y # draw the prediction line figure = ax.plot(X[:, 0], X[:, 1], y, c='r') figure = ax.plot(X_new[:, 0], X_new[:, 1], y_new, c='b', linewidth=2, label="Predictions") plt.show()
红色是数据集,蓝色线是多项式拟合结果
多项式回归的拟合结果以下:
由于这是一个回归分析问题,所以精度的要求很是高,虽然实际的偏差并非很高(0.55),使用多项式拟合得到了0.45的拟合精确度,不算特别高,出错的点还挺多的。
这么一看,好像多项式回归的效果不好了?并非这样的。
咱们来对比下模型学习到的多项式参数翻译成多项式函数,和原始数据集背后的目标函数形式的差距:
print(lin_reg.intercept_, lin_reg.coef_) (5.0000000000009095, array([ 3.00000000e+00, 2.00000000e+00, 1.00000000e+00, , ])) a,b,degree=2: [a, b, a^2, ab, b^2] 模型学习到的:H = 3*X + 2*Y + X^2 + 4.70974246e-17*X*Y -7.81622966e-16*Y^2 + 5 原始数据集目标函数:H = 3*X + 2*Y + X^2 + 5
能够看到,多项式回归出现了一些过拟合的现象,多出了两项:X*Y、和Y^2,若是没有这两项,那么多项式回归获得的方程就是完美的原始方程。
从上面的实验中,咱们能够获得几点启发:
从这个问题,继续延伸思考,相信读者朋友在研究和工程项目中经常会遇到一个有趣的现象:针对某个固定的数据集,咱们设计出一个很是精巧的神经网络,拥有上千万的参数。和另外一我的用一个只有十几万参数的简单经典神经网络进行benchmark对比,发现性能并无太大的提高,甚至没有提高,固然,性能也没有降低。咋一看,复杂模型和简单的效果同样好。
对这个问题的研究和解释,已经开始有很是多的学者和科研机构在进行,目前提出了不少的解释框架(例如teacher-student模型),笔者本身在项目中也一样遇到了这个问题。一个初步的见解是,对于一个特定场景的数据集来讲,是存在一个肯定的复杂度上限的。如今若是有两个模型,一个模型刚恰好达到这个复杂度上限,另外一个模型远远超过了这个复杂度上限,那么后面那个模型就会出现所谓的过拟合现象,可是,模型会经过权重分配,将高出复杂度上限以外的多余项(神经元)都分配0或者很是低的权重,让这部分神经元称为冗余项。最终,无论咱们用多么复杂的模型,起做用的永远只有那一小部分神经元在发挥做用,预测的效果也同样好,过拟合问题并无影响最终的分类预测结果。
笔者小节:
使用多项式回归对一个数据集进行拟合这种作法,本质上是一种先天经验主义思惟,它的科学假设是”复杂可约性“。所谓复杂可约性是复杂理论中的一个概念,指的是一个复杂的系统能够经过一个简化的模型进行约简归纳,经过历史的样本学习这个约简系统的结构参数,同时假设这个约简系统可以比真实的复杂系统推算的更快,从而借助约简系统对复杂系统进行将来预测。
上面的话说的有一些绕,简单来讲就是,无论数据集背后的真实目标函数是什么,咱们均可以用像低阶多项式函数这种简单模型来进行拟合学习,并利用学习到的模型对将来可能出现的新数据集进行预测。
这种基于先验的简化建模思想在今天的机器学习学术界和工业界应用很是普遍,也发挥了很好的效果。但其实咱们还有另外一种看世界的角度,那就是随机过程。随机过程事先不作任何假设,而是基于随机或者遵循某种策略下的随机,不断进行自我迭代与优化,从一种混沌状态逐渐收敛到目标状态。
在这个章节,咱们要讨论遗传编程,仍是上面那个问题,咱们如今换一种思惟来想:咱们所见的数据集背后无非是一个函数公式来决定的,而函数公式又是由一些基本的”数学符号“以及”数学运算符“号组合而成的,数学符号和数学运算符的组合方式咱们将其视为一个符号搜索空间,咱们直接去搜索这个符号搜索空间便可,经过数据集做为反馈,直到收敛为止,即找到了完美的目标函数。
接下来咱们逐个小节来分解任务,一步步达到咱们的目标。
咱们能够用python构造出树状程序的基础数据结构。这棵树由若干节点组成,根据与之关联的函数的不一样,这些节点又能够拥有必定数量的子节点。
有些节点将会返回传递给程序的参数;另外一些则会返回常量;还有一些则会返回应用于其子节点之上的操做。
一个封装类,对应于”函数型“节点上的函数。其成员变量包括了函数名称、函数自己、以及该函数接受的参数个数(子节点)。
class fwrapper: def __init__(self,function,params,name): self.function=function self.childcount=param self.name=name
对应于函数型节点(带子节点的节点)。咱们以一个fwrapper类对其进行初始化。当evaluate被调用时,咱们会对各个子节点进行求值运算,而后再将函数自己应用于求得的结果。
class node: def __init__(self,fw,children): self.function=fw.function self.name=fw.name self.children=children def evaluate(self,inp): results=[n.evaluate(inp) for n in self.children] return self.function(results) def display(self,indent=0): print (' '*indent)+self.name for c in self.children: c.display(indent+1)
这个类对应的节点只返回传递给程序的某个参数。其evaluate方法返回的是由idx指定的参数。
class paramnode: def __init__(self,idx): self.idx=idx def evaluate(self,inp): return inp[self.idx] def display(self,indent=0): print '%sp%d' % (' '*indent,self.idx)
返回常量值的节点。其evaluate方法仅返回该类被初始化时所传入的值。
class constnode: def __init__(self,v): self.v=v def evaluate(self,inp): return self.v def display(self,indent=0): print '%s%d' % (' '*indent,self.v)
除了基础数学符号数据结构以外,咱们还须要定义一些针对节点的操做函数。
一些简单的符号运算符(例如add、subtract),能够用lambda内联方式定义,另一些稍微复杂的运算符则须要在单独的语句块中定义,不论哪一种状况,都被会封装在一个fwrapper类中。
addw=fwrapper(lambda l:l[0]+l[1],2,'add') subw=fwrapper(lambda l:l[0]-l[1],2,'subtract') mulw=fwrapper(lambda l:l[0]*l[1],2,'multiply') def iffunc(l): if l[0]>0: return l[1] else: return l[2] ifw=fwrapper(iffunc,3,'if') def isgreater(l): if l[0]>l[1]: return 1 else: return 0 gtw=fwrapper(isgreater,2,'isgreater') flist=[addw,mulw,ifw,gtw,subw]
如今,咱们能够利用前面建立的节点类来构造一个程序树了,咱们来尝试写一个符号推理程序,
# -*- coding: utf-8 -*-
import gp
def exampletree():
# if arg[0] > 3:
# return arg[1] + 5
# else:
# return arg[1] - 2
return gp.node(
gp.ifw, [
gp.node(gp.gtw, [gp.paramnode(0), gp.constnode(3)]),
gp.node(gp.addw, [gp.paramnode(1), gp.constnode(5)]),
gp.node(gp.subw, [gp.paramnode(1), gp.constnode(2)])
]
)
if __name__ == '__main__':
exampletree = exampletree()
# expected result = 1
print exampletree.evaluate([2, 3])
# expected result = 8
print exampletree.evaluate([5, 3])
至此,咱们已经成功在python中构造出了一个以树为基础的语言和解释器。
如今咱们已经有能力进行形式化符号编程了,回到咱们的目标,生成一个可以拟合数据集的函数。首先第一步是须要随机初始化一个符号函数,即初始化种群。
建立一个随机程序的步骤包括:
def makerandomtree(pc,maxdepth=4,fpr=0.5,ppr=0.6): if random()<fpr and maxdepth>0: f=choice(flist) children=[makerandomtree(pc,maxdepth-1,fpr,ppr) for i in range(f.childcount)] return node(f,children) elif random()<ppr: return paramnode(randint(0,pc-1)) else: return constnode(randint(0,10))
该函数首先建立了一个节点并为其随机选了一个函数,而后它遍历了随机选中的函数所需的子节点,针对每个子节点,函数经过递归调用makerandomtree来建立新的节点。经过这样的方式,一颗完整的树就被构造出来了。
仅当被随机选中的函数再也不要求新的子节点时(即若是函数返回的是一个常量或输入参数时),向下建立分支的过程才会结束。
按照遗传编程算法的要求,每一轮迭代中都要对种群个体进行定量评估,获得一个个体适应性的排序。
与优化技术同样,我摩恩必须找到一种衡量题解优劣程度的方法,不少场景下,优劣程度并不容易定量评估(例如网络安全中经常是非黑即白的二分类)。可是在本例中,咱们是在一个数值型结果的基础上对程序进行测试,所以能够很容易经过绝对值偏差进行评估。
def scorefunction(tree,s): dif=0 for data in s: v=tree.evaluate([data[0],data[1]]) dif+=abs(v-data[2]) return dif
咱们来试一试初始化的随机种群的适应性评估结果,
# -*- coding: utf-8 -*- import gp if __name__ == '__main__': hiddenset = gp.buildhiddenset() random1 = gp.makerandomtree(2) random2 = gp.makerandomtree(2) print gp.scorefunction(random1, hiddenset) print gp.scorefunction(random2, hiddenset)
随机初始化的函数种群的适应性并非很好,这符合咱们的预期。
当表现最好的程序被选定以后,它们就会被复制并修改以进入到下一代。前面说到,遗传变异有两种方式,mutation和crossover,
变异的作法是对某个程序进行少许的修改,一个树状程序能够有多种修改方式,包括:
须要注意的是,变异的次数不宜过多(基因突变不能太频繁)。例如,咱们不宜对整棵树上的大多数节点都实施变异,相反,咱们能够位任何须要进行修改的节点定义一个相对较小的几率。从树的根节点开始,若是每次生成的随机数小于该几率值,就以如上所述的某种方式对节点进行变异。
def mutate(t,pc,probchange=0.1): if random()<probchange: return makerandomtree(pc) else: result=deepcopy(t) if hasattr(t,"children"): result.children=[mutate(c,pc,probchange) for c in t.children] return result
除了变异,另外一种修改程序的方法被称为交叉或配对,即:从本轮种群中的优秀适应着中,选出两个将其进行部分子树交换。执行交叉操做的函数以两棵树做为输入,并同时开始向下遍历,当到达某个随机选定的阈值时,该函数便会返回前一棵树的一份拷贝,树上的某个分支会被后一棵树上的一个分支所取代。经过同时对两棵树的即时遍历,函数会在每棵树上大体位于相同层次的节点处实施交叉操做。
def crossover(t1,t2,probswap=0.7,top=1): if random()<probswap and not top: return deepcopy(t2) else: result=deepcopy(t1) if hasattr(t1,'children') and hasattr(t2,'children'): result.children=[crossover(c,choice(t2.children),probswap,0) for c in t1.children] return result
读者朋友可能会注意到,对于某次具体的变异或者交叉来讲,新的种群个体并不必定会带来更好的性能,实际上,新种群个体的性能几乎彻底是随机的。从生物进化论的角度来讲,遗传变异是无方向的,随机的,遗传变异的目标仅仅是引入多样性,形成演化的是环境选择压(数据集的偏差反馈)。
如今,咱们将上面的步骤串起来,让遗传演化不断的循环进行。本质上,咱们的思路是要生成一组随机程序并择优复制和修改,而后一直重复这一过程直到终止条件知足为止。
def getrankfunction(dataset):
def rankfunction(population):
scores=[(scorefunction(t,dataset),t) for t in population]
scores.sort()
return scores
return rankfunction
def evolve(pc,popsize,rankfunction,maxgen=500,
mutationrate=0.1,breedingrate=0.4,pexp=0.7,pnew=0.05):
# Returns a random number, tending towards lower numbers. The lower pexp
# is, more lower numbers you will get
def selectindex():
return int(log(random())/log(pexp))
# Create a random initial population
population=[makerandomtree(pc) for i in range(popsize)]
for i in range(maxgen):
scores=rankfunction(population)
print "function score: ", scores[0][0]
if scores[0][0]==0: break
# The two best always make it
newpop=[scores[0][1],scores[1][1]]
# Build the next generation
while len(newpop)<popsize:
if random()>pnew:
newpop.append(mutate(
crossover(scores[selectindex()][1],
scores[selectindex()][1],
probswap=breedingrate),
pc,probchange=mutationrate))
else:
# Add a random node to mix things up
newpop.append(makerandomtree(pc))
population=newpop
scores[0][1].display()
return scores[0][1]
上述函数首先建立一个随机种群,而后循环至多maxgen次,每次循环都会调用rankfunction对程序按表现从优到劣的顺序进行排列。表现优者会不加修改地自动进入到下一代,咱们称这样的方法为精英选拔发(elitism)。
至于下一代中的其余程序,则是经过随机选择排名靠前者,再通过交叉和变异以后获得的。
这个过程是一直重复下去,知道某个程序达到了完美的拟合适配(损失为0),或者重复次数达到了maxgen次为止。
evolve函数有多个参数,用以从不一样方面对竞争环境加以控制,说明以下:
# -*- coding: utf-8 -*- import gp if __name__ == '__main__': rf = gp.getrankfunction(gp.buildhiddenset()) gp.evolve(2, 500, rf, mutationrate=0.2, breedingrate=0.1)
程序运行的很是慢,在笔者的mac上运行了15min才最终收敛到0。有意思的是,尽管这里给出的解是彻底正确的,可是它明显比咱们数据集背后的真实目标函数要复杂得多,也就是说发生了过拟合。
可是,咱们若是运用一些代数知识,将遗传编程获得函数进行约简,会发现它和目标函数实际上是等价的(p0为X,p1为Y)。
((X+6)+Y)+X + (if( (X*Y)>0 ){X}else{X} + X*X) + (X + (Y - (X + if(6>0){1}else{0})) ) # X*Y恒大于0 2*X + Y + 6 + X + X**2 + (X + (Y - (X + if(6>0){1}else{0})) ) # 6恒大于0 2*X + Y + 6 + X + X**2 + X + Y - X + 1 X**2 + 3*X + 2*Y + 5
能够看到,遗传编程这种基于内显性的构造方式,能够在形式上获得一个全局最优解,这点上比基于优化算法的逼近方法要好。
同时,上述例子告诉咱们遗传编程的一个重要特征:遗传算法找到的题解也许是彻底正确的,亦或是很是不错的。可是一般这些题解远比真实的目标函数要复杂得多。在遗传编程获得的题解中,咱们发现有不少部分是不作任何工做的,或者对应的是形式复杂,但始终都只返回同一结果的公式,例如"if(6>0){1}else{0})",只是1的一种多余的表达方式而已。
从上一章的遗传优化结果咱们能够看到,在数据集是充分典型集的状况下,过拟合是不影响模型的收敛的。
遗传编程的这种冗余性和优化算法和深度神经网络中的冗余结构本质上是一致的,这是一个冗余的过拟合现象,即程序的题解很是复杂,可是却对最终的决策没有影响,惟一的缺点就是浪费了不少cpu时钟。
可是另外一方面,这种冗余过拟合带来的一个额外的风险是,”若是数据集是非典型的,那么过拟合就会致使严重的后果“。
咱们须要明白的是,过拟合的罪魁祸首不是模型和优化算法,而偏偏是数据集自己。在本例中咱们清楚地看到,当咱们可以获得一个完整的典型集训练数据时,过拟合问题就会退化成一个冗余鲁棒可约结构。
可是反之,若是咱们的数据集由于系统噪声或采样不彻底等缘由,没有拿到目标函数的典型集,那么因为复杂模型带来的过拟合问题就会引起很严重的预测误差。咱们来稍微修改一下代码,将原始数据集中随机剔除1/10的数据,使数据的充分性和典型性降低,而后再来看遗传编程最后的函数优化结果,
# -*- coding: utf-8 -*- import gp if __name__ == '__main__': hiddenset = gp.buildhiddenset() # 按照 5% 模来采样,即剔除1/10的数据,模拟采样不彻底的状况 cnt = 0 for i in hiddenset: if cnt % 10 == 0: hiddenset.remove(i) cnt += 1 print hiddenset rf = gp.getrankfunction(hiddenset) gp.evolve(2, 500, rf, mutationrate=0.2, breedingrate=0.1)
获得的函数约简结果为:
if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0} + (Y+4) + X^2 + (Y + (X + (X+X))) # if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0} 不可约 if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0} + X**2 + 3*X + 2*Y + 4 # 真实的目标函数为: X**2 + 3*X + 2*Y + 5
能够看到,在数据集不完整的状况下,遗传算法就算完美拟合了训练集,可是也没法真正逼近目标函数,但这不是遗传算法的问题,而是受到数据集的制约。
更重要的是,由于数据集的非典型性(数据几率分布缺失),致使模型引入了真正的”过拟合复杂结构“,即”if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0}“,这是一个区间函数。要知道,这仅仅是一个很是小的例子,尚且引入了如此的不肯定性,在更复杂和更复杂的问题中,数据集的几率分布缺失会引起更大的”多余过拟合复杂结构问题“,影响程度的多少,根据数据缺失的程度而定。
这反过来提醒了咱们这些数据科学工做者,在解决一个实际问题的时候,不要过度纠结你的模型是否是足够好,层数是否是足够深,而要更多地关注你的数据集,数据集的质量直接决定了最终的模型效果。更进一步地说,若是你能找到一种方法,能100%拿到目标函数的数据全集,那么恭喜你,随便用一个机器学习模型均可以取得很好的效果。
对于遗传编程,咱们还须要再谈一个关键问题,即多样性问题。
咱们看到evolve函数中,会将最优的个体直接进入下一代,除此以外,对排名以后的个体也会按照比例和几率选择性地进行复制和修改以造成新的种群,这种作法有什么意义呢?
最大的问题在于,仅仅选择表现最优异的少数个体,很快就会使种群变得极端同质化(homogeneous),或称为近亲交配。尽管种群中所包含的题解,表现都很是不错,可是它们彼此间不会有太大的差别,由于在这些题解间进行的交叉操做最终会致使群内的题解变得愈来愈类似。咱们称这一现象为达到局部最大化(local maxima)。
对于种群而言,局部最大化是一种不错的状态(即收敛了),但还称不上最佳的状态。由于处于这种状态的种群里,任何细小的变化都不会对最终的结果产生太大的变化。这就是一个哲学上的矛盾与对立,收敛稳定与发散变化是彼此对立又统一的,彻底偏向任何一方都是不对的。
事实代表,将表现极为优异的题解和大量成绩尚可的题解组合在一块儿,每每可以获得更好的结果。基于这个缘由,evolve提供了两个额外的参数,容许咱们对筛选进程中的多样性进行调整。
这两个参数都会有效地增长进化过程当中的多样性,同时又不会对进程有过多的扰乱,由于,表现最差的程序最终老是会被剔除掉的(遗传编程的马尔科夫收敛性)。
Relevant Link:
《集体智慧编程》
咱们须要生成一段正则表达式,这个正则表达式须要可以匹配到全部的M数据集,同时不匹配全部的U数据集,且同时还要尽可能短,即不能是简单的M数据集的并集拼接。
定义一个目标(损失)函数来评估每次题解的好坏,
,其中nM表明匹配M的个数,nU表明匹配U的个数,wI表明奖励权重,r表明该正则表达式的长度
算法优化的目标是使上式尽可能大。
在讨论遗传编程以前,咱们先从常规思路,用一种贪婪迭代算法来解决这个问题。咱们的数据集以下,
# -*- coding: utf-8 -*- from __future__ import division import re import itertools def words(text): return set(text.split()) if __name__ == '__main__': M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') print M & U
首先,确认了U和M之间不存在交集,这道题理论上是有解的,不然无解。
咱们先准肯定义出何时意味着获得了一个可行解,
# -*- coding: utf-8 -*- from __future__ import division import re import itertools def words(text): return set(text.split()) def mistakes(regex, M, U): "The set of mistakes made by this regex in classifying M and U." return ({"Should have matched: " + W for W in M if not re.search(regex, W)} | {"Should not have matched: " + L for L in U if re.search(regex, L)}) def verify(regex, M, U): assert not mistakes(regex, M, U) return True if __name__ == '__main__': M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') some_answer = "a*" print mistakes(some_answer, M, U)
能够看到,当咱们输入正则”a*“的时候出现了不少错误,显然”a*“不是咱们要的答案。读者朋友能够试着输入”foo“试试。
上面构造正则候选集的过程说的可能有些抽象,这里经过代码示例来讲明一下,
# -*- coding: utf-8 -*- from __future__ import division import re import itertools OR = '|'.join # Join a sequence of strings with '|' between them cat = ''.join # Join a sequence of strings with nothing between them Set = frozenset # Data will be frozensets, so they can't be mutated. def words(text): return set(text.split()) def mistakes(regex, M, U): "The set of mistakes made by this regex in classifying M and U." return ({"Should have matched: " + W for W in M if not re.search(regex, W)} | {"Should not have matched: " + L for L in U if re.search(regex, L)}) def verify(regex, M, U): assert not mistakes(regex, M, U) return True def matches(regex, strings): "Return a set of all the strings that are matched by regex." return {s for s in strings if re.search(regex, s)} def regex_parts(M, U): "Return parts that match at least one winner, but no loser." wholes = {'^' + w + '$' for w in M} parts = {d for w in wholes for p in subparts(w) for d in dotify(p)} return wholes | {p for p in parts if not matches(p, U)} def subparts(word, N=4): "Return a set of subparts of word: consecutive characters up to length N (default 4)." return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N)) def dotify(part): "Return all ways to replace a subset of chars in part with '.'." choices = map(replacements, part) return {cat(chars) for chars in itertools.product(*choices)} def replacements(c): return c if c in '^$' else c + '.' def regex_covers(M, U): """Generate regex components and return a dict of {regex: {winner...}}. Each regex matches at least one winner and no loser.""" losers_str = '\n'.join(U) wholes = {'^'+winner+'$' for winner in M} parts = {d for w in wholes for p in subparts(w) for d in dotify(p)} reps = {r for p in parts for r in repetitions(p)} pool = wholes | parts | pairs(M) | reps searchers = {p:re.compile(p, re.MULTILINE).search for p in pool} return {p: Set(filter(searchers[p], M)) for p in pool if not searchers[p](losers_str)} def pairs(winners, special_chars=Set('*+?^$.[](){}|\\')): chars = Set(cat(winners)) - special_chars return {A+'.'+q+B for A in chars for B in chars for q in '*+?'} def repetitions(part): """Return a set of strings derived by inserting a single repetition character ('+' or '*' or '?'), after each non-special character. Avoid redundant repetition of dots.""" splits = [(part[:i], part[i:]) for i in range(1, len(part)+1)] return {A + q + B for (A, B) in splits # Don't allow '^*' nor '$*' nor '..*' nor '.*.' if not (A[-1] in '^$') if not A.endswith('..') if not (A.endswith('.') and B.startswith('.')) for q in '*+?'} def tests(): assert subparts('^it$') == {'^', 'i', 't', '$', '^i', 'it', 't$', '^it', 'it$', '^it$'} assert subparts('this') == {'t', 'h', 'i', 's', 'th', 'hi', 'is', 'thi', 'his', 'this'} assert dotify('it') == {'it', 'i.', '.t', '..'} assert dotify('^it$') == {'^it$', '^i.$', '^.t$', '^..$'} assert dotify('this') == {'this', 'thi.', 'th.s', 'th..', 't.is', 't.i.', 't..s', 't...', '.his', '.hi.', '.h.s', '.h..', '..is', '..i.', '...s', '....'} assert repetitions('a') == {'a+', 'a*', 'a?'} assert repetitions('ab') == {'a+b', 'a*b', 'a?b', 'ab+', 'ab*', 'ab?'} assert repetitions('a.c') == {'a+.c', 'a*.c', 'a?.c', 'a.c+', 'a.*c', 'a.?c', 'a.+c', 'a.c*', 'a.c?'} assert repetitions('^a..d$') == {'^a+..d$', '^a*..d$', '^a?..d$', '^a..d+$', '^a..d*$', '^a..d?$'} assert pairs({'ab', 'c'}) == { 'a.*a', 'a.*b', 'a.*c', 'a.+a', 'a.+b', 'a.+c', 'a.?a', 'a.?b', 'a.?c', 'b.*a', 'b.*b', 'b.*c', 'b.+a', 'b.+b', 'b.+c', 'b.?a', 'b.?b', 'b.?c', 'c.*a', 'c.*b', 'c.*c', 'c.+a', 'c.+b', 'c.+c', 'c.?a', 'c.?b','c.?c'} assert len(pairs({'1...2...3', '($2.34)', '42', '56', '7-11'})) == 8 * 8 * 3 return 'tests pass' if __name__ == '__main__': M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') some_answer = "a*" # print mistakes(some_answer, M, U) print tests()
笔者思考:
集合覆盖问题(set cover problem)是一个NP问题,几乎没有办法直接获得全局最优解。对这类复杂问题,一个有效的优化逼近方式就是贪婪迭代逼近,每次都求解一个局部最优值(例如每次生成一个可以覆盖最大M集合,可是不匹配U集合的正则子串),最后经过将全部局部最优解Ensemble起来获得一个最终题解(集成学习思想)。
咱们已经有了生成题解候选集的函数,也有了评估题解是否正确的损失函数,咱们如今能够来将他们组合起来,用于生成咱们的目标题解。
前面说过,咱们的算法是一个迭代式的贪婪算法,所以,咱们每次寻找一个可以最大程度匹配尽可能多M的正则子串,而后将本轮已经匹配到的M子串删除,并对余下的M子串继续搜索答案,直到全部的M子串都被成功匹配为止。
# -*- coding: utf-8 -*- from __future__ import division import re import itertools OR = '|'.join # Join a sequence of strings with '|' between them cat = ''.join # Join a sequence of strings with nothing between them Set = frozenset # Data will be frozensets, so they can't be mutated. def words(text): return set(text.split()) def mistakes(regex, M, U): "The set of mistakes made by this regex in classifying M and U." return ({"Should have matched: " + W for W in M if not re.search(regex, W)} | {"Should not have matched: " + L for L in U if re.search(regex, L)}) def verify(regex, M, U): assert not mistakes(regex, M, U) return True def matches(regex, strings): "Return a set of all the strings that are matched by regex." return {s for s in strings if re.search(regex, s)} def regex_parts(M, U): "Return parts that match at least one winner, but no loser." wholes = {'^' + w + '$' for w in M} parts = {d for w in wholes for p in subparts(w) for d in dotify(p)} return wholes | {p for p in parts if not matches(p, U)} def subparts(word, N=4): "Return a set of subparts of word: consecutive characters up to length N (default 4)." return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N)) def dotify(part): "Return all ways to replace a subset of chars in part with '.'." choices = map(replacements, part) return {cat(chars) for chars in itertools.product(*choices)} def replacements(c): return c if c in '^$' else c + '.' def regex_covers(M, U): """Generate regex components and return a dict of {regex: {winner...}}. Each regex matches at least one winner and no loser.""" losers_str = '\n'.join(U) wholes = {'^'+winner+'$' for winner in M} parts = {d for w in wholes for p in subparts(w) for d in dotify(p)} reps = {r for p in parts for r in repetitions(p)} pool = wholes | parts | pairs(M) | reps searchers = {p:re.compile(p, re.MULTILINE).search for p in pool} return {p: Set(filter(searchers[p], M)) for p in pool if not searchers[p](losers_str)} def pairs(winners, special_chars=Set('*+?^$.[](){}|\\')): chars = Set(cat(winners)) - special_chars return {A+'.'+q+B for A in chars for B in chars for q in '*+?'} def repetitions(part): """Return a set of strings derived by inserting a single repetition character ('+' or '*' or '?'), after each non-special character. Avoid redundant repetition of dots.""" splits = [(part[:i], part[i:]) for i in range(1, len(part)+1)] return {A + q + B for (A, B) in splits # Don't allow '^*' nor '$*' nor '..*' nor '.*.' if not (A[-1] in '^$') if not A.endswith('..') if not (A.endswith('.') and B.startswith('.')) for q in '*+?'} def tests(): assert subparts('^it$') == {'^', 'i', 't', '$', '^i', 'it', 't$', '^it', 'it$', '^it$'} assert subparts('this') == {'t', 'h', 'i', 's', 'th', 'hi', 'is', 'thi', 'his', 'this'} assert dotify('it') == {'it', 'i.', '.t', '..'} assert dotify('^it$') == {'^it$', '^i.$', '^.t$', '^..$'} assert dotify('this') == {'this', 'thi.', 'th.s', 'th..', 't.is', 't.i.', 't..s', 't...', '.his', '.hi.', '.h.s', '.h..', '..is', '..i.', '...s', '....'} assert repetitions('a') == {'a+', 'a*', 'a?'} assert repetitions('ab') == {'a+b', 'a*b', 'a?b', 'ab+', 'ab*', 'ab?'} assert repetitions('a.c') == {'a+.c', 'a*.c', 'a?.c', 'a.c+', 'a.*c', 'a.?c', 'a.+c', 'a.c*', 'a.c?'} assert repetitions('^a..d$') == {'^a+..d$', '^a*..d$', '^a?..d$', '^a..d+$', '^a..d*$', '^a..d?$'} assert pairs({'ab', 'c'}) == { 'a.*a', 'a.*b', 'a.*c', 'a.+a', 'a.+b', 'a.+c', 'a.?a', 'a.?b', 'a.?c', 'b.*a', 'b.*b', 'b.*c', 'b.+a', 'b.+b', 'b.+c', 'b.?a', 'b.?b', 'b.?c', 'c.*a', 'c.*b', 'c.*c', 'c.+a', 'c.+b', 'c.+c', 'c.?a', 'c.?b','c.?c'} assert len(pairs({'1...2...3', '($2.34)', '42', '56', '7-11'})) == 8 * 8 * 3 return 'tests pass' def findregex(winners, losers, k=4, addRepetition=False): "Find a regex that matches all winners but no losers (sets of strings)." # Make a pool of regex parts, then pick from them to cover winners. # On each iteration, add the 'best' part to 'solution', # remove winners covered by best, and keep in 'pool' only parts # that still match some winner. if addRepetition: pool = regex_covers(winners, losers) else: pool = regex_parts(winners, losers) solution = [] def score(part): return k * len(matches(part, winners)) - len(part) while winners: best = max(pool, key=score) solution.append(best) winners = winners - matches(best, winners) pool = {r for r in pool if matches(r, winners)} return OR(solution) if __name__ == '__main__': M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') solution = findregex(M, U, addRepetition=True) if verify(solution, M, U): print len(solution), solution solution = findregex(M, U, addRepetition=False) if verify(solution, M, U): print len(solution), solution
咱们来作一个和网络安全相关的实验,咱们如今有黑白两份样本,分别表明M和U,咱们如今尝试用本节讨论的算法来生成一段正则。
可是笔者在实际操做中发现,用regex golf这种问题的搜索空间是很是巨大的,当M和U的规模扩大时(例如大量webshell文件),所产生的正则子串候选集会是一个巨量的天文数字,该算法本质上仍是至关于在进行穷举搜索,搜索效率十分低下。
更进一步地,笔者尝试继续扩大黑白样本量(超过10的时候),算法已经没法搜索出有效的正则题解,这说明,当黑白样本超过必定数量的时候,alpha字符空间中黑白样本已经存在交叉,全局解不存在。
仍是上一小节的Regex golf问题,如今咱们来尝试用遗传编程来优化搜索效率。
原论文的策略是基于遗传算法生成一个”xx | xx“的双正则子串,即每次获得的个体最多有两个子串,而后按照上一小节中相似的贪婪策略进行逐步OR拼接。
笔者这里决定修改一下思路,直接基于遗传编程对整个题解空间进行搜索,即构造一个完整题解的regex tree,这是一种全局最优解搜索的优化思路。
咱们的整体策略依然是贪婪分段策略,也就说,咱们要寻找的最终正则题解是由不少个”|“组成的分段正则表达式。如今咱们来定义咱们的题解中可能出现的基本元素,这里,咱们依然采用树结构做为基础数据结构的承载:
基于上述基本元素定义,咱们能够将题解正则表达式抽象为一个树结构(regex tree),以下图,
(foo) | (ba++r)
该树结构能够经过深度优先遍历,打印出最终的题解正则串,如上图的标题所示。
# -*- coding: utf-8 -*-
from random import random, randint, choice
import re
import itertools
# "ROOT"
class rootnode:
def __init__(self, left_child_node, right_child_node):
if left_child_node and right_child_node:
self.left_child_node = left_child_node
self.right_child_node = right_child_node
else:
self.left_child_node = node
self.right_child_node = node
def display(self):
return "|"
# universal child node
class node:
def __init__(self, node):
self.node = node
# "|"
class spliternode:
def __init__(self, left_child_dotplaceholder, right_child_dotplaceholder):
if left_child_dotplaceholder and right_child_dotplaceholder:
self.left_child_dotplaceholder = left_child_dotplaceholder
self.right_child_dotplaceholder = right_child_dotplaceholder
else:
self.left_child_dotplaceholder = node
self.right_child_dotplaceholder = node
def display(self):
return "|"
# "(.)"
class dotplaceholdernode:
def __init__(self, childnode=None):
if childnode:
self.childnode = childnode
else:
self.childnode = node
# "foo"
class charnode:
def __init__(self, charstring):
if charstring:
self.charstring = charstring
else:
self.charstring = node
def display(self):
return self.charstring
# ".."
class concat_node:
def __init__(self, left_child_node, right_child_node):
if left_child_node and right_child_node:
self.left_child_node = left_child_node
self.right_child_node = right_child_node
else:
self.left_child_node = node
self.right_child_node = node
# "++"
class qualifiernode:
def __init__(self, qualifierstrig):
if qualifierstrig:
self.qualifierstrig = qualifierstrig
else:
self.qualifierstrig = node
def display(self):
return self.qualifierstrig
def exampletree():
return rootnode(
dotplaceholdernode(
charnode("foo")
),
dotplaceholdernode(
concat_node(
concat_node(
charnode("ba"),
qualifiernode("++")
),
charnode("r")
)
)
)
# left child deep first travel
def printregextree(rootnode_i):
if rootnode_i is None:
return ""
if isinstance(rootnode_i, rootnode):
# concat the finnal regex str
finnal_regexstr = ""
finnal_regexstr += printregextree(rootnode_i.left_child_node)
finnal_regexstr += rootnode_i.display()
finnal_regexstr += printregextree(rootnode_i.right_child_node)
return finnal_regexstr
if isinstance(rootnode_i, spliternode):
# concat the finnal regex str
split_regexstr = ""
split_regexstr += printregextree(rootnode_i.left_child_dotplaceholder)
split_regexstr += rootnode_i.display()
split_regexstr += printregextree(rootnode_i.right_child_dotplaceholder)
return split_regexstr
if isinstance(rootnode_i, dotplaceholdernode):
return printregextree(rootnode_i.childnode)
if isinstance(rootnode_i, charnode):
return rootnode_i.display()
if isinstance(rootnode_i, concat_node):
concat_str = ""
concat_str += printregextree(rootnode_i.left_child_node)
concat_str += printregextree(rootnode_i.right_child_node)
return concat_str
if isinstance(rootnode_i, qualifiernode):
return rootnode_i.display()
def matches(regex, strings):
"Return a set of all the strings that are matched by regex."
return {s for s in strings if re.search(regex, s)}
def regex_parts(M, U):
"Return parts that match at least one winner, but no loser."
wholes = {'^' + w + '$' for w in M}
parts = {d for w in wholes for p in subparts(w) for d in p}
return wholes | {p for p in parts if not matches(p, U)}
def subparts(word, N=5):
"Return a set of subparts of word: consecutive characters up to length N (default 4)."
return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N))
def words(text):
return set(text.split())
def makerandomtree(M, U, parentnode=None, splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0):
if curren_level > maxdepth:
print "curren_level > maxdepth: ", curren_level
return
# ROOT node
if isinstance(parentnode, rootnode):
curren_level = 0
print "curren_level: ", curren_level
# init root node
print "init rootnode: ", curren_level
rootnode_i = rootnode(
dotplaceholdernode(None),
dotplaceholdernode(None)
)
# create left child node
print "new dotplaceholdernode"
rootnode_i.left_child_node = makerandomtree(M, U, rootnode_i.left_child_node, splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
print "new dotplaceholdernode"
# create right child node
rootnode_i.right_child_node = makerandomtree(M, U, rootnode_i.right_child_node, splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
return rootnode_i
# ".." dot placeholder node
if isinstance(parentnode, dotplaceholdernode):
curren_level += 1
print "curren_level: ", curren_level
# "|"
if random() < splitrate:
print "new spliternode"
return makerandomtree(M, U, spliternode(None, None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# ".."
elif random() < concatrate:
print "new concat_node"
return makerandomtree(M, U, concat_node(None, None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "foo"
elif random() < charrate:
print "new charnode"
return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "|" split node
if isinstance(parentnode, spliternode):
curren_level += 1
print "curren_level: ", curren_level
print "init spliternode"
splitnode_i = spliternode(
dotplaceholdernode(None),
dotplaceholdernode(None)
)
print "new dotplaceholdernode"
splitnode_i.left_child_dotplaceholder = makerandomtree(M, U, splitnode_i.left_child_dotplaceholder,
splitrate, concatrate, charrate, qualifierate,
maxdepth, curren_level)
print "new dotplaceholdernode"
splitnode_i.right_child_dotplaceholder = makerandomtree(M, U, splitnode_i.right_child_dotplaceholder,
splitrate, concatrate, charrate, qualifierate,
maxdepth, curren_level)
return splitnode_i
# ".." concat node
if isinstance(parentnode, concat_node):
curren_level += 1
print "curren_level: ", curren_level
# "foo"
if random() < charrate:
print "new charnode"
return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "++"
if random() < qualifierate:
print "new qualifiernode"
return makerandomtree(M, U, qualifiernode(None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "foo" char node
if isinstance(parentnode, charnode):
curren_level += 1
print "curren_level: ", curren_level
charnode_str = choice(list(regex_parts(M, U)))
print "charnode_str: ", charnode_str
print "new charnode"
charnode_i = charnode(charnode_str)
return charnode_i
# "++" qualifierate node
if isinstance(parentnode, qualifiernode):
curren_level += 1
print "curren_level: ", curren_level
qualifiernode_str = choice(['.', '+', '?', '*', '.*', '.+', '.*?'])
print "qualifiernode_str: ", qualifiernode_str
print "new qualifiernode"
qualifiernode_i = qualifiernode(qualifiernode_str)
return qualifiernode_i
if __name__ == '__main__':
exampletree = exampletree()
print type(exampletree), exampletree
print printregextree(exampletree)
有了基本的数据结构,如今定义一下regex tree的生长准则,
这里须要用到代价敏感学习的训练思路,若是直接按照下面公式进行损失训练,
那么很快就会收敛到最优解:”|“上,缘由很显然,全字符匹配中,nm和nu都是相等的,相减为0,而后减去字符串长度1,就是-1,这是算法能找到的最好答案了。
为了解决这个问题,咱们须要对TP和FP采起不一样的惩罚力度,
def scorefunction(tree, M, U, w=1):
dif = 0
regex_str = printregextree(tree)
M_cn, U_cn = 0, 0
for s in list(M):
try:
if re.search(regex_str, s):
M_cn += 1
except Exception, e:
print e.message, "regex_str: ", regex_str
# this regex tree is illegal, low socre!!
return -8
for u in list(U):
if re.search(regex_str, u):
U_cn += 1
# print "M_cn: ", M_cn
# print "U_cn: ", U_cn
dif = w * (M_cn - U_cn) - len(regex_str)
return dif
上面代码中有一点值得注意,因为regex tree的生成具备必定的随机性,所以极可能产生不合法的正则串,所以对不合法的正则串给予较低的分值,驱使它淘汰。
有了损失函数的定义,就能够很容易算出一个种群中全部个体的适应度排名。
def rankfunction(M, U, population):
scores = [(scorefunction(t, M, U), t) for t in population]
scores.sort()
return scores
按照遗传编程的定义,咱们先随机初始化一棵符合题解规约的regex tree,
# -*- coding: utf-8 -*- from random import random, randint, choice import re import itertools # "ROOT" class rootnode: def __init__(self, left_child_node, right_child_node): if left_child_node and right_child_node: self.left_child_node = left_child_node self.right_child_node = right_child_node else: self.left_child_node = node self.right_child_node = node def display(self): return "|" # universal child node class node: def __init__(self, node): self.node = node # "|" class spliternode: def __init__(self, left_child_dotplaceholder, right_child_dotplaceholder): if left_child_dotplaceholder and right_child_dotplaceholder: self.left_child_dotplaceholder = left_child_dotplaceholder self.right_child_dotplaceholder = right_child_dotplaceholder else: self.left_child_dotplaceholder = node self.right_child_dotplaceholder = node def display(self): return "|" # "(.)" class dotplaceholdernode: def __init__(self, childnode=None): if childnode: self.childnode = childnode else: self.childnode = node # "foo" class charnode: def __init__(self, charstring): if charstring: self.charstring = charstring else: self.charstring = node def display(self): return self.charstring # ".." class concat_node: def __init__(self, left_child_node, right_child_node): if left_child_node and right_child_node: self.left_child_node = left_child_node self.right_child_node = right_child_node else: self.left_child_node = node self.right_child_node = node # "++" class qualifiernode: def __init__(self, qualifierstrig): if qualifierstrig: self.qualifierstrig = qualifierstrig else: self.qualifierstrig = node def display(self): return self.qualifierstrig def exampletree(): return rootnode( dotplaceholdernode( charnode("foo") ), dotplaceholdernode( concat_node( concat_node( charnode("ba"), qualifiernode("++") ), charnode("r") ) ) ) # left child deep first travel def printregextree(rootnode_i): if rootnode_i is None: return "" if isinstance(rootnode_i, rootnode): # concat the finnal regex str finnal_regexstr = "" finnal_regexstr += printregextree(rootnode_i.left_child_node) finnal_regexstr += rootnode_i.display() finnal_regexstr += printregextree(rootnode_i.right_child_node) return finnal_regexstr if isinstance(rootnode_i, spliternode): # concat the finnal regex str split_regexstr = "" split_regexstr += printregextree(rootnode_i.left_child_dotplaceholder) split_regexstr += rootnode_i.display() split_regexstr += printregextree(rootnode_i.right_child_dotplaceholder) return split_regexstr if isinstance(rootnode_i, dotplaceholdernode): return printregextree(rootnode_i.childnode) if isinstance(rootnode_i, charnode): return rootnode_i.display() if isinstance(rootnode_i, concat_node): concat_str = "" concat_str += printregextree(rootnode_i.left_child_node) concat_str += printregextree(rootnode_i.right_child_node) return concat_str if isinstance(rootnode_i, qualifiernode): return rootnode_i.display() def matches(regex, strings): "Return a set of all the strings that are matched by regex." return {s for s in strings if re.search(regex, s)} def regex_parts(M, U): "Return parts that match at least one winner, but no loser." wholes = {'^' + w + '$' for w in M} parts = {d for w in wholes for p in subparts(w) for d in p} return wholes | {p for p in parts if not matches(p, U)} def subparts(word, N=5): "Return a set of subparts of word: consecutive characters up to length N (default 4)." return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N)) def words(text): return set(text.split()) def makerandomtree(M, U, parentnode=None, splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0): if curren_level > maxdepth: print "curren_level > maxdepth: ", curren_level return # ROOT node if isinstance(parentnode, rootnode): curren_level = 0 print "curren_level: ", curren_level # init root node print "init rootnode: ", curren_level rootnode_i = rootnode( dotplaceholdernode(None), dotplaceholdernode(None) ) # create left child node print "new dotplaceholdernode" rootnode_i.left_child_node = makerandomtree(M, U, rootnode_i.left_child_node, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) print "new dotplaceholdernode" # create right child node rootnode_i.right_child_node = makerandomtree(M, U, rootnode_i.right_child_node, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) return rootnode_i # ".." dot placeholder node if isinstance(parentnode, dotplaceholdernode): curren_level += 1 print "curren_level: ", curren_level # "|" if random() < splitrate: print "new spliternode" return makerandomtree(M, U, spliternode(None, None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # ".." elif random() < concatrate: print "new concat_node" return makerandomtree(M, U, concat_node(None, None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "foo" elif random() < charrate: print "new charnode" return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "|" split node if isinstance(parentnode, spliternode): curren_level += 1 print "curren_level: ", curren_level print "init spliternode" splitnode_i = spliternode( dotplaceholdernode(None), dotplaceholdernode(None) ) print "new dotplaceholdernode" splitnode_i.left_child_dotplaceholder = makerandomtree(M, U, splitnode_i.left_child_dotplaceholder, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) print "new dotplaceholdernode" splitnode_i.right_child_dotplaceholder = makerandomtree(M, U, splitnode_i.right_child_dotplaceholder, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) return splitnode_i # ".." concat node if isinstance(parentnode, concat_node): curren_level += 1 print "curren_level: ", curren_level # "foo" if random() < charrate: print "new charnode" return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "++" if random() < qualifierate: print "new qualifiernode" return makerandomtree(M, U, qualifiernode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "foo" char node if isinstance(parentnode, charnode): curren_level += 1 print "curren_level: ", curren_level charnode_str = choice(list(regex_parts(M, U))) print "charnode_str: ", charnode_str print "new charnode" charnode_i = charnode(charnode_str) return charnode_i # "++" qualifierate node if isinstance(parentnode, qualifiernode): curren_level += 1 print "curren_level: ", curren_level qualifiernode_str = choice(['.', '+', '?', '*', '.*', '.+', '.*?']) print "qualifiernode_str: ", qualifiernode_str print "new qualifiernode" qualifiernode_i = qualifiernode(qualifiernode_str) return qualifiernode_i if __name__ == '__main__': exampletree = exampletree() print type(exampletree), exampletree print printregextree(exampletree) M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') print regex_parts(M, U) rd1 = makerandomtree(M, U, parentnode=rootnode(None, None), splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0) print rd1 print printregextree(rd1)
能够看到,随机产生的题解并非一个好的题解,可是它倒是一个有效的题解。如今咱们有了一个好的开始,接下来咱们能够着手进行遗传变异了。
遗传编程的遗传变异分为两种:1)mutate;2)crossover,咱们分别来定义。
mutate变异遍历整个regex tree,针对不一样节点采起不一样的变异策略:
def mutate(M, U, t, probchange=0.2):
if random() < probchange:
return makerandomtree(M, U)
else:
result = deepcopy(t)
if hasattr(t, "left_concatchildnode"):
result.left_concatchildnode = mutate(M, U, t.left_concatchildnode, probchange)
if hasattr(t, "right_concatchildnode"):
result.right_concatchildnode = mutate(M, U, t.right_concatchildnode, probchange)
if hasattr(t, "childnode"):
result.childnode = mutate(M, U, t.childnode, probchange)
if hasattr(t, "qualifierstrig"):
result.qualifierstrig = qualifiernode(choice(['.', '+', '?', '*', '.*', '.+', '.*?']))
if hasattr(t, "charstring"):
result.charstring = charnode(choice(list(regex_parts(M, U))))
return result
整体来讲,mutate的做用在于向种群中引入更多的多样性,随机性和多样性是物种进化的原动力。
crossover交叉是同序遍历两棵regex tree,按照必定的几率决定是否要将各自的节点进行互换。
def crossover(t1, t2, probswap=0.7): if random() < probswap: return deepcopy(t2) else: result = deepcopy(t1) if hasattr(t1, 'left_childnode') and hasattr(t2, 'left_childnode'): result.left_childnode = crossover(t1.left_childnode, t2.left_childnode, probswap) if hasattr(t1, 'right_childnode') and hasattr(t2, 'right_childnode'): result.right_childnode = crossover(t1.right_childnode, t2.right_childnode, probswap) if hasattr(t1, 'childnode') and hasattr(t2, 'childnode'): result.childnode = crossover(t1.childnode, t2.childnode, probswap) if hasattr(t1, 'qualifierstrig') and hasattr(t2, 'qualifierstrig'): result.qualifierstrig = t2.qualifierstrig if hasattr(t1, 'charstring') and hasattr(t2, 'charstring'): result.charstring = t2.charstring return result
整体来讲,crossover的做用在于加速优秀基因和保留,和劣质基因的淘汰。由于能够这么理解,由于crossover的存在,同一个基因模式在种群中会有扩大的趋势,而若是是优秀的基因则会不断被保留。
至此,regex tree遗传进化的全部元素都已经准备稳当了,咱们如今能够开始编写遗传进化算法主程序了,让程序自动生成一段符合题解的正则。
# -*- coding: utf-8 -*- from random import random, randint, choice import re from copy import deepcopy import itertools from math import log import numpy as np import os # "ROOT" class rootnode: def __init__(self, left_childnode, right_childnode): if left_childnode and right_childnode: self.left_childnode = left_childnode self.right_childnode = right_childnode else: self.left_childnode = node self.right_childnode = node def display(self): return "|" # universal child node class node: def __init__(self, node): self.node = node # "|" class spliternode: def __init__(self, left_childnode, right_childnode): if left_childnode and right_childnode: self.left_childnode = left_childnode self.right_childnode = right_childnode else: self.left_childnode = node self.right_childnode = node def display(self): return "|" # "(.)" class dotplaceholdernode: def __init__(self, childnode=None): if childnode: self.childnode = childnode else: self.childnode = node # "foo" class charnode: def __init__(self, charstring): if charstring: self.charstring = charstring else: self.charstring = node def display(self): return self.charstring # ".." class concat_node: def __init__(self, left_concatchildnode, right_concatchildnode): if left_concatchildnode and right_concatchildnode: self.left_concatchildnode = left_concatchildnode self.right_concatchildnode = right_concatchildnode else: self.left_concatchildnode = node self.right_concatchildnode = node # "++" class qualifiernode: def __init__(self, qualifierstrig): if qualifierstrig: self.qualifierstrig = qualifierstrig else: self.qualifierstrig = node def display(self): return self.qualifierstrig def exampletree(): return rootnode( dotplaceholdernode( charnode("foo") ), dotplaceholdernode( concat_node( concat_node( charnode("ba"), qualifiernode("++") ), charnode("r") ) ) ) # left child deep first travel def printregextree(rootnode_i): if rootnode_i is None: return "" if isinstance(rootnode_i, rootnode): # concat the finnal regex str finnal_regexstr = "" finnal_regexstr += printregextree(rootnode_i.left_childnode) finnal_regexstr += rootnode_i.display() finnal_regexstr += printregextree(rootnode_i.right_childnode) return finnal_regexstr if isinstance(rootnode_i, spliternode): # concat the finnal regex str split_regexstr = "" split_regexstr += printregextree(rootnode_i.left_childnode) split_regexstr += rootnode_i.display() split_regexstr += printregextree(rootnode_i.right_childnode) return split_regexstr if isinstance(rootnode_i, dotplaceholdernode): return printregextree(rootnode_i.childnode) if isinstance(rootnode_i, charnode): return rootnode_i.display() if isinstance(rootnode_i, concat_node): concat_str = "" concat_str += printregextree(rootnode_i.left_concatchildnode) concat_str += printregextree(rootnode_i.right_concatchildnode) return concat_str if isinstance(rootnode_i, qualifiernode): return rootnode_i.display() def matches(regex, strings): "Return a set of all the strings that are matched by regex." return {s for s in strings if re.search(regex, s)} def regex_parts(M, U): "Return parts that match at least one winner, but no loser." wholes = {'^' + w + '$' for w in M} parts = {d for w in wholes for p in subparts(w) for d in p} return wholes | {p for p in parts if not matches(p, U)} def subparts(word, N=5): "Return a set of subparts of word: consecutive characters up to length N (default 4)." return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N)) def words(text): return set(text.split()) def makerandomtree(M, U, charnode_pool, parentnode=rootnode(None, None), splitrate=0.7, concatrate=0.7, charrate=0.7, qualifierate=0.7, maxdepth=6, curren_level=0, stopearly=0.1): if curren_level > maxdepth: #print "curren_level > maxdepth: ", curren_level return if random() < stopearly: return # ROOT node if isinstance(parentnode, rootnode): curren_level = 0 #print "curren_level: ", curren_level # init root node #print "init rootnode: ", curren_level rootnode_i = rootnode( dotplaceholdernode(None), dotplaceholdernode(None) ) # create left child node #print "new dotplaceholdernode" rootnode_i.left_childnode = makerandomtree(M, U, charnode_pool, rootnode_i.left_childnode, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) #print "new dotplaceholdernode" # create right child node rootnode_i.right_childnode = makerandomtree(M, U, charnode_pool, rootnode_i.right_childnode, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) return rootnode_i # ".." dot placeholder node if isinstance(parentnode, dotplaceholdernode): curren_level += 1 #print "curren_level: ", curren_level # "|" if random() < splitrate: #print "new spliternode" return makerandomtree(M, U, charnode_pool, spliternode(None, None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # ".." elif random() < concatrate: #print "new concat_node" return makerandomtree(M, U, charnode_pool, concat_node(None, None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "foo" elif random() < charrate: #print "new charnode" return makerandomtree(M, U, charnode_pool, charnode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "|" split node if isinstance(parentnode, spliternode): curren_level += 1 #print "curren_level: ", curren_level #print "init spliternode" splitnode_i = spliternode( dotplaceholdernode(None), dotplaceholdernode(None) ) #print "new dotplaceholdernode" splitnode_i.left_childnode = makerandomtree(M, U, charnode_pool, splitnode_i.left_childnode, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) #print "new dotplaceholdernode" splitnode_i.right_childnode = makerandomtree(M, U, charnode_pool, splitnode_i.right_childnode, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) return splitnode_i # ".." concat node if isinstance(parentnode, concat_node): curren_level += 1 #print "curren_level: ", curren_level # "foo" if random() < charrate: #print "new charnode" return makerandomtree(M, U, charnode_pool, charnode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "++" if random() < qualifierate: #print "new qualifiernode" return makerandomtree(M, U, charnode_pool, qualifiernode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "foo" char node if isinstance(parentnode, charnode): curren_level += 1 #print "curren_level: ", curren_level charnode_str = choice(charnode_pool) #print "charnode_str: ", charnode_str #print "new charnode" charnode_i = charnode(charnode_str) return charnode_i # "++" qualifierate node if isinstance(parentnode, qualifiernode): curren_level += 1 #print "curren_level: ", curren_level qualifiernode_str = choice(['.', '+', '?', '*', '.*', '.+', '.*?']) #print "qualifiernode_str: ", qualifiernode_str #print "new qualifiernode" qualifiernode_i = qualifiernode(qualifiernode_str) return qualifiernode_i def scorefunction(tree, M, U, w=1): dif = 0 regex_str = printregextree(tree) M_cn, U_cn = 0, 0 for s in list(M): try: if re.search(regex_str, s): M_cn += 1 except Exception, e: # print e.message, "regex_str: ", regex_str # this regex tree is illegal, low socre!! return -32 for u in list(U): if re.search(regex_str, u): U_cn += 1 # print "M_cn: ", M_cn # print "U_cn: ", U_cn dif = w * (M_cn - 2*U_cn) - len(regex_str) return dif def rankfunction_(M, U, population): scores = [(scorefunction(t, M, U), t) for t in population] # remove illegal regex scores_ = [] for i in scores: if i[1]: scores_.append(i) scores_.sort(reverse=True) return scores_ def mutate(M, U, charnode_pool, t, probchange=0.4): if random() < probchange: return makerandomtree(M, U, charnode_pool) else: result = deepcopy(t) if hasattr(t, "left_concatchildnode"): result.left_concatchildnode = mutate(M, U, charnode_pool, t.left_concatchildnode, probchange) if hasattr(t, "right_concatchildnode"): result.right_concatchildnode = mutate(M, U, charnode_pool, t.right_concatchildnode, probchange) if hasattr(t, "childnode"): result.childnode = mutate(M, U, charnode_pool, t.childnode, probchange) if hasattr(t, "qualifierstrig"): result.qualifierstrig = qualifiernode(choice(['.', '+', '?', '*', '.*', '.+', '.*?'])) if hasattr(t, "charstring"): result.charstring = charnode(choice(charnode_pool)) return result def crossover(t1, t2, probswap=0.5): if random() < probswap: return deepcopy(t2) else: result = deepcopy(t1) if hasattr(t1, 'left_childnode') and hasattr(t2, 'left_childnode'): result.left_childnode = crossover(t1.left_childnode, t2.left_childnode, probswap) if hasattr(t1, 'right_childnode') and hasattr(t2, 'right_childnode'): result.right_childnode = crossover(t1.right_childnode, t2.right_childnode, probswap) if hasattr(t1, 'childnode') and hasattr(t2, 'childnode'): result.childnode = crossover(t1.childnode, t2.childnode, probswap) if hasattr(t1, 'qualifierstrig') and hasattr(t2, 'qualifierstrig'): result.qualifierstrig = t2.qualifierstrig if hasattr(t1, 'charstring') and hasattr(t2, 'charstring'): result.charstring = t2.charstring return result def evolve(M, U, charnode_pool, popsize=128, rankfunction=rankfunction_, maxgen=500, mutationrate=0.6, probswap=0.5, pexp=0.3, pnew=0.8): # Returns a random number, tending towards lower numbers. # The lower pexp is, more lower numbers you will get # probexp:表示在构造新种群时,”选择评价较低的程序“这一律率的递减比例。该值越大,相应的筛选过程就越严格,即只选择评价最高的多少比例的个体做为复制对象 def selectindex(): return int(log(random()) / log(pexp)) # Create a random initial population population = [makerandomtree(M, U, charnode_pool) for i in range(popsize)] scores = [] for i in range(maxgen): scores = rankfunction(M, U, population) print scores[0] print "evole round: {0}, top score: {1}, regex_str: {2}".format(i, scores[0][0], printregextree(scores[0][1])) if scores[0][0] > 0: print "found good solution: {0}".format(printregextree(scores[0][1])) break # The top 20% always make it # newpop = np.array(scores)[:int(len(scores) * 0.2), 1].tolist() newpop = [scores[0][1], scores[1][1]] # Build the next generation # probnew:表示在构造新种群时,”引入一个全新的随机程序“的几率,该参数和probexp是”种群多样性“的重要决定参数 while len(newpop) < popsize: if random() < pnew: newpop.append( mutate( M, U, charnode_pool, crossover( scores[selectindex()][1], scores[selectindex()][1], probswap ), mutationrate ) ) else: # Add a random node to mix things up new_tree = makerandomtree(M, U, charnode_pool) # print "evole round: {0}, add new tree: {1}".format(i, printregextree(new_tree)) newpop.append(new_tree) population = newpop # return the evolutionary results return scores[0][1] def test_regex(M, U, regex_str): dif = 0 M_cn, U_cn = 0, 0 for s in list(M): try: if re.search(regex_str, s): M_cn += 1 except Exception, e: # print e.message, "regex_str: ", regex_str # this regex tree is illegal, low socre!! dif = -32 for u in list(U): try: if re.search(regex_str, u): U_cn += 1 except Exception, e: # print e.message, "regex_str: ", regex_str # this regex tree is illegal, low socre!! dif = -32 print "M_cn: ", M_cn print "U_cn: ", U_cn dif = 1 * (M_cn - 4 * U_cn) - 4 * len(regex_str) print "dif: ", dif def test_regex_golf(): # exampletree = exampletree() # print type(exampletree), exampletree # print printregextree(exampletree) M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') charnode_pool = list(regex_parts(M, U)) print charnode_pool # rd1 = makerandomtree(M, U, parentnode=rootnode(None, None), splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0) # rd2 = makerandomtree(M, U, parentnode=rootnode(None, None), splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0) # print "rd1: " # print printregextree(rd1) # print "rd2: " # print printregextree(rd2) # dif = scorefunction(tree=rd1, M=M, U=U, w=1) # print "dif: ", dif # population = [makerandomtree(M, U) for i in range(10)] # for i in population: # print printregextree(i) # scores = rankfunction_(M, U, population) # print "function score: ", scores # print np.array(scores)[:int(len(scores) * 0.2), 1].tolist() # rd1_mutate = mutate(M, U, rd1, probchange=0.2) # print "rd1_mutate: " # print printregextree(rd1_mutate) # rd1_rd2_crossover = crossover(rd1, rd2, probswap=0.7) # print "rd1_rd2_crossover: " # print printregextree(rd1_rd2_crossover) evolutionary_regex_str = evolve(M, U, charnode_pool) print printregextree(evolutionary_regex_str) def load_data(): M, U = [], [] rootDir = "./blacksamples" for lists in os.listdir(rootDir): if lists == '.DS_Store': continue filepath = os.path.join(rootDir, lists) filecontent = open(filepath, 'r').read() # only remain English word cop = re.compile("[^^a-z^A-Z^0-9^\s]") # remove space filecontent = re.sub(r'\s+', '', filecontent).strip() filecontent = cop.sub('', filecontent) M.append(filecontent) rootDir = "./whitesamples" for lists in os.listdir(rootDir): if lists == '.DS_Store': continue filepath = os.path.join(rootDir, lists) filecontent = open(filepath, 'r').read() # only remain English word cop = re.compile("[^^a-z^A-Z^0-9^\s]") filecontent = cop.sub('', filecontent) # remove space filecontent = re.sub(r'\s+', '', filecontent).strip() U.append(filecontent) M = set(M) U = set(U) return M, U def test_webshell(): M, U = load_data() # print M charnode_pool = list(regex_parts(M, U)) print charnode_pool print len(charnode_pool) evolutionary_regex_str = evolve(M, U, charnode_pool) print printregextree(evolutionary_regex_str) print "test_regex: " test_regex(M, U, charnode_pool) if __name__ == '__main__': test_webshell() # test_regex_golf()
代码中的blacksamples、whitesamples请读者朋友自行准备。
Relevant Link:
http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e4%ba%8c%e6%84%9a%e5%bc%84%e6%b7%b1%e5%ba%a6%e5%ad%a6%e4%b9%a0%e7%9a%84%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95 Bartoli, Alberto, et al. “Playing regex golf with genetic programming.” Proceedings of the 2014 conference on Genetic and evolutionary computation. ACM, 2014. https://alf.nu/RegexGolf http://www.doc88.com/p-0387699026353.html http://regex.inginf.units.it/golf/# https://github.com/norvig/pytudes https://github.com/norvig/pytudes/blob/master/ipynb/xkcd1313.ipynb https://github.com/norvig/pytudes/blob/master/ipynb/xkcd1313-part2.ipynb
咱们来回顾一下要应用遗传编程在某个具体场景中,须要的两个必要条件:
基于php token生成一个php ast tree,而且在损失函数中找到必定定量评估方法,判断当前文件的恶意程度。用遗传编程自动生成一些能够绕过当前检测机制的php webshell
笔者提醒:
本质上来讲,遗传编程的优化方向是随机的,和梯度驱动的SGD优化算法相比,遗传编程每次的迭代优化并不明确朝某个方向前进,而是被动由环境来进行淘汰和筛选,因此是一种环境选择压驱动的优化算法。
遗传编程的这种的优化特性特别适合像“恶意样本检测”这种“阶跃损失”的分类问题,由于对于恶意样原本说,只有两种状态,“黑或白”,损失函数的值也只有两种,“0或者1”,所以,咱们没法用SGD相似的算法来优化这种阶跃损失函数问题,由于阶跃点处的梯度要么不存在(左极限),要么是无穷大的(右极限)。
可是遗传编程依然能在每轮筛选出“优胜者”,并按照必定的策略保留优胜者,进行交叉和变异以进入下一轮,同时也会按照必定的几率挑选部分的“失败者”也进入下一轮进化,这么作的目的是引入多样性。