socket.socket(AF.INET,socket.SOCK_STREAM)
2.1 sendall的发送方式与for i in f(局部)python
2.2 使用send、recv交替的方式作一个ack来解决黏包linux
3.1 5种不一样的socket类nginx
3.1.1 baseserver用于继承,不对外提供服务sql
3.1.2 tcpserver继承baseserver类用于处理tcp链接shell
3.1.3 unixstreamserver继承tcpserver使用UNIX域套接字实现面向数据流协议(sock_stream --->tcp)编程
3.1.4 udpserver继承tcpsever用于处理udp链接windows
3.1.5 unixdatagramserver继承tcpserver使用针对UNIX域套接字来处理数据报式协议(sock_dgram--->udp)安全
3.2 socketserver的实现服务器
3.2.1 step1 定义一个类,存放handler方法多线程
须要继承socketserver.BaseRequestHandler,以后每生成一个新链接,都会实例化一个类,并调用这个handler方法,和客户端全部的交互都是在这个handler里面定义的
class myclass(socketserver.BaseRequestHandler): def handle(self): print('xxx') self.request.send(b'ok')
3.2.2 step2 定义一个实例,此实例关联上一步的handler、并绑定ip和及端口,监听端口处理新发起的链接,将新发起的链接实例化对像,并交给handler处理。
此处咱们要处理tcp,因此是tcpserver。考虑到并发,此处使用threading多线程实现,或者可使用ForkingServer来处理
s1=socketserver.ThreadingTCPServer(('localhost',9999),myclass)
s2=socketserver.FuckingTCPServer(('localhost',8888),myclass)
3.2.3 step3 server.forever()
s1.serve_forever()
s2.serve_forever()
3.2.4 step4 客户端对接测试
import socketserver class myclass(socketserver.BaseRequestHandler): def handle(self): print('xxx') self.request.send(b'ok') s1=socketserver.ThreadingTCPServer(('localhost',9999),myclass) s2=socketserver.FockingTCPServer(('localhost',8888),myclass) s1.serve_forever() s2.serve_forever() ------------------------------- import socket c=socket.socket() c.connect(('localhost',8888)) print(c.recv(1024).decode())
经过byte的方式读取文件结合使用ab的方式续写文件的方式来实现断点续传,其核心的思路就是,得到当前半成品文件的字节数,使用seek将被操做文件的句柄移动到此处再日后读取,以此实现断点续传
import socket c1=socket.socket() c1.connect(('localhost',6666)) ''' f=open(r'E:\L.exe','rb') count = 0 for i in f: c1.send(i) count+=1 if count > 10: break ''' x=input('seek到哪一个位置?') f=open(r'E:\L.exe','rb') f.seek(int(x)) print('开始传输') count=0 for i in f: print('\r%s'%count) count+=1 c1.send(i) print('ok') --------------------- import socket import os s1=socket.socket() s1.bind(('localhost',6666)) s1.listen() print('----begin----') conn,addr=s1.accept() print('链接创建') print(os.path.getsize(r'E:\\xxxx.exe')) f=open(r'E:\\xxxx.exe','ab') while True: data=conn.recv(1024) f.write(data) if not data: print('end') break ''' f=open(r'E:\\xxxx.exe','wb') while True: data=conn.recv(1024) f.write(data) if not data: print('end') break '''
5.1 定义对象
import optparse parse=optparse.OptionParser()
5.2 添加参数
parse.add_option('-u','--user',dest='user',action='store',type=str,metavar='user',help='Enter User Name!!') parse.add_option('-p','--port',dest='port',type=int,metavar='xxxxx',default=3306,help='Enter Mysql Port!!')
#-u,--user 表示一个是短选项 一个是长选项
#dest='user' 将该用户输入的参数保存到变量user中,能够经过options.user方式来获取该值
#type=str,表示这个参数值的类型必须是str字符型,若是是其余类型那么将强制转换为str(可能会报错)
#metavar='user',当用户查看帮助信息,若是metavar没有设值,那么显示的帮助信息的参数后面默认带上dest所定义的变量名
#help='Enter..',显示的帮助提示信息
#default=3306,表示若是参数后面没有跟值,那么将默认为变量default的值
#parse.set_defaults(v=1.2) #也能够这样设置默认值
5.3 监听
将监听结果赋值给options和args,一个结果为属性一个结果为列表。
options,args=parse.parse_args()
5.4 例子
import optparse class test(): def __init__(self): parse=optparse.OptionParser() parse.add_option('-s',dest='x',help='server binding host',metavar='HOST') parse.add_option('-p',dest='port',help='server binding port') (options,args)=parse.parse_args() print(options.x,options.port) for i in args: print(i) F:\ftp服务器_sockserver版\server\core>python main.py -s 0.0.0.0 -p xxx aaa bbb ccc 0.0.0.0 xxx aaa bbb ccc F:\ftp服务器_sockserver版\server\core>python main.py -h Usage: main.py [options] Options: -h, --help show this help message and exit -s HOST server binding host -p PORT server binding port
import paramiko #step1 实例化ssh ssh = paramiko.SSHClient() #step2 #加上这句话不用担忧选yes的问题,会自动选上(用ssh链接远程主机时,第一次链接时会提示是否继续进行远程链接,选择yes) ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)#家目录/.ssh/known_hosts #step3 链接 ssh.connect(hostname='xxx.cm',port=22,username='xxxxx',password='xxxxx') #step4 执行 stdin,stdout,stderr=ssh.exec_command('df -h;pwd')#能够用;执行多个指令 x1=stdout.read() x2=stderr.read() result=x1 if x1 else x2#三元运算 print(result.decode()) #step5 关闭 ssh.close()
------------------------
Filesystem Size Used Avail Use% Mounted on
/dev/sda1 7.9G 3.8G 3.8G 51% /
none 3.9G 4.0K 3.9G 1% /dev/shm
/dev/sda2 8.7G 298M 7.9G 4% /xxx/conf
/dev/sda3 893G 88G 761G 11% /xxx/data
/xxx/data/home/xxxxxx
import paramiko #step1 写链接信息 linkit=paramiko.Transport('xxx.cm',22) linkit.connect(username='xxxxx',password='xxxx') #step2 建立链接对象 sftp_object=paramiko.SFTPClient.from_transport(linkit) #step3 上传下载文件 sftp_object.put('config.conf','tmpfromwin')#上传 # config.conf 为本地文件 tmpfromwin为本地文件上传到服务器上的文件名 sftp_object.get('xxx.zip',r'f:\x.zip')#下载 #xxx.zip为服务器的文件名 r'f:\x.zip'本地保存的位置及文件名
3.1 免密登陆的思路
PC-A 生成公钥和私钥
PC-A 将公钥发给PC B
PC-A 无密码登陆PC-B
Python 调用pca的私钥便可完成对pcb的无密码登陆
3.2 免密登陆的key生成
pc-A
[root@localhost ~]# ssh-keygen Generating public/private rsa key pair. Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase): Enter same passphrase again: Your identification has been saved in /root/.ssh/id_rsa. Your public key has been saved in /root/.ssh/id_rsa.pub. The key fingerprint is: SHA256:cNTWlMyJXaOFTQMY3dFKCIywHfVzgmpXvjt0vVfuZao root@localhost.localdomain The key's randomart image is: +---[RSA 2048]----+ | ..o=+%o@Bo | | +..*o@o+o.| | o o.. *... | | o . o +. | | S . . . | | . . ... o| | ... .=| | .. ++| | Eo..o| +----[SHA256]-----+ [root@localhost ~]# [root@localhost ~]# cd .ssh/ [root@localhost .ssh]# ll 总用量 8 -rw-------. 1 root root 1675 4月 29 12:44 id_rsa -rw-r--r--. 1 root root 408 4月 29 12:44 id_rsa.pub [root@localhost .ssh]# ssh-copy-id -p 22 root@192.168.99.172 /usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/root/.ssh/id_rsa.pub" The authenticity of host '192.168.99.172 (192.168.99.172)' can't be established. ECDSA key fingerprint is SHA256:JRJkZRzFncdiupBqjji0LP6XNMQ9eSdFKm0wVoDp8RY. ECDSA key fingerprint is MD5:f3:c8:77:ef:15:36:b7:3f:b9:36:bd:1a:4e:1a:5d:33. Are you sure you want to continue connecting (yes/no)? /usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed The authenticity of host '192.168.99.172 (192.168.99.172)' can't be established. ECDSA key fingerprint is SHA256:JRJkZRzFncdiupBqjji0LP6XNMQ9eSdFKm0wVoDp8RY. ECDSA key fingerprint is MD5:f3:c8:77:ef:15:36:b7:3f:b9:36:bd:1a:4e:1a:5d:33. Are you sure you want to continue connecting (yes/no)? yes /usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys root@192.168.99.172's password: Number of key(s) added: 1 Now try logging into the machine, with: "ssh -p '22' 'root@192.168.99.172'" and check to make sure that only the key(s) you wanted were added. [root@localhost .ssh]# ssh 192.168.99.172 Last login: Sun Apr 29 12:39:28 2018 from 192.168.99.239 [root@localhost ~]# exit 登出 Connection to 192.168.99.172 closed. ----------------------------------------------------------
PC-B drwx------. 2 root root 29 4月 29 12:48 .ssh [root@localhost ~]# cd .ssh/ [root@localhost .ssh]# ll 总用量 4 -rw-------. 1 root root 408 4月 29 12:48 authorized_keys [root@localhost .ssh]# cat authorized_keys ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDMeTAz8cAtlZON+sUMSYpTXjhW32IkAdE+336BwW9LotWGznIOoS7d6cdvY...
3.3 python的实现
import paramiko #step 1 获取私钥 pk=paramiko.RSAKey.from_private_key_file('F:\id_rsa') #step 2 链接服务器,设置用户名并关联私钥 link1=paramiko.Transport('xxx.cm',22) link1.connect(username='xxxx',pkey=pk) #step 3 建立transport对象 收发文件 myobject=paramiko.SFTPClient.from_transport(link1) myobject.get('xxx.sh',r'f:\xxx.sh')
3.4 一则网上的代码,此处先不做分析
import paramiko import time def verification_ssh(host,username,password,port,root_pwd,cmd): s=paramiko.SSHClient() s.load_system_host_keys() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) s.connect(hostname = host,port=int(port),username=username, password=password) if username != 'root': ssh = s.invoke_shell() time.sleep(0.1) ssh.send('su - \n') buff = '' while not buff.endswith('Password: '): resp = ssh.recv(9999) buff +=resp ssh.send(root_pwd) ssh.send('\n') buff = '' while not buff.endswith('# '): resp = ssh.recv(9999) buff +=resp ssh.send(cmd) ssh.send('\n') buff = '' while not buff.endswith('# '): resp = ssh.recv(9999) buff +=resp s.close() result = buff else: stdin, stdout, stderr = s.exec_command(cmd) result = stdout.read() s.close() return result
1.1 进程是系统进行资源分配和的基本单位
1.2 线程是CPU运算调度的最小单位
1.3 协程比线程还小的单位
2.1 进程就是资源的集合,这些资源包括对内存、对硬盘、对光驱等各类资源的调用的集合。
2.2 线程包含在进程中(进程是线程的容器)是进程的实际运算单位。
2.3 一个进程能够包括一个或多个线程
2.4 进程须要操做cpu 必须经过建立线程来操做
2.5 一个线程,实际就是一个cpu的控制流,一个进程中能够并发多个线程
2.6 程序是指令、数据及其组织形式的描述,进程是程序的实体。
3.1 一个进程是有一系列的相关的资源的集合一个进程的资源包括了memory page (内存页,存储页,存储页面),文件句柄,socket链接,一些安全信息 如谁的id启动了这个进程
3.2 进程上下文,意思是可执行程序代码是进程的重要组成部分。进程上下文其实是进程执行活动全过程的静态描述。
3.3 每一个进程执行过的、执行时的以及待执行的指令和数据;在指令寄存器、堆栈、状态字寄存器等中的内容。此外, 还包括进程打开的文件描述符等.
3.4 多个线程共享进程上下文(即资源)
3.5 一个进程的全部线程 共享同一块内存空间
3.6 线程快仍是进程快 没有可比性 一个是资源的集合 一个是cpu的控制流,进程须要执行 也必须经过建立线程来执行
3.7 线程建立快 仍是进程建立快 线程建立快 它就是一段cpu的指令集,而进程须要去申请各类资源组成集合
3.8 线程共享内存空间 进程间资源互相独立
3.8.1一个父进程建立子进程,至关于克隆了一份独立的内容 多个子进程直接的资源是不能互相访问的
3.8.2 一个线程,建立多个线程都共享同一个进程的资源
3.9同一个进程的多个线程直接能直接联系交流交互,两个进程想通讯交流 要找一个中间代理
3.10
3.10.1对于一个主线程的修改 可能或影响其余线程的运行(由于共享资源)
3.10.2对于父进程的修改,不会影响到子进程(资源独立)
两步,一步关联函数和变量,一步start
import threading import time def PointIt(x): print('---->',x) time.sleep(5) print(x,'--->ok') PointIt('n1') PointIt('n2') #对比n1/n2/n3/n4的输出用时 #step 1 建立进程,target为函数的函数名,args内输入函数的参数 n3=threading.Thread(target=PointIt,args=('n3',))#逗号不能漏 n4=threading.Thread(target=PointIt,args=('n4',))#逗号不能漏
#step 2 start
n3.start() n4.start() print('done')
import threading import time #step 1 创建一个类,须要继承线程类 class Mythread_class(threading.Thread): #step 2定义构造函数,接收须要输入的变量 def __init__(self,n): super(Mythread_class, self).__init__() self.n=n #step 3 定义run ->这里必须是run这个名字 def run(self): print(self.n) time.sleep(3) print(self.n,'done') #step 4 生成实例,每一个实例运行start方法都会启动一个线程运行run方法 t1=Mythread_class('t1') t2=Mythread_class('t2') t1.run()#这样不行 这样仍是单线程 t2.run()#若是直接是run方法,仍是单线程 t1.start() t2.start()#多线程 for i in range(5): x=Mythread_class('x-%s'%i) x.start() print('主线程done')
程序运行的主线程在启动子线程以后不会等子线程执行完毕 而是继续运行主线程
一样,在主线程中也没法计算出子线程运行所花费的时间
若是run()运行结束则join()结束,若是run()没有完成,则卡住等待完成。经过这个方法来判断各个线程都运行结束所花费的时间
import threading import time class Mythread(threading.Thread): def __init__(self,n): super(Mythread, self).__init__() self.n=n def run(self): print('t-%s is running,当前线程为%s,当前活跃线程数为%s'%(self.n,threading.current_thread(),threading.active_count())) time.sleep(3) print('t-%s done'%self.n) time1=time.time() t_object=[]#使用这个列表来记录线程对象 for i in range(10): x=Mythread(i) x.start() t_object.append(x) print('main',threading.current_thread()) for i in t_object: i.join() time2=time.time() print(time2-time1) ------------------------------------------ t-0 is running,当前线程为<Mythread(Thread-1, started 2712)>,当前活跃线程数为2 t-1 is running,当前线程为<Mythread(Thread-2, started 9332)>,当前活跃线程数为3 t-2 is running,当前线程为<Mythread(Thread-3, started 2696)>,当前活跃线程数为4 t-3 is running,当前线程为<Mythread(Thread-4, started 9884)>,当前活跃线程数为5 t-4 is running,当前线程为<Mythread(Thread-5, started 3272)>,当前活跃线程数为6 t-5 is running,当前线程为<Mythread(Thread-6, started 9208)>,当前活跃线程数为7 t-6 is running,当前线程为<Mythread(Thread-7, started 1828)>,当前活跃线程数为8 t-7 is running,当前线程为<Mythread(Thread-8, started 9256)>,当前活跃线程数为9 t-8 is running,当前线程为<Mythread(Thread-9, started 1060)>,当前活跃线程数为10 t-9 is running,当前线程为<Mythread(Thread-10, started 9780)>,当前活跃线程数为11 我是主线程main <_MainThread(MainThread, started 2056)> t-0 done t-4 done t-5 done t-2 done t-3 done t-1 done t-8 done t-9 done t-7 done t-6 done
3.003652811050415
当主线程执行完毕后,主线程结束,则全部守护线程所有结束,不管运行到什么状态 setDeamon
import threading import time class Myclass(threading.Thread): def __init__(self,n): super(Myclass, self).__init__() self.n=n def run(self): print(self.n,'begin') time.sleep(3) print(self.n,'------------->done','目前还活跃的线程数:',threading.active_count()) tmp_list=[] for i in range(20): j=Myclass(i) j.setDaemon(True)#setdaesmon来设置为守护进程 j.start() tmp_list.append(j) time.sleep(3) print('我要结束主进程了') ----------------------------------------------- 0 begin 1 begin 2 begin 3 begin 4 begin 5 begin 6 begin 7 begin 8 begin 9 begin 10 begin 11 begin 12 begin 13 begin 14 begin 15 begin 16 begin 17 begin 18 begin 19 begin 1 ------------->done 目前还活跃的线程数: 21 0 ------------->done 目前还活跃的线程数: 20 4 ------------->done 目前还活跃的线程数: 19 3 ------------->done 目前还活跃的线程数: 18 6 ------------->done 目前还活跃的线程数: 17 5 ------------->done 目前还活跃的线程数: 16 2 ------------->done 目前还活跃的线程数: 15 11 ------------->done 目前还活跃的线程数: 14 12 ------------->done 目前还活跃的线程数: 14 10 ------------->done 目前还活跃的线程数: 14 7 ------------->done 目前还活跃的线程数: 12 9 ------------->done 目前还活跃的线程数: 12 14 ------------->done 目前还活跃的线程数: 11 8 ------------->done 目前还活跃的线程数: 9 我要结束主进程了
5.1 In CPython,This lock is necessary mainly because CPython’s memory management is not thread-safe.
5.2 多个线程都打到多个cpu的核上 可是同一时刻只能有一个线程在真正的工做
5.3 全局解释器锁(Global Interpreter Lock)是计算机程序设计语言解释器用于同步线程的工具,使得任什么时候刻仅有一个线程在执行
5.4 一个python解释器进程内有一条主线程,以及多条用户程序的执行线程。即便在多核CPU平台上,因为GIL的存在,因此禁止多线程的并行执行。
5.5 Python 3.2开始使用新的GIL。
5.6 能够建立独立的进程来实现并行化
一个全局变量,每一个线程都要对其进行操做,为了防止一个线程还未对其操做完毕,即这个变量的值还未被原线程修改时,这个变量的值就被传到下一个线程进行操做,这样有可能照成最终的结果不许确。为了防止这种状况,使用互斥锁解决
step1 实例一个锁对象
step2 acqiure()方法->锁变量
step3 release()方法释放变量
import threading import time mylock=threading.Lock() num=0 class Myclass(threading.Thread): def __init__(self,n): super(Myclass, self).__init__() self.n=n def run(self): print(self.n,'begin') mylock.acquire() time.sleep(3) global num num += 1 mylock.release() time.sleep(1) #mylock.release()使用完后迅速释放 tmp_list=[] for i in range(5000): x=Myclass(i) x.start() tmp_list.append(x) for i in tmp_list: i.join() print(num)
mylock=threading.RLock()#RLock 递归锁
import threading mylock=threading.RLock()#RLock 递归锁 number1=0 number2=1111 count=0 def run1(): mylock.acquire() global number1 number1+=1 mylock.release() def run2(): #mylock.acquire() 只能套两层锁,若是套三层锁,即把这个注释去掉,就卡死了 global number2 number2+=2 #mylock.acquire() def terminal_fun(): mylock.acquire() global count run1() print('between run1 with run2') run2() print('count:',count) count+=1 mylock.release() for i in range(5): x=threading.Thread(target=terminal_fun) x.start() while threading.active_count() != 1: print(threading.active_count()) else: print('ok')
---------------------------------
between run1 with run2
count: 0
between run1 with run2
count: 1
between run1 with run2
count: 2
between run1 with run2
count: 3
between run1 with run2
count: 4
ok
此代码说明:注意主程序结束的写法法,本程序使用的是:等待活跃线程数等于1结束的方法
不能单纯理解为线程并发数
threading.BoundedSemaphore(3) 同时访问资源的线程数量,但不是活跃的线程数量
线程中,信号量主要是用来维持有限的资源,使得在必定时间使用该资源的线程只有指定的数量
是一个变量,控制着对公共资源或者临界区的访问。信号量维护着一个计数器,指定可同时访问资源或者进入临界区的线程数。
每次有一个线程得到信号量时,计数器-1。若计数器为0,其余线程就中止访问信号量,直到另外一个线程释放信号量。
如下代码,io为一个信号量,查看活跃的线程,实际全部的线程都跑起来了,可是,能输出的只有三个线程
import threading import time limit_num = threading.BoundedSemaphore(3) # 控制并发,不是三个一组,释放一个,就新增一个 def run(x): limit_num.acquire() print('i am %s' % x, threading.active_count()) time.sleep(3) print('i am %s' % x, threading.active_count()) limit_num.release() for i in range(20): x = threading.Thread(target=run, args=(i,)) x.start() while threading.active_count() != 1: pass else: print('done!') ------------------------------------------- i am 0 2 i am 1 3 i am 2 4 i am 1 21 i am 2 21 i am 4 20 i am 0 19 i am 3 18 i am 5 18 i am 3 18 i am 4 18 i am 6 16 i am 7 16 i am 5 16 i am 8 15 i am 7 15 i am 6 15 i am 9 13 i am 10 13 i am 8 13 i am 11 12 i am 10 12 i am 9 12 i am 11 11 i am 13 10 i am 14 9 i am 12 9 i am 14 9 i am 13 9 i am 12 9 i am 15 7 i am 17 7 i am 16 7 i am 15 6 i am 17 5 i am 18 5 i am 16 5 i am 19 3 i am 18 3 i am 19 2 done!
可用于多个线程间同步信息 如同一个线程控制红绿灯,一个线程等待红绿灯
四个知识点
9.1 生成事件 threading.Event()
9.2 set事件 x.set()
9.3 clear事件 x.clear()
9.4 wait事件 x.wait()
import threading import time my_green_red_light=threading.Event() def light():#负责对light event 进行set或clear time_count=0 my_green_red_light.set() while True: if time_count >8 and time_count < 16: my_green_red_light.clear() print('\033[41;1mred\033[0m') elif time_count > 16: my_green_red_light.set() time_count=0 else: print('\033[42;1mgreen\033[0m') time.sleep(0.3) time_count+=1 def car(x): while True: if my_green_red_light.is_set(): print('car [%s] is running'%x) else: print('is red now,wait for green....') my_green_red_light.wait() print('green now ,go go go !!!') time.sleep(0.8) light_thread=threading.Thread(target=light) car_thread=threading.Thread(target=car,args=('tesla',)) light_thread.start() car_thread.start() ------------------------------ green car [tesla] is running green green car [tesla] is running green green green car [tesla] is running green green car [tesla] is running green red red is red now,wait for green.... red red red red red green green now ,go go go !!! green green car [tesla] is running green green green car [tesla] is running green
使用队列的意义
一、解耦 是程序之间实现双耦合 经过生产者消费者模型
二、提升处理效率
10.1 q.qsize()获取队列长度
10.2 q.put()放元素
10.3 q.get()拿元素
10.4 q.nowait()是否等待(当即响应)
10.5 q.get(block=False)是否阻塞
10.6 q.get(timeout=1)响应超时
10.7 q=queue.Queue(maxsize=3)队列长度
10.8 q=queue.LifoQueue()(last in first out 堆栈)
import queue >>> import queue >>> x=queue.Queue(maxsize=5) >>> x.put(1) >>> x.put(2) >>> x.put(3) >>> x.put(4) >>> x.put(5,timeout=1) >>> x.put(6,timeout=1) Traceback (most recent call last): File "<pyshell#8>", line 1, in <module> x.put(6,timeout=1) queue.Full >>> x.get() 1 >>> x.get() 2 >>> x.get() 3 >>> x.get() 4 >>> x.get(block=False) 5 >>> x.get(block=False) Traceback (most recent call last): File "<pyshell#14>", line 1, in <module> x.get(block=False) queue.Empty >>> >>> y=queue.LifoQueue() >>> y.put(1) >>> y.put(2) >>> y.put(3) >>> y.get() 3 >>> y.get() 2 >>> y.get() 1 >>> z=queue.PriorityQueue() >>> z.put((1,'123')) >>> z.put((-1,'456')) >>> z.put((5,'789')) >>> z.get <bound method Queue.get of <queue.PriorityQueue object at 0x000001A41A496550>> >>> z.get() (-1, '456') >>> z.get() (1, '123') >>> z.get() (5, '789') >>> >>> x.qsize() 0 >>>
import queue import time import threading q=queue.Queue(maxsize=5) def product_it(name,t): count=0 while True: data='骨头%s'%count q.put(data) print('[%s]生成[%s]'%(name,data)) count+=1 time.sleep(t) def eat_it(name,t): while True: print('[%s]吃了骨头[%s]'%(name,q.get())) time.sleep(t) t1=threading.Thread(target=product_it,args=('北京店',4,)) t2=threading.Thread(target=product_it,args=('上海店',0.3,)) t3=threading.Thread(target=eat_it,args=('小明',5,)) t4=threading.Thread(target=eat_it,args=('小王',2,)) t1.start() t2.start() t3.start() t4.start() ------------------------------ [北京店]生成[骨头0] [上海店]生成[骨头0] [小明]吃了骨头[骨头0] [小王]吃了骨头[骨头0] [上海店]生成[骨头1] [上海店]生成[骨头2] [上海店]生成[骨头3] [上海店]生成[骨头4] [上海店]生成[骨头5] [小王]吃了骨头[骨头1] [上海店]生成[骨头6] [小王]吃了骨头[骨头2] [上海店]生成[骨头7] [小明]吃了骨头[骨头3] [北京店]生成[骨头1]
cpu密集型,io密集型
线程占用cpu资源 python的多线程实际是经过gil锁进行上下文切换 某个时间点上 只有一个线程在跑
若是cpu只有一个核心 那么 不管有多少个线程 同一时间,只可能有一个线程在被cpu处理
线程之间经过快速切换执行,使使用者感受同一时刻多线程并行的效果,但单核状况实际是串行的
若是多核心 理论上来讲 不一样的核心同一时刻能够运行不一样的线程
可是在python中 因为考虑到线程间数据共享的状况 python内存在GLI锁,使得同一时间内,只有一个线程运行 不管多少核 这个是python的局限性
python语言诞生时 cpu只有单核 开发者没有考虑多核的状况,
cpu在执行指令时须要知道上下文关系 因此python在启用线程的时候 调用的是c语言的thread接口 并将这个关系传给cpu (和互斥锁相似 有一个全局变量+1)
若是线程同时操做,拿到的上下文可能同样 因此为了不上下文雷同 使用gli锁来确保同一时刻只有一个线程在执行
1.1 进程间是独立的
1.2 python的进程是操做系统的原生进程,进程间管理是由操做系统来完成的,且进程间的数据相互独立也不须要锁这个概念。因此python的多进程 能够解决python的多核问题
multiprocessing.Process(target=run,args=(i,))
import multiprocessing import time import threading def run(x): print('process %s is running'%x) t=threading.Thread(target=mythread,args=(x,)) t.start() print('process %s is done'%x) def mythread(x): print('进程%s启用的线程的线程号是:%s'%(x,threading.get_ident()))#打印线程号 if __name__=='__main__': for i in range(10): x=multiprocessing.Process(target=run,args=(i,)) x.start() ------------------------------------------- process 4 is running process 4 is done 进程4启用的线程的线程号是:2172 process 0 is running process 0 is done 进程0启用的线程的线程号是:764 process 2 is running process 6 is running process 2 is done 进程2启用的线程的线程号是:8236 process 9 is running process 3 is running 进程6启用的线程的线程号是:2544 process 6 is done process 5 is running process 7 is running 进程9启用的线程的线程号是:3536 process 9 is done process 3 is done 进程3启用的线程的线程号是:484 process 5 is done process 7 is done 进程5启用的线程的线程号是:9624 进程7启用的线程的线程号是:1464 process 1 is running 进程1启用的线程的线程号是:1764 process 1 is done process 8 is running 进程8启用的线程的线程号是:3516 process 8 is done
此处能够观察cpu的使用状况,能发现cpu的使用率快速提高
threading.get_ident()/os.getpid()/os.getppid()
import multiprocessing import os def info(): tmp_info ='''\tname:%s\n\tppid:%s\n\tpid :%s'''%(__name__,os.getppid(),os.getpid()) print(tmp_info) def run(): print('子进程信息'.center(50,'-')) x = multiprocessing.Process(target=run2) x.start() info() def run2(): print('子进程信息'.center(50,'-')) info() if __name__=="__main__": print('父进程信息'.center(50,'-')) info() x=multiprocessing.Process(target=run) x.start() --------------------------------------------------------------- ----------------------父进程信息----------------------- name:__main__ ppid:2592 pid :10720 ----------------------子进程信息----------------------- name:__mp_main__ ppid:10720 pid :7752 ----------------------子进程信息----------------------- name:__mp_main__ ppid:7752 pid :5696
5.1 进程队列方式进行进程间通讯
此处须要对比进程队列与线程队列的区别
queue.Queue()普通的队列,在一个进程中使用
multithreading.Queue 原理为pickle序列号与反序列化,并非资源共享
import multiprocessing import threading import queue #进程queue与线程queue def main(my_queue): print(my_queue.get()) print(my_queue.get()) print(my_queue.get()) def main2(x): x.put(1) x.put(2) x.put(3) if __name__=='__main__': my_queue = queue.Queue() my_queue.put('[a]') my_queue.put('[b]') my_queue.put('[c]') t1 = threading.Thread(target=main,args=(my_queue,)) t1.start()#子线程访问主线程资源 #t1 = threading.Thread(target=main)此处不将队列传入,线程依然能调用主线程的队列 #t1.start() # 子线程访问主线程资源 #p1=multiprocessing.Process(target=main,args=(my_queue,)) #p1.start()#子进程访问主进程资源 #即便是父进程将进程queue作完参数传给子进程,子进程依然没法访问这个queue Q = multiprocessing.Queue()#进程queue multiprocessing.Queue() p2=multiprocessing.Process(target=main2,args=(Q,))#将队列传入子进程 p2.start() p2.join()#等待子进程执行完毕,确保都入队成功 print(Q.get()) print(Q.get()) print(Q.get()) ---------------------------------------- [a] [b] [c] 1 2 3
5.2 进程通道pip方式进行进程间通讯
multiprocessing.Pipe()结合send和recv来使用
import multiprocessing import threading import time def f(x1): print('f') x1.send('hello,i am p1') print(x1.recv()) def f2(x2): print('f2') time.sleep(3) print(x2.recv()) x2.send('ok , p2 ack') if __name__=='__main__': x1, x2 = multiprocessing.Pipe() p1=multiprocessing.Process(target=f,args=(x1,)) p2=multiprocessing.Process(target=f2,args=(x2,)) p1.start() p2.start() ------------------------ f f2 hello,i am p1 ok , p2 ack
5.3 多进程共同修改字典列表的方法manager().dict()、manager().list()
import multiprocessing import os def f(x,y): x[os.getpid()]=os.getpid()#x是一个字典,给x这字典添加一个key和value y.append(os.getpid())#y是一个列表,给y这个列表添加一个value print(x,y) if __name__=='__main__': with multiprocessing.Manager() as xxx: x=xxx.dict()#生成一个字典 一个多进程manage的字典 y=xxx.list(range(5))#列表生成 生成一个列表 一个多进程manage的列表 p_list=[]#主进程列表 for i in range(5): p=multiprocessing.Process(target=f,args=(x,y,))#给进程manage的列表字典添加元素 p.start() p_list.append(p)#为了确保是否添加完成 for i in p_list: i.join() #配合上面的start 判断是否子进程运行结束 ------------------------------------------------ {19080: 19080} [0, 1, 2, 3, 4, 19080] {19080: 19080, 20880: 20880} [0, 1, 2, 3, 4, 19080, 20880] {19080: 19080, 20880: 20880, 15908: 15908} [0, 1, 2, 3, 4, 19080, 20880, 15908] {19080: 19080, 20880: 20880, 15908: 15908, 8140: 8140} [0, 1, 2, 3, 4, 19080, 20880, 15908, 8140] {19080: 19080, 20880: 20880, 15908: 15908, 8140: 8140, 13536: 13536} [0, 1, 2, 3, 4, 19080, 20880, 15908, 8140, 13536]
以上能够看出,多进程实时修改数据(字典,列表,队列) 无需互斥锁。
为何要有进程锁 ,不一样进程之间不是不能访问各自的内存空间
可是例如屏幕,打印机,投影仪这种资源在进程很是多的状况下可能会出现例如输出错行的状况,此时就要用到锁
进程锁比较少用到
import multiprocessing import os def f(x,i): x.acquire() print(i,'---',os.getpid()) x.release() if __name__=='__main__': lock=multiprocessing.Lock() for i in range(1000): p=multiprocessing.Process(target=f,args=(lock,i,)) p.start()
------------------------
2 --- 3784
4 --- 4544
3 --- 13556
5 --- 4864
0 --- 13100
6 --- 15272
1 --- 20104
8 --- 11800
7 --- 672
9 --- 19056
7.1
进程池 用于限制同时运行的进程数量
#运行进程过多致使进程间频繁切换下降程序效率->根据cpu核心等实际状况,优化同时运行的进程数量
#对同时运行的进程数进行限制 多进程同时存在,可是只有固定数量的进程在执行
7.2
step1 实例化一个pool
step2 用实例化的pool去生成进程。须要注意两种生成进程的方式apply与apply_async一种是单进程执行,一种是多进程执行
import multiprocessing import os import time def f(i): print('process',i,'is running...',os.getpid()) time.sleep(3) return i #传递给回调函数 def backtome(xxxx): print(xxxx,'is done',os.getpid()) if __name__=='__main__': print('主进程的pid为:',os.getpid()) mypool=multiprocessing.Pool(processes=5)#设置同一时刻能实际运行的进程数量 for i in range(20): #p=mypool.apply(func=f,args=(i,))#单核串行 p=mypool.apply_async(func=f,args=(i,),callback=backtome) print('end') mypool.close()#必须添加close() mypool.join()#必须添加join ------------------------------------------- 主进程的pid为: 12720 end process 0 is running... 10644 process 1 is running... 5056 process 2 is running... 20972 process 3 is running... 6148 process 4 is running... 19120 process 5 is running... 10644 0 is done 12720 process 6 is running... 5056 1 is done 12720 2 is done 12720 process 7 is running... 20972 3 is done 12720 process 8 is running... 6148 4 is done 12720 process 9 is running... 19120 process 10 is running... 10644 5 is done 12720 6 is done 12720 process 11 is running... 5056 7 is done 12720 process 12 is running... 20972 9 is done 12720 process 13 is running... 19120 8 is done 12720 process 14 is running... 6148 process 15 is running... 10644 10 is done 12720 process 16 is running... 5056 11 is done 12720 process 17 is running... 20972 12 is done 12720 14 is done 12720 process 18 is running... 6148 13 is done 12720 process 19 is running... 19120 15 is done 12720 16 is done 12720 17 is done 12720 18 is done 12720 19 is done 12720
apply无回调方法
import multiprocessing import os import time def f(i): print('process',i,'is running...',os.getpid()) time.sleep(3) print('end') return i #传递给回调函数 def backtome(xxxx): print(xxxx,'is done',os.getpid()) if __name__=='__main__': print('主进程的pid为:',os.getpid()) mypool=multiprocessing.Pool(processes=5)#设置同一时刻能实际运行的进程数量 for i in range(20): p=mypool.apply(func=f,args=(i,),)#单核串行 #p=mypool.apply(func=f, args=(i,),callback=backtome)apply无callback函数 #p=mypool.apply_async(func=f,args=(i,),callback=backtome) print('end') mypool.close()#必须添加close() mypool.join()#必须添加join -------------------------------- 主进程的pid为: 17912 process 0 is running... 4080 end process 1 is running... 18092 end process 2 is running... 5924 end process 3 is running... 14276 end process 4 is running... 1800 end process 5 is running... 4080 end process 6 is running... 18092 end process 7 is running... 5924 end process 8 is running... 14276 end process 9 is running... 1800 end process 10 is running... 4080 end process 11 is running... 18092 end process 12 is running... 5924 end process 13 is running... 14276 end process 14 is running... 1800 end process 15 is running... 4080 end process 16 is running... 18092 end process 17 is running... 5924 end process 18 is running... 14276 end process 19 is running... 1800 end end
协程 异步io
协程在单线程状况下操做
协程是一种用户态的轻量级线程 -> cpu根本不知道他的存在
线程在切换的时候有cpu的寄存器来保存每一个线程的状态 每一个线程有本身的上下文
协程在切换时的状态须要用户本身将上下文保存到相应的地方。
yelid是协程的一种
在单线程下实现并发的效果,用户须要本身保存运行状态
2.1 无需线程切换 也就无需cpu在线程切换时上下文切换的开销
2.2 无需原子操做锁定及同步开销 ->相对于多线程的互斥锁 ->协程是单线程的串行操做
2.3 方便切换控制流 简化模型
2.4 高并发 高扩展 低成本
协程实质上是一个单线程,没法利用多核资源
协程须要和进程配合才能在多cpu上运行
nginx 就是单线程 就能支持上万个并发
协程能处理上万并发的思路,遇到io就进行协程切换 -> io完成就自动切换到原协程
#进程是资源分配的最小单位,线程是CPU调度的最小单位.这是计算机里常常考的
#协程。比线程还小的单位
import time def produce_scq(n,x,y):#生产者实际就是个函数 ,实体化生成器并next他 x.__next__()#这里很关键 y.__next__() print(n,'is producing') count=0 while count < 20: x.send('包子%s'%count) y.send('包子%s'%count) count +=1 return 'done' def custom_scq(n): #消费者是生成器 ,等待外部信息输入 print('start eat') while True: baozi=yield print(n,'is eating',baozi) time.sleep(0.01) c1=custom_scq('小明') c2=custom_scq('小东') print(produce_scq('大明',c1,c2)) --------------------------------------------- start eat start eat 大明 is producing 小明 is eating 包子0 小东 is eating 包子0 小明 is eating 包子1 小东 is eating 包子1 小明 is eating 包子2 小东 is eating 包子2 小明 is eating 包子3 小东 is eating 包子3 小明 is eating 包子4 小东 is eating 包子4 小明 is eating 包子5 小东 is eating 包子5 小明 is eating 包子6 小东 is eating 包子6 小明 is eating 包子7 小东 is eating 包子7 小明 is eating 包子8 小东 is eating 包子8 小明 is eating 包子9 小东 is eating 包子9 小明 is eating 包子10 小东 is eating 包子10 小明 is eating 包子11 小东 is eating 包子11 小明 is eating 包子12 小东 is eating 包子12 小明 is eating 包子13 小东 is eating 包子13 小明 is eating 包子14 小东 is eating 包子14 小明 is eating 包子15 小东 is eating 包子15 小明 is eating 包子16 小东 is eating 包子16 小明 is eating 包子17 小东 is eating 包子17 小明 is eating 包子18 小东 is eating 包子18 小明 is eating 包子19 小东 is eating 包子19 done
step1 greenlet.greenlet(xxx)
step2 xxx.switch
import multiprocessing import greenlet def test1(): print(12) g2.switch() print(56) g2.switch() def test2(): print(34) g1.switch() print(78) g1=greenlet.greenlet(test1) g2=greenlet.greenlet(test2) #实例化两个协程来运行函数,实际此时函数还未运行 g1.switch()#开始执行g1 --------------------------------------------- 12 34 56 78
切换思路,遇到io或非cpu操做时,自动切换,让cpu继续处理下一个指令
gevent.sleep与gevent.joinall()、gevent.spawn的搭配使用
import gevent def f1(): print('f1 run') gevent.sleep(2) print('f1 done') def f2(): print('f2 run') gevent.sleep(1) print('f2 done') def f3(): print('f3 run') gevent.sleep(0) print('f3 done') gevent.joinall([gevent.spawn(f1),gevent.spawn(f2),gevent.spawn(f3)]) #生成协程 ---------------------------------------- f1 run f2 run f3 run f3 done f2 done f1 done
#gevent.sleep 模拟io操做 -》 joinall 所有执行 gevent.spawn 生成协程
f1 -> sleep -> f2 -> sleep -> f3 -> f1还未执行完成 -> f2 还未完成 -> f3 ok -> f3 done -> f1 还未完成-> f2 还未完成->f1 还未完成->f2 ok -> f1 还未完成 -> f1 ok
import urllib import gevent,time #step 1 导入模块 from urllib import request from gevent import monkey #step 2 监控IO monkey.patch_all() #step 3 def f(url): print('GET:',url) res=request.urlopen(url)#获取数据 # step 4 用来展现get到的这个url有多少data data=res.read()#读数据 print('bytes:',len(data),'url:',url) t1=time.time()#获取程序开始时间 f('http://www.dangdang.com') f('http://d.cn') f('http://www.163.com') f('http://www.126.com')#单线程执行 t2=time.time() print('--------------------->cost:',t2-t1) #step 5 协程执行 gevent.joinall([gevent.spawn(f,'http://d.cn'),gevent.spawn(f,'http://www.dangdang.com'),gevent.spawn(f,'http://www.163.com'),gevent.spawn(f,'http://www.126.com')]) t3=time.time() print('--------------------->cost:',t3-t2) ------------------------------------------------------------------- GET: http://www.dangdang.com bytes: 720 url: http://www.dangdang.com GET: http://d.cn bytes: 80968 url: http://d.cn GET: http://www.163.com bytes: 688755 url: http://www.163.com GET: http://www.126.com bytes: 12921 url: http://www.126.com --------------------->cost: 1.0575993061065674 GET: http://d.cn GET: http://www.dangdang.com GET: http://www.163.com GET: http://www.126.com bytes: 720 url: http://www.dangdang.com bytes: 688755 url: http://www.163.com bytes: 80968 url: http://d.cn bytes: 12921 url: http://www.126.com --------------------->cost: 0.14879226684570312
此处的joinall是等协程都运行结束的意思
若是没有joinall()
import urllib import gevent,time #step 1 导入模块 from urllib import request from gevent import monkey #step 2 监控IO monkey.patch_all() #step 3 def f(url): print('GET:',url) res=request.urlopen(url)#获取数据 # step 4 用来展现get到的这个url有多少data data=res.read()#读数据 print('bytes:',len(data),'url:',url) t1=time.time()#获取程序开始时间 f('http://www.dangdang.com') f('http://d.cn') f('http://www.163.com') f('http://www.126.com')#单线程执行 t2=time.time() print('--------------------->cost:',t2-t1) #step 5 协程执行 gevent.spawn(f,'http://d.cn') gevent.spawn(f,'http://www.dangdang.com') gevent.spawn(f,'http://www.163.com') gevent.spawn(f,'http://www.126.com') t3=time.time() print('--------------------->cost:',t3-t2) --------------------------------- GET: http://www.dangdang.com bytes: 168224 url: http://www.dangdang.com GET: http://d.cn bytes: 80968 url: http://d.cn GET: http://www.163.com bytes: 688790 url: http://www.163.com GET: http://www.126.com bytes: 12921 url: http://www.126.com --------------------->cost: 1.2731683254241943 --------------------->cost: 0.0
单线程遇到io轮训的过程,须要注意注册轮询的过程。gevent.spawn
import socket import gevent from gevent import monkey monkey.patch_all() def server_port_and_listen(): sever1=socket.socket() sever1.bind(('localhost',9999)) sever1.listen() count=1 print('开始监听9999端口') while True: conn,addr=sever1.accept() #精髓在这里 每一个链接起一个协程 gevent.spawn(server_deal_conn,conn,count) #协程轮训监听 遇到io自动切换 #server_deal_conn(conn) count+=1 def server_deal_conn(x,i): print('链接%s创建'%i) try: while True: data=x.recv(1024) if not data: print('链接已经断开') break tmp_data='i am %s\n'%i+data.decode() x.send(tmp_data.encode('utf-8')) #此处不考虑长度了,只作短消息传递 except Exception as ex: print(ex) finally: print('断开链接',i,'详细信息',x) x.close() server_port_and_listen()
------------------------------------------
开始监听9999端口
链接1创建
链接2创建
链接3创建
链接4创建
[WinError 10054] 远程主机强迫关闭了一个现有的链接。
断开链接 1 详细信息 <gevent._socket3.socket object, fd=616, family=2, type=1, proto=0>
[WinError 10054] 远程主机强迫关闭了一个现有的链接。
断开链接 2 详细信息 <gevent._socket3.socket object, fd=624, family=2, type=1, proto=0>
[WinError 10054] 远程主机强迫关闭了一个现有的链接。
断开链接 3 详细信息 <gevent._socket3.socket object, fd=628, family=2, type=1, proto=0>
[WinError 10054] 远程主机强迫关闭了一个现有的链接。
断开链接 4 详细信息 <gevent._socket3.socket object, fd=620, family=2, type=1, proto=0>
链接5创建
链接6创建
即 :事件 -> 触发 ->回应
事件如我点击鼠标、键盘、个人网卡收到数据包
回应如点鼠标关机,ctrl+s保存文件。回复对方的ping包。和对方三次握手。
2.1 单线程
2.2 多线程
2.3 异步
单线程和多线程随着须要扫描设备的增长,更容易出现响应时间和遗漏响应的状况。多线程须要屡次切换上下文。
针对不一样的操做系统有不一样的库能够选择
4.一、select库
select库是各个版本的linux和windows平台都支持的基本事件驱动模型库,而且在接口的定义上也基本相同,只是部分参数的含义略有差别。
Select库有三种事件:读事件、写事件、异常事件
4.二、poll库
poll库,做为linux平台上的基本事件驱动模型,Windows平台不支持poll库。
4.三、epoll库
epoll库是Nginx服务器支持的高性能事件之一,它是公认的很是优秀的事件驱动模型,和poll和select有很大的不一样,属于poll库的一个变种,他们的处理方式都是建立一个待处理事件列表,而后把这个事件列表发送给内核,返回的时候,再去轮询检查这个列表,以判断事件是否发生。
readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是没有任何fd就绪,那程序就会一直阻塞在这里 # select中第1个参数表示inputs中发生变化的句柄放入readable。 # select中第2个参数表示outputs中的值原封不动的传递给writeable。
# select中第3个参数表示inputs中发生错误的句柄放入exeptional.
import socket import queue import select input_list=[] output_list=[] conn_list=[] my_dict={} server1=socket.socket() server1.setblocking(False)#默认为阻塞IO server1.bind(('localhost',6666)) server1.listen() input_list.append(server1) while True: readable, writeable, exeptional = select.select(input_list, output_list, input_list) # io轮询监控 多路复用 # print(readable) for i in readable: if i is server1: print('新建链接') conn, addr = i.accept() # 注意accept的位置 conn.setblocking(False) input_list.append(conn) my_dict[conn] = queue.Queue() # 生成一个字典key key对应value为字典 else: print('in else') try: data = i.recv(1024) print('-----------------------------', len(data)) if data: print('接收到消息') print(data.decode()) my_dict[i].put(data) print(my_dict) output_list.append(i) print('outputlist', output_list) else: print('断开') input_list.remove(i) except Exception as areyouok: print(areyouok, i) input_list.remove(i) if i in output_list: output_list.remove(i) del my_dict[i] for i in writeable: print('writeable----------') data = my_dict[i].get() print(data.decode()) i.send(data) output_list.remove(i) for i in exeptional: input_list.remove(i) if i in output_list: output_list.remove(i) del my_dict[i]
seletor在select、poll、epoll基础上封装,若是系统支持eqoll优先epoll
#step 1 生成一个selector对象,至关于监听列表
still_listen=selectors.DefaultSelector()
#step 2 定义方法
#等待链接的方法 def wait_for_connect(server,mask): conn,addr=server1.accept() ... #等待消息的方法 def conn_wait_for_message(conn,mask): data=conn.recv(1024) ...
#Step 3 建立socket
socket.socket()、bind(('127.0.0.1',9999))、listen(10000)、.setblocking(False)
#step 4 注册及卸载 selectors.EVENT_READ/ x.register /x.unregister(conn)(相似于gevent.spawn)
still_listen.register(server1,selectors.EVENT_READ,wait_for_connect)
still_listen.register(conn,selectors.EVENT_READ,conn_wait_for_message)
#step 5 开始监听
infor_come=still_listen.select()#无返回时阻塞 for key,mask in infor_come: choice_func=key.data #根据注册的方式获取须要调用的函数 choice_func(key.fileobj,mask)#传入对象(已创建的链接conn或新建链接server)
import selectors import socket #ulimit - n #ulimit -SHn #step 1 selectors.DefaultSelector()生成一个额select对象 still_listen=selectors.DefaultSelector() #step 2 定义方法 #等待链接的方法 def wait_for_connect(server,mask): conn,addr=server1.accept() print('accepted', conn, 'from', addr) conn.setblocking(False) still_listen.register(conn,selectors.EVENT_READ,conn_wait_for_message) #等待消息的方法 def conn_wait_for_message(conn,mask): try: data=conn.recv(1024) print(conn,'message coming') if data: conn.send(data) else: print('close...') still_listen.unregister(conn) conn.close() except Exception as ex: print(ex) still_listen.unregister(conn) conn.close() server1=socket.socket() server1.bind(('127.0.0.1',9999)) server1.listen(10000) server1.setblocking(False) #step 3 注册方法 x.register 与卸载方法 x.unregister(conn) still_listen.register(server1,selectors.EVENT_READ,wait_for_connect) print('运行成功,开始监听端口。。。') #step 4 开始监听 while True: infor_come=still_listen.select()#无返回时阻塞 for key,mask in infor_come: choice_func=key.data #根据注册的方式获取须要调用的函数 choice_func(key.fileobj,mask)#传入对象(已创建的链接conn或新建链接server)
实际就是循环创建链接
import socket socket_List=[]#用于存放链接对象 t1=time.time()#开始时间 for i in range(1000):#生成一千个链接对象 x=socket.socket() socket_List.append(x) print('准备开始链接服务器') t2=time.time() for x in socket_List: x.connect(('192.168.99.106', 8999))#对象开始正式链接服务器 print('链接创建完毕') messgae_list=['it is the first message','it is the second message','it is the third message'] t3=time.time() for mes in messgae_list: print(mes) for s in socket_List:#每一个链接开始发送内容 print('%s: sending "%s"' % (s.getsockname(), mes) ) s.send(mes.encode('utf-8')) for s in socket_List:#每一个链接开始接收内容 data=s.recv(1024) print('%s: received "%s"' % (s.getsockname(), data)) if not data: print(sys.stderr, 'closing socket', s.getsockname()) t4=time.time() print(t2-t1) print(t3-t2) print(t4-t3) #输出时间
os.walk()的使用!!!