# 单例模式 class A: __isinstace = None def __init__(self): print('init') # __new__方法是为对象在内存中开辟内存空间,__init__是为该空间封装属性 def __new__(cls, *args, **kwargs): print('new') if not cls.__isinstace: cls.__isinstace = super().__new__(cls) return cls.__isinstace if __name__ == '__main__': a = A() aa = A() print(id(a)) print(id(aa))
class A: def __init__(self, v): self.v = v def __getitem__(self, key): print('getitem') def __setitem__(self, key, value): print('setitem') def __getattr__(self, key): print('getattr') def __setattr__(self, key, value): print('setattr') def __delitem__(self, key): print('delitem') def __delattr__(self, key): print('delattr') if __name__ == '__main__': a = A(1) a['v'] = 1 a.v = 2 v = a.v vv = a['v'] del a.v del a['v']
class A: def __init__(self): print('init') def __enter__(self): print('enter') return 1 def __exit__(self, exception_type, exception_value, traceback): print('exit') if __name__ == '__main__': # a 为__enter__方法返回的值 with A() as a: print(a)
首先获取目标的全部方法、属性集,而后再获取。python
attr()方法就是经过参数获取相应对象中的值。编程
其中一个很重要的功能就是hasattr()和getattr(),由于python一切皆对象,因此说全部的属性和方法是以字典的形式保存的,即key-->value,键名对应着值。若要获取一个对象的属性的值,就是前往该对象的属性、方法集合中去寻找相应的键,若存在就返回值,不然就报错,若调用getattr就会作相应的错误处理(返回一句提示);若要更新一个对象中的值,则是前往属性、方法集合中去寻找相应的键,并更新值,若键不存在就更新集合,添加一个键,并默认为其赋予默认的值。缓存
class A: def __init__(self, value): self.value = value if __name__ == '__main__': a = A(1) if hasattr(a, 'b'): getattr(a, 'b') else: print('not find') print(getattr(a, 'b', 'not find')) def func1(): print('func1') def fun2(): print('func2') # globals()获取当前文件中全部的方法和属性 # print(globals()['func1']()) import sys # 获取当前主进程模块 obj = sys.modules['__main__'] print(obj.__dict__)
python中因为GIL(全局解释锁)的存在,一次只能执行一个进程,并不能达到真正意义上的多进程安全
# 多进程 from multiprocessing import Process, Pool,Manager, Lock import time def func1(): print('fucn1') def func2(): print('func2') def func3(i): print('接收', i) time.sleep(1) print('结束', i) def func3(queue, i, lock): lock.acquire() queue.put(f'放入{i}') print(f'成功放入{i}') time.sleep(2) lock.release() def func4(queue): data = queue.get() print('拿出数据:', data) time.sleep(0.5) class MyProcess(Process): def __init__(self, args): self.args = args super().__init__(target=self.run, args=args) def run(self): print('获取', self.args[0]) time.sleep(1) print('结束', self.args[0]) if __name__ == '__main__': # p1 = Process(target=func1) # p2 = Process(target=func2) start = time.time() # 进程池 # processes = [Process(target=func3, args=(i, )) for i in range(100)] # pool = Pool() # for i in range(100): # pool.apply_async(func3, (i,)) # # pool.close() # pool.join() # for p in processes: # p.start() # # for p in processes: # p.join() # p1.start() # p2.start() # # p1.join() # p2.join() # 继承实现 # processes = [MyProcess(args=(i,)) for i in range(100)] # for p in processes: # p.start() # for p in processes: # p.join() # 进程间通讯 queue = Manager().Queue() lock = Lock() processes = [Process(target=func3, args=(queue, i, lock)) for i in range(50)] processes += [Process(target=func4, args=(queue,)) for _ in range(40)] for p in processes: p.start() for p in processes: p.join() # 锁 end = time.time() print('主进程结束, 耗时:', end-start)
# 多线程 from threading import Thread import time a = 0 def fun1(): print('func1') def fun2(): print('func2') def fun3(): global a print('计算开始:', a) for i in range(1000000): a += 1 print('计算结束:', a) def func4(): global a print('计算开始', a) for i in range(100000): a -= 1 print('计算结果', a) class MyThread(Thread): def run(self): print('MyThread') time.sleep(1) if __name__ == '__main__': start = time.time() # 单线程 # t1 = Thread(target=fun1) # t2 = Thread(target=fun2) # # t1.start() # t2.start() # # t1.join() # t2.join() # 多线程 # threads = [MyThread() for _ in range(100)] # # for t in threads: # t.start() # for t in threads: # t.join() # 多线程通讯, 以及多线程之间的并发所形成的数据有误 threads = [Thread(target=fun3) for _ in range(100)] # threads += [Thread(target=func4) for _ in range(100)] for t in threads: t.start() for t in threads: t.join() end = time.time() print('最终结果:',a) print('耗时:', end-start)
表示同一时间内最多几个线程为就绪状态,准备被系统调度运行。服务器
from threading import Thread, Semaphore import time def func1(i): s.acquire() print(f'开始执行{i}') time.sleep(1) print(f'结束行执行{i}') s.release() if __name__ == "__main__": start = time.time() s = Semaphore(5) threads = [Thread(target=func1, args=(i, )) for i in range(100)] for t in threads: t.start() for i in threads: t.join() end = time.time() print('执行完成,耗时',end - start)
按照指定顺序执行程序即为同步,不然就为异步。网络
from threading import Thread, Lock import time def func1(): while True: lock1.acquire() print('func1') time.sleep(0.5) lock2.release() def func2(): while True: lock2.acquire() print('func2') time.sleep(0.5) lock3.release() def func3(): while True: lock3.acquire() print('func3') time.sleep(0.5) lock1.release() if __name__ == "__main__": lock1 = Lock() lock2 = Lock() lock2.acquire() lock3 = Lock() lock3.acquire() t1 = Thread(target=func1) t2 = Thread(target=func2) t3 = Thread(target=func3) t1.start() t2.start() t3.start()
在OSI7层模型的4层模型中,应用层一下的都是由socket封装。多线程
# tcp 服务端 from socket import * # tcp s = socket(AF_INET, SOCK_STREAM) s.bind(('localhost', 9999)) s.listen() new_socket = s.accept() # 返回的是一个套接字和客户端的ip+端口号 # socket, addr = new_socket print(new_socket) data = new_socket[0].recv(1024) # socket.send(message.encode('utf-8')) print(data[1][0]) if len(data) == 0: # 当客户端断开链接时会发送一个长度为0的数据,当检测到长度为0时,说明客户端已经断开链接p new_socket[0].close() s.close() # 客户端 from socket import * s = socket(AF_INET, SOCK_STREAM) s.connect(('localhost', 9999)) # 客户端请求链接,链接成功后就能够收发消息了 s.send('hhhhh'.encode('utf-8')) s.recv(1024) s.close()
# 服务端 from socket import * s = socket(AF_INET, SOCK_DGRAM) s.bind(('localhost', 9999)) data = s.recvfrom(1024) print(data) s.close() # 客户端 s = socket(AF_INET, SOCK_DGRAM) s.sendto('hhhh'.encode('utf-8'), ('localhost', 9999)) s.close()
from socket import * from threading import Thread import time # 客户端/服务端(udp) class Send(Thread): def run(self): while True: data = input('>>>') s.sendto(data.encode('utf-8'), addr) time.sleep(0.5) class Receive(Thread): def run(self): while True: data = s.recvfrom(1024) print(data[0].decode('utf-8')) time.sleep(0.5) if __name__ =='__main__': s = socket(AF_INET, SOCK_DGRAM) addr = ('localhost', 9999) s.bind(('localhost', 8888)) send = Send() rece = Receive() try: send.start() rece.start() send.join() rece.join() finally: s.close()
线程实现并发
from socket import * from threading import Thread import time def deal_with_client(socket, addr): while True: data = socket.recv(1024) if len(data) == 0: break print(data.decode('utf-8')) time.sleep(0.5) socket.close() if __name__ == '__main__': s = socket(AF_INET, SOCK_STREAM) s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) # 通常来讲一个线程占用一个端口,经过设置这个使得多个线程能够访问同一个端口 s.bind(('localhost', 8888)) s.listen() while True: new_socket, new_addr = s.accept() p = Thread(target=deal_with_client, args=(new_socket, new_addr)) p.start() s.close()