并发编程

操做系统

系统是一个有程序员写出来的软件, 该软件用于控制计算机的硬件,让他们之间进行相互配合python

并发与并行

并发:伪,好比吃饭时,电话响了,停下吃饭动做去接电话就是并发,至关于能够处理多个任务,但不能同时处理react

并行:真,如果一遍吃饭一遍接电话就是并行,至关于能够处理多个任务,且能够同时进行git

线程与进程

一个程序至少有一个进程和一个线程,一个CPU一次只能执行一个进程,而一个进程中能够有一个线程,也能够有多个线程,线程是程序执行的最小单位程序员

1.由于线程是CPU工做的最小单元,建立线程能够利用CPU多核的优点实现并行操做(不适用于Python)github

2.进程与进程之间是数据隔离的(jave/c#)进程是为线程创造工做环境web

3.Python中存在一个GIL锁,所以只能一个进程对应一个线程,就不能利用多核的优点,只能经过多线程的方式(但会形成资源的浪费)c#

4.当python使用多线程处理I/O密集型时,能够提升效率,当计算密集型时只能经过多进程安全

线程

线程的建立

第一种函数式多线程

def func(arg):
    print(arg)


t1 = threading.Thread(target=func, args=(11, ))
t1.start()
View Code

第二种类方式并发

class MyThread(threading.Thread):
    def run(self):
        print(self._args)


t1 = MyThread(args=(11, ))
t1.start()
View Code2

主线程默认等子线程执行完后再执行

def func(arg):
    print(arg)


t1 = threading.Thread(target=func, args=(11, ))
t1.start()

t2 = threading.Thread(target=func, args=(22, ))
t2.start()
print("主线程")

setDaemon

默认值为False,当为True时,主线程再也不等子线程,当主线程结束时终止全部的子线程

def func(arg):
    print(arg)


t1 = threading.Thread(target=func, args=(11, ))
t1.setDaemon(True)
t1.start()

t2 = threading.Thread(target=func, args=(22, ))
t2.setDaemon(True)
t2.start()
print("主线程")

join

开发者可使用join()来控制主线程等子线程的时间

def func(arg):
    print(arg)


t1 = threading.Thread(target=func, args=(11, ))
t1.start()
t1.join(2)

t2 = threading.Thread(target=func, args=(22, ))
t2.start()
t2.join(2)
print("主线程")

线程名称

def func(arg):
    t = threading.current_thread()
    name = t.getName()
    print(arg, name)


t1 = threading.Thread(target=func, args=(11, ))
t1.setName("线程一")
t1.start()

t2 = threading.Thread(target=func, args=(22, ))
t2.setName("线程二")
t2.start()
print("主线程")

start

start不是表明开始运行线程,而是向cpu发送信息代表本身准备就绪,但是被CPUdcdu调度了,可是CPU是否会当即调度要取决于CPU的运算

 锁

1.当线程安全时(列表和字典),锁会在内部让多个线程排队操做

2.当线程不安全时,此时的排队处理还能够起到线程安全的做用

3.一次只放一个线程

lock

import threading

lst = []
lock = threading.Lock()


def func_lock(args):
    lock.acquire()  # 加锁 
    lst.append(args)
    m = lst[-1]
    print(args, m)
    lock.release()  # 解锁


for i in range(10):  # 建立十个线程
    t = threading.Thread(target=func_lock, args=(i,))
    t.start()
lock

rlock

import threading
import time

lst = []
lock = threading.RLock()


def func_lock(args):
    lock.acquire()  # 加锁
    lock.acquire()  # 再次加锁
    lst.append(args)
    time.sleep(1)
    m = lst[-1]
    print(args, m)
    lock.release()  # 解锁
    lock.release()  # 不会锁死


for i in range(10):  # 建立十个线程
    t = threading.Thread(target=func_lock, args=(i,))
    t.start()
rlock

信号量

Semaphore一次放n个

import threading
import time


lock = threading.BoundedSemaphore(3)


def func_lock(args):
    lock.acquire()  # 加锁
    time.sleep(1)
    print(args)
    lock.release()  # 解锁


for i in range(10):  # 建立十个线程
    t = threading.Thread(target=func_lock, args=(i,))
    t.start()
Semphore

条件

condition经过一次方法放入指定个数

import threading
import time


lock = threading.Condition()


def func_lock(args):
    lock.acquire()
    lock.wait()  # 加锁
    time.sleep(0.1)
    print("\n"+str(args))
    lock.release()  # 解锁


for i in range(10):  # 建立十个线程
    t = threading.Thread(target=func_lock, args=(i,))
    t.start()

while True:
    inp = int(input("放入的数量>>>>"))
    lock.acquire()
    lock.notify(inp)
    lock.release()
condition
import threading
import time


lock = threading.Condition()


def func_lock():
    input("放入数量>>>>")
    return True


def func(args):
    lock.wait_for(func_lock)
    print(args)
    time.sleep(1)


for i in range(10):  # 建立十个线程
    t = threading.Thread(target=func, args=(i,))
    t.start()
2

事件

控制放开与禁止,一旦放开就是放开所有线程

import threading
import time


lock = threading.Event()


def func_lock(args):
    lock.wait()  # 加锁
    time.sleep(1)
    print(args)


for i in range(10):  # 建立十个线程
    t = threading.Thread(target=func_lock, args=(i,))
    t.start()

input("放行>>>>")
lock.set()

input("中止放行>>>>")
lock.clear()

for i in range(10):  # 建立十个线程
    t = threading.Thread(target=func_lock, args=(i,))
    t.start()

input("放行>>>>")
lock.set()
Event

threading.local

内部自动为每一个线程维护一个空间(字典),用于当前存储属于本身的值,以保证线程之间的数据隔离

import threading
import time


lock = threading.local()


def func(name, age):
    # 内部会为当前线程建立一个空间用于储存
    lock.name = name
    lock.age = name
    time.sleep(1)
    print(lock.name, name)
    print(lock.age, age)


for i in range(10):  # 建立十个线程
    t = threading.Thread(target=func, args=(i, i+10))
    t.start()
threading.local
import time
import threading

DATA_DICT = {}


def func(arg):
    ident = threading.get_ident()  # 获取表明此线程线程的值
    DATA_DICT[ident] = arg  # 线程的值做为字典的key, arg做为values
    time.sleep(1)
    print(DATA_DICT[ident], arg)


for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()
loacl实现原理
import time
import threading
INFO = {}


class Local(object):

    def __getattr__(self, item):
        ident = threading.get_ident()
        return INFO[ident][item]

    def __setattr__(self, key, value):
        ident = threading.get_ident()
        if ident in INFO:
            INFO[ident][key] = value  # INFO[ident]是字典indent键对应的值,此值是一个字典
        else:
            INFO[ident] = {key: value}


obj = Local()


def func(arg):
    obj.phone = arg  # 调用对象的 __setattr__方法(“phone”,1)
    time.sleep(2)
    print(obj.phone, arg)


for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()
类方式实现原理

线程池

from concurrent.futures import ThreadPoolExecutor
import time


def task(x, y):
    time.sleep(2)
    print(x, y)


# 建立5个线程池
pool = ThreadPoolExecutor(5)

for i in range(10):
    # 去线程池申请一个线程执行task函数
    pool.submit(task, i, i + 10)
pool

生产者消费者模型

三部件:消费者

      队列:先入先出

      栈:先入后出

    生产者

import time
import queue
import threading

q = queue.Queue()  # 线程安全


def producer(id):
    """
    生产者
    :return:
    """
    while True:
        time.sleep(2)
        q.put('包子')
        print('厨师%s 生产了一个包子' % id)


for i in range(1, 4):
    t = threading.Thread(target=producer, args=(i,))
    t.start()


def consumer(id):
    """
    消费者
    :return:
    """
    while True:
        time.sleep(1)
        v1 = q.get()
        print('顾客 %s 吃了一个包子' % id)


for i in range(1, 3):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()
demo

 进程

进程建立

import multiprocessing


def func(args):
    print(args)


def run():
    for i in range(10):
        t = multiprocessing.Process(target=func, args=(i,))
        t.start()


if __name__ == '__main__':
    run()
function
class MyProcess(multiprocessing):
    def run(self):
        print("当前进程", multiprocessing.current_process())


def run():
    t = MyProcess()
    t.start()


if __name__ == '__main__':
    run()
class

进程的经常使用功能

进程的经常使用功能与线程相似,只是使用时把线程的方式换为进程就能够

join 等子进程的最多时间,不填则等完
deamon 是否等子进程, 默认为False,不等
name 查看当前进程名字
multiprocessing.current_process() 查看当前进程名字
multiprocessing.current_process().ident/pid 查看当前进程id

进程间的数据共享

进程之间的数据是不能够共享的,可是为了某些需求,有进程间数据共享的须要时,能够经过下面的方式进行

Queue

import multiprocessing


def func(args, q):
    q.put(args)


if __name__ == '__main__':
    q = multiprocessing.Queue()
    for i in range(10):
        t = multiprocessing.Process(target=func, args=(i, q, ))
        t.start()
    while True:
        v = q.get()
queue

Manager

import multiprocessing


def func(args, dic):
    dic[args] = 123


if __name__ == '__main__':
    m = multiprocessing.Manager()
    dic = m.dict()
    lst = []
    for i in range(10):
        t = multiprocessing.Process(target=func, args=(i, dic, ))
        t.start()
        lst.append(t)
    while True:
        n = 0
        for el in lst:
            if not el.is_alive():  # 进程是否存活
                n += 1
        if n == len(lst):
            break
    print(dic)
manager

进程锁

与线程类似,参考线程

进程池

import time
from concurrent.futures import ProcessPoolExecutor


def task(arg):
    time.sleep(2)
    print(arg)


if __name__ == '__main__':

    pool = ProcessPoolExecutor(5)
    for i in range(10):
        pool.submit(task, i)
pool

单线程实现并发

- 协程+IO切换:gevent
- 基于事件循环的异步非阻塞框架:Twisted
                

 IO多路复用

 IO多路复用是为了检测多个socket是否已经发生变化,(是否链接成功,是否已经获取数据,可读可写)

操做系统检测socket是否发生变化

select:最多1024个socket;循环去检测。
poll:不限制监听socket个数;循环去检测(水平触发)。
epoll:不限制监听socket个数;回调方式(边缘触发)。

python模块检测socket是否发生变化

import selcet
select.select 
select.epoll 

异步非阻塞

 socket非阻塞

非阻塞就是不等待,socket是默认阻塞的,固然但是改变它是否阻塞,它的阻塞体如今connect与recv

client.setblocking(False) # 默认为True
# 会报BlockingIOError的错误,只要捕获便可。

 异步

指通知,当执行完以后,自动执行回调函数或者自动执行某些操做(通知)

from twisted.web.client import getPage, defer
from twisted.internet import reactor


def all_done(arg):
    reactor.stop()


def callback(contents):
    print(contents)


deferred_list = []
url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:
    deferred = getPage(bytes(url, encoding='utf8'))
    deferred.addCallback(callback)
    deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)

reactor.run()
twisted

同步阻塞

对比非阻塞,阻塞固然指的就是等了,等一步完成后才进行下一步,就是严格按顺序来执行

协程

协程是程序员创造出来了的,而不是一个真正存在的东西,单纯的协程没有实际的用处,通常都是配合IO操做使用的

协程就是微线程,是对一个线程进程的分片,使得线程能够在代码块之间能够来回的切换执行,而不是逐行的执行

协程与IO操做

from gevent import monkey
monkey.patch_all() # 之后代码中遇到IO都会自动执行greenlet的switch进行切换
import requests
import gevent


def get_page1(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page2(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page3(url):
    ret = requests.get(url)
    print(url,ret.content)

gevent.joinall([
    gevent.spawn(get_page1, 'https://www.python.org/'), # 协程1
    gevent.spawn(get_page2, 'https://www.yahoo.com/'),  # 协程2
    gevent.spawn(get_page3, 'https://github.com/'),     # 协程3
])

 进程,线程,协程的区别

       进程是计算机资源分配的最小单元,主要用来作数据隔离,线程是工做的最小单元,真正进行工做的其实就是线程.一个进程里能够有多个线程,一个应用程序里能够有多个进程,对于其余语言来讲几乎用不到进程,他们使用的都是线程,而对于Python来讲,对于IO密集型操做使用线程,对于计算密集型操做使用进程.由于python有GIL锁,它的做用是使一个进程中同一时刻只能有一个线程被CPU调度,因此想使用CPU的多核优点只能使用多个进程,而IO操做占用不多的CPU,因此使用多线程

       协程是程序员创造出来的,它自己是不存在的,它是用来可让程序员能够控制代码的执行顺序,它自己存在没有什么意义,可是一旦它与IO切换放在一块儿,它的价值就大了,它能够人为的控制使程序遇到IO就切换到别的任务,IO操做回来时继续执行,这样就可以使使线程的工做不会停,让线程一直工做,python在使用协程时主要是经过greenlet的模块,使用协程+IO操做时使用的是gevent模块

相关文章
相关标签/搜索
本站公众号
   欢迎关注本站公众号,获取更多信息