难易程度:★★★前言
阅读点:python; web安全;
文章做者:xiaoye
文章来源:i春秋
关键字:网络渗透技术
python是门简单易学的语言,强大的第三方库让咱们在编程中事半功倍,今天,咱们来谈谈python多线程在渗透测试中的应用,本文,咱们将编写一个简易c段存活主机扫描脚本,以及一个python版本的多线程御剑--目录扫描工具
1、python 多线程
python多线程有几种写法
1. thread模块
python的一个多线程模块,小脚本能够用,可是有瑕疵,好比不稳定,线程数很差控制
下方贴出一个c段存活主机扫描脚本,这个脚本i春秋ado老师也有教过哦
思想:输入一个ip,通过字符拆分,获取c段,多线程地ping -c 2 ip ,根据返回的信息来判断主机是否存活
demo ping_thread.py:php
''' Created on 2017-2-27 @author: xiaoye ''' #coding: utf-8 import thread import time from subprocess import Popen,PIPE def scan_ip(ip): process = Popen('ping -c 2 ' + ip, stdin=PIPE, stdout=PIPE, shell=True) data = process.stdout.read() if 'ttl' in data: print '%s is live ,now time is %s' % (ip, time.strftime('%H:%M:%S')) if __name__ == '__main__': #scan_ip('111.13.147.229') ips = raw_input() ip_header = '.'.join(ips.split('.')[:3]) for i in range(1,255): ip = ip_header + '.' + str(i) #print ip thread.start_new_thread(scan_ip, (ip,)) time.sleep(0.1)
运行状况:
速度还行,稳定性通常
thread模块,核心在:
html
1
2
|
thread.start_new_thread(scan_ip, (ip,))
time.sleep(
0.1
)
|
scan_ip是要执行的函数,(ip,)是传入的参数,记得sleep一下
2.threading模块用法:
demo:python
''' Created on 2017-2-28 @author: xiaoye ''' #coding: utf-8 import threading import time def test(th): print 'i am doing %s %s' % (th, time.strftime('%H:%M:%S')) def main(): thread = [] keys = ['movie_th','swim_th','listen_th','learn_th','movie_th','swim_th','listen_th','learn_th','movie_th','swim_th','listen_th','learn_th','movie_th','swim_th','listen_th','learn_th'] thread_count = len(keys) #print thread_count for i in range(thread_count): t = threading.Thread(target=test, args=(keys[i],)) thread.append(t) for i in range(thread_count): thread[i].start() for i in range(thread_count): thread[i].join() if __name__ == '__main__': main()
运行状况:
能够看到,基本是同时运行的,threading.Thread模块的一种用法就是这样:git
for i in range(thread_count): t = threading.Thread(target=test, args=(keys[i],)) thread.append(t) for i in range(thread_count): thread[i].start() for i in range(thread_count): thread[i].join()
模式1.:一个列表存放全部线程,start()执行列表中线程,join()等待运行完毕
模式1?,还有模式2吗?
固然,模式2就是从threading.Thread继承一个子类class,重写父类run方法,实现多线程运行run函数,而这种也是很是良好的写法
demo:github
# -*- coding: utf-8 -*- import threading class T(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): #继承,threading.Thread子类, 重写run方法, run方法在start()后自动执行 print 'i love you' def main(): thread = [] for i in range(10): thread.append(T()) for i in thread: i.start() for i in thread: i.join() if __name__ == '__main__': main()
运行状况:
2、线程间的数据守护
Queue绝对是保护线程间数据安全的好选择,有关于Queue,你们能够自行百度其用法,我发出一点常常用的:
Queue.qsize() 返回队列的大小
Queue.empty() 若是队列为空,返回True,反之False
Queue.full() 若是队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 至关Queue.get(False)
非阻塞 Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 至关Queue.put(item, False)
Queue.task_done() 在完成一项工做以后,Queue.task_done() 函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的操做
3、多线程threading.Thread+Queue实现渗透测试工具编写
脚本放出来:
1.多线程c段存活主机扫描:web
''' Created on 2017-2-28 @author: xiaoye ''' #coding: utf-8 import time import sys import threading import Queue from subprocess import Popen,PIPE class Quethread(threading.Thread): def __init__(self, que): threading.Thread.__init__(self) self._que = que def run(self): while not self._que.empty(): ip = self._que.get() process = Popen('ping -c 2 ' + ip, stdin=PIPE, stdout=PIPE, shell=True) data = process.stdout.read() if 'ttl' in data: sys.stdout.write('%s is live %s\n' % (ip, time.strftime('%H:%M:%S'))) def main(): que = Queue.Queue() ips = raw_input() thread = [] thread_count = 200 ip_head = '.'.join(ips.split('.')[:3]) #print ip_head for i in range(1, 255): que.put(ip_head + '.' + str(i)) '''for i in range(1,255): print que.get()''' for i in range(thread_count): thread.append(Quethread(que)) for i in thread: i.start() for i in thread: i.join() if __name__ == '__main__': main()
ubuntu下运行成功,win下须要修改Popen里的命令等,截图:
速度很快,稳定性较强
c段主机存活脚本:https://github.com/xiaoyecent/ping_threading_Queue
2.py版多线程御剑--目录扫描--支持自定义字典、输出文件位置以及自定义线程数:
shell
''' @author: xiaoye ''' #coding: utf-8 import requests import sys import threading #import time import Queue from optparse import OptionParser reload(sys) sys.setdefaultencoding('utf8') class Doscan(threading.Thread): def __init__(self, que): threading.Thread.__init__(self) self._que = que def run(self): while not self._que.empty(): d = self._que.get() try: r = requests.get(url + d, headers=headers, timeout=3) sys.stdout.write(d + ' is scan status:' + str(r.status_code) + '\n') if r.status_code == 200: with open(option.outfile, 'a') as f: f.write(url + d + '\n') except: pass def main(): thread = [] thread_count = option.threadcount que = Queue.Queue() with open(option.dictname, 'r') as f: for d in f.readlines(): d = d.strip('\n') que.put(d) for i in range(thread_count): thread.append(Doscan(que)) for i in thread: i.start() for i in thread: i.join() if __name__ == '__main__': parse = OptionParser() parse.add_option('-u', '--url', dest='input_url', type='string', help='the url you wan to scan dir') parse.add_option('-o', '--out', dest='outfile', type='string', help='output filename', default='result.txt') parse.add_option('-s', '--speed', dest='threadcount', type='int', default=60, help='the thread_count') parse.add_option('-d', '--dict', dest='dictname', type='string', help='dict filename') (option, args) = parse.parse_args() headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:50.0) Gecko/20100101 Firefox/50.0'} url = option.input_url main()
Usage: scan_dir.py [options] Options: -h, --help show this help message and exit -u INPUT_URL, --url=INPUT_URL the url you wan to scan dir -o OUTFILE, --out=OUTFILE output filename -s THREADCOUNT, --speed=THREADCOUNT the thread_count -d DICTNAME, --dict=DICTNAME dict filename
参数用法贴出来
运行状况
举个例子:
-u http://localhost -s 30 -d d://PHP.txt -o d://ichunqiu.txt:
结果:
运行速度取决于线程数(默认60)和实际环境
源码:https://github.com/xiaoyecent/scan_dir
4、总结
多线程加队列实现线程间的数据保护是很好的搭配,threading.Thread+Queue的用法但愿你们可以掌握,另外,继承threading.Thread写出子类,重写父类run方法来实现多线程的写法也值得借鉴编程