socket一般也称做"套接字",用于描述IP地址和端口,是一个通讯链的句柄,应用程序一般经过"套接字"向网络发出请求或者应答网络请求。java
socket起源于Unix,而Unix/Linux基本哲学之一就是“一切皆文件”,对于文件用【打开】【读写】【关闭】模式来操做。socket就是该模式的一个实现,socket便是一种特殊的文件,一些socket函数就是对其进行的操做(读/写IO、打开、关闭)python
socket和file的区别:算法
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',9999) sk = socket.socket() sk.bind(ip_port) sk.listen(5) while True: print 'server waiting...' conn,addr = sk.accept() client_data = conn.recv(1024) print client_data conn.sendall('不要回答,不要回答,不要回答') conn.close() socket server
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',9999) sk = socket.socket() sk.connect(ip_port) sk.sendall('请求占领地球') server_reply = sk.recv(1024) print server_reply sk.close() socket client
WEB服务应用:数组
#!/usr/bin/env python #coding:utf-8 import socket def handle_request(client): buf = client.recv(1024) client.send("HTTP/1.1 200 OK\r\n\r\n") client.send("Hello, World") def main(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('localhost',8080)) sock.listen(5) while True: connection, address = sock.accept() handle_request(connection) connection.close() if __name__ == '__main__': main()
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)服务器
参数一:地址簇 socket.AF_INET IPv4(默认) socket.AF_INET6 IPv6 socket.AF_UNIX 只可以用于单一的Unix系统进程间通讯 参数二:类型 socket.SOCK_STREAM 流式socket , for TCP (默认) socket.SOCK_DGRAM 数据报式socket , for UDP socket.SOCK_RAW 原始套接字,普通的套接字没法处理ICMP、IGMP等网络报文,而SOCK_RAW能够;其次,SOCK_RAW也能够处理特殊的IPv4报文;此外,利用原始套接字,能够经过IP_HDRINCL套接字选项由用户构造IP头。 socket.SOCK_RDM 是一种可靠的UDP形式,即保证交付数据报但不保证顺序。SOCK_RAM用来提供对原始协议的低级访问,在须要执行某些特殊操做时使用,如发送ICMP报文。SOCK_RAM一般仅限于高级用户或管理员运行的程序使用。 socket.SOCK_SEQPACKET 可靠的连续数据包服务 参数三:协议 0 (默认)与特定的地址家族相关的协议,若是是 0 ,则系统就会根据地址格式和套接类别,自动选择一个合适的协议
import socket ip_port = ('127.0.0.1',9999) sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0) sk.bind(ip_port) while True: data = sk.recv(1024) print data import socket ip_port = ('127.0.0.1',9999) sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0) while True: inp = raw_input('数据:').strip() if inp == 'exit': break sk.sendto(inp,ip_port) sk.close()
sk.bind(address)网络
s.bind(address) 将套接字绑定到地址。address地址的格式取决于地址族。在AF_INET下,以元组(host,port)的形式表示地址。
sk.listen(backlog)数据结构
开始监听传入链接。backlog指定在拒绝链接以前,能够挂起的最大链接数量。 backlog等于5,表示内核已经接到了链接请求,但服务器尚未调用accept进行处理的链接个数最大为5 这个值不能无限大,由于要在内核中维护链接队列
sk.setblocking(bool)多线程
是否阻塞(默认True),若是设置False,那么accept和recv时一旦无数据,则报错。
sk.accept()app
接受链接并返回(conn,address),其中conn是新的套接字对象,能够用来接收和发送数据。address是链接客户端的地址。 接收TCP 客户的链接(阻塞式)等待链接的到来
sk.connect(address)socket
链接到address处的套接字。通常,address的格式为元组(hostname,port),若是链接出错,返回socket.error错误。
sk.connect_ex(address)
同上,只不过会有返回值,链接成功时返回 0 ,链接失败时候返回编码,例如:10061
sk.close()
关闭套接字
sk.recv(bufsize[,flag])
接受套接字的数据。数据以字符串形式返回,bufsize指定最多能够接收的数量。flag提供有关消息的其余信息,一般能够忽略。
sk.recvfrom(bufsize[.flag])
与recv()相似,但返回值是(data,address)。其中data是包含接收数据的字符串,address是发送数据的套接字地址。
sk.send(string[,flag])
将string中的数据发送到链接的套接字。返回值是要发送的字节数量,该数量可能小于string的字节大小。
sk.sendall(string[,flag])
将string中的数据发送到链接的套接字,但在返回以前会尝试发送全部数据。成功返回None,失败则抛出异常。
sk.sendto(string[,flag],address)
将数据发送到套接字,address是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。该函数主要用于UDP协议。
sk.settimeout(timeout)
设置套接字操做的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。通常,超时期应该在刚建立套接字时设置,由于它们可能用于链接的操做(如 client 链接最多等待5s )
sk.getpeername()
返回链接套接字的远程地址。返回值一般是元组(ipaddr,port)。
sk.getsockname()
返回套接字本身的地址。一般是一个元组(ipaddr,port)
sk.fileno()
套接字的文件描述符
import socket ip_port = ('127.0.0.1',9999) sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0) sk.bind(ip_port) while True: data = sk.recv(1024) print data import socket ip_port = ('127.0.0.1',9999) sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0) while True: inp = raw_input('数据:').strip() if inp == 'exit': break sk.sendto(inp,ip_port) sk.close() UDP Demo
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8888) sk = socket.socket() sk.bind(ip_port) sk.listen(5) while True: conn,address = sk.accept() conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.') Flag = True while Flag: data = conn.recv(1024) if data == 'exit': Flag = False elif data == '0': conn.sendall('经过可能会被录音.balabala一大推') else: conn.sendall('请从新输入.') conn.close() 服务端
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8005) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input:') sk.sendall(inp) if inp == 'exit': break sk.close() 客户端
对于默认Socket服务端处理客户端请求时,按照阻塞方式依次处理请求,SocketServer实现同事处理多个请求。
#!/usr/bin/env python # -*- coding:utf-8 -*- import SocketServer class MyServer(SocketServer.BaseRequestHandler): def handle(self): # print self.request,self.client_address,self.server conn = self.request conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.') Flag = True while Flag: data = conn.recv(1024) if data == 'exit': Flag = False elif data == '0': conn.sendall('经过可能会被录音.balabala一大推') else: conn.sendall('请从新输入.') if __name__ == '__main__': server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer) server.serve_forever()
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8009) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input:') sk.sendall(inp) if inp == 'exit': break sk.close() 客户端
从剖析上述源码执行流程,对源码精简以下:
import socket import threading import select def process(request, client_address): print request,client_address conn = request conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.') flag = True while flag: data = conn.recv(1024) if data == 'exit': flag = False elif data == '0': conn.sendall('经过可能会被录音.balabala一大推') else: conn.sendall('请从新输入.') sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sk.bind(('127.0.0.1',8002)) sk.listen(5) while True: r, w, e = select.select([sk,],[],[],1) print 'looping' if sk in r: print 'get request' request, client_address = sk.accept() t = threading.Thread(target=process, args=(request, client_address)) t.daemon = False t.start() sk.close()
如精简代码能够看出,SocketServer之因此能够同时处理请求得益于 select 和 Threading 两个东西,其实本质上就是在服务器端为每个客户端建立一个线程,当前线程用来处理对应客户端的请求,因此,能够支持同时n个客户端连接(长链接)。
#!/usr/bin/env python #coding:utf-8 import SocketServer import os class MyServer(SocketServer.BaseRequestHandler): def handle(self): base_path = 'G:/temp' conn = self.request print 'connected...' while True: pre_data = conn.recv(1024) #获取请求方法、文件名、文件大小 cmd,file_name,file_size = pre_data.split('|') # 防止粘包,给客户端发送一个信号。 conn.sendall('nothing') #已经接收文件的大小 recv_size = 0 #上传文件路径拼接 file_dir = os.path.join(base_path,file_name) f = file(file_dir,'wb') Flag = True while Flag: #未上传完毕, if int(file_size)>recv_size: #最多接收1024,可能接收的小于1024 data = conn.recv(1024) recv_size+=len(data) #写入文件 f.write(data) #上传完毕,则退出循环 else: recv_size = 0 Flag = False print 'upload successed.' f.close() instance = SocketServer.ThreadingTCPServer(('127.0.0.1',9999),MyServer) instance.serve_forever() FTP上传文件(服务端)
#!/usr/bin/env python #coding:utf-8 import socket import sys import os ip_port = ('127.0.0.1',9999) sk = socket.socket() sk.connect(ip_port) container = {'key':'','data':''} while True: # 客户端输入要上传文件的路径 input = raw_input('path:') # 根据路径获取文件名 file_name = os.path.basename(path) # 获取文件大小 file_size=os.stat(path).st_size # 发送文件名 和 文件大小 sk.send(file_name+'|'+str(file_size)) # 为了防止粘包,将文件名和大小发送过去以后,等待服务端收到,直到从服务端接受一个信号(说明服务端已经收到) sk.recv(1024) send_size = 0 f= file(path,'rb') Flag = True while Flag: if send_size + 1024 >file_size: data = f.read(file_size-send_size) Flag = False else: data = f.read(1024) send_size+=1024 sk.send(data) f.close() sk.close() FTP上传文件(客户端)
1023M = send(1g数据) 那么实际是发送了 1023M,其余 1M 就是漏发了
sendall(1g数据) 第一次: send(1023M) 第二次: send(1M) ========== 发送大文件时候,不可能所有读1G内存,须要open文件时,一点一点读,而后再发。
file_size=os.stat(文件路径).st_size
f = file(文件路径,'rb')
send_size = 0
while Flag: # 大文件只剩下 不到 1024 字节,其余已经被发送。 if send_size + 1024 > file_size: # 从大文件中读取小于 1024字节,多是 10字节... data = f.read(file_size-send_size) Flag = False else: # 从大文件中读取 1024 字节 data = f.read(1024) # 记录已经发送了多少字节 send_size += 1024 # 将大文件中的数据,分批发送到缓冲区,每次最多发 1024 字节 sk.sendall(data)
Linux中的 select,poll,epoll 都是IO多路复用的机制。
I/O多路复用指:经过一种机制,能够监视多个描述符,一旦某个描述符就绪(通常是读就绪或者写就绪),可以通知程序进行相应的读写操做。
select select最先于1983年出如今4.2BSD中,它经过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程能够得到这些文件描述符从而进行后续的读写操做。 select目前几乎在全部的平台上支持,其良好跨平台支持也是它的一个优势,事实上从如今看来,这也是它所剩很少的优势之一。 select的一个缺点在于单个进程可以监视的文件描述符的数量存在最大限制,在Linux上通常为1024,不过能够经过修改宏定义甚至从新编译内核的方式提高这一限制。 另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增加。同时,因为网络响应时间的延迟使得大量TCP链接处于非活跃状态,但调用select()会对全部socket进行一次线性扫描,因此这也浪费了必定的开销。 poll poll在1986年诞生于System V Release 3,它和select在本质上没有多大差异,可是poll没有最大文件描述符数量的限制。 poll和select一样存在一个缺点就是,包含大量文件描述符的数组被总体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增长而线性增大。 另外,select()和poll()将就绪的文件描述符告诉进程后,若是进程没有对其进行IO操做,那么下次调用select()和poll()的时候将再次报告这些文件描述符,因此它们通常不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。 epoll 直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具有了以前所说的一切优势,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。 epoll能够同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,若是咱们没有采起行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,可是代码实现至关复杂。 epoll一样只告知那些就绪的文件描述符,并且当咱们调用epoll_wait()得到就绪文件描述符时,返回的不是实际的描述符,而是一个表明就绪描述符数量的值,你只须要去epoll指定的一个数组中依次取得相应数量的文件描述符便可,这里也使用了内存映射(mmap)技术,这样便完全省掉了这些文件描述符在系统调用时复制的开销。 另外一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用必定的方法后,内核才对全部监视的文件描述符进行扫描,而epoll事先经过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用相似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便获得通知。
Python select 用于监听多个文件描述符:
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket import threading import select def process(request, client_address): print request,client_address conn = request conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.') flag = True while flag: data = conn.recv(1024) if data == 'exit': flag = False elif data == '0': conn.sendall('经过可能会被录音.balabala一大推') else: conn.sendall('请从新输入.') s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s1.bind(('127.0.0.1',8020)) s1.listen(5) s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s2.bind(('127.0.0.1',8021)) s2.listen(5) while True: r, w, e = select.select([s1,s2,],[],[],1) print 'looping' for s in r: print 'get request' request, client_address = s.accept() t = threading.Thread(target=process, args=(request, client_address)) t.daemon = False t.start() s1.close() s2.close() 服务端
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8020) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input:') sk.sendall(inp) if inp == 'exit': break sk.close() 客户端:8020
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8021) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input:') sk.sendall(inp) if inp == 'exit': break sk.close() 客户端:8021
问答:
一、Python线程
Threading用于提供线程相关的操做,线程是应用程序中工做的最小单元。
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time def show(arg): time.sleep(1) print 'thread'+str(arg) for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print 'main thread stop'
上述代码建立了10个“前台”线程,而后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。
更多方法:
因为线程之间是进行随机调度,而且每一个线程可能只执行n条执行以后,CPU接着执行其余线程。因此,可能出现以下问题:
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time gl_num = 0 def show(arg): global gl_num time.sleep(1) gl_num +=1 print gl_num for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print 'main thread stop' 未使用线程锁
#!/usr/bin/env python #coding:utf-8 import threading import time gl_num = 0 lock = threading.RLock() def Func(): lock.acquire() global gl_num gl_num +=1 time.sleep(1) print gl_num lock.release() for i in range(10): t = threading.Thread(target=Func) t.start()
一、建立多进程程序
from multiprocessing import Process import threading import time def foo(i): print 'say hi',i for i in range(10): p = Process(target=foo,args=(i,)) p.start()
注意:因为进程之间的数据须要各自持有一份,因此建立进程须要的很是大的开销。
进程各自持有一份数据,默认没法共享数据
#!/usr/bin/env python #coding:utf-8 from multiprocessing import Process from multiprocessing import Manager import time li = [] def foo(i): li.append(i) print 'say hi',li for i in range(10): p = Process(target=foo,args=(i,)) p.start() print 'ending',li
#方法一,Array from multiprocessing import Process,Array temp = Array('i', [11,22,33,44]) def Foo(i): temp[i] = 100+i for item in temp: print i,'----->',item for i in range(2): p = Process(target=Foo,args=(i,)) p.start() p.join() #方法二:manage.dict()共享数据 from multiprocessing import Process,Manager manage = Manager() dic = manage.dict() def Foo(i): dic[i] = 100+i print dic.values() for i in range(2): p = Process(target=Foo,args=(i,)) p.start() p.join() 进程间共享数据
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process,Pool import time def Foo(i): time.sleep(2) return i+100 def Bar(arg): print arg pool = Pool(5) #print pool.apply(Foo,(1,)) #print pool.apply_async(func =Foo, args=(1,)).get() for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) print 'end' pool.close() pool.join()