多任务—进程

1、进程以及状态服务器

1. 进程并发

程序:例如xxx.py这是程序,是一个静态的app

进程:一个程序运行起来后,代码+用到的资源 称之为进程,它是操做系统分配资源的基本单元。dom

 

2. 进程的状态异步

工做中,任务数每每大于cpu的核数,即必定有一些任务正在执行,而另一些任务在等待cpu进行执行,所以致使了有了不一样的状态socket

  • 就绪态:当进程已分配到除CPU之外的全部必要的资源,只要得到处理机即可当即执行,这时的进程状态称为就绪状态。async

  • 执行态:当进程已得到处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。tcp

  • 等待态:正在执行的进程,因为等待某个事件发生而没法执行时,便放弃处理机而处于阻塞状态。引发进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能知足、等待信件(信号)、一个程序sleep等ide

 

3.同步和异步函数

所谓同步就是一个任务的完成须要依赖另一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态能够保持一致。

所谓异步是不须要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工做,依赖的任务也当即执行,只要本身完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务没法肯定,因此它是不可靠的任务序列。

 

4.阻塞与非阻塞

阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来讲的

  1. 同步阻塞形式

  效率最低。银行排队。

  1. 异步阻塞形式

  若是在银行等待办理业务的人采用的是异步的方式去等待消息被触发(通知),也就是领了一张小纸条,假如在这段时间里他不能离开银行作其它的事情,那么很显然,这我的被阻塞在了这个等待的操做上面;

  异步操做是能够被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。

  1. 同步非阻塞形式

  其实是效率低下的。

  想象一下你一边打着电话一边还须要抬头看到底队伍排到你了没有,若是把打电话和观察排队的位置当作是程序的两个操做的话,这个程序须要在这两种不一样的行为之间来回的切换,效率可想而知是低下的。

  1. 异步非阻塞形式

  效率更高,

  由于打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不一样的操做中来回切换。

  好比说,这我的忽然发觉本身烟瘾犯了,须要出去抽根烟,因而他告诉大堂经理说,排到我这个号码的时候麻烦到外面通知我一下,那么他就没有被阻塞在这个等待的操做上面,天然这个就是异步+非阻塞的方式了。

 

不少人会把同步和阻塞混淆,是由于不少时候同步操做会以阻塞的形式表现出来,一样的,不少人也会把异步和非阻塞混淆,由于异步操做通常都不会在真正的IO操做处被阻塞。

 

2、进程的建立

1.进程的建立-multiprocessing

multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来表明一个进程对象,这个对象能够理解为是一个独立的进程,能够执行另外的事情

 from multiprocessing import Process
 import time
 
 
 def func1():
     while True:
         print("test1")
         time.sleep(1)
 
 def func2():
     while True:
         print("test2")
         time.sleep(1)
 
 
 if __name__ == '__main__':
     p1 = Process(target=func1)
     p2 = Process(target=func2)
     p1.start()
     p2.start()
建立进程

说明

  • 建立子进程时,只须要传入一个执行函数和函数的参数,建立一个Process实例,用start()方法启动

 

2. 进程pid

 from multiprocessing import Process
 import time
 import os
 
 
 def func():
     print("子进程的pid:%d" % os.getpid())   # os.getpid 获取当前进程的进程号
     print("子进程的父进程pid:%d" % os.getppid())    # os.getppid 获取当前进程的父进程的进程号
 
 
 if __name__ == "__main__":
     print("父进程pid:%d" % os.getpid())
     p = Process(target=func)
     p.start()
                                                                                                 
进程pid

 

3. Process语法结构以下:

Process([group [, target [, name [, args [, kwargs]]]]])

  • target:若是传递了函数的引用,能够任务这个子进程就执行这里的代码
  • args:给target指定的函数传递的参数,以元组的方式传递
  • kwargs:给target指定的函数传递命名参数
  • name:给进程设定一个名字,能够不设定
  • group:指定进程组,大多数状况下用不到

Process建立的实例对象的经常使用方法:

  • start():启动子进程实例(建立子进程)
  • is_alive():判断进程子进程是否还在活着
  • join([timeout]):是否等待子进程执行结束,或等待多少秒
  • terminate():无论任务是否完成,当即终止子进程

Process建立的实例对象的经常使用属性:

  • name:当前进程的别名,默认为Process-N,N为从1开始递增的整数
  • pid:当前进程的pid(进程号)

 

4. 给子进程指定的函数传递参数

 from multiprocessing import Process
 import time
 import os
 
 
 def func(name, age, **kwargs):
     for i in range(10):
         print("子进程运行中:name=%s,age=%d,pid=%d" % (name, age, os.getpid()))
         print(kwargs)
         time.sleep(0.2)
 
 
 if __name__ == "__main__":
     p = Process(target=func, args=('test', 11), kwargs={"ab":'ab'})
     p.start()
     time.sleep(1)   # 1秒后结束子进程
     p.terminate()
     p.join()

# 运行结果
子进程运行中:name=test,age=11,pid=4260
{'ab': 'ab'}
子进程运行中:name=test,age=11,pid=4260
{'ab': 'ab'}
子进程运行中:name=test,age=11,pid=4260
{'ab': 'ab'}
子进程运行中:name=test,age=11,pid=4260
{'ab': 'ab'}
子进程运行中:name=test,age=11,pid=4260
{'ab': 'ab'}
View Code

 

5. 进程间不共享全局变量

 from multiprocessing import Process
 import time
 import os
 
 
 num_list = [11,22,33]
 
 def func1():
     print("in func1:pid=%d, num_list=%s" % (os.getpid(), num_list))
     for i in range(3):
         num_list.append(i)
         time.sleep(1)
         print("in func1:pid=%d, num_list=%s" % (os.getpid(), num_list))
 
 
 def func2():
     print("in func2:pid=%d, num_list=%s" % (os.getpid(), num_list))
 
 
 if __name__ == "__main__":
     p1 = Process(target=func1)
     p1.start()
     p1.join()
     p2 = Process(target=func2)
     p2.start()

# 执行结果:
in func1:pid=4344, num_list=[11, 22, 33]
in func1:pid=4344, num_list=[11, 22, 33, 0]
in func1:pid=4344, num_list=[11, 22, 33, 0, 1]
in func1:pid=4344, num_list=[11, 22, 33, 0, 1, 2]
in func2:pid=4345, num_list=[11, 22, 33]
View Code

 

6.守护进程

守护进程会随着主进程的结束而结束。

主进程建立守护进程

  其一:守护进程会在主进程代码执行结束后就终止

  其二:守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

 from multiprocessing import Process
 import time
 import os
 
 
 class MyProcess(Process):
 
     def run(self):
         print(os.getpid(), self.name)
 
 
 p = MyProcess()
 p.daemon = True #必定要在p.start()前设置,设置p为守护进程,禁止p建立子进程,而且父进程代码执行结束,p
 即终止运行
 p.start()
 time.sleep(10) # 在sleep时查看进程id对应的进程ps -ef|grep id
 print('')
守护进程开启
 from multiprocessing import Process
 import time
 
 
 def func1():
     print("123")
     time.sleep(1)
     print("123end")
 
 
 def func2():
     print("456")
     time.sleep(3)
     print("456end")
 
 
 p1 = Process(target=func1)
 p2 = Process(target=func2)
 p1.daemon = True
 p1.start()
 p2.start()
 time.sleep(0.1)
 print("main-------")#打印该行则主进程代码结束,则守护进程p1应该被终止.#可能会有p1任务执行的打印信息123,由于主进程打印main----时,p1也执行了,可是随即被终止.


# 执行结果:
123
456
main-------
456end
主进程代码执行结束守护进程当即结束

 

 7.使用多进程实现socket服务端的并发效果

 from multiprocessing import Process
 import socket
 
 
 def server(conn):
     recv_data = conn.recv(1024).decode('utf-8')
     print(recv_data)
     conn.send("抱着妹妹上花轿".encode('utf-8'))
     conn.close()
 
 if __name__ == "__main__":
     tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     tcp_socket.bind(('192.168.1.1',8010))
     tcp_socket.listen(128)
     while True:
         conn, addr = tcp_socket.accept()
         p = Process(target=server, args=(conn,))
         p.start()
server
 from socket import *
 
 
 # 建立套接字 
 tcp_client_socket = socket(AF_INET, SOCK_STREAM)
 
 # 目的地址
 server_ip = input("服务端ip:")
 server_port = input("服务端端口:")
 
 # 连接服务器
 tcp_client_socket.connect((server_ip, int(server_port)))
 
 # 客户端发送信息
 send_data = input("输入发送的信息:")
 tcp_client_socket.send(send_data.encode('utf-8'))
 
 # 接受服务端发来的信息
 recv_data = tcp_client_socket.recv(1024)
 print("收到的信息:%s" % recv_data.decode('utf-8'))
 
 # 关闭套接字
 tcp_client_socket.close()
client

 

 3、进程间通讯-Queue

1. Queue的使用

可使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue自己是一个消息列队程序,首先用一个小实例来演示一下Queue的工做原理:

 from multiprocessing import Queue
 
 
 q = Queue(3)    # 初始哈Queue对象,最多可接手3条put
 q.put(1)
 q.put(2)
 print(q.full())   # False
 q.put(3)
 print(q.full())   # True
 
 
 #由于消息列队已满下面的try都会抛出异常,第一个try会等待2秒后再抛出异常,第二个Try会马上抛出异常
 try:
     q.put(4, True, 2)
 except:
     print("消息队列已满:现有消息数量%s" % q.qsize())
 
 try:
     q.put_nowait(4)
 except:
     print("消息队列已满:现有消息数量%s" % q.qsize())
 
 #推荐的方式,先判断消息列队是否已满,再写入
 if not q.full():
     q.put_nowait(4)
 
 #读取消息时,先判断消息列队是否为空,再读取
 if not q.empty():
     for i in range(q.qsize()):
         print(q.get_nowait())
                                              
View Code

说明

初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就表明可接受的消息数量没有上限(直到内存的尽头);

  • Queue.qsize():返回当前队列包含的消息数量;

  • Queue.empty():若是队列为空,返回True,反之False ;

  • Queue.full():若是队列满了,返回True,反之False;

  • Queue.get([block[, timeout]]):获取队列中的一条消息,而后将其从列队中移除,block默认值为True;

1)若是block使用默认值,且没有设置timeout(单位秒),消息列队若是为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,若是设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常;

2)若是block值为False,消息列队若是为空,则会马上抛出"Queue.Empty"异常;

  • Queue.get_nowait():至关Queue.get(False);

  • Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;

1)若是block使用默认值,且没有设置timeout(单位秒),消息列队若是已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,若是设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常;

2)若是block值为False,消息列队若是没有空间可写入,则会马上抛出"Queue.Full"异常;

  • Queue.put_nowait(item):至关Queue.put(item, False);

 

2. Queue实例

咱们以Queue为例,在父进程中建立两个子进程,一个往Queue里写数据,一个从Queue里读数据:

 from multiprocessing import Process, Queue
 import time,random
 
 
 # 写数据进程执行的代码:
 def write(q):
     for i in range(3):
         print("puting %d in queue" % i)
         q.put(i)
         time.sleep(random.random())
 
 
 def read(q):
     while True:
         if not q.empty():
             print("%i reading in queue" % q.get(True))
             time.sleep(random.random())
         else:
             break
 
 if __name__ == "__main__":
     q = Queue()
     wp = Process(target=write, args=(q,))
     rp = Process(target=read, args=(q,))
 
     wp.start()   # 启动子进程wq,写入:
     wp.join()    # 等待pw结束:
 
     rp.start()
     rp.join()
     print('全部数据都写入而且读完')
View Code

 

4、进程池Pool

当须要建立的子进程数量很少时,能够直接利用multiprocessing中的Process动态成生多个进程,但若是是上百甚至上千个目标,手动的去建立进程的工做量巨大,此时就能够用到multiprocessing模块提供的Pool方法。

初始化Pool时,能够指定一个最大进程数,当有新的请求提交到Pool中时,若是池尚未满,那么就会建立一个新的进程用来执行该请求;但若是池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用以前的进程来执行新的任务,请看下面的实例:

 from multiprocessing import Process, Pool
 import time, os, random
 
 
 def func(num):
     start_time = time.time()
     print("%d开始执行,进程号%s" %(num,os.getpid()))
     time.sleep(random.random())
     end_time = time.time()
     print("%d执行完毕,耗时%0.2f" % (num,end_time-start_time))
 
 
 po = Pool(3)
 for i in range(10):
     # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
     # 每次循环将会用空闲出来的子进程去调用目标
     po.apply_async(func, (i,))
 
 print("--start--")
 po.close()   # 关闭进程池,关闭后po再也不接收新的请求
 po.join()   # 等待po中全部子进程执行完成,必须放在close语句以后
 print("--end--")


# 执行结果:
--start--
0开始执行,进程号4333
1开始执行,进程号4334
2开始执行,进程号4332
0执行完毕,耗时0.65
3开始执行,进程号4333
1执行完毕,耗时0.85
4开始执行,进程号4334
2执行完毕,耗时0.95
5开始执行,进程号4332
4执行完毕,耗时0.43
6开始执行,进程号4334
5执行完毕,耗时0.41
7开始执行,进程号4332
3执行完毕,耗时0.85
8开始执行,进程号4333
6执行完毕,耗时0.51
9开始执行,进程号4334
9执行完毕,耗时0.24
8执行完毕,耗时0.54
7执行完毕,耗时0.75
--end--
View Code

multiprocessing.Pool经常使用函数解析:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;
  • close():关闭Pool,使其再也不接受新的任务;
  • terminate():无论任务是否完成,当即终止;
  • join():主进程阻塞,等待子进程的退出, 必须在close或terminate以后使用;

 

进程池中的Queue

若是要使用Pool建立进程,就须要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),不然会获得一条以下的错误信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

下面的实例演示了进程池中的进程如何通讯:

 from multiprocessing import Manager, Pool
 import os, time, random
 
 
 def write(q):
     print("write启动(%s),父进程(%s)" % (os.getpid(), os.getppid()))
     for i in range(3):
         q.put(i)
 
 def read(q):
     print("read启动(%s), 父进程(%s)" % (os.getpid(), os.getppid()))
     for i in range(q.qsize()):
         print("正在从队列读取消息%d" % q.get())
 
 if __name__ == "__main__":
     print("start:%s" % os.getpid())
     q = Manager().Queue()
     po = Pool()
     po.apply_async(write, (q,))
     time.sleep(1)   # 先让上面的任务向Queue存入数据,而后再让下面的任务开始从中取数据
 
     po.apply_async(read, (q,))
 
     po.close()
     po.join()
     print("end:%s" % os.getpid())

# 执行结果:
start:4586
write启动(4592),父进程(4586)
read启动(4592), 父进程(4586)
正在从队列读取消息0
正在从队列读取消息1
正在从队列读取消息2
end:4586
View Code

 

应用:文件夹copy器(多进程版)

 import multiprocessing, time, os, random
 
 
 def copy_file(queue, file_name, need_copy_file, new_file):
     f_read = open(need_copy_file + '/' + file_name, 'rb')
     f_write = open(new_file + '/' + file_name, 'wb')
     while True:
         time.sleep(random.random())
         content = f_read.read()
         if content:
             f_write.write(content)
         else:
             break
     f_read.close()
     f_write.close()
     # 发送已经拷贝完毕的文件名字
     queue.put(file_name)
 
 
 if __name__ == "__main__":
 
     # 获取想要copy的文件夹
     need_copy_file = input("输入文件名:")
     # 新的文件夹名称
     new_file = need_copy_file + "[副本]"
     # 建立目标文件夹
     try:
         os.mkdir(new_file)
     except:
         print("建立文件失败")
     # 获取这个文件夹中全部的普通文件名
     file_names = os.listdir(need_copy_file)
 
     # 建立队列
     queue = multiprocessing.Manager().Queue()
     # 建立进程池
     pool = multiprocessing.Pool(3)
     # 向进程池中添加任务
     for file_name in file_names:
         pool.apply_async(copy_file, (queue, file_name, need_copy_file, new_file))
     pool.close()
     # 主进程显示进度
     all_file_nums = len(file_names)
     while True:
         file_name = queue.get()
         if file_name in file_names:
             file_names.remove(file_name)
         copy_rate = (all_file_nums - len(file_names))*100/all_file_nums
         print("\r%.2f...(%s)" % (copy_rate, file_name) + " "*50, end="")
         if copy_rate >= 100:
             print("\ncopy完成")
             break
View Code
相关文章
相关标签/搜索