了解相关概念以前,咱们先来看一张图python
进程:git
线程:程序员
经过对比,咱们能够得出:github
Threading用于提供线程相关的操做,线程是应用程序中工做的最小单元算法
import threading import time def show(arg): time.sleep(1) print('thread'+str(arg)) for i in range(10): t = threading.Thread(target=show,args=(i,)) t.start() print('main thread stop')
上述代码建立了10个前台线程,而后控制器就交给了cpu,cpu根据指定算法进行调度,分片执行指令编程
更多方法:数组
若是是后台线程,主线程执行过程当中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均中止安全
若是是前台线程,主线程执行过程当中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序中止数据结构
自定义线程类:多线程
import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self): #定义每一个线程要运行的函数 print('running on number:%s'%self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
不过咱们有个疑问啦,在定义的这个类中,根本没涉及调用run函数,这是怎么实现的呢??
那咱们去看下源码就明白了,实际上是start方法再起做用
因此start方法在底层是调用了run方法
因为线程之间是进行随机调度,因此不可避免的存在多个线程同时修改同一条数据,从而可能会出现脏数据,因此出现了线程锁,同一时刻只容许一个线程执行操做。
未上锁的:
import threading import time num = 0 def show(arg): global num time.sleep(1) num += 1 print(num) for i in range(10): t = threading.Thread(target=show,args=(i,)) t.start() print('main thread stop')
上锁的:
import threading import time num = 0 lock = threading.RLock() def func(): lock.acquire() global num num += 1 time.sleep(1) print(num) lock.release() for i in range(10): t = threading.Thread(target=func) t.start()
咱们会发现,这两段代码输出的结果都同样,并无产生脏数据,可是细心你会发现:打印结果的过程是不一样的,未加锁的--能够说是几乎同时打印结果,而加了锁的,则是一个一个打印,这就是锁在起做用,对同一资源,在一个点上只能执行一个线程。
Python线程的事件用于主线程控制其余线程的执行,事件主要提供了三个方法:set、wait、clear
事件处理的机制:全局定义了一个‘flag’,若是‘flag’值为False,那么当程序执行event.wait方法时就会阻塞,若是‘Flag’值为True,那么event.wait方法时便再也不阻塞
import threading import time def do(event): print('start') event.wait() print('execute') event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do,args=(event_obj,)) t.start() event_obj.clear() # time.sleep(2) inp = input('input:') if inp == 'true': event_obj.set()
import threading import time event = threading.Event() def func(): print('%s wait for event...'%threading.currentThread().getName()) #等待--阻塞 event.wait() #收到事件后进入运行状态 print('%s recv event.'%threading.currentThread().getName()) t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t1.start() t2.start() time.sleep(2) #发出事件通知 print('MainThread set event.') event.set()
Semaphore是同时容许必定数量的线程更改数据,好比厕全部3个坑,那最多只容许3我的上厕所,后面的人只能等里面有人出来了才能再进去。
import threading,time def run(n): semaphore.acquire() time.sleep(1) print('run the thread:%s'%n) semaphore.release() if __name__ == '__main__': num = 0 #最多容许5个线程同时运行 semaphore = threading.BoundedSemaphore(5) for i in range(20): t = threading.Thread(target=run,args=(i,)) t.start()
使用线程等待,只有知足某条件时,才释放n个线程
import threading import time def consumer(cond): with cond: print("consumer before wait") cond.wait() print("consumer after wait") def producer(cond): with cond: print("producer before notifyAll") cond.notifyAll() print("producer after notifyAll") condition = threading.Condition() c1 = threading.Thread(name="c1", target=consumer, args=(condition,)) c2 = threading.Thread(name="c2", target=consumer, args=(condition,)) p = threading.Thread(name="p", target=producer, args=(condition,)) c1.start() time.sleep(2) c2.start() time.sleep(2) p.start() # consumer()线程要等待producer()设置了Condition以后才能继续。
import threading def run(n): con.acquire() con.wait() print('run the thread:%s'%n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run,args=(i,)) t.start() while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release()
from multiprocessing import Process import time def foo(i): print('say hi',i) if __name__ == '__main__': for i in range(10): p = Process(target=foo,args=(i,)) p.start()
咱们能够看到,进程和线程代码实现几乎是相同的,对于进程而言,模块是multiprocessing,另外,在建立进程前,加了一个__name__的验证,这是因为操做系统的缘由,反正你只要加上了就能够了。
另外,咱们已经提到过,建立进程就等同搭建了一个进程环境,消耗内存是不小的(相对线程)。
因为进程建立时,数据是各持有一份的,默认状况下进程间是没法共享数据的。
from multiprocessing import Process import time li = [] def foo(i): li.append(i) print('say hi',li) if __name__ == '__main__': for i in range(10): p = Process(target=foo,args=(i,)) p.start() print('ending',li) 结果为: say hi [1] say hi [0] say hi [2] say hi [3] say hi [4] say hi [5] say hi [6] say hi [7] ending [] say hi [8] say hi [9]
从结果里,咱们也知道,进程间数据是不共享的,列表元素没有实现累加。
不过,若是你硬要实现共享的话,办法仍是有的,请往下看:
方法一:引用数组Array
from multiprocessing import Process,Array def Foo(temp,i): temp[i] = 100+i for item in temp: print(i,'----->',item) if __name__ == '__main__': temp = Array('i', [11, 22, 33, 44]) for i in range(2): p = Process(target=Foo,args=(temp,i,)) p.start()
方法二:manage.dict()
from multiprocessing import Process,Manager def Foo(dic,i): dic[i] = 100 + i print(dic.values()) if __name__ == '__main__': manage = Manager() dic = manage.dict() for i in range(2): p = Process(target=Foo,args=(dic,i,)) p.start() p.join()
方法三:multiprocessing.Queue
from multiprocessing import Process, Queue def f(i,q): print(i,q.get()) if __name__ == '__main__': q = Queue() q.put("h1") q.put("h2") q.put("h3") for i in range(10): p = Process(target=f, args=(i,q,)) p.start()
当建立进程时(非使用时),共享数据会被拿到子进程中,当进程中执行完毕后,再赋值给原值,另外涉及数据共享就一定存在同一份数据被多个进程同时修改,因此在multiprocessing模块里也也提供了RLock类。
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,若是进程池序列中没有可以使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,因为这个缘由,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。若是callback被指定,那么callback能够接收一个参数而后被调用,当结果准备好回调时 会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被当即完成,不然处理结果的线程会被阻塞。
close() : 阻止更多的任务提交到pool,待任务完成后,工做进程会退出。
terminate() : 无论任务是否完成,当即中止工做进程。在对pool对象进程垃圾回收的时候,会当即调用terminate()。
join() : wait工做线程的退出,在调用join()前,必须调用close() or terminate()。这样是由于被终止的进程须要被父进程调用wait(join等价与wait),不然进程会成为僵尸进程
from multiprocessing import Process,Pool import time def Foo(i): time.sleep(2) return i+100 def Bar(arg): print(arg) if __name__ == '__main__': pool = Pool(5) for i in range(10): pool.apply_async(func=Foo,args=(i,),callback=Bar) print('end') pool.close() pool.join() #进程池中进程执行完毕后再关闭 print('really end')
适用于多线程编程的先进先出数据结构,能够用来安全的传递多线程信息。
import threading import queue que = queue.Queue(10) def s(i): que.put(i) def x(i): g = que.get(i) print('get',g) for i in range(1,13): t = threading.Thread(target=s,args=(i,)) t.start() for i in range(1,11): t = threading.Thread(target=x,args=(i,)) t.start() print('size',que.qsize()) 结果为: get 1 get 2 get 3 get 4 get 5 get 6 get 7 get 8 get 9 get 10 size
线程和进程的操做是由程序触发系统接口,最后的执行者是系统;而协程的操做则是程序员
协程存在的意义:对于多线程应用,cpu经过切片来切换线程间的执行,线程切换时须要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码执行顺序。
协程的适用场景:当程序中存在大量不须要cpu的操做时(IO),适用于协程。例如:爬虫
from greenlet import greenlet def test1(): print 12 gr2.switch() print 34 gr2.switch() def test2(): print 56 gr1.switch() print 78 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
import gevent def foo(): print('Running in foo') gevent.sleep(0) print('Explicit context switch to foo again') def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
1 from gevent import monkey; monkey.patch_all() 2 import gevent 3 import urllib2 4 5 def f(url): 6 print('GET: %s' % url) 7 resp = urllib2.urlopen(url) 8 data = resp.read() 9 print('%d bytes received from %s.' % (len(data), url)) 10 11 gevent.joinall([ 12 gevent.spawn(f, 'https://www.python.org/'), 13 gevent.spawn(f, 'https://www.yahoo.com/'), 14 gevent.spawn(f, 'https://github.com/'), 15 ])
欢迎你们对个人博客内容提出质疑和提问!谢谢
笔者:拍省先生