Python全栈之路-Day40

1 互斥锁(同步锁)

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9

import time
import threading

def addNum():
    global num     # 在每一个线程中都获取这个全局变量
    Lock.acquire() # 每次只能有一个线程在运行Lock块的代码
    temp = num
    time.sleep(0.01)
    num = temp - 1    # 对此公共变量进行-1操做
    Lock.release()

num = 100  # 设定一个共享变量

Lock = threading.Lock()  # 定义同步锁
thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待全部线程执行完毕
    t.join()

print('Result: ', num)

2 死锁与递归锁

2.1 死锁

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9


import threading
import time

mutexA = threading.Lock()
mutexB = threading.Lock()

class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):

        mutexA.acquire()  # 若是锁被占用,则阻塞在这里,等待锁的释放

        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        mutexB.release()

        mutexA.release()


    def fun2(self):

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)

        mutexA.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":

    print("start---------------------------%s"%time.time())

    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()

2.2 递归锁

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9


import threading
import time

RLock = threading.RLock()  # 得到递归锁,能够acquire屡次

class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):
        RLock.acquire()  # acquire计数加1
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        RLock.acquire()  # acquire计数加1
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        RLock.release()  # acquire计数减1
        RLock.release()  # acquire计数减1  计数为0后其余线程就能够竞争这把锁

    def fun2(self):
        RLock.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)
        RLock.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        RLock.release()
        RLock.release()

if __name__ == "__main__":
    print("start---------------------------%s"%time.time())
    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()

3 event对象

线程的一个关键特性是每一个线程都是独立运行且状态不可预测。若是程序中的其 他线程须要经过判断某个线程的状态来肯定本身下一步的操做,这时线程同步问题就 会变得很是棘手。为了解决这些问题,咱们须要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它容许线程等待某些事件的发生。在 初始状况下,Event对象中的信号标志被设置为假。若是有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程若是将一个Event对象的信号标志设置为真,它将唤醒全部等待这个Event对象的线程。若是一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行python

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def worker(event):
    logging.debug('Waiting for redis ready...')
    event.wait()
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

def main():
    readis_ready = threading.Event()  # 完成线程间的通讯 至关于标志位
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()

    logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
    time.sleep(3) # simulate the check progress
    readis_ready.set()

if __name__=="__main__":
    main()

# event.isSet():返回event的状态值;
# event.wait():若是 event.isSet()==False将阻塞线程;
# event.set(): 设置event的状态值为True,全部阻塞池的线程激活进入就绪状态, 等待操做系统调度;
# event.clear():恢复event的状态值为False。

4 进程对象Process

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9

from multiprocessing import Process
import os
import time
def info(name):


    print("name:",name)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("------------------")
    time.sleep(1)

def foo(name):

    info(name)

if __name__ == '__main__':

    info('main process line')


    p1 = Process(target=info, args=('alvin',))
    p2 = Process(target=foo, args=('egon',))
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("ending")

5 协程简介

5.1 yield与协程

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9

import time

"""
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,经过锁机制控制队列和等待,但一不当心就可能死锁。
若是改用协程,生产者生产消息后,直接经过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
"""
# 注意到consumer函数是一个generator(生成器):
# 任何包含yield关键字的函数都会自动成为生成器(generator)对象

def consumer():
    r = ''
    while True:
        # 三、consumer经过yield拿到消息,处理,又经过yield把结果传回;
        #    yield指令具备return关键字的做用。而后函数的堆栈会自动冻结(freeze)在这一行。
        #    当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
        #    就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。经过这种方式,迭代器能够实现无限序列和惰性求值。
        n = yield r
        if not n:
            return
        print('[CONSUMER] ←← Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'
def produce(c):
    # 一、首先调用c.next()启动生成器
    next(c)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] →→ Producing %s...' % n)
        # 二、而后,一旦生产了东西,经过c.send(n)切换到consumer执行;
        cr = c.send(n)
        # 四、produce拿到consumer处理的结果,继续生产下一条消息;
        print('[PRODUCER] Consumer return: %s' % cr)
    # 五、produce决定不生产了,经过c.close()关闭consumer,整个过程结束。
    c.close()
if __name__=='__main__':
    # 六、整个流程无锁,由一个线程执行,produce和consumer协做完成任务,因此称为“协程”,而非线程的抢占式多任务。
    c = consumer()
    produce(c)


'''
result:

[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK
'''

5.2 greentlet

greelet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操做进行恢复为止。可使用一个调度器循环在一组生成器函数之间协做多个任务。greentlet是python中实现咱们所谓的"Coroutine(协程)"的一个基础库.git

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9

from greenlet import greenlet

def test1():
    print (12)
    gr2.switch()
    print (34)
    gr2.switch()

def test2():
    print (56)
    gr1.switch()
    print (78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

5.3 gevent

Python经过yield提供了对协程的基本支持,可是不彻底。而第三方的gevent为Python提供了比较完善的协程支持。github

gevent是第三方库,经过greenlet实现协程,其基本思想是:redis

当一个greenlet遇到IO操做时,好比访问网络,就自动切换到其余的greenlet,等到IO操做完成,再在适当的时候切换回来继续执行。因为IO操做很是耗时,常常使程序处于等待状态,有了gevent为咱们自动切换协程,就保证总有greenlet在运行,而不是等待IO。网络

因为切换是在IO操做时自动完成,因此gevent须要修改Python自带的一些标准库,这一过程在启动时经过monkey patch完成:app

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9


import gevent
import time

def foo():
    print("running in foo")
    gevent.sleep(2)
    print("switch to foo again")

def bar():
    print("switch to bar")
    gevent.sleep(5)
    print("switch to bar again")

start=time.time()

gevent.joinall(
    [gevent.spawn(foo),
    gevent.spawn(bar)]
)

print(time.time()-start)

固然,实际代码里,咱们不会用gevent.sleep()去切换协程,而是在执行到IO操做时,gevent自动切换,代码以下:函数

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9

from gevent import monkey
monkey.patch_all()
import gevent
from urllib import request
import time

import ssl
ssl._create_default_https_context = ssl._create_unverified_context # 解决Mac上的报错
def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

start=time.time()

gevent.joinall([
        gevent.spawn(f, 'https://itk.org/'),
        gevent.spawn(f, 'https://www.github.com/'),
        gevent.spawn(f, 'https://zhihu.com/'),
])

# f('https://itk.org/')
# f('https://www.github.com/')
# f('https://zhihu.com/')

print(time.time()-start)
相关文章
相关标签/搜索