多进程与多线程(九)

1.1 基于UDP协议实现简单的套接字通讯

udp是无连接的,先启动哪一端都不会报错html

udp套接字简单示例python

1.1.1.1  客户端:

from socket import *

 

client=socket(AF_INET,SOCK_DGRAM) #数据报协议,建立一个客户的套接字

 

while True: # 通信循环

    msg=input('>>: ').strip()

    client.sendto(msg.encode('utf-8'),('127.0.0.1',8080)) #对话(发送)

    data,server_addr=client.recvfrom(1024)

    print(data.decode('utf-8'))

 

 

1.1.1.2  服务端:

from socket import *

 

server=socket(AF_INET,SOCK_DGRAM) #数据报协议,建立一个服务器的套接字

server.bind(('127.0.0.1',8080)) #绑定服务器套接字

 

 

data,client_addr=server.recvfrom(3) #对话(接收)

print('第一次:',data)

 

 

1.1.2 数据报协议

1.1.2.1  客户端:

from socket import *

 

client=socket(AF_INET,SOCK_DGRAM) #数据报协议

 

 

client.sendto('hello'.encode('utf-8'),('127.0.0.1',8082))

client.sendto('world'.encode('utf-8'),('127.0.0.1',8082))

 

 

1.1.2.2  服务端:

from socket import *

 

server=socket(AF_INET,SOCK_DGRAM) #数据报协议

server.bind(('127.0.0.1',8080))

 

 

data,client_addr=server.recvfrom(3)

print('第一次:',data)

data,client_addr=server.recvfrom(3)

print('第二次: ',data)

 

 

1.2 多进程

顾名思义,进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。算法

进程的概念起源于操做系统,是操做系统最核心的概念,也是操做系统提供的最古老也是最重要的抽象概念之一。操做系统的其余全部内容都是围绕进程的概念展开的。数据库

即便能够利用的cpu只有一个(早期的计算机确实如此),也能保证支持(伪)并发的能力。将一个单独的cpu变成多个虚拟的cpu(多道技术:时间多路复用和空间多路复用+硬件上支持隔离),没有进程的抽象,现代计算机将不复存在。编程

操做系统的做用:json

    1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口windows

    2:管理、调度进程,而且将多个进程对硬件的竞争变得有序安全

 

多道技术:服务器

    1.产生背景:针对单核,实现并发多线程

    ps:

    如今的主机通常是多核,那么每一个核都会利用多道技术

    有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再从新调度,会被调度到4个

    cpu中的任意一个,具体由操做系统调度算法决定。

   

    2.空间上的复用:如内存中同时有多道程序

    3.时间上的复用:复用一个cpu的时间片

强调:遇到io切,占用cpu时间过长也切,核心在于切以前将进程的状态保存下来,这样才能保证下次切换回来时,能基于上次切走的位置继续运行

1.2.1 开启进程的两种方式:

1.2.1.1  multiprocessing模块介绍

python中的多线程没法利用多核优点,若是想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分状况须要使用多进程。Python提供了multiprocessing。

    multiprocessing模块用来开启子进程,并在子进程中执行咱们定制的任务(好比函数),该模块与多线程模块threading的编程接口相似。

  multiprocessing模块的功能众多:支持子进程、通讯和共享数据、执行不一样形式的同步,提供了Process、Queue、Pipe、Lock等组件。

    须要再次强调的一点是:与线程不一样,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

 

1.2.1.2  方式一:

注意:在windows中Process()必须放到# if __name__ == '__main__':下

from multiprocessing import Process

import time

 

def task(name):

    print('%s is running' %name)

    time.sleep(5)

    print('%s is done' %name)

 

 

if __name__ == '__main__':

    p=Process(target=task,args=('alex',))

    p.start()

    print('')

 

 

1.2.1.3  方式二:

from multiprocessing import Process

import time

 

class MyProcess(Process):

    def __init__(self,name):

        super(MyProcess,self).__init__()

        self.name=name

 

    def run(self):

        print('%s is running' %self.name)

        time.sleep(3)

        print('%s is done' %self.name)

 

if __name__ == '__main__':

    p=MyProcess('进程1')

    p.start() #p.run()

    print('')

 

 

1.2.2 进程之间内存空间是隔离的

from multiprocessing import Process

import time

 

n=100

 

def task():

    global n

    time.sleep(5)

    n=0

 

if __name__ == '__main__':

    p=Process(target=task)

    p.start()

    # time.sleep(5)

    print(p.is_alive())

    p.join()

    print(p.is_alive())

    print('',n)

 

1.2.3 join方法

主进程等,等待子进程结束

from multiprocessing import Process

import time

import os

 

def task(n):

    print('%s is runing' %os.getpid())

    time.sleep(n)

    print('%s is done' %os.getpid())

 

 

if __name__ == '__main__':

 

    start_time=time.time()

    p1=Process(target=task,args=(1,))

    p2=Process(target=task,args=(2,))

    p3=Process(target=task,args=(3,))

 

    p_l=[p1,p2,p3]

    # p1.start()

    # p2.start()

    # p3.start()

    for p in p_l:

        p.start()

 

    # p3.join()  #3

    # p1.join() #

    # p2.join() #

    for p in p_l:

        p.join()

    stop_time=time.time()

    print('',(stop_time-start_time))

 

既然join是等待进程结束,进程不就又变成串行的了吗?固然不是了,必须明确:p.join()是让谁等?很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,
进程只要start就会在开始运行了,因此p1-p4.start()时,系统中已经有四个并发的进程了, 而咱们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键,join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其他p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接经过检测,无需等待, 因此4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间
 
from multiprocessing import Process

import time

import os

 

def task(n):

    print('%s is runing' %os.getpid())

    time.sleep(n)

    print('%s is done' %os.getpid())

 

 

if __name__ == '__main__':

 

    start_time=time.time()

    p1=Process(target=task,args=(1,))

    p2=Process(target=task,args=(2,))

    p3=Process(target=task,args=(3,))

 

    p_l=[p1,p2,p3]

    # p1.start()

    # p2.start()

    # p3.start()

    for p in p_l:

        p.start()

 

    # p3.join()  #3

    # p1.join() #

    # p2.join() #

    for p in p_l:

        p.join()

    stop_time=time.time()

    print('',(stop_time-start_time))

 

 
from multiprocessing import Process

import time

import os

 

def task(n):

    print('%s is runing' %os.getpid())

    time.sleep(n)

    print('%s is done' %os.getpid())

 

 

if __name__ == '__main__':

 

    start_time=time.time()

    p1=Process(target=task,args=(1,))

    p2=Process(target=task,args=(2,))

    p3=Process(target=task,args=(3,))

 

 

    p1.start()

    p1.join()

    p2.start()

    p2.join()

    p3.start()

    p3.join()

    stop_time=time.time()

    print('',(stop_time-start_time))

 

1.2.4 进程对象的其余属性或方法

进程对象的其余方法一:terminate,is_alive

from multiprocessing import Process

import time

import os

 

def task(n):

    print('pid:%s ppid:%s' %(os.getpid(),os.getppid()))

    time.sleep(n)

 

 

if __name__ == '__main__':

    p=Process(target=task,args=(15,),name='进程1')

    p.start()

    p.terminate()#关闭进程,不会当即关闭,因此is_alive马上查看的结果可能仍是存活

    # time.sleep(1)

    print(p.is_alive())#结果为True

    print('主pid:%s ppid:%s' %(os.getpid(),os.getppid()))

    # print(p.pid)

    p.name='xxxx'

print(p.name)

 

 

 

1.2.5 守护进程

主进程建立守护进程

守护进程:当子进程执行的任务在父进程代码运行完毕后就没有存在的必要了,那

该子进程就应该被设置为守护进程,

守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

from multiprocessing import Process

import time

 

def task(name):

    p=Process(target=time.sleep,args=(6,))

    p.start()

    print('%s is running' %name)

    time.sleep(5)

    print('%s is done' %name)

 

 

if __name__ == '__main__':

    p=Process(target=task,args=('alex',))

    p.daemon=True  #必定要在p.start()前设置,设置p为守护进程,禁止p建立子进程,而且父进程代码执行结束,p即终止运行

    p.start()

    time.sleep(1)

    print('')

 

主进程代码运行完毕,守护进程就会结束

from multiprocessing import Process

from threading import Thread

import time

def foo():

    print(123)

    time.sleep(1)

    print("end123")

 

def bar():

    print(456)

    time.sleep(3)

    print("end456")

 

if __name__ == '__main__':

 

    p1=Process(target=foo)

    p2=Process(target=bar)

 

    p1.daemon=True

    p1.start()

    p2.start()

    print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,由于主进程打印main----时,p1也执行了,可是随即被终止

 

 

 

1.2.6 互斥锁

进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理

多个进程共享同一打印终端, 并发运行,效率高,但竞争同一打印终端,带来了打印错乱, 加锁:由并发变成了串行,牺牲了运行效率,但避免了竞争

多个进程共享同一文件, 并发运行,效率高,但竞争写同一文件,数据写入错乱, 加锁:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全

文件当数据库,模拟抢票

from multiprocessing import Process,Lock

import json

import time

import random

import os

 

def search():

    time.sleep(random.randint(1,3))

    dic=json.load(open('db.txt','r',encoding='utf-8'))

    print('%s 查看到剩余票数%s' %(os.getpid(),dic['count']))

 

def get():

    dic=json.load(open('db.txt','r',encoding='utf-8'))

    if dic['count'] > 0:

        dic['count']-=1

        time.sleep(random.randint(1,3))

        json.dump(dic,open('db.txt','w',encoding='utf-8'))

        print('%s 购票成功' %os.getpid())

 

def task(mutex):

    search()

    mutex.acquire()

    get()

    mutex.release()

 

if __name__ == '__main__':

    mutex=Lock()

    for i in range(10):

        p=Process(target=task,args=(mutex,))

        p.start()

        # p.join()

 

db.txt

{"count": 1}

 

加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

虽然能够用文件共享数据实现进程间通讯,但问题是:

1.效率低(共享数据基于文件,而文件是硬盘上的数据)

2.须要本身加锁处理

所以咱们最好找寻一种解决方案可以兼顾:一、效率高(多个进程共享一块内存的数据)二、帮咱们处理好锁问题。这就是mutiprocessing模块为咱们提供的基于消息的IPC通讯机制:队列和管道。

1 队列和管道都是将数据存放于内存中

2 队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来,

咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性。

 

1.2.7 队列

进程彼此之间互相隔离,要实现进程间通讯(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

建立队列的类(底层就是以管道和锁定的方式实现)

Queue([maxsize]):建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。

maxsize是队列中容许最大项数,省略则无大小限制。

q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是blocked为False,但该Queue已满,会当即抛出Queue.Full异常。

q.get方法能够从队列读取而且删除一个元素。一样,get方法有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是blocked为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值,不然,若是队列为空,则当即抛出Queue.Empty异常.

 

q.get_nowait():同q.get(False)

q.put_nowait():同q.put(False)

 

q.empty():调用此方法时q为空则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中又加入了项目。

q.full():调用此方法时q已满则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中的项目被取走。

q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()同样

from multiprocessing import Queue

 

q=Queue(3)

 

q.put('first')

q.put(2)

q.put({'count':3})

# q.put('fourth',block=False) #q.put_nowait('fourth')

# q.put('fourth',block=True,timeout=3)

 

print(q.get())

print(q.get())

print(q.get())

# print(q.get(block=False)) #q.get_nowait()

print(q.get(block=True,timeout=3))

 

其余方法(了解):

1 q.cancel_join_thread():不会在进程退出时自动链接后台线程。能够防止join_thread()方法阻塞

2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但还没有写入的数据,但将在此方法完成时立刻关闭。若是q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,若是某个使用者正在被阻塞在get()操做上,关闭生产者中的队列不会致使get()方法返回错误。

3 q.join_thread():链接队列的后台线程。此方法用于在调用q.close()方法以后,等待全部队列项被消耗。默认状况下,此方法由不是q的原始建立者的全部进程调用。调用q.cancel_join_thread方法能够禁止这种行为

1.2.8 生产者消费者模型

在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。

为何要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型

from multiprocessing import Process,Queue
import time
import random

def producer(name,food,q):
    for i in range(3):
        res='%s%s' %(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print('厨师[%s]生产了<%s>' %(name,res))


def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print('吃货[%s]吃了<%s>' % (name, res))

if __name__ == '__main__':
    #队列
    q=Queue()
    #生产者们
    p1=Process(target=producer,args=('egon1','泔水',q))
    p2=Process(target=producer,args=('egon2','骨头',q))
    #消费者们
    c1=Process(target=consumer,args=('管廷威',q))
    c2=Process(target=consumer,args=('oldboy',q))
    c3=Process(target=consumer,args=('oldgirl',q))


    p1.start()
    p2.start()
    c1.start()
    c2.start()
    c3.start()

    p1.join()
    p2.join()
    q.put(None)
    q.put(None)
    q.put(None)
    print('')

 

此时的问题是主进程永远不会结束,缘由是:生产者p在生产完后就结束了,可是消费者c在取空了q以后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

 

   #参数介绍:

    maxsize是队列中容许最大项数,省略则无大小限制。   

  #方法介绍:

    JoinableQueue的实例p除了与Queue对象相同的方法以外还具备:

    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常

    q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止

from multiprocessing import Process,JoinableQueue

import time

import random

 

def producer(name,food,q):

    for i in range(3):

        res='%s%s' %(food,i)

        time.sleep(random.randint(1,3))

        q.put(res)

        print('厨师[%s]生产了<%s>' %(name,res))

 

 

def consumer(name,q):

    while True:

        res=q.get()

        if res is None:break  #生产者在生产完毕后发送结束信号None

        time.sleep(random.randint(1,3))

        print('吃货[%s]吃了<%s>' % (name, res))

        q.task_done()

 

if __name__ == '__main__':

    #队列

    q=JoinableQueue()

    #生产者们

    p1=Process(target=producer,args=('egon1','泔水',q))

    p2=Process(target=producer,args=('egon2','骨头',q))

    #消费者们

    c1=Process(target=consumer,args=('管廷威',q))

    c2=Process(target=consumer,args=('oldboy',q))

    c3=Process(target=consumer,args=('oldgirl',q))

    c1.daemon=True

    c2.daemon=True

    c3.daemon=True

 

    p1.start()

    p2.start()

    c1.start()

    c2.start()

    c3.start()

 

    p1.join()

    p2.join()

    q.join()

    print('')

 

 

1.3 多线程

threading模块介绍

multiprocess模块的彻底模仿了threading模块的接口,两者在使用层面,有很大的类似性,于是再也不详细介绍

官网连接:https://docs.python.org/3/library/threading.html?highlight=threading#

1.3.1 开启线程的两种方式

方式一:

from threading import Thread

import time

import random

 

def piao(name):

    print('%s is piaoing' %name)

    time.sleep(random.randint(1,3))

    print('%s is piao end' %name)

 

 

if __name__ == '__main__':

    t1=Thread(target=piao,args=('alex',))

    t1.start()

    print('')

 

 

方式二:

from threading import Thread
import time
import random

class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randint(1,3))
        print('%s is piao end' %self.name)


if __name__ == '__main__':
    t1=MyThread('alex')
    t1.start()
    print('')

 

 

1.3.2 练习

多线程并发

客户端:

from socket import *

 

client=socket(AF_INET,SOCK_STREAM)

client.connect(('127.0.0.1',8081))

 

while True:

    msg=input('>>: ').strip()

    if not msg:continue

    client.send(msg.encode('utf-8'))

    data=client.recv(1024)

    print(data.decode('utf-8'))

 

client.close()

 

服务端:

from threading import Thread,current_thread

from socket import *

 

def comunicate(conn):

    print('子线程:%s' %current_thread().getName())

    while True:

        try:

            data=conn.recv(1024)

            if not data:break

            conn.send(data.upper())

        except ConnectionResetError:

            break

    conn.close()

 

def server(ip,port):

    print('主线程:%s' %current_thread().getName())

    server = socket(AF_INET, SOCK_STREAM)

    server.bind((ip,port))

    server.listen(5)

 

    while True:

        conn, addr = server.accept()

        print(addr)

        # comunicate(conn)

        t=Thread(target=comunicate,args=(conn,))

        t.start()

 

    server.close()

 

if __name__ == '__main__':

    server('127.0.0.1', 8081)

 

 

 

1.3.3 进程与线程的区别

from threading import Thread
import time
import random
import os

def piao():
    print('%s is piaoing' %os.getpid())
    time.sleep(random.randint(1,3))


if __name__ == '__main__':
    t1=Thread(target=piao,)
    t2=Thread(target=piao,)
    t3=Thread(target=piao,)
    t1.start()a
    t2.start()
    t3.start()
    print('',os.getpid())

 

 

from threading import Thread

import time

import random

import os

 

n=100

def piao():

    global n

    n=0

 

if __name__ == '__main__':

    t1=Thread(target=piao,)

    t1.start()

    t1.join()

    print('',n)

 

 

1.3.4 守护线程

不管是进程仍是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁

须要强调的是:运行完毕并不是终止运行

1.对主进程来讲,运行完毕指的是主进程代码运行完毕

2.对主线程来讲,运行完毕指的是主线程所在的进程内全部非守护线程通通运行完毕,主线程才算运行完毕

主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),而后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(不然会产生僵尸进程),才会结束

主线程在其余非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。由于主线程的结束意味着进程的结束,进程总体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。

from threading import Thread
import time

def sayhi(name):
    print('====>')
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    # t.setDaemon(True)
    t.daemon=True
    t.start()

    print('主线程')

 

from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    t1=Thread(target=foo)
    t2=Thread(target=bar)

    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")

 

 

 

1.3.5 线程的互斥锁

from threading import Thread,Lock

import time

 

n=100

 

def task():

    global n

    with mutex:

        temp=n

        time.sleep(0.1)

        n=temp-1

 

if __name__ == '__main__':

    start_time=time.time()

    mutex=Lock()

    t_l=[]

    for i in range(100):

        t=Thread(target=task)

        t_l.append(t)

        t.start()

 

    for t in t_l:

        t.join()

    stop_time=time.time()

    print('',n)

    print('run time is %s' %(stop_time-start_time))

 

1.3.6 测试文件

# import time

# import os

#

# print(os.getpid())

# time.sleep(1000)

 

import os

print(os.cpu_count())  # 本机为4核

 

 

1.3.7 GIL测试

首先须要明确的一点是GIL并非Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就比如C++是一套语言(语法)标准,可是能够用不一样的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也同样,一样一段代码能够经过CPython,PyPy,Psyco等不一样的Python执行环境来执行。像其中的JPython就没有GIL。然而由于CPython是大部分环境下默认的Python执行环境。因此在不少人的概念里CPython就是Python,也就想固然的把GIL归结为Python语言的缺陷。因此这里要先明确一点:GIL并非Python的特性,Python彻底能够不依赖于GIL

GIL本质就是一把互斥锁,既然是互斥锁,全部互斥锁的本质都同样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。

能够确定的一点是:保护不一样的数据的安全,就应该加不一样的锁

GIL保护的是解释器级的数据,保护用户本身的数据则须要本身加锁处理,以下图

 

 

图1-1  

有了GIL的存在,同一时刻同一进程中只有一个线程被执行

多cpu,意味着能够有多个核并行完成计算,因此多核提高的是计算性能, 每一个cpu一旦遇到I/O阻塞,仍然须要等待,因此多核对I/O操做没什么用处

一个工人至关于cpu,此时计算至关于工人在干活,I/O阻塞至关于为工人干活提供所需原材料的过程,工人干活的过程当中若是没有原材料了,则工人干活的过程须要中止,直到等待原材料的到来。

若是你的工厂干的大多数任务都要有准备原材料的过程(I/O密集型),那么你有再多的工人,意义也不大,还不如一我的,在等材料的过程当中让工人去干别的活,

反过来说,若是你的工厂原材料都齐全,那固然是工人越多,效率越高

 对计算来讲,cpu越多越好,可是对于I/O来讲,再多的cpu也没用

固然对运行一个程序来讲,随着cpu的增多执行效率确定会有所提升(无论提升幅度多大,总会有所提升),这是由于一个程序基本上不会是纯计算或者纯I/O,因此咱们只能相对的去看一个程序究竟是计算密集型仍是I/O密集型,从而进一步分析python的多线程到底有无用武之地

如今的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提高,甚至不如串行(没有大量切换),可是,对于IO密集型的任务效率仍是有显著提高的。

多线程性能测试

1.3.7.1  计算密集型:开多进程

from multiprocessing import Process

from threading import Thread

import os,time

def work():

    res=0

    for i in range(100000000):

        res*=i

 

 

if __name__ == '__main__':

    l=[]

    start=time.time()

    for i in range(4):

        # p=Process(target=work) #5.826333284378052

        p=Thread(target=work) #run time is 19.91913938522339

        l.append(p)

        p.start()

    for p in l:

        p.join()

    stop=time.time()

    print('run time is %s' %(stop-start))

 

 

1.3.7.2  I/O密集型:多线程效率高

from multiprocessing import Process

from threading import Thread

import threading

import os,time

def work():

    time.sleep(2)

 

if __name__ == '__main__':

    l=[]

    start=time.time()

    for i in range(400):

        # p=Process(target=work) # 12.465712785720825

        p=Thread(target=work) #2.037116765975952

        l.append(p)

        p.start()

    for p in l:

        p.join()

    stop=time.time()

    print('run time is %s' %(stop-start))

 

from threading import Thread,Lock

import time

 

n=100

 

def task():

    global n

    mutex.acquire()

    temp=n

    time.sleep(0.1)

    n=temp-1

    mutex.release()

 

if __name__ == '__main__':

    mutex=Lock()

    for i in range(3):

        t=Thread(target=task)

        t.start()

 

 

 

1.3.8 paramiko模块

paramiko是一个用于作远程控制的模块,使用该模块能够对远程服务器进行命令或文件操做,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实。

下载安装

pip3 install paramiko #在python3中

 

在python2中

pycrypto,因为 paramiko 模块内部依赖pycrypto,因此先下载安装pycrypto #在python2中

pip3 install pycrypto

pip3 install paramiko

 

注:若是在安装pycrypto2.0.1时发生以下错误

        command 'gcc' failed with exit status 1...

多是缺乏python-dev安装包致使

若是gcc没有安装,请事先安装gcc

1.3.8.1  用于链接远程服务器并执行基本命令

import paramiko

 

# 建立SSH对象

ssh = paramiko.SSHClient()

# 容许链接不在know_hosts文件中的主机

ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# 链接服务器

ssh.connect(hostname='106.74.230.135', port=22101, username='root', password='123456')

 

# 执行命令

stdin, stdout, stderr = ssh.exec_command('df')

# 获取命令结果

result = stdout.read()

print(result.decode('utf-8'))

# 关闭链接

ssh.close()

 

 

import paramiko

 

transport = paramiko.Transport(('106.74.230.135', 22101))

transport.connect(username='root', password='123456')

 

sftp = paramiko.SFTPClient.from_transport(transport)

# 将location.py 上传至服务器 /tmp/test.py

sftp.put(r'D:\video\python20期\day9\02 多线程\7 GIL测试.py', '/root/test.txt')

# 将remove_path 下载到本地 local_path

# sftp.get('remove_path', 'local_path')

 

transport.close()
相关文章
相关标签/搜索