#Created on 2016年12月30日python
第一课 课前废话 28minutesshell
能够经过Queue来限制线程的数量,就像生产者和消费者缓存
第二课 上节内容回顾 20minutes
上节回顾:
socket文件传送
多线程:
计算密集型:多进程,须要CPU
I/O密集型:多线程 ,不需CPU
线程安全:线程锁 GLL
event:线程间通讯
线程事件 event.set(),event.wait()
两种写法:直接调用函数,另外一种直接继承类
生产者消费者模型:加快效率,松耦合
paramiko
ssh通讯原理:认证加密,通讯加密
本节内容:
多进程
Paramiko
审计开发
Select异步模型安全
第三课 多进程使用 14minutes
Multiprocessing
进程:由系统控制bash
简单的多进程
a = range(10)
def main(n):
time.sleep(1)
print n*n
return n*n
if __name__ == '__main__':
p = Pool(5)
print p.map(main,a)多线程
第四课 子进程与父进程的关系 25minutes并发
每个程序都有主进程和子进程
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import sys,time,re,os
from multiprocessing import Pool,Process
def aa(x):
print x
print 'ppid:',os.getppid()
time.sleep(1)
print 'pid:',os.getpid()
aa('fffff')
print '-----------'
p = Process(target=aa,args=('sssss',))
p.start()
p.join()
结果:
[root@Rsyslog day7]# python multiprocess.py
fffff
ppid: 7730
pid: 7760
-----------
sssss
ppid: 7760
pid: 7761
[root@Rsyslog day7]# app
第五课 进程间的内存同步方法Queue 31minutesssh
如下例子多进程没法通讯的,但在Win下老死机,须要Linux测试
def aa(info_list,n):
info_list.append(n)
print info_list
info = []
for i in range(10):
#进程不共享内存,因此每次只出来一个
p = Process(target=aa,args=(info,i))
p.start()
[root@Rsyslog day7]# python multiprocess-1.py
[1]
[0]
[2]
[3]
[4]
[5]
[7]
[6]
[8]
[9]
[root@Rsyslog day7]#
#若是用多线程,它由于是共享内存的,因此会出来不少
p = threading.Thread(target=aa,args=(info,i))
p.start()
[root@Rsyslog day7]# python multiprocess-1.py
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5, 6, 7]
[0, 1, 2, 3, 4, 5, 6, 7, 8]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
--------------------------------------------------
问题:若是经过Queue解决上面的问题
下面例子中,主进程和子进程的Q是不同的结果,相互独立
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import sys,time,re,os
from multiprocessing import Pool,Process,Queue异步
def f(q,n):
q.put([n,'hello'])
print q.get()
q = Queue()
for i in range(5):
p = Process(target=f,args=(q,i))
p.start()
p.join()
[root@Rsyslog day7]# python multi-2.py
[0, 'hello']
[1, 'hello']
[2, 'hello']
[3, 'hello']
[4, 'hello']
[root@Rsyslog day7]#
-------------------------------------------------------
第六课 进程间的内存同步方法manager 29minutes
进程里的会直接Copy外部变量的数据,因此子进程内部和外面的就是结果是不同
共享内存方法:Value,Array-------须要练习
只能进行数字和列表共享
from multiprocessing import Array,Value
def run(v,a):
v.value = 3.1415926
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
v = Value('d',0.0)
a = Array('i',range(10))
p = Process(target=run,args=(v,a))
p.start()
p.join()
print v.value
print a[:]
结果:
3.1415926
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
manager 支持更多的共享
可以使用Manager方式更换数据
def run(d,l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
man= Manager()
d = man.Dict()
l = man.list(range(10))
p = Process(target=run,args=(d,l))
p.start()
p.join()
print d
print l
结果:
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
#Created on 2017年01月01日
第七课 经过Pool产生多进程1 20minutes
第8课 经过Pool产生多进程2 9minutes
多进程数量和CPU核数相对便可,不可太多.
def run(x):
print x*x
time.sleep(1)
pool = Pool(processes=4)
re_list = []
# for i in range(10):
# res = pool.apply_async(run, [i,]) #异步
# #res = pool.apply(run, [i,]) #同步,基本不用
# res.get()
#上面For循环等同于如下命令
print pool.map(run,range(10))
-----------------------------------------------
一个简单的多进程的池的练习
[root@Rsyslog test_script]# cat process_test_ping.py
#!/usr/bin/env python
import multiprocessing
import subprocess
host_list = ['192.168.100.254','1.1.1.1','192.168.100.253','114.28.127.2','114.28.127.72','114.28.127.70','114.28.127.12','114.28.127.56','114.28.127.102']
if len(host_list) > 30:
process_number = 30
else:
process_number = len(host_list)
def ping_host(ipaddr):
if subprocess.call('ping -c1 -W 1 %s > /dev/null' % ipaddr, shell=True) == 0:
print '%s is OK' % ipaddr
else:
print '%s is DOWN' % ipaddr
pool = multiprocessing.Pool(processes=process_number)
for ip in host_list:
pool.apply_async(ping_host,(ip,))
#pool.map(ping_host,host_list)
pool.close()
pool.join()
[root@Rsyslog test_script]#
#Created on 2017年01月03日
第九课 开发审计堡垒机 56minutes
主要用Paramiko源码下的demo.py文件
第十课 开发审计堡垒机修改Paramiko源码记录操做 29minutes
修改Demo下的一个文件
第十一课 审计堡垒的安全控制 15minutes
登陆后自动启动脚本
将脚本放到用户的环境变量中
在/home/allan/.bashrc 用户环境变量
将用户的脚本放到用户的环境变量中
shellinabox -t 不用证书
第十二课 select-Poll-epoll介绍 47minutes
Select VS poll &epoll
异步I/O模型
Nginx可经过异步支持上万个并发
ulimit -n 显示当前Shell终端同时最大打开多少个文件
Select单进程多并发,文件描述符限制为1024
Select目前基本不用,无太大并发可用
Poll改进版:本质同Select,无文件描述符限制
水平触发:以上两种将就绪的文件描述符告诉进程后,若是进程没有对其进行IO操做,那么下次调用
Select和poll的时候再次报告这些描述符,因此通常不会丢失,这就是水平触发。
这种操做速度慢,且耗时,若是有一万个描述符,要轮循好久。
Epoll:水平触发和边缘触发
边缘触发:只告诉进程哪些文件描述符就绪,只说一次,若是没有采起行动,不会再次告知。
#Created on 2017年01月03日
第十三课 select代码实例分析 54minutes
select异步例子,须要实例化操做。
第十四课 审计做业 14minutes
审计开发
用户可登陆和操做指定机器,机器组
用户可经过不一样的远程账号登陆远程主机:
在选择主机后可选择收号登陆,如Root或其余
做业二:
Select代码做业,加注释
Server:
#!/usr/bin/env python # encoding: utf-8 #Created on 2017年1月11日 #@author: Administrator #--------------select异步说明-------------------------- import select import socket import Queue import sys def select_server(): server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) #设定SOcket为非阻塞模式,当前容许链接,但会话需等待,不设定有链接时不容许链接 server.setblocking(0) address_port = ('127.0.0.1',9999) print >>sys.stderr,'starting up on %s port %s' % address_port server.bind(address_port) server.listen(5) ''' select()方法接收并监控3个通讯列表, 第一个是全部的输入的data,就是指外部发过来的数据, 第2个是监控和接收全部要发出去的data(outgoing data),第3个监控错误信息,接下来咱们需 要建立2个列表来包含输入和输出信息来传给select(). ''' inputs = [server] outputs = [] ''' 全部客户端的进来的链接和数据将会被server的主循环程序放在上面的list中处理,咱们如今的 server端须要等待链接可写(writable)以后才能过来,而后接收数据并返回(所以不是在接收到 数据以后就马上返回),由于每一个链接要把输入或输出的数据先缓存到queue里,而后再由select取 出来再发出去。 ''' message_que = {} while inputs: #select将轮循三张表,将结果返回给前面的三个参数 print '\nnow wait for connect.......' readable,writable,exception = select.select(inputs,outputs,inputs) for s in readable: #轮循inputs表 #第一种状况是S是Server自己,则开始接收客户端 if s is server: #若是目标是当前Server,就是socket对象 #接收链接 connection,address = s.accept() #将链接对象放到表中 inputs.append(connection) #将链接对象放到Queue中 message_que[connection] = Queue.Queue() #第二种状况:若是已链接,则接收数据 else: #若是已链接,则接收数据 data = s.recv(1024) if data: #若是有数据 message_que[s].put(data) #将数据放到Queue中 if s not in outputs: #若是s不在outputs中,则加入 outputs.append(s) #第三种状况,客户端已断开,接收数据为空,则关闭客户端 else: inputs.remove(s) #删除inputs中的链接 if s in outputs: #删除outputs中的链接 outputs.remove(s) s.close() #关闭链接 del message_que[s] #删除Queue中的数据 ''' 对于writable list中的socket,也有几种状态,若是这个客户端链接在跟它对应的queue里有数据, 就把这个数据取出来再发回给这个客户端,不然就把这个链接从output list中移除,这样下一次循环 select()调用时检测到outputs list中没有这个链接,那就会认为这个链接还处于非活动状态 ''' for s in writable: try: #取出Queue中的数据 next_msg = message_que[s].get_nowait() except Queue.Empty: #若是数据为空,则出现错误 #print >>sys.stderr,'output queue for',s.getpeername(),'is empty' outputs.remove(s) #删除putputs中的链接 else: #若是正常取出,则发送给客户端 #print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername()) s.send(next_msg.upper()+' =====> from server') for s in exception: #若是出现错误 inputs.remove(s) #删除inputs表的项 if s in outputs: outputs.remove(s) #删除outputs表的项 s.close() #关闭链接 del message_que[s] #删除Queue中的项 if __name__ == '__main__': select_server()
Client:
#!/usr/bin/env python # encoding: utf-8 #Created on 2017年1月11日 #@author: Administrator import socket import sys #开两个Socket链接 socks = [socket.socket(socket.AF_INET,socket.SOCK_STREAM), socket.socket(socket.AF_INET,socket.SOCK_STREAM)] address_port = ('127.0.0.1',9999) #定义发送的信息 message = ['helle,this is first data', 'helle,this is second data', 'helle,this is thrid data'] #经过迭代方式打开两个Socket进程 for s in socks: try: s.connect(address_port) except: print 'error' #迭代发送信息 for msg in message: #发送 for s in socks: s.send(msg) print >>sys.stderr,'%s sending "%s" '%(s.getsockname(),msg) #接收 for s in socks: data = s.recv(1024) print >>sys.stderr,'%s recevid "%s"' % (s.getsockname(),data) if not data: print >>sys.stderr,'closeing socket',s.getsockname()