day9-进程、线程

  1. 线程
  2. 进程
  3. 协程
  4. IO多路复用

1. 线程

线程是最小的工做单位,一个应用程序至少有一个进程,一个进程至少有一个线程。
应用场景:
IO密集型:使用线程
计算密集型:使用进程
GIL: 全局解释器锁,保证同一进程中只有一个线程同时被调度。python

线程的基本使用:git

def task(arg):
    time.sleep(arg)
    print(arg)

for i in range(5):
    t = threading.Thread(target=task,args=[i,])
    # t.setDaemon(True) # 主线程终止,不等待子线程
    # t.setDaemon(False)
    t.start()
    # t.join()  # 一直等
    # t.join(1) # 等待最大时间

锁:
1.只有一我的使用锁:github

import threading
import time
lock = threading.Lock()
v =10
def task(arg):
    time.sleep(1)
    lock.acquire()
    global v
    v -= 1
    print(v)
    lock.release()

for i in range(10):
    t = threading.Thread(target=task,args=[i,])
    t.start()

2.多人使用锁:并发

lock = threading.BoundedSemaphore(3)
v =10
def task(arg):

    lock.acquire()
    time.sleep(1)
    global v
    v -= 1
    print(v)
    lock.release()

for i in range(10):
    t = threading.Thread(target=task,args=[i,])
    t.start()

3.事件锁(全部人解脱锁的限制)app

lock = threading.Event()

def task(arg):

    time.sleep(1)
    lock.wait()
    print(arg)

for i in range(10):
    t = threading.Thread(target=task,args=[i,])
    t.start()

while True:
    value = input(">>>")
    if value == '1':
        lock.set()

4.自定义解锁个数(为所欲为锁)socket

lock = threading.Condition()

def task(arg):
    time.sleep(1)
    lock.acquire()
    lock.wait()
    print("线程:%s" % arg)
    lock.release()

for i in range(10):
    t = threading.Thread(target=task,args=[i,])
    t.start()

while True:
    value = input(">>")
    lock.acquire()
    lock.notify(int(value))
    lock.release()

线程池:
模式一: 直接处理ui

def task(url):
    """
    任务执行两个操做:下载;保存本地
    """
    # response中封装了Http请求响应的全部数据
    # - response.url            请求的URL
    # - response.status_code    响应状态码
    # - response.text           响应内容(字符串格式)
    # - response.content        响应内容(字节格式)
    # 下载
    response = requests.get(url)

    # 下载内容保存至本地
    f = open('a.log','wb')
    f.write(response.content)
    f.close()

pool = ThreadPoolExecutor(2)
url_list = [
    'http://www.oldboyedu.com',
    'http://www.autohome.com.cn',
    'http://www.baidu.com',
]
for url in url_list:
    print('开始请求',url)
    # 去链接池中获取连接
    pool.submit(task,url)

模式二:分步处理url

def save(future):
    """
    只作保存  # future中包含response
    """
    response = future.result()

    # 下载内容保存至本地
    f = open('a.log','wb')
    f.write(response.content)
    f.close()

def task(url):
    """
    只作下载 requests
    """
    # response中封装了Http请求响应的全部数据
    # - response.url            请求的URL
    # - response.status_code    响应状态码
    # - response.text           响应内容(字符串格式)
    # - response.content        响应内容(字节格式)
    # 下载
    response = requests.get(url)
    return response

pool = ThreadPoolExecutor(2)
url_list = [
    'http://www.oldboyedu.com',
    'http://www.autohome.com.cn',
    'http://www.baidu.com',
]
for url in url_list:
    print('开始请求',url)
    # 去链接池中获取连接
    # future中包含response
    future = pool.submit(task,url)
    # 下载成功后,自动调用save方法
    future.add_done_callback(save)

2. 进程

进程的基本使用:spa

from multiprocessing import Process
import time
def task(arg):
    time.sleep(arg)
    print(arg)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=task,args=(i,))
        p.daemon = True
        # p.daemon = False
        p.start()
        p.join(1)

    print('主进程最后...')

进程之间的数据共享:经过Array(‘类型’,长度) 或者Manager().list() / Manager().dict()线程

from multiprocessing import Process,Array,Manager
from threading import Thread
'''
# 验证进程之间数据不共享
def task(num,li):
    li.append(num)
    print(li)

if __name__ == "__main__":
    v = []
    for i in range(10):
        p = Process(target=task,args=(i,v,))
        p.start()

'''

'''
# 进程间数据共享方式一:
def task(num,li):
    li[num] = 1
    print(list(li))

if __name__ == "__main__":
    v = Array('i',10)
    for i in range(10):
        p = Process(target=task,args=(i,v,))
        p.start()
'''

# 进程间数据共享方式二:经过socket
def task(num,li):
    li.append(num)
    print(li)

if __name__ == '__main__':
    v = Manager().list()
    # v = Manager().dict()
    for i in range(10):
        p = Process(target=task,args=(i,v,))
        p.start()
        p.join()

进程池的使用:

from concurrent.futures import ProcessPoolExecutor

def call(arg):
    data = arg.result()
    print(data)

def task(arg):
    return arg + 100

if __name__ == "__main__":
    pool = ProcessPoolExecutor(10)
    for i in range(100):
        obj = pool.submit(task,i)
        obj.add_done_callback(call)

3.协程

协程永远是一个进程在执行,是对线程的分片处理。
greenlet: python自带的协程模块

# 协程
from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

# 根据协程二次开发:协程+IO
from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    response = requests.get(url)
    print(response.url,response.status_code)

gevent.joinall([
        gevent.spawn(f, 'http://www.oldboyedu.com/'),
        gevent.spawn(f, 'http://www.baidu.com/'),
        gevent.spawn(f, 'http://github.com/'),
])

4. IO多路复用

IO多路复用,用于监听多个socket对象是否变化(可读,可写,发送错误)

import socket
import select

# IO多路复用:8002,8001
#
############### 基于select实现服务端的“伪”并发 ###############
sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001,))
sk1.listen(5)

sk2 = socket.socket()
sk2.bind(('127.0.0.1',8002,))
sk2.listen(5)
inputs = [sk1,sk2,]
w_inputs = []
while True:
    # IO多路复用,同时监听多个socket对象
    #    - select,内部进行循环操做(1024)  主动查看
    #    - poll, 内部进行循环操做         主动查看
    #    - epoll,                        被动告知
    r,w,e = select.select(inputs,w_inputs,inputs,0.05)
    # r = [sk2,]
    # r = [sk1,]
    # r = [sk1,sk2]
    # r = []
    # r = [conn,]
    # r = [sk1,Wconn]
    #######?
    for obj in r:
        if obj in [sk1,sk2]:
            # 新链接捡来了...
            print('新链接来了:',obj)
            conn,addr = obj.accept()
            inputs.append(conn)
        else:
            # 有链接用户发送消息来了..
            print('有用户发送数据了:',obj)
            try:
                data = obj.recv(1024)
            except Exception as ex:
                data = ""
            if data:
                w_inputs.append(obj)
                # obj.sendall(data)
            else:
                obj.close()
                inputs.remove(obj)
                w_inputs.remove(obj)

    for obj in w:
        obj.sendall(b'ok')
        w_inputs.remove(obj)

模拟socketserver:

import socket
import select
import threading

def process_request(conn):
    while True:
        v = conn.recv(1024)
        conn.sendall(b'1111')

sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001,))
sk1.listen(5)
inputs=[sk1,]
while True:
    # IO多路复用,同时监听多个socket对象
    #    - select,内部进行循环操做(1024)  主动查看
    #    - poll, 内部进行循环操做         主动查看
    #    - epoll,                        被动告知
    r,w,e = select.select(inputs,[],inputs,0.05)

    for obj in r:
        if obj in sk1:
            # conn客户端的socket
            conn,addr = obj.accept()
            t = threading.Thread(target=process_request,args=(conn,))
            t.start()
相关文章
相关标签/搜索