一个应用程序,能够多进程、也能够多线程.
一个python脚本,默认是单进程,单线程的。
I/O操做(音频、视频、显卡操做),不占用CPU,因此:java
python中有个全局解释器锁,叫GIL(全称Global Interpreter Lock
),致使一个进程只能由一个线程让CPU去调度,但在java c#可使用多个线程。
多线程,多进程的目的,是为了提升并发,I/O密集型用多线程,计算密集型,用多进程。python
咱们来看看怎么建立多线程:git
def f1(args): print(args) import threading t=threading.Thread(target=f1,args=(123,)) #建立一个线程,target表示线程执行的目标,args表示参数 t.start() #并不表明当前当即被执行,系统来决定 f1(111)
以上代码结果print顺序会随机!程序员
更多的方法:github
import time def f1(args): time.sleep(5) print(args) import threading t1=threading.Thread(target=f1,args=(123,)) t1.setDaemon(True) #表示主线程不等待子线程 t.start() #并不表明当前被当即被执行,系统来决定 f1(111) t.join(2) #表示主程序执行到此,等待...直到子线程执行完毕 print(222222) print(333333)
下面看下run方法:c#
class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self):#定义每一个线程要运行的函数 print("running on number:%s" %self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
线程有两种调用方式:数组
简单调用方式:自定义一个继承threading.thread的子类,经过自定义类的对象调用网络
import threading def f1(arg): print(arg) t = threading.Thread(target=f1,args=(123,)) t.run()
自定义类的调用方式多线程
import threading def f2(arg): print(arg) class MyThread(threading.Thread): def __init__(self,func,args): self.func=func self.args=args super(MyThread,self).__init__() def run(self): self.func(self.args) obj=MyThread(f2,123) obj.run()
因为线程之间进行随机调度,而且每一个线程可能只执行n条操做后,当多个线程同时修改同一条数据时,可能会出现脏数据:同一时刻只能容许指定的线程数执行操做.
python中的线程锁有Lock
, RLock
两种,其中RLock用的较多,由于支持多层嵌套的方式,Lock用的较少,不支持多层嵌套锁.并发
def func(l): global NUM #上锁 l.acquire() NUM-=1 time.sleep(2) print(NUM) #开锁 l.release() lock=threading.RLock() #放行几个线程出去执行 for i in range(30): t=threading.Thread(target=func,args=(lock,)) t.start()
若是不使用线程锁,上面程序会有30个线程同时执行,结果为30个-20
semaphore,同时容许指定数量的线程更改数据,好比厕全部3个坑,那最多只容许3我的上厕所,后面的人只能等里面有人出来了才能再进去。
import threading,time NUM=10 def func(l): global NUM #上锁 l.acquire() NUM-=1 time.sleep(2) print(NUM) #开锁 l.release() lock=threading.BoundedSemaphore(3) #放行几个线程出去执行 for i in range(30): t=threading.Thread(target=func,args=(lock,)) t.start()
从上面两个代码对比,咱们会发现,semaphore若是设置为1时,也可实现信号锁的功能.
python线程中的event主要用于让主线程控制其子线程的执行方式(有点相似交警控制红绿灯),event主要提供三个方法:set wait clear
事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。
import threading def func(i,e): print(i) e.wait() #检测是什么灯 print(i+100) event=threading.Event() for i in range(10): t = threading.Thread(target=func,args=(i,event)) t.start() event.clear() #默认是红灯 inp = input('>>>') if inp == '1': event.set() #设置成绿灯
使线程等待,当条件成立时,释放线程执行.
import threading def func(i,con): print(i) con.acquire() #配合,固定格式,线程hold住 con.wait() print(i+100) con.release() c=threading.Condition() for i in range(10): t = threading.Thread(target=func,args=(i,c,)) t.start() while True: inp = input('>>>|') if inp == 'q': break c.acquire() #如下都是固定格式 c.notify(int(inp)) c.release()
例子1中,写到函数func中的:c.acquire(),c.notify(args),c.release()
是固定格式.
例子2:wait_for
import threading def condition(): ret=False r = input('>>|') if r == 'true': ret= True else: ret=False return ret def func(i,con): print(i) con.acquire() #配合,固定格式,线程hold住 con.wait_for(condition) print(i+100) con.release() c=threading.Condition() for i in range(10): t = threading.Thread(target=func,args=(i,c,)) t.start()
其中例子2中,con.acquire(),con.wait_for(condition)
是固定格式配合使用,拦截线程,con.release()释放线程.
定时器,延迟多长时间(单位:秒)执行
import threading def hello(): print('hello,world!!') t=threading.Timer(1,hello) t.start()
python的线程池有两种实现方式,咱们先来看一个比较简单的实现方式.
实现思路:
线程类
线程类
import threading,time,queue class ThreadPool: def __init__(self,maxsize): self.maxsize=maxsize self._q=queue.Queue(maxsize) for i in range(maxsize): self._q.put(threading.Thread) def get_thread(self): return self._q.get() def add_thread(self): self._q.put(threading.Thread) pool=ThreadPool(5) def task(arg,p): print(arg) time.sleep(1) p.add_thread() for i in range(100): t = pool.get_thread() #线程池中没有线程为阻塞状态 obj=t(target=task,args=(i,pool)) obj.start()
此方式的缺点:没有将线程重复利用,要直到建立一个线程的耗时多是一个线程执行的好几倍,因此有了第二种方式.
第二种方式是也是使用队列,但队列中的元素为为一个个(函数名,函数参数,)
的元组,建立一个线程组成的列表,线程轮流去队列中取到元组,分解后执行函数,而后取下一个函数.
import queue import threading import contextlib import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num, max_task_num = None): if max_task_num: self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数一、任务函数执行状态;二、任务函数返回值(默认为None,即:不执行回调函数) :return: 若是线程池已经终止,则返回True不然None """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 建立一个线程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循环去获取任务函数并执行任务函数 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 执行完全部的任务后,全部线程中止 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 不管是否还有任务,终止线程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用于记录线程中正在等待的线程数 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) # How to use pool = ThreadPool(5) def callback(status, result): # status, execute action status # result, execute action return value pass def action(i): print(i) for i in range(30): ret = pool.run(action, (i,), callback) time.sleep(5) print(len(pool.generate_list), len(pool.free_list)) print(len(pool.generate_list), len(pool.free_list)) pool.close() pool.terminate()
进程与线程的使用方式基本雷同.好比start,daemon(用法略不一样,意义相同),join,各类锁等等.
默认进程之间是没法进行共享的,看例子:
from multiprocessing import Process li = [] def foo(i): li.append(i) print('say hi',li) for i in range(10): p = Process(target=foo,args=(i,)) p.start() print('ending',li)
out:
say hi [0] say hi [1] say hi [2] say hi [3] say hi [4] say hi [5] say hi [6] say hi [7] ending [] say hi [8] say hi [9]
那么如何让进程之间可以共享呢?
基本可分为三种方式:
queues方式:
from multiprocessing import queues from multiprocessing import Process import multiprocessing def foo(i,arg): arg.put(i) print('say hi',i,arg.qsize()) if __name__ == '__main__': li=queues.Queue(20,ctx=multiprocessing) for i in range(10): p=Process(target=foo,args=(i,li,)) p.start()
Array方式,数组有个特性,必须初始化的时候指定数组的长度和元素类型:
from multiprocessing import Process from multiprocessing import Array def foo(i,arg): arg[i]=i+100 for item in arg: print(item) print('======') if __name__ == '__main__': li=Array('i',10) for i in range(10): p=Process(target=foo,args=(i,li,)) p.start()
Array的类型对应表:
'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double
manager.dict 可实现数据共享
进程和进程之间若是想通信,须要链接p.join()
from multiprocessing import Process from multiprocessing import Manager def foo(i,arg): arg[i]=i+100 print(arg.values()) if __name__ == '__main__': obj=Manager() li=obj.dict() for i in range(10): p=Process(target=foo,args=(i,li,)) p.start() p.join()
当建立进程时(非使用时),共享数据会被拿到子进程中,当进程中执行完毕后,再赋值给原值。
进程锁例子:
from multiprocessing import Process, Array, RLock def Foo(lock,temp,i): """ 将第0个数加100 """ lock.acquire() temp[0] = 100+i for item in temp: print(i,'----->',item) lock.release() lock = RLock() temp = Array('i', [11, 22, 33, 44]) for i in range(20): p = Process(target=Foo,args=(lock,temp,i,)) p.start()
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,若是进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池有两种方式:
from multiprocessing import Pool import time def f1(arg): time.sleep(1) print(arg) if __name__ == '__main__': pool=Pool(5) for i in range(30): pool.apply(func=f1,args=(i,))
异步操做:
from multiprocessing import Pool import time def f1(arg): time.sleep(1) print(arg) if __name__ == '__main__': pool=Pool(5) for i in range(30): pool.apply_async(func=f1,args=(i,)) pool.close() #全部任务执行完毕 #1 time.sleep(1) #pool.terminate() #当即终止,不论是否有任务正在执行或者待执行 #2 pool.join()
其中 #1 #2 二选一操做
pool.terminate 当即终止,不论是否有任务正在执行或者等待执行
pool.close 全部任务执行完毕后关闭
原理:利用一个线程,分解一个线程成为多个微线程==>程序级别作的,与操做系统没有关系.
与线程进程的区别:线程和进程的操做是由程序触发系统接口,最后的执行者是系统;协程的操做则是程序员。
协程的适用场景:涉及到http的I/O请求,协程是高性能的代名词.因此,网络爬虫不少是使用协程方式.
协程存在的意义:对于多线程应用,CPU经过切片的方式来切换线程间的执行,线程切换时须要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的执行方式:打个比方,1个很牛逼的足球队员,前面一排并列的足球,从第一个足球踢出去,而后提出第二个第三个,等足球弹回起始位置时,足球员对此足球接住后再次剃出或者停住球,这个足球员就是协程
使用前须要安装gevent第三方模块
pip3 install gevent
看下代码吧,自动切换,关键词gevent.spawn()
:
from gevent import monkey;monkey.patch_all() import gevent import requests def f(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])