在上一篇文章BaseProxy:异步http/https代理中,我介绍了本身的开源项目BaseProxy,这个项目的初衷实际上是为了渗透测试,抓包改包。在知识星球中,有不少朋友问我这个项目的原理及实现代码,本篇文章就讲解一下和这个项目相关的HTTPS的中间人攻击。html
HTTPS隧道代理简单来讲是基于TCP协议数据透明转发,在RFC中,为这类代理给出了规范,Tunneling TCP based protocols through Web proxy servers。浏览器客户端发送的原始 TCP 流量,代理发送给远端服务器后,将接收到的 TCP 流量原封不动返回给浏览器。交互流程以下图所示:python
以链接百度为例,浏览器首先发起 CONNECT 请求:web
CONNECT baidu.com:443 HTTP/1.1
代理收到这样的请求后,根据 host 地址与服务器创建 TCP 链接,并返回给浏览器链接成功的HTTP 报文(没有报文体):浏览器
HTTP/1.1 200 Connection Established
浏览器一旦收到这个响应报文,就可认为与服务器的 TCP 链接已打通,后续可直接透传。安全
在BaseProxy项目中,https=False
是对于https实行透传。服务器
HTTPS 代理本质上是隧道透传,仅仅是转发 TCP 流量,没法获取其中的GET/POST请求的具体内容。这就很麻烦,如今 HTTPS 愈来愈广泛,作安全测试也就拿不到 HTTP 请求。那怎么作呢? 代理须要对 TCP 流量进行解密,而后对明文的HTTP请求进行分析,这样的代理就称为HTTPS中间人。app
在上图中,隧道代理负责浏览器和服务器之间的TCP流量的转发。机器学习
若是须要对TCP流量进行分析和修改,就要将上图中的代理功能一分为二,即代理既要当作TLS服务端,又要当作TLS客户端,以下图所示。异步
在上图中,用一个 TLS 服务器假装成远端的真正的服务器,接收浏览器的 TLS 流量,解析成明文。这个时候能够对明文进行分析修改,而后用明文做为原始数据,模拟 TLS 客户端将原始数据向远端服务器转发。源码分析
CA证书是我当时遇到的坑,以前没接触过。HTTPS传输是须要证书的,用来对HTTP明文请求进行加解密。通常正常网站的证书都是由合法的 CA 签发,则称为合法证书。在上图中,浏览器会验证隧道代理中 TLS 服务器 的证书:
www.baidu.com
,则返回的证书 CN 属性必须是 www.baidu.com
。对于第一点,合法的 CA 机构不会给咱们签发证书的,不然HTTPS安全性形同虚设,所以咱们须要自制CA证书,并导入到浏览器的信任区中。
对于第二点,咱们因为须要对各个网站进行HTTPS拦截,所以咱们须要实时生成相应域名的服务器证书,并使用自制的CA证书进行签名。
经过以上的讲解,HTTPS中间人的原理已经基本清楚,下面简要地说明一下BaseProxy源码。
代理其实就是一个HTTPS服务器,使用了Python中的HTTPServer类,为了增长异步特性,将其放到线程池中。
class MitmProxy(HTTPServer): def __init__(self,server_addr=('', 8788),RequestHandlerClass=ProxyHandle, bind_and_activate=True,https=True): HTTPServer.__init__(self,server_addr,RequestHandlerClass,bind_and_activate) logging.info('HTTPServer is running at address( %s , %d )......'%(server_addr[0],server_addr[1])) self.req_plugs = []##请求拦截插件列表 self.rsp_plugs = []##响应拦截插件列表 self.ca = CAAuth(ca_file = "ca.pem", cert_file = 'ca.crt') self.https = https def register(self,intercept_plug): if not issubclass(intercept_plug, InterceptPlug): raise Exception('Expected type InterceptPlug got %s instead' % type(intercept_plug)) if issubclass(intercept_plug,ReqIntercept): self.req_plugs.append(intercept_plug) if issubclass(intercept_plug,RspIntercept): self.rsp_plugs.append(intercept_plug) class AsyncMitmProxy(ThreadingMixIn,MitmProxy): pass
对HTTP请求的解析与响应,关键在于ProxyHandle类,实现其中的do_CONNECT和do_GET方法,并在do_CONNECT方法中判断是使用透传模式仍是中间人模式。
class ProxyHandle(BaseHTTPRequestHandler): def __init__(self,request,client_addr,server): self.is_connected = False BaseHTTPRequestHandler.__init__(self,request,client_addr,server) def do_CONNECT(self): ''' 处理https链接请求 :return: ''' self.is_connected = True#用来标识是否以前经历过CONNECT if self.server.https: self.connect_intercept() else: self.connect_relay() def do_GET(self): ''' 处理GET请求 :return: ''' ...... do_HEAD = do_GET do_POST = do_GET do_PUT = do_GET do_DELETE = do_GET do_OPTIONS = do_GET
与CA证书相关的内容都放在了CAAuth类中。生成CA证书代码以下:
def _gen_ca(self,again=False): # Generate key #若是证书存在并且不是强制生成,直接返回证书信息 if os.path.exists(self.ca_file_path) and os.path.exists(self.cert_file_path) and not again: self._read_ca(self.ca_file_path) #读取证书信息 return self.key = PKey() self.key.generate_key(TYPE_RSA, 2048) # Generate certificate self.cert = X509() self.cert.set_version(2) self.cert.set_serial_number(1) self.cert.get_subject().CN = 'baseproxy' self.cert.gmtime_adj_notBefore(0) self.cert.gmtime_adj_notAfter(315360000) self.cert.set_issuer(self.cert.get_subject()) self.cert.set_pubkey(self.key) self.cert.add_extensions([ X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:0"), X509Extension(b"keyUsage", True, b"keyCertSign, cRLSign"), X509Extension(b"subjectKeyIdentifier", False, b"hash", subject=self.cert), ]) self.cert.sign(self.key, "sha256") with open(self.ca_file_path, 'wb+') as f: f.write(dump_privatekey(FILETYPE_PEM, self.key)) f.write(dump_certificate(FILETYPE_PEM, self.cert)) with open(self.cert_file_path, 'wb+') as f: f.write(dump_certificate(FILETYPE_PEM, self.cert))
根据域名实时生成服务器证书,并对服务器证书进行自签名。代码以下:
def _sign_ca(self,cn,cnp): #使用合法的CA证书为代理程序生成服务器证书 # create certificate try: key = PKey() key.generate_key(TYPE_RSA, 2048) # Generate CSR req = X509Req() req.get_subject().CN = cn req.set_pubkey(key) req.sign(key, 'sha256') # Sign CSR cert = X509() cert.set_version(2) cert.set_subject(req.get_subject()) cert.set_serial_number(self.serial) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(31536000) cert.set_issuer(self.cert.get_subject()) ss = ("DNS:%s" % cn).encode(encoding="utf-8") cert.add_extensions( [X509Extension(b"subjectAltName", False, ss)]) cert.set_pubkey(req.get_pubkey()) cert.sign(self.key, 'sha256') with open(cnp, 'wb+') as f: f.write(dump_privatekey(FILETYPE_PEM, key)) f.write(dump_certificate(FILETYPE_PEM, cert)) except Exception as e: raise Exception("generate CA fail:{}".format(str(e)))
关注公众号:七夜安全博客
知识星球已经50多人了,随着人数的增多,价格以后会上涨,越早关注越多优惠。星球的福利有不少: