直接采样的思想是,经过对均匀分布采样,实现对任意分布的采样。由于均匀分布采样好猜,咱们想要的分布采样很差采,那就采起必定的策略经过简单采起求复杂采样。
假设y服从某项分布p(y),其累积分布函数CDF为h(y),有样本z~Uniform(0,1),咱们令 z = h(y),即 y = h(z)^(-1),结果y即为对分布p(y)的采样。python
直接采样的核心思想在与CDF以及逆变换的应用。在原分布p(y)中,若是某个区域[a, b]的分布较多,而后对应在CDF曲线中,[h(a), h(b)]的曲线斜率便较大。那么,通过逆变换以后,对y轴(z)进行均匀分布采样时,分布多的部分(占据y轴部分多)对应抽样获得的几率便更大,web
实际中,全部采样的分布都较为复杂,CDF的求解和反函数的求解都不太可行。算法
拒绝采样是由一个易于采样的分布出发,为一个难以直接采样的分布产生采样样本的通用算法。既然 p(x) 太复杂在程序中无法直接采样,那么便一个可抽样的分布 q(x) 好比高斯分布,而后按照必定的方法拒绝某些样本,达到接近 p(x) 分布的目的。网络
设定一个方便抽样的函数 q(x),以及一个常量 k,使得 p(x) 总在 k*q(x) 的下方。(参考上图)app
重要性采样主要是用于求一个复杂分布p(x)的均值,最后并无获得样本。函数
重要性采样的思想是借助一个易于采样的简单分布q(x),对这个简单分布q(x)所获得的样本所有接受。可是以此获得的样本确定不知足分布p(x),所以须要对每个样本附加相应的重要性权重。在重要性采样中,以p(x0)/q(x0)的值做为每一个样本的权重。这样,当样本和分布p(x)相近时,对应的权重大;与分布p(x)相差过多时,对应的权重小。这个方法采样获得的是带有重要性权重的服从q(z)分布的样本,这个权重乘以样本以后的结果其实就是服从p(z)分布的。学习
经过上述公式,咱们能够知道重要性采样能够用于近似复杂分布的均值。spa
假设有一个例子:E:吃饭、学习、打球;时间T:上午、下午、晚上;天气W:晴朗、刮风、下雨。样本(E,T,W)知足必定的几率分布。现要对其进行采样,如:打球+下午+晴朗。.net
问题是咱们不知道p(E,T,W),或者说,不知道三件事的联合分布。固然,若是知道的话,就没有必要用吉布斯采样了。可是,咱们知道三件事的条件分布。也就是说,p(E|T,W), p(T|E,W), p(W|E,T)。如今要作的就是经过这三个已知的条件分布,再用Gibbs sampling的方法,获得联合分布。
具体方法:首先随便初始化一个组合,i.e. 学习+晚上+刮风,而后依条件几率改变其中的一个变量。具体说,假设咱们知道晚上+刮风,咱们给E生成一个变量,好比,学习→吃饭。咱们再依条件几率改下一个变量,根据学习+刮风,把晚上变成上午。相似地,把刮风变成刮风(固然能够变成相同的变量)。这样学习+晚上+刮风→吃饭+上午+刮风。一样的方法,获得一个序列,每一个单元包含三个变量,也就是一个马尔可夫链。而后跳过初始的必定数量的单元(好比100个),而后隔必定的数量取一个单元(好比隔20个取1个)。这样sample到的单元,是逼近联合分布的。
蓄水池抽样(Reservoir Sampling ),即可以在o(n)时间内对n个数据进行等几率随机抽取,例如:从1000个数据中等几率随机抽取出100个。另外,若是数据集合的量特别大或者还在增加(至关于未知数据集合总量),该算法依然能够等几率抽样。
马氏链定理:若是一个非周期马氏链具备转移几率矩阵P,且它的任何两个状态是连通的,那么\(\lim_{p\to\infty}P_{ij}^n\)存在且与i无关,记\(\lim_{p\to\infty}P_{ij}^n = \pi(j)\),咱们有:
其中\(\pi = [\pi(1), \pi(2), ... , \pi(j), ...], \sum_{i=0}^{\infty}\pi_i = 1, \pi\)称为马氏链的平稳分布。
全部的 MCMC(Markov Chain Monte Carlo) 方法都是以这个定理做为理论基础的。
说明:
针对一个新的分布,如何构造对应的转移矩阵?
对于一个分布\(\pi(x)\),根据细致平稳条件,若是构造的转移矩阵P知足\(\pi(i)P_{ij} = \pi(j)P_{ji}\),那么\(\pi(x)\)即为该马氏链的平稳分布,所以能够根据这个条件去构造转移矩阵。
一般状况下,初始的转移矩阵\(P\)通常不知足细致平稳条件,所以咱们经过引入接受率构造出新的转移矩阵\(P'\),使其和\(\pi(x)\)知足细致平稳条件。由此,咱们即可以用任何转移几率矩阵(均匀分布、高斯分布)做为状态间的转移几率。
若是咱们假设状态之间的转移几率是相同的,那么在算法实现时,接收率能够简单得用\(\pi(j)/\pi(i)\)表示。
对于给定的几率分布p(x),咱们但愿能有便捷的方式生成它对应的样本。因为马氏链能收敛到平稳分布, 因而一个很的漂亮想法是:若是咱们能构造一个转移矩阵为P的马氏链,使得该马氏链的平稳分布刚好是p(x), 那么咱们从任何一个初始状态x0出发沿着马氏链转移, 获得一个转移序列 x0,x1,x2,⋯xn,xn+1⋯,, 若是马氏链在第n步已经收敛了,因而咱们就获得了 π(x) 的样本xn,xn+1⋯。
在马尔科夫状态链中,每个状态表明一个样本\(x_n\),即全部变量的赋值状况。
经过分析MCMC源码,能够知道:假设状态间的转移几率相同,那么下一个样本的采样会依赖于上一个样本。假设上一个样本所对应的原始分布几率\(\pi(x)\)很小,那么下一个样本的接受率很大几率为1;反之若是上一个样本的原始分布几率\(\pi(x)\)很大,那么下一个样本会有挺大几率被拒绝。这样的机制保证了生成的样本服从分布\(\pi(x)\)。
从上述分析能够看出,假如初始状态的样本对应的分布几率很小,那么在算法刚开始运行时所产生的样本(即便是分布几率很小的样本)很大可能都会被接收,从而使得算法刚开始运行时采样的样本不知足原始分布\(\pi(x)\)。只要算法采样到分布几率大的样本(此时即为收敛!),那么以后所采样获得的样本就会基本服从原始分布。固然,从初始状态遍历到分布几率大的状态时须要运行一段时间,这段过程即为收敛的过程。MCMC算法在收敛以后,保证了在分布几率\(\pi(x)\)大的地方产生更多的样本,在分布几率\(\pi(x)\)小的地方产生较少的样本。
一个马尔可夫链须要通过屡次的状态转移过程采用达到一个稳定状态,这时候采样才比较接近真实的分布。此过程称为burn in。通常可经过丢弃前面的N个采样结果来达到burn in。
MCMC的收敛是什么意思?这个过程当中是什么参数会更新致使收敛?如何肯定什么时候收敛?
收敛过程没有参数会更新,收敛的思想相似于大数定理。应用MCMC算法采样时,初始的样本量少,服从的分布可能和复杂分布\(\pi(x)\)相差挺远,但随着状态转移数的增长(转移矩阵P的应用),根据上述定理的证实,最终的样本分布会逐渐服从复杂分布\(\pi(x)\)。
\(\pi\)是每一个状态所对应的几率分布吗?若是是的话,初始选定一个状态后,这个\(\pi\)如何设定?或则在MCMC证实过程当中,初始\(\pi\)的几率分布如何设置?
在MCMC的证实过程当中,\(\pi\)是每一个状态所对应的几率分布。证实中所给定的初始\(\pi\)应该只是为了证实不管初始样本符合什么分布,在通过必定数量的转移以后,获得的样本会服从复杂分布\(\pi (x)\),在实际代码实现中,不用对这个\(\pi\)进行设定。
import numpy as np import random import matplotlib.pyplot as plt import pandas as pdf
def f(x): if 0 <= x and x <= 0.25: y = 8 * x elif 0.25 < x and x <= 1: y = (1 - x) * 8/3 else: y = 0 return y
def g(x): if 0 <= x and x <= 1: y = 1 else: y = 0 return y
def plot(fun): X = np.arange(0, 1.0, 0.01) Y = [] for x in X: Y.append(fun(x)) plt.plot(X, Y) plt.xlabel("x") plt.ylabel("y") plt.show()
plot(f) plot(g)
def rejection_sampling(N=10000): M = 3 cnt = 0 samples = {} while cnt < N: x = random.random() acc_rate = f(x) / (M * g(x)) u = random.random() if acc_rate >= u: if samples.get(x) == None: samples[x] = 1 else: samples[x] = samples[x] + 1 cnt = cnt + 1 return samples
s = rejection_sampling(100000)
X = [] Y = [] for k, v in s.items(): X.append(k) Y.append(v)
plt.hist(X, bins=100, edgecolor='None')
Metropolis-Hastings Algorithm
PI = 3.1415926 def get_p(x): # 模拟pi函数 return 1/(2*PI)*np.exp(- x[0]**2 - x[1]**2) def get_tilde_p(x): # 模拟不知道怎么计算Z的PI,20这个值对于外部采样算法来讲是未知的,对外只暴露这个函数结果 return get_p(x)
def domain_random(): #计算定义域一个随机值 return np.random.random()*3.8-1.9
def metropolis(x): new_x = (domain_random(),domain_random()) #新状态 #计算接收几率 acc = min(1,get_tilde_p((new_x[0],new_x[1]))/get_tilde_p((x[0],x[1]))) #使用一个随机数判断是否接受 u = np.random.random() if u<acc: return new_x return x
def testMetropolis(counts = 100,drawPath = False): plt.figure() #主要逻辑 x = (domain_random(),domain_random()) #x0 xs = [x] #采样状态序列 for i in range(counts): xs.append(x) x = metropolis(x) #采样并判断是否接受 #在各个状态之间绘制跳转的线条帮助可视化 X1 = [x[0] for x in xs] X2 = [x[1] for x in xs] if drawPath: plt.plot(X1, X2, 'k-',linewidth=0.5) ##绘制采样的点 plt.scatter(X1, X2, c = 'g',marker='.') plt.show()
testMetropolis(5000)
def metropolis(x): new_x = domain_random() #计算接收几率 acc = min(1,f(new_x)/f(x)) #使用一个随机数判断是否接受 u = np.random.random() if u<acc: return new_x return x
def testMetropolis(counts = 100,drawPath = False): plt.figure() #主要逻辑 x = domain_random() xs = [x] #采样状态序列 for i in range(counts): xs.append(x) x = metropolis(x) #采样并判断是否接受 #在各个状态之间绘制跳转的线条帮助可视化 plt.hist(xs, bins=100, edgecolor='None') # plt.plot(xs) plt.show()
testMetropolis(100000)
def partialSampler(x,dim): xes = [] for t in range(10): #随机选择10个点 xes.append(domain_random()) tilde_ps = [] for t in range(10): #计算这10个点的未归一化的几率密度值 tmpx = x[:] tmpx[dim] = xes[t] tilde_ps.append(get_tilde_p(tmpx)) #在这10个点上进行归一化操做,而后按照几率进行选择。 norm_tilde_ps = np.asarray(tilde_ps)/sum(tilde_ps) u = np.random.random() sums = 0.0 for t in range(10): sums += norm_tilde_ps[t] if sums>=u: return xes[t]
def gibbs(x): rst = np.asarray(x)[:] path = [(x[0],x[1])] for dim in range(2): #维度轮询,这里加入随机也是能够的。 new_value = partialSampler(rst,dim) rst[dim] = new_value path.append([rst[0],rst[1]]) #这里最终只画出一轮轮询以后的点,但会把路径都画出来 return rst,path
def testGibbs(counts = 100,drawPath = False): plt.figure() x = (domain_random(),domain_random()) xs = [x] paths = [x] for i in range(counts): xs.append([x[0],x[1]]) x,path = gibbs(x) paths.extend(path) #存储路径 p1 = [x[0] for x in paths] p2 = [x[1] for x in paths] xs1 = [x[0] for x in xs] xs2 = [x[1] for x in xs] if drawPath: plt.plot(p1, p2, 'k-',linewidth=0.5) ##绘制采样的点 plt.scatter(xs1, xs2, c = 'g',marker='.') plt.show()
testGibbs(5000)