python学习第十天

线程锁使用场景python

多个线程同时修改同一份数据时必须加锁即线程锁git

 

队列的做用github

解耦,使程序之间实现松耦合编程

提升处理效率多线程

 

队列和列表处理数据的区别并发

队列数据只有一份,取走就没有了,列表数据取出后仍然存在于列表中app

 

python多线程使用场景异步

python多线程不适合CPU操做(计算)密集型任务,适合IO操做(数据读写收发)密集型任务socket

 

每一个进程都有父进程async

 

进程间通讯

进程queue

两个进程的queue经过序列化共享数据

from multiprocessing import Process,Queue

def run(qq):

    qq.put(1)

'''定义主进程队列'''

q=Queue()

'''将主进程队列拷贝给子进程,子进程向队列中放入数据'''

p =Process(target=run,args=(q,))

p.start()

'''经过pickle序列化将子进程队列的数据传递给主进程'''

print(q.get())

 

管道Pipes

使用方法相似socket

from multiprocessing import Process, Pipe

def f(conn):

    conn.send([42, None, 'hello'])

    print("from parent:",conn.recv())

    conn.close()

if __name__ == '__main__':

    parent_conn, child_conn = Pipe()

    p = Process(target=f, args=(child_conn,))

    p.start()

    print(parent_conn.recv())  # prints "[42, None, 'hello']"

    parent_conn.send("ok")

 

Managers

真正的进程数据共享

from multiprocessing import Process,Manager

import os

def f(d,l):

    d[os.getpid()] =os.getpid()

    l.append(os.getpid())

    print(l)

'''能够写成manager =Manager()'''

with Manager() as manager:

    d =manager.dict()

    l =manager.list(range(5))

    p_list =[]

    for i in range(10):

        p =Process(target=f,args=(d,l))

        p.start()

        p_list.append(p)

    for j in p_list:

        j.join()

    print(d)

    print(l)

 

进程锁

保证屏幕输出结果不乱

from multiprocessing import Process,Lock

def f(l,i):

    l.acquire()

    print("hello world",i)

    l.release()

lock =Lock()

for i in range(10):

    p =Process(target=f,args=(lock,i))

    p.start()

 

进程池

apply  串行

apply_async  并行

from  multiprocessing import Process, Pool

import time,os

def Foo(i):

    time.sleep(2)

    print(os.getpid())

    return i + 100

def Bar(arg):

    print('-->exec done:', arg)

'''定义线程池,同时只能执行5个进程'''

pool = Pool(5)

for i in range(10):

    '''并发执行,当子线程执行完前面的函数后由主线程调用callback指定的函数'''

    pool.apply_async(func=Foo, args=(i,), callback=Bar)

    '''串行执行'''

    # pool.apply(func=Foo, args=(i,))

print('end')

pool.close()

pool.join()  # 进程池中进程执行完毕后再关闭,若是注释,那么程序直接关闭。

 

协程的优缺点

优势:

无需线程上下文切换的开销

无需原子操做锁定及同步的开销

方便切换控制流,简化编程模型

高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。因此很适合用于高并发处理

 

缺点:

没法利用多核资源

进行阻塞(Blocking)操做(如IO时)会阻塞掉整个程序

 

协程处理逻辑

遇到IO就切换

 

greenlet和gevent模块

greenlet须要用户本身定义协程切换位置

gevent自动切换,在配合urllib、socket等个别模块使用时须要额外导入monkey模块标记IO操做

greenlet模块

from greenlet import greenlet

def test1():

    print(12)

    r2.switch()

    print(45)

    r2.switch()

    print(67)

def test2():

    print(34)

    r1.switch()

    print(56)

    r1.switch()

r1 =greenlet(test1)

r2 =greenlet(test2)

r1.switch()

 

gevent模块

import gevent

def foo():

    print("1")

    gevent.sleep(2)

    print("1-1")

def bar():

    print("2")

    gevent.sleep(1)

    print("2-2")

def func():

    print("3")

    gevent.sleep(0)

    print("3-3")

gevent.joinall([gevent.spawn(foo),gevent.spawn(bar),gevent.spawn(func)])

 

from urllib import request

import gevent,time

from gevent import monkey

monkey.patch_all()

def f(url):

    print("GET: %s"%url)

    resp =request.urlopen(url)

    data =resp.read()

    print("%d bytes received from %s"%(len(data),url))

urls =['https://www.python.org/','https://www.yahoo.cpm/','https://github.com/']

start_time =time.time()

for url in urls:

    f(url)

print("cost:",time.time()-start_time)

start_time2 =time.time()

gevent.joinall(gevent.spawn[f,'https://www.python.org/'],

               gevent.spawn([f,'https://www.yahoo.cpm/'],

               gevent.spawn([f,'https://github.com/'])))

print("cost:",time.time()-start_time2)

 

事件驱动模型

一个线程接收事件并放入消息队列中,另外一个线程从队列中取出事件处理

 

IO方案

阻塞   单线程请求数据阻塞,数据从内核拷贝到应用内存阻塞

非阻塞  单线程请求数据当即返回错误,客户端不停的请求,数据从内核拷贝到应用内存阻塞

多路复用  单线程包含多个请求让内核帮助监听,只要有一个请求的数据准备好了就通知线程,select不通知哪一个线程准备好了,epoll会通知。数据从内核拷贝到应用内存阻塞

异步  线程请求数据当即返回,当数据准备好时直接拷贝到应用内存并通知线程

 

selectors模块

import selectors

import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):

    conn, addr = sock.accept()  # Should be ready

    print('accepted', conn, 'from', addr)

    conn.setblocking(False)

    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):

    data = conn.recv(1000)  # Should be ready

    if data:

        print('echoing', repr(data), 'to', conn)

        conn.send(data)  # Hope it won't block

    else:

        print('closing', conn)

        sel.unregister(conn)

        conn.close()

sock = socket.socket()

sock.bind(('localhost', 10000))

sock.listen(100)

sock.setblocking(False)

sel.register(sock, selectors.EVENT_READ, accept)

while True:

    events = sel.select()

    for key, mask in events:

        callback = key.data

        callback(key.fileobj, mask)

相关文章
相关标签/搜索