python线程、协程

线程python

Threading用于提供线程相关的操做,线程是应用程序中工做的最小单元。
git

更多方法:程序员

  • start            线程准备就绪,等待CPU调度github

  • setName      为线程设置名称编程

  • getName      获取线程名称安全

  • setDaemon   设置为后台线程或前台线程(默认)
                        若是是后台线程,主线程执行过程当中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均中止
                        若是是前台线程,主线程执行过程当中,前台线程也在进行,主线程执行完后,等待前台线程也执行完成后,中止
    服务器

  • join              逐个执行每一个线程,执行完毕后继续往下执行,该方法使得多线程变得无心义网络

  • run              线程被cpu调度后执行Thread类对象的run方法数据结构

直接调用多线程


#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
 
def show(arg):
   time.sleep(1)
   print('thread'+str(arg))
t_list = []  
for i in range(10):
   t = threading.Thread(target=show, args=(i,))
   t.start()
   t_list.append(t)
for i in t_list:
   t.join()        #等待全部线程结束结束程序
print('main thread stop')

继承式调用

import threading
import time


class MyThread(threading.Thread):
   def __init__(self,num):
       threading.Thread.__init__(self)
       self.num = num

   def run(self):#定义每一个线程要运行的函数

       
print("running on number:%s" %self.num)

       time.sleep(3)

if __name__ == '__main__':

   t1 = MyThread(1)
   t2 = MyThread(2)
   t1.start()
   t2.start()

守护线程

import time
import threading

def run(n):

   print('[%s]------running----\n' % n)
   time.sleep(2)
   print('--done--')

def main():
   for i in range(5):
       t = threading.Thread(target=run,args=[i,])
       #time.sleep(1)
       
t.start()
       t.join(1)
       print('starting thread', t.getName())
m = threading.Thread(target=main,args=[])
m.setDaemon(True) #将主线程设置为Daemon线程,它退出时,其它子线程会同时退出,不论是否执行完任务
m.start()
#m.join(timeout=2)
print("---main thread done----")

线程锁

若是同一时间有多个线程操做同一数据可能会出现混乱现象,因此须要加线程锁保证同一时间只有一个线程操做这一数据,保证数据一致性

加锁:lock = threading.RLock()    解锁:lock.release()

      lock.acquire()

#!/usr/bin/env python
#coding:utf-8

import threading
import time

gl_num = 0

lock = threading.Lock()

def Func():
   lock.acquire()   #修改数据前加锁
   
global gl_num
   gl_num +=1      #对此公共变量进行-1操做
   
time.sleep(1)
   print gl_num
   lock.release()  #修改后释放

for i in range(10):
   t = threading.Thread(target=Func)
   t.start()


正常来说,这个num结果应该是0, 但在python 2.7上多运行几回,会发现,最后打印出来的num结果不老是0,为何每次运行的结果不同呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操做, 因为2个线程是并发同时运行的,因此2个线程颇有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每一个线程在要修改公共数据时,为了不本身在还没改完的时候别人也来修改此数据,能够给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。 

RLock(递归锁)

说白了就是在一个大锁中还要再包含子锁

import threading,time

def run1():
   print("grab the first part data")
   lock.acquire()
   global num
   num +=1
   
lock.release()
   return num
def run2():
   print("grab the second part data")
   lock.acquire()
   global  num2
   num2+=1
   
lock.release()
   return num2
def run3():
   lock.acquire()
   res = run1()
   print('--------between run1 and run2-----')
   res2 = run2()
   lock.release()
   print(res,res2)


if __name__ == '__main__':

   num,num2 = 0,0
   
lock = threading.RLock()
   for i in range(10):
       t = threading.Thread(target=run3)
       t.start()

while threading.active_count() != 1:
   print('active num:',threading.active_count())
else:
   print('----all threads done---')
   print(num,num2)

Semaphore(信号量)

互斥锁 同时只容许一个线程更改数据,而Semaphore是同时容许必定数量的线程更改数据 ,好比厕全部3个坑,那最多只容许3我的上厕所,后面的人只能等里面有人出来了才能再进去。

import threading,time

def run(n):
   semaphore.acquire()
   time.sleep(1)
   print("run the thread: %s\n" %n)
   semaphore.release()

if __name__ == '__main__':

   num= 0
   
semaphore  = threading.BoundedSemaphore(
5) #最多容许5个线程同时运行
   
for i in range(20):
       t = threading.Thread(
target=run,args=(i,))
       t.start()

while threading.active_count() != 1:
   
pass #print threading.active_count()
else:
   
print('----all threads done---')
   
print(num)

Events

python线程的事件用于主线程控制其余线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。

  • clear:将“Flag”设置为False

  • set:将“Flag”设置为True

import threading,time
def light():
   if not event.isSet():
       event.set() #绿灯状态
   
count = 0
   
while True:
       if count < 10:
           print('\033[42;1m--green light on---\033[0m')
       elif count <13:
           print('\033[43;1m--yellow light on---\033[0m')
       elif count <20:
           if event.isSet():
               event.clear()
           print('\033[41;1m--red light on---\033[0m')
       else:
           count = 0
           
event.set() #打开绿灯
       
time.sleep(1)
       count +=1
def car(n):
   while 1:
       time.sleep(1)
       if  event.isSet(): #绿灯
           
print("car [%s] is running.." % n)
       else:
           print("car [%s] is waiting for the red light.." %n)
           event.wait()
if __name__ == '__main__':
   event = threading.Event()
   Light = threading.Thread(target=light)
   Light.start()
   for i in range(3):
       t = threading.Thread(target=car,args=(i,))
       t.start()

queue队列 

queue.qsize() 返回队列的大小 
queue.empty() 若是队列为空,返回True,反之False 
queue.full() 若是队列满了,返回True,反之False
queue.full 与 maxsize 大小对应 

queue.get([block[, timeout]])获取队列,timeout等待时间 
queue.get_nowait() 至关queue.get(False)
queue.put(itemblock=Truetimeout=None) 非阻塞写入队列,timeout等待时间 

queue.put_nowait(item) 至关queue.put(item, False)
queue.task_done() 在完成一项工做以后,queue.task_done()函数向任务已经完成的队列发送一个信号
queue.join() 实际上意味着等到队列为空,再执行别的操做

  • class queue.Queue(maxsize=0) #先入先出

  • class queue.LifoQueue(maxsize=0) #last in fisrt out 

  • class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列(priority_number, data)

1.多线程采用的是分时复用技术,即不存在真正的多线程,cpu作的事是快速地切换线程,以达到相似同步运行的目的,由于高密集运算方面多线程是没有用的,可是对于存在延迟的状况(延迟IO,网络等)多线程能够大大减小等待时间,避免没必要要的浪费。

2.原子操做:这件事情是不可再分的,如变量的赋值,不可能一个线程在赋值,到一半切到另一个线程工做去了……可是一些数据结构的操做,如栈的push什么的,并不是是原子操做,好比要通过栈顶指针上移、赋值、计数器加1等等,在其中的任何一步中断,切换到另外一线程再操做这个栈时,就会产生严重的问题,所以要使用锁来避免这样的状况。好比加锁后的push操做就能够认为是原子的了……

3.阻塞:所谓的阻塞,就是这个线程等待,一直到能够运行为止。最简单的例子就是一线程原子操做下,其它线程都是阻塞状态,这是微观的状况。对于宏观的状况,好比服务器等待用户链接,若是始终没有链接,那么这个线程就在阻塞状态。同理,最简单的input语句,在等待输入时也是阻塞状态。

4.在建立线程后,执行p.start(),这个函数是非阻塞的,即主线程会继续执行之后的指令,至关于主线程和子线程都并行地执行。因此非阻塞的函数马上返回值的~

对于资源,加锁是个重要的环节。由于python原生的list,dict等,都是not thread safe的。而Queue,是线程安全的,所以在知足使用条件下,建议使用队列。

队列适用于 “生产者-消费者”模型。双方不管数量多少,产生速度有何差别,均可以使用queue。

import time,queue,threading
def consumer(n):
   while True:
       print('consumer [%s] get task: [%s]' % (n,q.get()))
       time.sleep(1)
       q.task_done()
def producer(n):
   count = 1
   
while True:
       print('producer [%s] produced a new task :%s ' % (n,count))
       q.put(count)
       count +=1
       
q.join()
       print('task finished')
q = queue.Queue()
c1 = threading.Thread(target=consumer,args=[1,])
c2 = threading.Thread(target=consumer,args=[2,])
c3 = threading.Thread(target=consumer,args=[3,])

p1 = threading.Thread(target=producer,args=['user1',])
p2 = threading.Thread(target=producer,args=['user2',])
p3 = threading.Thread(target=producer,args=['user3',])

c1.start()
p1.start()

在上面的例子中,Producer在随机的时间内生产一个“产品”,放入队列中。Consumer发现队列中有了“产品”,就去消费它。本例中,因为Producer生产的速度快于Consumer消费的速度,因此每每Producer生产好几个“产品”后,Consumer才消费一个产品。

Queue模块实现了一个支持多producer和多consumer的FIFO队列。当共享信息须要安全的在多线程之间交换时,Queue很是有用。Queue的默认长度是无限的,可是能够设置其构造函数的maxsize参数来设定其长度。Queue的put方法在队尾插入,该方法的原型是:

put( item[, block[, timeout]])

若是可选参数block为true而且timeout为None(缺省值),线程被block,直到队列空出一个数据单元。若是timeout大于0,在timeout的时间内,仍然没有可用的数据单元,Full exception被抛出。反之,若是block参数为false(忽略timeout参数),item被当即加入到空闲数据单元中,若是没有空闲数据单元,Full exception被抛出。

Queue的get方法是从队首取数据,其参数和put方法同样。若是block参数为true且timeout为None(缺省值),线程被block,直到队列中有数据。若是timeout大于0,在timeout时间内,仍然没有可取数据,Empty exception被抛出。反之,若是block参数为false(忽略timeout参数),队列中的数据被当即取出。若是此时没有可取数据,Empty exception也会被抛出。




协程


线程和进程的操做是由程序触发系统接口,最后的执行者是系统;协程的操做则是程序员。

协程存在的意义:对于多线程应用,CPU经过切片的方式来切换线程间的执行,线程切换时须要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

协程拥有本身的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。所以:

协程能保留上一次调用时的状态(即全部局部状态的一个特定组合),每次过程重入时,就至关于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

协程的适用场景:当程序中存在大量不须要CPU的操做时(IO),适用于协程。

协程的好处:

  • 无需线程上下文切换的开销

  • 无需原子操做锁定及同步的开销

  • 方便切换控制流,简化编程模型

  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。因此很适合用于高并发处理。

缺点:

  • 没法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程须要和进程配合才能运行在多CPU上.固然咱们平常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。

  • 进行阻塞(Blocking)操做(如IO时)会阻塞掉整个程序



greenlet


from greenlet import greenlet

def test1():
   
print(12)
   
gr2.switch()
   
print(34)
   
gr2.switch()

def test2():
   
print(56)
   
gr1.switch()
   
print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()


gevent


gevent 是一个第三方库,能够轻松经过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度。

import gevent

def foo():
   
print('Running in foo')
   
gevent.sleep(0)
   
print('Explicit context switch to foo again')

def bar():
   
print('Explicit context to bar')
   
gevent.sleep(0)
   
print('Implicit context switch back to bar')

gevent.joinall([
   
gevent.spawn(foo),
   gevent.spawn(bar),
])

遇到IO操做自动切换:


from gevent import monkey; monkey.patch_all()
import gevent
import urllib2

def f(url):
   
print('GET: %s' % url)
   
resp = urllib2.urlopen(url)
   
data = resp.read()
   
print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
       
gevent.spawn(f, 'https://www.python.org/'),
       gevent.spawn(f, 'https://www.yahoo.com/'),
       gevent.spawn(f, 'https://github.com/'),
])
相关文章
相关标签/搜索