异常处理与网络编程

第1章 异常处理

1.1 异常

程序运行时检测到的错误被称为异常,程序的运行也会被终止python

错误分为两种,即语法错误和逻辑错误mysql

实例:linux

>>> 10 * (1/0)
Traceback (most recent call last):
  File "<stdin>", line 1, in ?
ZeroDivisionError: division by zero
>>> '2' + 2
Traceback (most recent call last):
  File "<stdin>", line 1, in ?
TypeError: Can't convert 'int' object to str implicitly

异常以不一样的类型出现,这些类型都做为信息的一部分打印出来。错误信息的前面部分显示了异常发生的上下文,并以调用栈的形式显示具体信息nginx

1.2 常见异常

AttributeError          试图访问一个对象没有的树形,好比foo.x,可是foo没有属性x
IOError                 输入/输出异常;基本上是没法打开文件
ImportError             没法引入模块或包;基本上是路径问题或名称错误
IndentationError        语法错误(的子类);代码没有正确对齐
IndexError              下标索引超出序列边界,好比当x只有三个元素,却试图访问x[5]
KeyError                试图访问字典里不存在的键
KeyboardInterrupt       Ctrl+C被按下
NameError               使用一个还未被赋予对象的变量
SyntaxError             Python代码非法,代码不能编译(我的认为这是语法错误,写错了)
TypeError               传入对象类型与要求的不符合
UnboundLocalError       试图访问一个还未被设置的局部变量,基本上是因为另有一个同名的全局变量,致使你觉得正在访问它
ValueError              传入一个调用者不指望的值,即便值的类型是正确的

1.3 异常处理

第一种状况:若是错误发生的条件是可预知的,咱们须要用if进行处理:在错误发生以前进行预防git

AGE = 25
while True:
    age = input('>>>:')
    if age.isdigit():
        age = int(age)
        if age == AGE:
            print('You got it')
            break

第二种状况:若是错误发生的条件是不可预知的,则须要用到try...except:在错误发生以后进行处理程序员

#基本语法为
'''
try:
    被检测的代码块
except 异常类型:
    try中一旦检测到异常,就执行这个位置的逻辑
'''

#实例:
try:
    x = int(input("Please enter a number: "))
except ValueError:
    print("That was no valid number.  Try again")

try语句按照以下方式工做;web

首先,执行try子句(在关键字try和关键字except之间的语句)sql

若是没有异常发生,忽略except子句,try子句执行后结束。shell

若是在执行try子句的过程当中发生了异常,那么try子句余下的部分将被忽略。若是异常的类型和 except 以后的名称相符,那么对应的except子句将被执行。最后执行 try 语句以后的代码。编程

若是一个异常没有与任何的except匹配,那么这个异常将会传递给上层的try中

一个 try 语句可能包含多个except子句,分别来处理不一样的特定的异常。最多只有一个分支会被执行

处理程序将只针对对应的try子句中的异常进行处理,而不是其余的 try 的处理程序中的异常

一个except子句能够同时处理多个异常,这些异常将被放在一个括号里成为一个元组,例如:

except (RuntimeError, TypeError, NameError):
        pass

万能异常,可以匹配全部的异常:

s1 = 'hello'
try:
    int(s1)
except Exception as e:              #把异常赋值给e
    print(e)

try except 语句还有一个可选的else子句,若是使用这个子句,那么必须放在全部的except子句以后。这个子句将在try子句没有发生任何异常的时候执行。例如:

for arg in sys.argv[1:]:
    try:
        f = open(arg, 'r')
    except IOError:
        print('cannot open', arg)
    else:
        print(arg, 'has', len(f.readlines()), 'lines')
        f.close()

try except 语句还有一个可选的finally子句,若是使用这个子句,那么必须放在全部的except子句以后。不管try子句有没有发生异常它都会执行

1.4 抛出异常

Python 使用 raise 语句抛出一个指定的异常。例如:

try:
    raise TypeError('类型错误')
except Exception as e:
    print(e)

1.5 自定义异常

经过建立一个新的exception类来拥有本身的异常。异常应该继承自 Exception 类,或者直接继承,或者间接继承,例如:

class EgonException(BaseException):
    def __init__(self,msg):
        self.msg=msg
    def __str__(self):
        return self.msg

try:
    raise EgonException('类型错误')
except EgonException as e:
    print(e)

1.6 定义清理行为

try 语句还有另一个可选的子句,它定义了不管在任何状况下都会执行的清理行为。 例如:

s1 = 'hello'

try:
    int(s1)
except IndexError as e:
    print(e)
except KeyError as e:
    print(e)
except ValueError as e:
    print(e)
#except Exception as e:
#    print(e)
else:
    print('try内代码块没有异常则执行我')
finally:

    print('不管异常与否,都会执行该模块,一般是进行清理工做')

 

第2章 并发编程

2.1 什么是进程

狭义定义:进程是正在运行的程序的实例

广义定义:进程是一个具备必定独立功能的程序关于某个数据集合的一次运行活动。它是操做系统动态执行的基本单元,在传统的操做系统中,进程既是基本的分配单元,也是基本的执行单元

2.2 进程与程序的区别

程序是永存的;进程是暂时的,是程序在数据集上的一次执行,有建立有撤销,存在是暂时的;

程序是静态的观念,进程是动态的观念;

进程具备并发性,而程序没有;

进程是竞争计算机资源的基本单位,程序不是。

进程和程序不是一一对应的: 一个程序可对应多个进程即多个进程可执行同一程序; 一个进程能够执行一个或几个程序

2.3 并发与并行

并发与并行是两个既类似而又不相同的概念:并发性,又称共行性,是指能处理多个同时性活动的能力;并行是指同时发生的两个并发事件,具备并发的含义,而并发则不必定并行,也亦是说并发事件之间不必定要同一时刻发生。

打个比方:

你吃饭吃到一半,电话来了,你一直到吃完了之后才去接,这就说明你不支持并发也不支持并行。

你吃饭吃到一半,电话来了,你停了下来接了电话,接完后继续吃饭,这说明你支持并发。

你吃饭吃到一半,电话来了,你一边打电话一边吃饭,这说明你支持并行。

并发的实质是一个物理CPU(也能够多个物理CPU) 在若干道程序之间多路复用,并发性是对有限物理资源强制行使多用户共享以提升效率。

并行性指两个或两个以上事件或活动在同一时刻发生。在多道程序环境下,并行性使多个程序同一时刻可在不一样CPU上同时执行。

 

并发,是在同一个cpu上同时(不是真正的同时,而是看来是同时,由于cpu要在多个程序间切换)运行多个程序

 

并行,是每一个cpu运行一个程序

2.4 同步异步、阻塞非阻塞

同步:在发出一个功能调用时,在没有获得结果以前,该调用就不会返回

异步:与同步相反。当一个异步功能调用发出后,调用者不能马上获得结果。当该异步功能完成后,经过状态、通知或回调来通知调用者

阻塞:阻塞调用是指调用结果返回以前,当前线程会被挂起。调用线程只有在获得结果以后才会返回

非阻塞:与阻塞相反。指在不能马上获得结果以前也会马上返回,同时该函数不会阻塞当前线

小结:

1. 同步与异步针对的是函数/任务的调用方式:同步就是当一个进程发起一个函数(任务)调用的时候,一直等到函数(任务)完成,而进程继续处于激活状态。而异步状况下是当一个进程发起一个函数(任务)调用的时候,不会等函数返回,而是继续往下执行,当函数返回的时候经过状态、通知、事件等方式通知进程任务完成。
2. 阻塞与非阻塞针对的是进程或线程:阻塞是当请求不能知足的时候就将进程挂起,而非阻塞则不会阻塞当前进程

2.5 进程的建立

1)        对于通用系统(跑不少应用程序),须要有系统运行过程当中建立或撤销进程的能力,主要分为4中形式建立新的进程:

系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台而且只在须要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)

一个进程在运行过程当中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)

用户的交互式请求,而建立一个新进程(如用户双击暴风影音)

一个批处理做业的初始化(只在大型机的批处理系统中应用)

2)        不管哪种,新进程的建立都是由一个已经存在的进程执行了一个用于建立进程的系统调用而建立的:

在UNIX中该系统调用是:fork,fork会建立一个与父进程如出一辙的副本,两者有相同的存储映像、一样的环境字符串和一样的打开文件(在shell解释器进程中,执行一个命令就会建立一个子进程)

在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的建立,也负责把正确的程序装入新进程。

3)        关于建立的子进程,UNIX和windows的异同:

相同的是:进程建立后,父进程和子进程有各自不一样的地址空间(多道技术要求物理层面实现进程之间内存的隔离),任何一个进程的在其地址空间中的修改都不会影响到另一个进程。

不一样的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,提示:子进程和父进程是能够有只读的共享内存区的。可是对于windows系统来讲,从一开始父进程与子进程的地址空间就是不一样的。

2.6 进程的终止

正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)

出错退出(自愿,python a.py中a.py不存在)

严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,能够捕捉异常,try...except...)

被其余进程杀死(非自愿,如kill -9)

2.7 进程的层次结构

不管UNIX仍是windows,进程只有一个父进程,不一样的是:

在UNIX中全部的进程,都是以init进程为根,组成树形结构。父子进程共同组成一个进程组,这样,当从键盘发出一个信号时,该信号被送给当前与键盘相关的进程组中的全部成员。

在windows中,没有进程层次的概念,全部的进程都是地位相同的,惟一相似于进程层次的暗示,是在建立进程时,父进程获得一个特别的令牌(称为句柄),该句柄能够用来控制子进程,可是父进程有权把该句柄传给其余子进程,这样就没有层次了。

2.8 进程的状态

在两种状况下会致使一个进程在逻辑上不能运行:

进程挂起是自身缘由,遇到I/O阻塞,便要让出CPU让其余进程去执行,这样保证CPU一直在工做

与进程无关,是操做系统层面,可能会由于一个进程占用时间过多,或者优先级等缘由,而调用其余的进程去使用CPU。

于是一个程序有三种状态:

2.9 进程并发的实现

进程并发的实如今于,硬件中断一个正在运行的进程,把此时进程运行的全部状态保存下来,为此,操做系统维护一张表格,即进程表(process table),每一个进程占用一个进程表项(这些表项也称为进程控制块)

 

该表存放了进程状态的重要信息:程序计数器、堆栈指针、内存分配情况、全部打开文件的状态、账号和调度信息,以及其余在进程由运行态转为就绪态或阻塞态时,必须保存的信息,从而保证该进程在再次启动时,就像从未被中断过同样。

 

第3章 并发编程之多进程

3.1 multiprocessing模块介绍

multiprocessing是一个和threading模块相似,提供API,生成进程的模块。multiprocessing包提供本地和远程并发,经过使用子进程而不是线程有效地转移全局解释器锁。所以,multiprocessing模块容许程序员充分利用给定机器上的多个处理器。它在Unix和Windows上均可以运行。

3.2 Process类

在multiprocessing中,经过建立Process对象,而后调用其start()方法来生成进程。Process遵循threading.Thread的API

语法:

Process([group [, target [, name [, args [, kwargs]]]]])            #由该类实例化获得的对象,表示一个子进程中的任务(还没有启动)

参数:

group:未使用,值始终未None
target:调用对象,即子进程要执行的任务
args:调用对象的位置参数元组
kwargs:调用对象的字典
name:子进程的名称

方法:

p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法
p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁
p.is_alive():若是p仍然运行,返回True
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

属性:

p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置
p.name:进程的名称
p.pid:进程的pid
p.exitcode:进程在运行时为None、若是为–N,表示被信号N结束
p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络链接的底层进程间通讯提供安全性,这类链接只有在具备相同的身份验证键时才能成功

3.3 Process类的使用

建立并开启子进程的第一种方式:

from multiprocessing import Process

def foo(name):
    print('My name is %s' %name)

p1 = Process(target=foo,args=('Yim',))      #必须加,号
p2 = Process(target=foo,kwargs={'name':'Jim'})

if __name__ == '__main__':      #在windows中Process()必须放到# if __name__ == '__main__'下
    '''
    因为Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。
    若是在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。
    这是隐藏对Process()内部调用的源,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用
    '''
    p1.start()
    p2.start()

 

#执行结果:
My name is Yim
My name is Jim

建立并开启子进程的第二种方式:

from multiprocessing import Process
class foo(Process):     #继承Process类
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        print('My name is %s' %self.name)

if __name__ == '__main__':
    p1 = foo('Yim')
    p2 = foo('Jim')
    p1.start()
    p2.start()

#执行结果:
My name is Yim
My name is Jim

3.4 并发的套接字通讯

服务器端代码:

#!/usr/bin/python
# -*- coding:utf-8 -*-
from socket import *
from multiprocessing import Process

def talk(conn,client_addr):
    while True:
        try:
            print('客户端IP:%s,端口:%s'%(client_addr[0],client_addr[1]))
            data = conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except Exception:
            break
    conn.close()

def server():
    s = socket(AF_INET,SOCK_STREAM)
    s.bind(('127.0.0.1',8000))
    s.listen(5)
    while True:
        conn, client_addr = s.accept()
        p = Process(target=talk,args=(conn,client_addr,))
        p.start()
    s.close()

if __name__ == '__main__':
    server()

客户端代码:

#!/usr/bin/python
# -*- coding:utf-8 -*-
from socket import *

c = socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8000))

while True:
    msg = input('>>>:').strip()
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data = c.recv(1024)
    print(data.decode('utf-8'))
c.close()

3.5 进程之间的内存空间是隔离的

修改子进程内的变量,不会影响父进程变量

#!/usr/bin/python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time,random

n = 100
def task():
    global n
    n = 1

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    print(n)

#执行结果
100

3.6 Join方法

join([timeout])

若是可选参数timeout为None(默认值),则该方法将阻塞,直到调用join()方法的进程终止。若是超时是正数,则它最多阻止超时秒。

一个过程能够链接屡次。

进程不能本身加入,由于这将致使死锁。尝试在进程启动以前加入进程是一个错误。

#!/usr/bin/python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time,random

def foo(name):
    print('My name is %s' %name)
    time.sleep(random.randint(5,10))

if __name__ == '__main__':
    p1 = Process(target=foo,args=('Yim',))
    p2 = Process(target=foo,args=('Jim',))
    p1.start()
    p1.join()       #等待p1执行完毕,这样就是串行执行
    p2.start()

# 简单写法
# if __name__ == '__main__':
#     p_l = [p1, p2, p3]
#     for p in p_l:
#         p.start()
#     for p in p_l:
#         p.join()

#执行结果:
My name is Yim
My name is Jim      #隔一段时间才输出

3.7 其余方法和属性

terminate和is_alive

#!/usr/bin/python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time,random

def foo(name):
    print('My name is %s' %name)

if __name__ == '__main__':
    p1 = Process(target=foo,args=('Yim',))
    p1.start()
    p1.terminate()      #关闭进程。不会当即关闭,因此is_alive马上查看的结果可能仍是存活的
    print(p1.is_alive())
    time.sleep(random.randint(2,3))
    print(p1.is_alive())

#执行结果:
True
False

pid属性

from multiprocessing import Process,Pool
import time,random
import os

def task():
    print('%s is running parent[%s]' %(os.getpid(),os.getppid()))

if __name__ == '__main__':
    p=Process(target=task)
    p.start()
    print(p.pid) #p这个进程的id
    print('',os.getpid()) #查看aaa.py的id号码
    print(os.getppid()) #pycharm的进程id

#执行结果:
1681217560
16608
16812 is running parent[17560]

3.8 守护进程

主进程建立守护进程

守护进程会在主进程代码执行结束后就终止

守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children

from multiprocessing import Process
import time,random

def foo(name):
    print('My name is %s' %name)

if __name__ == '__main__':
    p1 = Process(target=foo,args=('Yim',))
    p1.daemon = True    #必定要在p1.start()前设置,设置p1为守护进程,禁止p1建立子进程,而且父进程代码执行结束,p1即终止运行
    p1.start()

3.9 进程池

多进程是实现并发的手段之一,须要注意的问题是:

须要并发执行的任务一般会远大于CPU核数

一个操做系统不可能无限开启进程,一般有几个核就开几个进程

进程开启过多,效率反而会降低

咱们能够经过维护一个进程池来控制进程数量,语法以下:

Pool([numprocess  [,initializer [, initargs]]])             #建立进程池

参数:

numprocess:要建立的进程数。若是省略,将默认使用os.cpu_count()的值
initializer:每一个工做进程启动时要执行的可调用对象,默认为None
initargs:要传给initializer的参数组

主要方法:

p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。  
p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用

其余方法:

方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具备如下方法:
obj.get():返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起一场。若是远程操做中引起了异常,它将在调用此方法时再次被引起。
obj.ready():若是调用完成,返回True
obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常
obj.wait([timeout]):等待结果变为可用。
obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数

同步调用:

import os,time,random
from multiprocessing import Pool
# Pool进程的用法
# p.apply_async()       #等同于concurrent.futures里的p.submit(),异步
# p.apply()             #等同于concurrent.futures里的p.submit().result(),同步

def task(i):
    print('%s is running %s' %(os.getpid(),i))
    time.sleep(random.randint(1,3))
    return i**2

if __name__ == '__main__':
    pool = Pool(3)      #进程池中从无到有建立三个进程,之后一直是这三个进程在执行任务
    futrues = []
    for i in range(10):
        futrue = pool.apply(task,args=(i,))     #同步调用,直到本次任务执行完毕拿到futrue,等待任务task执行的过程当中可能有阻塞也可能没有阻塞,但无论该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程当中如果任务发生了阻塞就会被夺走cpu的执行权限
        futrues.append(futrue)
    print(futrues)

异步调用:

import os,time,random
from multiprocessing import Pool

def task(i):
    print('%s is running %s' %(os.getpid(),i))
    time.sleep(random.randint(1,3))
    return i**2

if __name__ == '__main__':
    pool = Pool()
    futrues = []
    for i in range(10):
        futrue = pool.apply_async(task,args=(i,))       #同步运行,阻塞、直到本次任务执行完毕拿到futrue
        futrues.append(futrue)
    # 异步apply_async用法:若是使用异步提交的任务,主进程须要使用jion,等待进程池内任务都处理完,而后能够用get收集结果,不然,主进程结束,进程池可能还没来得及执行,也就跟着一块儿结束了
    pool.close()        #关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
    pool.join()     #调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool,join函数等待全部子进程结束

    for future in futrues:
        print(futrue.get())     #使用get来获取apply_aync的结果,若是是apply,则没有get方法,由于apply是同步执行,马上获取结果,也根本无需get

3.10 concurrent.futures实现进程池

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,Executor
import time,os,random

def task(i):
    print('%s is running %s' %(os.getpid(),i))
    time.sleep(random.randint(1,3))
    return i**2

if __name__ == '__main__':
    # print(os.cpu_count())
    pool=ProcessPoolExecutor()

    objs=[]
    for i in range(10):
        obj=pool.submit(task,i) #异步的方式提交任务
        objs.append(obj)

        # res=pool.submit(task,i).result() #同步方式提交任务
        # print(res)
    pool.shutdown(wait=True) #shutdown表明不容许再往进程池里提交任务,wait=True就是join的意思:等待任务都执行完毕
    print('')

    for obj in objs:
        print(obj.result())

3.11 进程同步(互斥锁)

若是多个进程对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,须要加锁处理

#并发运行,效率高,但竞争同一打印终端,带来了打印错乱
from multiprocessing import Process
import os,time

def work():
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is running' %os.getpid())
if __name__ == '__main__':
    for i in range(3):
        p = Process(target=work)
        p.start()

#执行结果:
15608 is running
20112 is running
24332 is running
15608 is running
20112 is running
24332 is running

加锁处理:

#由并发变成了串行,牺牲了运行效率,但避免了竞争
from multiprocessing import Process,Lock
import os,time

def work(lock):
    lock.acquire()
    print('%s is running' % os.getpid())
    time.sleep(2)
    print('%s is running' % os.getpid())
    lock.release()
if __name__ == '__main__':
    lock = Lock()
    for i in range(3):
        p = Process(target=work,args=(lock,))
        p.start()

#执行结果:
18996 is running
18996 is running
24524 is running
24524 is running
10992 is running
10992 is running

总结:

加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改。速度是慢了,但牺牲了速度却保证了数据安全。

虽然能够用文件共享数据实现进程间通讯,但问题是:

  • 效率低(共享数据基于文件,而文件是硬盘上的数据)
  • 须要本身加锁处理

所以咱们最好找寻一种解决方案可以兼顾:一、效率高(多个进程共享一块内存的数据)二、帮咱们处理好锁问题。这就是mutiprocessing模块为咱们提供的基于消息的IPC通讯机制:队列和管道。

  • 队列和管道都是将数据存放于内存中
  • 队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来

咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性。

3.12 队列(推荐使用)

进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

建立队列的类(底层就是以管道和锁定的方式实现):

Queue([maxsize])        #建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。
maxsize:队列中容许最大项数,省略则无大小限制

Queue模块中的常见方法:

q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是blocked为False,但该Queue已满,会当即抛出Queue.Full异常。

q.get方法能够从队列读取而且删除一个元素。一样,get方法有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是blocked为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值,不然,若是队列为空,则当即抛出Queue.Empty异常.

q.get_nowait():同q.get(False)

q.put_nowait():同q.put(False)

q.empty():调用此方法时q为空则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中又加入了项目。

q.full():调用此方法时q已满则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中的项目被取走。

q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()同样

q.cancel_join_thread():不会在进程退出时自动链接后台线程。能够防止join_thread()方法阻塞

q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,若是某个使用者正在被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误

q.join_thread():链接队列的后台线程。此方法用于在调用q.close()方法以后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用q.cancel_join_thread方法能够禁止这种行为 

应用:

#!/usr/bin/python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue

q = Queue(3)

q.put('1')
q.put('2')
q.put('3')

print(q.get())
print(q.get())
print(q.get())

#执行结果:
1
2
3

3.13 生产者消费者模型

在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题,该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。

为何要使用生产者和消费者模式?在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,若是生产者处理数据很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。

什么是生产者消费者模式?生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

#!/usr/bin/python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res = q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print('%s吃了%s' %(os.getpid(),res))
def producer(q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res = '包子%s' %i
        q.put(res)
        print('%s生产了%s' %(os.getpid(),res))
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,))     #生产
    c1 = Process(target=consumer,args=(q,))     #消费
    p1.start()
    c1.start()
    p1.join()
    q.put(None)     #发送结束信号,消费者在接收到结束信号后就能够break出死循环
print('')

#执行结果:
16896生产了包子0
6540吃了包子0
16896生产了包子1
主
6540吃了包子1

3.14 共享数据

进程间通讯应该尽可能避免使用共享数据的方式

#!/usr/bin/python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Manager,Lock

def work(d,lock):
    with lock:        #不加锁而操做共享的数据,确定会出现数据错乱
        d['count'] -= 1
if __name__ == '__main__':
    lock = Lock()
    with Manager() as m:
        dic = m.dict({'count':100})
        p_l = []
        for i in range(10):
            p = Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

#执行结果:
{'count': 0}

 

第4章 并发编程之多线程

4.1 什么是线程

线程是程序中一个单一的顺序控制流程。进程内一个相对独立的、可调度的执行单元,是系统独立调度和分派CPU的基本单位指运行中的程序的调度单位。在单个程序中同时运行多个线程完成不一样的工做,称为多线程。

4.2 线程与进程的区别

1)        一个程序至少有一个进程,一个进程至少有一个线程.

2)        线程的划分尺度小于进程,使得多线程程序的并发性高。

3)        另外,进程在执行过程当中拥有独立的内存单元,而多个线程共享内存,从而极大地提升了程序的运行效率。

4)        线程在执行过程当中与进程仍是有区别的。每一个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。可是线程不可以独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。

5)        从逻辑角度来看,多线程的意义在于一个应用程序中,有多个执行部分能够同时执行。但操做系统并无将多个线程看作多个独立的应用,来实现进程的调度和管理以及资源分配。这就是进程和线程的重要区别。

6)        优缺点:线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反。同时,线程适合于在SMP机器上运行,而进程则能够跨机器迁移。

4.3 开线程的两种方式

方式一:

from threading import Thread
import time
def foo(name):
    time.sleep(2)
    print('My name is %s' %name)
if __name__ == '__main__':
    t =  Thread(target=foo,args=('Yim',))
    t.start()

#执行结果:
My name is Yim

方式二:

from threading import Thread
import time
class foo(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        print('My name is %s'%self.name)
if __name__ == '__main__':
    t1 = foo('Yim')
    t2 = foo('Jim')
    t1.start()
    t2.start()

#执行结果
My name is Jim
My name is Yim

4.4 同一进程内的多个线程数据是共享的

#!/usr/bin/python
# -*- coding:utf-8 -*-
from threading import Thread
n = 100

def task():
    global n
    n = 1
if __name__ == '__main__':
    t = Thread(target=task)
    t.start()
    t.join()
    print(n)

#执行结果:
1

4.5 线程同步(互斥锁)

锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据

注意:

  • 线程抢的是GIL锁,GIL锁至关于执行权限,拿到执行权限后才能拿到互斥锁Lock,其余线程也能够抢到GIL,但若是发现Lock仍然没有被释放则阻塞,即使是拿到执行权限GIL也要马上交出来
  • join是等待全部,即总体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁均可以实现,毫无疑问,互斥锁的部分串行效率要更高
#!/usr/bin/python
# -*- coding:utf-8 -*-
from threading import Thread,Lock
import time
n = 100

def task():
    global n
    lock.acquire()
    temp = n
    time.sleep(0.1)
    n = temp - 1
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    t_l = []
    for i in range(2):
        t = Thread(target=task)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join
    print(n)

#执行结果:
100

4.6 守护线程

主进程在其代码结束后就算运行完毕了,而后主进程会一直等待非守护的子进程运行完毕,而后回收子进程的资源,进程结束

主线程在其余非守护线程运行完毕后才算运行完毕,进程必须保证非守护线程都运行完毕后才能结束

#!/usr/bin/python
# -*- coding:utf-8 -*-
from threading import Thread
import os
import time
import random

def task():
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    print('%s is done' %os.getpid())
if __name__ == '__main__':
    t=Thread(target=task,)
    t.daemon=True       #必须在t.start()以前设置
    t.start()
    print('')

#执行结果:
12544 is runing
主

4.7 死锁与递归锁

死锁: 是指两个或两个以上的进程或线程在执行过程当中,因争夺资源而形成的一种互相等待的现象,若无外力做用,它们都将没法推动下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,以下就是死锁:

#!/usr/bin/python
# -*- coding:utf-8 -*-
from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A锁\033[0m' %self.name)
        mutexB.acquire()
        print('\033[42m%s 拿到B锁\033[0m' %self.name)
        mutexB.release()
        mutexA.release()
    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B锁\033[0m' %self.name)
        time.sleep(2)
        mutexA.acquire()
        print('\033[44m%s 拿到A锁\033[0m' %self.name)
        mutexA.release()
        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

#执行结果:
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁       #卡住,死锁了

解决方法:递归锁,在Python中为了支持在同一线程中屡次请求同一资源,python提供了可重入锁RLock。

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源能够被屡次require。直到一个线程全部的acquire都被release,其余的线程才能得到资源。上面的例子若是使用RLock代替Lock,则不会发生死锁:

from threading import Thread,Lock,current_thread,RLock
import time
# mutexA=Lock()
# mutexB=Lock()

mutexA=mutexB=RLock()   #一个线程拿到锁,counter加1,该线程内又碰到加锁的状况,则counter继续加1,这期间全部其余线程都只能等待,等待该线程释放全部锁,即counter递减到0为止

class Mythread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('%s 拿到A锁' %self.name) #current_thread().getName()
        mutexB.acquire()
        print('%s 拿到B锁' %self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print('%s 拿到B锁' % self.name)  # current_thread().getName()
        time.sleep(0.1)
        mutexA.acquire()
        print('%s 拿到A锁' % self.name)
        mutexA.release()
        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=Mythread()
        t.start()

#执行结果:
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-1 拿到A锁
Thread-2 拿到A锁
Thread-2 拿到B锁
Thread-2 拿到B锁
Thread-2 拿到A锁
Thread-4 拿到A锁
Thread-4 拿到B锁
Thread-4 拿到B锁
Thread-4 拿到A锁
Thread-6 拿到A锁
Thread-6 拿到B锁
Thread-6 拿到B锁
Thread-6 拿到A锁
Thread-8 拿到A锁
Thread-8 拿到B锁
Thread-8 拿到B锁
Thread-8 拿到A锁
Thread-10 拿到A锁
Thread-10 拿到B锁
Thread-10 拿到B锁
Thread-10 拿到A锁
Thread-5 拿到A锁
Thread-5 拿到B锁
Thread-5 拿到B锁
Thread-5 拿到A锁
Thread-9 拿到A锁
Thread-9 拿到B锁
Thread-9 拿到B锁
Thread-9 拿到A锁
Thread-7 拿到A锁
Thread-7 拿到B锁
Thread-7 拿到B锁
Thread-7 拿到A锁
Thread-3 拿到A锁
Thread-3 拿到B锁
Thread-3 拿到B锁
Thread-3 拿到A锁

4.8 信号量

同进程的同样

Semaphore管理一个内置的计数器,

每当调用acquire()时内置计数器-1;

调用release() 时内置计数器+1;

计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其余线程调用release()。

实例:(同时只有5个线程能够得到semaphore,便可以限制最大链接数为5):

#!/usr/bin/python
# -*- coding:utf-8 -*-
from threading import Thread,Semaphore,current_thread
import time,random

sm = Semaphore(5)
def task():
    with sm:
        print('%s is running'%current_thread().getName())
        time.sleep(random.randint(1,3))
if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=task)
        t.start()

#执行结果:
Thread-1 is running     #同时只有5个线程
Thread-2 is running
Thread-3 is running
Thread-4 is running
Thread-5 is running
Thread-6 is running
Thread-7 is running
Thread-8 is running
Thread-9 is running
Thread-10 is running

与进程池是彻底不一样的概念,进程池Pool(4),最大只能产生4个进程,并且从头至尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

4.9 定时器

制定n秒后执行某操做

#!/usr/bin/python
# -*- coding:utf-8 -*-
from threading import Timer

def foo():
    print('from foo')
t = Timer(2,foo)        #2秒后执行foo
t.start()

4.10 线程queue

queue队列 :使用import queue,用法与进程Queue同样

#!/usr/bin/python
# -*- coding:utf-8 -*-
import queue

q = queue.Queue(3)

q.put('1')
q.put('2')
q.put('3')

print(q.get())
print(q.get())
print(q.get())

#执行结果:
1
2
3

堆栈:

import queue

q = queue.LifoQueue(3)

q.put('1')
q.put('2')
q.put('3')

print(q.get())
print(q.get())
print(q.get())

#执行结果:
3
2
1

存储数据时可设置优先级的队列:

import queue

q = queue.PriorityQueue(3)  #元组,第一个元素是优先级,数字越小,优先级越高

q.put((10,'a'))
q.put((5,'b'))
q.put((-2,'c'))

print(q.get())
print(q.get())
print(q.get())

#执行结果:
(-2, 'c')
(5, 'b')
(10, 'a')

4.11 事件event

线程的一个关键特性是每一个线程都是独立运行且状态不可预测。若是程序中的其 他线程须要经过判断某个线程的状态来肯定本身下一步的操做,这时线程同步问题就会变得很是棘手。为了解决这些问题,咱们须要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它容许线程等待某些事件的发生。在 初始状况下,Event对象中的信号标志被设置为假。若是有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程若是将一个Event对象的信号标志设置为真,它将唤醒全部等待这个Event对象的线程。若是一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。

方法:

event.isSet():返回event的状态值;

event.wait():若是 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,全部阻塞池的线程激活进入就绪状态, 等待操做系统调度;

event.clear():恢复event的状态值为False

例如,有多个工做线程尝试连接MySQL,咱们想要在连接前确保MySQL服务正常才让那些工做线程去链接MySQL服务器,若是链接不成功,都会去尝试从新链接。那么咱们就能够采用threading.Event机制来协调各个工做线程的链接操做

from threading import Thread,Event
import threading
import time,random

def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('连接超时')
        print('<%s>第%s次尝试连接' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>连接成功' %threading.current_thread().getName())

def check_mysql():
    print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()

if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql
 
    conn1.start()
    conn2.start()
    check.start()

4.12 socketserver模块

python中的socketserver模块,主要是用来提供服务器类,而且提供异步处理的能力

服务器端:

#TCP
# import socketserver
#
# class.txt MyTCPhandler(socketserver.BaseRequestHandler): #通讯
#     def handle(self):
#         while True:
#             # conn.recv(1024)
#             data=self.request.recv(1024)
#             self.request.send(data.upper())
#
#
# if __name__ == '__main__':
#     # print(socketserver.ForkingTCPServer)
#
#     s=socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyTCPhandler)
#     s.serve_forever()


#UDP
import socketserver

class MyTCPhandler(socketserver.BaseRequestHandler): #通讯
    def handle(self):
        print(self.request)
        client_data=self.request[0]
        self.request[1].sendto(client_data.upper(),self.client_address)

if __name__ == '__main__':
    s=socketserver.ThreadingUDPServer(('127.0.0.1',8080),MyTCPhandler)
    s.serve_forever()

客户端:

#TCP
# from socket import *
#
#
# c=socket(AF_INET,SOCK_STREAM)
# c.connect(('127.0.0.1',8080))
#
# count=1
# while True:
#     msg=input('>>: ').strip()
#     if not msg:break
#     c.send(msg.encode('utf-8'))
#     data=c.recv(1024)
#     print(data)
#
# c.close()



#UDP
from socket import *


c=socket(AF_INET,SOCK_DGRAM)

while True:
    msg=input('>>: ').strip()
    if not msg:break
    c.sendto(msg.encode('utf-8'),('127.0.0.1',8080))
    data=c.recvfrom(1024)
    print(data)

c.close()
相关文章
相关标签/搜索