day40学习整理-并发编程

2019/09/19 学习整理

并发编程

线程queue

queue队列:使用import queue,用法与进程Queue同样编程

1、先进先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''

2、后进先出

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''

3、存储数据时可设置优先级的队列

class queue.PriorityQueue(maxsize=0)数组

import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(一般是数字,也能够是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

4、其余用法

exception queue.Empty:在Queue对象为空时调用非阻塞get()(或get_nowait())时引起异常。并发

exception queue.Full:在已满的Queue对象上调用非阻塞put()(或put_nowait())时引起异常。app

Queue.qsize()异步

Queue.empty():若是为空则返回Trueasync

Queue.full():若是已满,则返回True异步编程

Queue.put(item, block=True, timeout=None):将项目放入队列。若是可选的args块为true且timeout为None(默认值),则在必要时阻塞,直到有空闲插槽可用。若是timeout是一个正数,它会阻止最多超时秒,若是在该时间内没有可用的空闲槽,则会引起Full异常。不然(块为假),若是空闲插槽当即可用,则将项目放入队列,不然引起彻底异常(在这种状况下忽略超时)。函数

Queue.put_nowait(item):至关于put(item,False)。学习

Queue.get(block=True, timeout=None):从队列中删除并返回一个项目。若是可选的args块为true且timeout为None(默认值),则在必要时阻止,直到某个项可用为止。若是timeout是一个正数,它会阻止最多超时秒,若是在该时间内没有可用的项,则会引起Empty异常。不然(块为假),若是一个项当即可用则返回一个项,不然引起Empty异常(在这种状况下忽略超时)。

Queue.get_nowait():至关于get(False)。

提供了两种方法来支持跟踪守护进程消费者线程是否已彻底处理入队任务。

Queue.task_done():表示之前排队的任务已完成。由队列使用者线程使用。对于用于获取任务的每一个get(),对task_done()的后续调用会告知队列该任务的处理已完成。

若是join()当前正在阻塞,则它将在全部项目都已处理后恢复(这意味着已为每一个已放入队列的项目收到task_done()调用)。

若是调用的次数超过队列中放置的项目,则引起ValueError。

Queue.join(): block直到queue被消费完毕。

线程定时器

from threading import Timer,current_thread

def task(x):
    print('%s run....' %x)
    print(current_thread().name)

if __name__ == '__main__':
    t=Timer(3,task,args=(10,)) # 3s后执行该线程
    t.start()
    print('主')

进程池和线程池

Python标准模块——concurrent.futures

concurrent.futures模块提供了高度封装的异步调用接口

ThreadPoolExecutor:线程池,提供异步调用

ProcessPoolExecutor:进程池,提供异步调用

1、ProcessPoolExecutor 与 ThreadPoolExecutor

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
from multiprocessing import current_process
import time

def task(i):
    print(f'{currentThread().name} 在执行任务 {i}')
    # print(f'进程 {current_process().name} 在执行任务 {i}')
    time.sleep(1)
    return i**2

if __name__ == '__main__':
    pool = ThreadPoolExecutor(4) # 池子里只有4个线程
    # pool = ProcessPoolExecutor(4) # 池子里只有4个线程
    fu_list = []
    for i in range(20):
        # pool.submit(task,i) # task任务要作20次,4个线程负责作这个事
        future = pool.submit(task,i) # task任务要作20次,4个进程负责作这个事
        # print(future.result()) # 若是没有结果一直等待拿到结果,致使了全部的任务都在串行
        fu_list.append(future)
    pool.shutdown() # 关闭了池的入口,会等待全部的任务执行完,结束阻塞.
    for fu in fu_list:
        print(fu.result())

2、回调函数

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
from multiprocessing import current_process
import time

def task(i):
    print(f'{currentThread().name} 在执行任务 {i}')
    # print(f'进程 {current_process().name} 在执行任务 {i}')
    time.sleep(1)
    return i**2


def parse(future):
    # 处理拿到的结果
    print(future.result())



if __name__ == '__main__':
    pool = ThreadPoolExecutor(4) # 池子里只有4个线程
    # pool = ProcessPoolExecutor(4) # 池子里只有4个线程
    fu_list = []
    for i in range(20):
        # pool.submit(task,i) # task任务要作20次,4个线程负责作这个事
        future = pool.submit(task,i) # task任务要作20次,4个进程负责作这个事
        future.add_done_callback(parse)
        # 为当前任务绑定了一个函数,在当前任务执行结束的时候会触发这个函数,
        # 会把future对象做为参数传给函数
        # 这个称之为回调函数,处理完了回来就调用这个函数.


        # print(future.result()) # 若是没有结果一直等待拿到结果,致使了全部的任务都在串行

    # pool.shutdown() # 关闭了池的入口,会等待全部的任务执行完,结束阻塞.
    # for fu in fu_list:
    #     print(fu.result())

协程

1、什么是协程

协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:协程是一种用户态的轻量级线程,即协程是由用户程序本身控制调度的。

须要强调的是:

  1. python的线程属于内核级别的,即由操做系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其余线程运行)
  2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操做系统)控制切换,以此来提高效率(!!!非io操做的切换与效率无关)

对比操做系统控制线程的切换,用户在单线程内控制协程的切换。

优势以下:

  1. 协程的切换开销更小,属于程序级别的切换,操做系统彻底感知不到,于是更加轻量级
  2. 单线程内就能够实现并发的效果,最大限度地利用cpu

缺点以下:

  1. 协程的本质是单线程下,没法利用多核,能够是一个程序开启多个进程,每一个进程内开启多个线程,每一个线程内开启协程
  2. 协程指的是单个线程,于是一旦协程出现阻塞,将会阻塞整个线程

总结协程特色:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里本身保存多个控制流的上下文栈
  4. 附加:一个协程遇到IO操做自动切换到其它协程(如何实现检测IO,yield、greenlet都没法实现,就用到了gevent模块(select机制))

用法介绍

g1=gevent.spawn(func,1,,2,3,x=4,y=5):建立一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面能够有多个参数,能够是位置实参或关键字实参,都是传给函数eat的

g2=gevent.spawn(func2)

g.join():等待g1结束

g2.join():等待g2结束

上述两步合做一步:gevent.joinall([g1,g2])

g1.value:拿到func1的返回值

2、协程操做-gevent模块

Gevent 是一个第三方库,能够轻松经过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它是以C扩展模块形式接入Python的轻量级协程。 Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度。

gevent的同步与异步

from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():  # 同步
    for i in range(10):
        task(i)

def asynchronous(): # 异步
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)
    print('DONE')
    
if __name__ == '__main__':
    print('Synchronous:')
    synchronous()
    print('Asynchronous:')
    asynchronous()
#  上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。
#  初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,
#  后者阻塞当前流程,并执行全部给定的greenlet任务。执行流程只会在 全部greenlet执行完后才会继续向下走。
from gevent import monkey;monkey.patch_all()
# gevent识别不了 time.sleep等io操做 须要打monkey补丁才能识别

import gevent
import time
def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play():
    print('play 1')
    time.sleep(1)
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])

print('主')
相关文章
相关标签/搜索