python笔记9 线程进程 threading多线程模块 GIL锁 multiprocessing多进程模块 同步锁Lock 队列queue IO模型

线程与进程

进程

进程就是一个程序在一个数据集上的一次动态执行过程。进程通常由程序、数据集、进程控制块三部分组成。咱们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程当中所须要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统能够利用它来控制和管理进程,它是系统感知进程存在的惟一标志。python

线程

线程的出现是为了下降上下文切换的消耗,提升系统的并发性,并突破一个进程只能干同样事的缺陷,使到进程内并发成为可能。linux

进程和线程的关系:

(1)一个线程只能属于一个进程,而一个进程能够有多个线程,但至少有一个线程。
(2)资源分配给进程,同一进程的全部线程共享该进程的全部资源。
(3)CPU分给线程,即真正在CPU上运行的是线程。安全

串行 并行 并发

多年前单核cpu同时执行两个程序就是并发执行.网络

同步异步

在计算机领域,同步就是指一个进程在执行某个请求的时候,若该请求须要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去;异步是指进程不须要一直等下去,而是继续执行下面的操做,无论其余进程的状态。当有消息返回时系统会通知进程进行处理,这样能够提升执行的效率。举个例子,打电话时就是同步通讯,发短息时就是异步通讯。多线程

 进程实例

from multiprocessing import Process
import threading
import os
import time
def info(name):
    print('父进程:',os.getppid())           #打印当前调用函数进程的父进程号
    print('进程',os.getpid(),name)          #打印当前调用函数进程的进程号
    print('----------------------------')
if __name__ == '__main__':                   #判断是被调用仍是在当前文件执行
    info('主线程')                            #先打印当前进程的进程号与打印当前调用函数进程的父进程
    p1 = Process(target=info,args=('多进程1',))    #开多进程1打印子进程的进程号与父进程的进程号
    p2 = Process(target=info,args=('多进程2',))  #开多进程2打印子进程的进程号与父进程的进程号
    p1.start()
    p2.start()


父进程: 17724 #父进程是pycharm的进程号
进程 12128 主线程                 #当前进程号是pycharm调用python.exe的进程号 ----------------------------
父进程: 12128 #子进程的父进程号是朱金成浩     
进程 15080 多进程2          
----------------------------
父进程: 12128 #主进程下开多个子进程,共有的是一个主进程号
进程 18476 多进程1
---------------------------

threading模块

普通执行脚本时 是串行 

import time

def coun(n):
    print('running on number: %s' % n)
    time.sleep(n)

start = time.time()
coun(3)
coun(2)
print('用时:%s秒' % (time.time() - start))     #串行,从上到下依次执行,执行完共耗时5秒

running on number: 3
running on number: 2
用时:5.0006632804870605秒

多线程

import time
import threading           #开多线程的模块

def coun(n):
    print('running on number: %s' % n)
    time.sleep(n)


start = time.time()
t1 = threading.Thread(target=coun,args=(3,))    #此处 开第一个子线程实传入参数3     此处仍是主线程执行的
t2 = threading.Thread(target=coun,args=(2,))    #此处 开第二个子线程实传入参数2     此处也是主线程执行的
t1.start()                                      #到此处执行子线程1
t2.start()                               #到此处执行子线程2
print('用时:%s秒' % (time.time() - start))      #整个代码除了 t1.start() t2.start()是子线程执行的其余都是主线程执行的


running on number: 3
running on number: 2
用时:0.000995635986328125秒                   #主线程并无 执行函数,所以主线程耗时0.0秒 函数由子线程执行

当启动子线程时,子线程和主线程 并发着一块儿执行并发

join()

在子线程运行完以前,这个子线程的主线程一直被阻塞,等子线程运行完在继续运行主线程app

import time
import threading     #开多线程的模块

def coun(n):
    print('running on number: %s' % n)
    time.sleep(n)
    print('执行结束')


start = time.time()
t1 = threading.Thread(target=coun,args=(3,)) #此处 开第一个子线程实传入参数3
t2 = threading.Thread(target=coun,args=(2,)) #此处 开第二个子线程实传入参数3
t1.start()                     #到此处执行子线程1
t2.start()                     #到此处执行子线程2
t1.join()             #在t1运行完以前,主线程一直被阻塞       
t2.join()             #在t2运行完以前,主线程一直被阻塞 print('用时:%s秒' % (time.time() - start))    #只有在t1和t2共同 运行完以后才继续运行主线程  

running on number: 3
running on number: 2
执行结束
执行结束
用时:3.002174139022827秒   
循环多个子线程并在最慢的运行完成后结束主线程的方法:
import time
import threading     # 开多线程的模块


def c(a):
    print('%s开始' %(a))
    if a == 5:                  #若是是5等待1秒
        time.sleep(1)
        print('%s运行完成'% (a))
    elif a == 1:                 #若是是1等待5秒
        time.sleep(5)
        print('%s运行完成' % (a))
    else:
        time.sleep(a)
        print('%s运行完成' % (a))

threadd_list = []
d = time.time()
for i in range(1, 6):                  #循环开新的子进程  
    t = threading.Thread(target=c,args=(i,))
    threadd_list.append(t)            #将每一个子线程的t加入到列表
    t.start()                           #启动子进程
for i in threadd_list:                  
    i.join()                        #循环等待全部子进程结束在继续运行

print('共用时%s' % (time.time() - d))   #共耗时 5秒,最慢的线程5秒
  

1开始
2开始
3开始
4开始
5开始
5运行完成
2运行完成
3运行完成
4运行完成
1运行完成
共用时5.00162410736084dom

错误方法1:异步

import time
import threading  # 开多线程的模块


def c(a):
    print('%s开始' %(a))
    if a == 5:                  #若是是5等待1秒
        time.sleep(1)
        print('%s运行完成'% (a))
    elif a == 1:                 #若是是1等待5秒
        time.sleep(5)
        print('%s运行完成' % (a))
    else:
        time.sleep(a)
        print('%s运行完成' % (a))

threadd_list = []
d = time.time()
for i in range(1, 6):                  #循环开新的子进程
    t = threading.Thread(target=c,args=(i,))
    threadd_list.append(t)
    t.start()                           #启动子进程
    t.join()                            #这样在循环里join()会在每次循环时都要等待,结果和串行同样 print('共用时%s' % (time.time() - d))   #共耗时

1开始
1运行完成
2开始
2运行完成
3开始
3运行完成
4开始
4运行完成
5开始
5运行完成
共用时15.006914854049683

错误方法2:socket

import time
import threading  # 开多线程的模块


def c(a):
    print('%s开始' %(a))
    if a == 5:                  #若是是5等待1秒
        time.sleep(1)
        print('%s运行完成'% (a))
    elif a == 1:                 #若是是1等待5秒
        time.sleep(5)
        print('%s运行完成' % (a))
    else:
        time.sleep(a)
        print('%s运行完成' % (a))

threadd_list = []
d = time.time()
for i in range(1, 6):                  #循环开新的子进程
    t = threading.Thread(target=c,args=(i,))
    threadd_list.append(t)
    t.start()                           #启动子进程
t.join()                #放在循环外,当循环结束时,会执行最后循环的t ,最后的是5对应是1秒 print('共用时%s' % (time.time() - d))   #共耗时

1开始
2开始
3开始
4开始
5开始
5运行完成
共用时1.0022823810577393
2运行完成
3运行完成
4运行完成
1运行完成

setDaemon(True)

将线程声明为守护线程,必须在start()方法以前设置,做用是当主线程运行完成后,无论子线程是否运行完,都随主线程一块儿退出,与join的做用相反,join是主线程等待子线程结束在继续往下执行

import time
import threading  # 开多线程的模块


def c(a):
    print('%s开始' %(a))
    if a == 5:                  #若是是5等待1秒
        time.sleep(1)
        print('%s运行完成'% (a))
    elif a == 1:                 #若是是1等待5秒
        time.sleep(5)
        print('%s运行完成' % (a))
    else:
        time.sleep(a)
        print('%s运行完成' % (a))

threadd_list = []
d = time.time()
for i in range(1, 6):                  #循环开新的子进程
    t = threading.Thread(target=c,args=(i,))
    threadd_list.append(t)
    t.setDaemon(True)            #主程序结束子程序结束
    t.start()                           #启动子进程

print('共用时%s' % (time.time() - d))   

1开始 2开始 3开始 4开始 5开始 共用时0.0019898414611816406      #主程序共耗时0秒,主程序结束子程序也结束

当有多个子线程而只设置一个守护线程则没有意义,

由于其余子线程并无运行完,因此主线程并无真正意义上的运行完,只有全部的子线程都是守护线程才能在主线程运行完直接所有结束,

不然只要有一个子线程在运行都要继续等待.

import time
import threading  # 开多线程的模块


def c(a):
    print('%s开始' %(a))
    if a == 5:                  #若是是5等待1秒
        time.sleep(1)
        print('%s运行完成'% (a))
    elif a == 1:                 #若是是1等待5秒
        time.sleep(5)
        print('%s运行完成' % (a))
    else:
        time.sleep(a)
        print('%s运行完成' % (a))

threadd_list = []
d = time.time()
for i in range(1, 6):                  #循环开新的子进程
    t = threading.Thread(target=c,args=(i,))
    threadd_list.append(t)
    if i == 1:                        #只开启1的守护线程,
        t.setDaemon(True)            
    t.start()                           #启动子进程

print('共用时%s' % (time.time() - d))   #共耗时

1开始
2开始
3开始
4开始
5开始
共用时0.0009970664978027344
5运行完成
2运行完成
3运行完成
4运行完成                         #此处缺乏1运行完成,由于当1运行完时以前其余全部子线程都运行结束了,也意味着主线程结束了,所以1与主线程一块儿结束了

GIL锁

因为GIL锁的存在,python不能并行,只能并发, 所以python不能算是真正意义上的多线程并行运算.,只能是占用一个cpu的一个线程的并发运算

所以,对于密集型运算,python不适用 threading多线程模块

密集型计算串行:

import time
import threading  # 开多线程的模块

def aa():
    a = 1
    for i in range(1,100000):
        a *= i
def bb():
    a = 1
    for i in range(1,100000):
        a *= i
c = time.time()
aa()
bb()
print('耗时%s' %(time.time() - c)) 

耗时5.802525758743286                #密集型并行计算总耗时5.8秒

密集型threading多线程计算 并发:

import time
import threading  # 开多线程的模块

def aa():
    a = 1
    for i in range(1,100000):
        a *= i
def bb():
    a = 1
    for i in range(1,100000):
        a *= i
c = time.time()
t1 = threading.Thread(target=aa)
t2 = threading.Thread(target=bb)
t1.start()
t2.start()
t1.join()
t2.join()
print('耗时%s' %(time.time() - c))
 
耗时6.302755832672119     #密集型threading多线程计算并发  耗时6.3秒 , 计算量越大差距越大 

对于密集型计算的开启multiprocessing多进程会快 ,可是咱们不可能无限量开进程

multiprocessing多进程模块

def aa():
    a = 1
    for i in range(1, 100000):
        a *= i


def bb():
    a = 1
    for i in range(1, 100000):
        a *= i
if __name__ == '__main__':    #多进程模块必须这样写if判断,不然报错 import time
    import multiprocessing  # 引用多进程模块
    c = time.time()
    t1 = multiprocessing.Process(target=aa)   #和多线程用法基本一致
    t2 = multiprocessing.Process(target=bb)
    t1.start()
    t2.start()
    t1.join()                       #也有join和setDaemon()方法
t2.join()
print('耗时%s' %(time.time() - c))
    耗时4.506261587142944 

   同步锁Lock

同步锁保证数据的安全,当一个线程获取这把锁后,只有他释放了其余线程才能获取继续执行,以此类推

不加同步锁:

import threading
import time
def jnum():
    global num               
    xx = num               #每次代码走到这里都把num的值赋值给xx,由于是多线程,每一个线程在循环时,在0.1秒内就循环完成了 所以每一个线程的值都是10
    time.sleep(0.1)        
    num = xx - 1           #每一个线程都是执行10-1

num = 10
thread_list = []
for i in range(10):
    f1 = threading.Thread(target=jnum,)
    thread_list.append(f1)
    f1.start()                       
for i in thread_list:
    i.join()
print(num)

9

加同步锁:

import threading
import time
R=threading.Lock()      #同步锁
def jnum():
    R.acquire()          #加锁,每一个锁只能给一个线程, 当以一个线程运行到这里获取到锁时,日后的线程会处在等待状态,等待第一个锁释放时在获取锁,以此类推. global num
    xx = num
    time.sleep(0.1)
    num = xx - 1
    R.release()          #解锁

num = 10
thread_list = []
for i in range(10):
    f1 = threading.Thread(target=jnum,)
    thread_list.append(f1)
    f1.start()
for i in thread_list:
    i.join()
print(num)
    
0

队列queue

import queue       #队列
q = queue.Queue()        #建立队列,默认先进先出模式:FIFO 
q.put(111)        #put存
q.put(222)
q.put(333)
print(q.get())    #get取
print(q.get())   
print(q.get())    
print(q.get())    #无值阻塞,等待新的值进入

maxsize

import queue       #队列
q = queue.Queue(maxsize=3)        #建立队列,默认先进先出模式:FIFO maxsize=3 只能存3个值,若是要在往里存,须要取出来才能存

q.put(111)        #put存
q.put(222)
q.put(333)
print(q.get())    #get取
print(q.get())
print(q.get())
print(q.get())    #无值阻塞,等待新的值进入
import queue       #队列
q = queue.Queue(maxsize=3)        #建立队列,默认先进先出模式:FIFO

q.put(111)        #put存
q.put(222)
q.put(333)
q.put(444,False)          #加False当存满后又有值进来会报错
print(q.get())    #get取
print(q.get())
print(q.get())
print(q.get())    #无值阻塞,等待新的值进入
Traceback (most recent call last): File
"F:/python24期/L010-老男孩教育-Python24期VIP视频-mp4/练习/lianxi.py", line 8, in <module> q.put(444,False) #加False当存满后又有值进来会报错 File "C:\Python36\lib\queue.py", line 130, in put raise Full queue.Full
import queue       #队列
q = queue.Queue(maxsize=3)        #建立队列,默认先进先出模式:FIFO

q.put(111)        #put存
q.put(222)
q.put(333)

print(q.get())    #get取
print(q.get())
print(q.get())
print(q.get(block=False))    #加block=False 当取完时 再取会报错

111
Traceback (most recent call last):
222
  File "F:/python24期/L010-老男孩教育-Python24期VIP视频-mp4/练习/lianxi.py", line 12, in <module>
333
    print(q.get(block=False))    #无值阻塞,等待新的值进入
  File "C:\Python36\lib\queue.py", line 161, in get raise Empty
queue.Empty
此包中的经常使用方法(q = Queue.Queue()):
q.qsize() 返回队列的大小 q.empty() 若是队列为空,返回True,反之False q.full() 若是队列满了,返回True,反之False q.full 与 maxsize 大小对应 q.get([block[, timeout]]) 获取队列,timeout等待时间 q.get_nowait() 至关q.get(False)非阻塞
q.put(item) 写入队列,timeout等待时间 q.put_nowait(item) 至关q.put(item, False) q.task_done() 在完成一项工做以后,q.task_done() 函数向任务已经完成的队列发送一个信号 q.join() 实际上意味着等到队列为空,再执行别的操做

LifoQueue建立队列后进先出

import queue       #队列
q = queue.LifoQueue()            #后进先出队列
q.put(111)        #put存
q.put(222)
q.put(333)

print(q.get())    #get取
print(q.get())
print(q.get())

333
222
111

queue.PriorityQueue建立优先级队里

import queue          #队列
q = queue.PriorityQueue()       #建立优先级队里 根据优先级取
q.put([5,111])        #put存
q.put([7,222])
q.put([1,333])

print(q.get())    #get取
print(q.get())
print(q.get())

[1, 333]
[5, 111]
[7, 222]

生产者消费者模型

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。

生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

这就像,在餐厅,厨师作好菜,不须要直接和客户交流,而是交给前台,而客户去饭菜也不须要不找厨师,直接去前台领取便可,这也是一个结耦的过程。

import random, time
import queue, threading

q = queue.Queue()

 
def chushi(name):                           #厨师作包子
    count = 0
    while count < 10:                         #作十次
        print('making.........') 
        s = random.randrange(3)             #随机生成3之内的随机数
        time.sleep(s)
        q.put(count)                         #把生成的包子放到消息队列,等待客人来取
        print('%s制做了%s个包子' % (name, count))
        count += 1
        print('OK........')


def keren(name):
    count = 0
    while count < 10:
        print('%s来了' %(name))
        s = random.randrange(4)
        time.sleep(s)
        print('%s等待了%s秒' %(name,s))
        if not q.empty():
            data = q.get()
            print('%s吃了%s个包子'%(name,data))
        else:
            print('没有包子')
        count += 1
p = threading.Thread(target=chushi,args=('庖丁',))
c = threading.Thread(target=keren,args=('小张',))
c1 = threading.Thread(target=keren,args=('小王',))
c2 = threading.Thread(target=keren,args=('小李',))
p.start()
c.start()
c1.start()
c2.start()

IO模型

linux下的 network IO.

 1.阻塞模型    blocking IO

全程阻塞

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来讲,不少时候数据在一开始尚未到达(好比,尚未收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,而后kernel返回结果,用户进程才解除block的状态,从新运行起来。
因此,blocking IO的特色就是在IO执行的两个阶段都被block了。

咱们常写的都拿基本都是阻塞模型

 2.非阻塞模型   nonblocking IO

当用户进程发出read操做时,若是kernel中的数据尚未准备好,那么它并不会block用户进程,而是马上返回一个error。从用户进程角度讲 ,它发起一个read操做后,并不须要等待,而是立刻就获得了一个结果。用户进程判断结果是一个error时,它就知道数据尚未准备好,因而它能够再次发送read操做。一旦kernel中的数据准备好了,而且又再次收到了用户进程的system call,那么它立刻就将数据拷贝到了用户内存,而后返回。因此,用户进程实际上是须要不断的主动询问kernel数据好了没有。

在网络IO时候,非阻塞IO也会进行recvform系统调用,检查数据是否准备好,与阻塞IO不同,”非阻塞将大的整片时间的阻塞分红N多的小的阻塞, 因此进程不断地有机会 ‘被’ CPU光顾”。即每次recvform系统调用之间,cpu的权限还在进程手中,这段时间是能够作其余事情的,

      也就是说非阻塞的recvform系统调用调用以后,进程并无被阻塞,内核立刻返回给进程,若是数据还没准备好,此时会返回一个error。进程在返回以后,能够干点别的事情,而后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程一般被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。须要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

服务端

import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.setsockopt
sk.bind(('127.0.0.1',6667))
sk.listen(5)
sk.setblocking(False)    #非阻塞            
while True:
    try:
        print ('waiting client connection .......')
        connection,address = sk.accept()   # 进程主动轮询
        print("+++",address)
        client_messge = connection.recv(1024)
        print(str(client_messge,'utf8'))
        connection.close()
    except Exception as e:
        print (e)
        time.sleep(4)

客户端

import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

while True:
    sk.connect(('127.0.0.1',6667))
    print("hello")
    sk.sendall(bytes("hello","utf8"))
    time.sleep(2)
    break

优势:可以在等待任务完成的时间里干其余活了(包括提交其余任务,也就是 “后台” 能够有多个任务在同时执行)。

缺点:任务完成的响应延迟增大了,由于每过一段时间才去轮询一次read操做,而任务可能在两次轮询之间的任意时间完成。这会致使总体数据吞吐量的下降。

 3 IO多复路   IO multiplexing(select,poll,epoll)  ****

服务端

import socket
import select
sk=socket.socket()
sk.bind(("127.0.0.1",8800))
sk.listen(5)
sk.setblocking(False)
inputs=[sk,]         #建立一个列表存放套接字,第一个列表放socket对象sk. while True:
    r,w,e=select.select(inputs,[],[],5)        #监听事件驱动 一但inputs列表里有变更,就将变更的值赋给r,若是有连接进来就是sk有变更,r就等于sk. for obj in r:
        if obj==sk:                      #若是r等于sk .
            conn,add=obj.accept()        #取sk的conn
            inputs.append(conn)  #将conn添加至列表 此时因sk变更发生的执行结束,而且将conn加入到inputs列表,所以列表有变更,变更为conn,conn赋值给r继续执行. else:   #此时r为conn不是sk 执行else

            data_byte=obj.recv(1024)             #如下为正常socket的tcp链接
            print(str(data_byte,'utf8'))
            if not data_byte:          #在linux系统时,若是强行关掉了客户端 客户端的返回值是空
                inputs.remove(obj)     #删除此时的客户端的conn链接 continue
            inp=input('回答%s: >>>'%inputs.index(obj))
            obj.sendall(bytes(inp,'utf8'))

客户端

import socket
sk=socket.socket()
sk.connect(('127.0.0.1',8800))

while True:
    inp=input(">>>>")   # how much one night?
    sk.sendall(bytes(inp,"utf8"))
    data=sk.recv(1024)
    print(str(data,'utf8'))

 4. 异步    asynchronous IO

相关文章
相关标签/搜索