过滤列表中的负数
方法一:node
from random import randint data = [randint(-10, 10) for _ in xrange(10)] filter(lambda x: x>=0, data)
方法二:python
[x for x in data if x >= 0]
时间的比较:
timeit [x for x in data if x >= 0]
1000000 loops, best of 3: 469 ns per loop
timeit filter(lambda x: x >= 0, data)
1000000 loops, best of 3: 1.55 µs per loo
能够看出列表解析更快一些git
筛出字典中的高于90的值web
d = {x: randint(60, 100) for x in xrange(1, 21)} data = {k: v for k, v in d.iteritems() if v > 90}
集合的筛选json
data = [randint(-10, 10) for _ in xrange(10)] s = set(data) r = {x for x in s if x % 3 == 0}
为元组中的元素命名api
from collections import namedtuple Student = namedtuple('Student', ['name', 'age', 'sex', 'email']) s = Student('Jim', 16, 'male', 'jim8721@gmail.com') isinstance(s, tuple)
统计序列中元素出现的频度的前三个
方法一:数据结构
import random data = [random.randint(0, 20) for x in range(30)] c = dict.fromkeys(data, 0) for x in data: c[x] += 1 sorted(c.tems(), key = lambda x:x[1], reverse = True)[:3]
方法二:多线程
from collections import Counter c2 = Counter(data) c2.most_common(3)
字典排序的两种方式app
d = {x: random.randint(60, 100) for x in 'qwerasdf'} sorted(zip(d.itervalues(), d.iterkeys())) sorted(d.items(), key=lambda x: x[1])
字典的小应用
相同键的排序:less
a_dic = {'a':{'val':3}, 'b':{'val':4}, 'c':{'val':1}} dict= sorted(a_dic.iteritems(), key=lambda d:d[1]['val'], reverse = True)
不一样键的排序:
a = {'a':{'val':3}, 'b':{'val':4}, 'c':{'val':1}, 'd':{'val2':0}} dict= sorted(a.iteritems(), key=lambda d:d[1].get('val',0), reverse = True)
功能: iteritems()以迭代器对象返回字典键值对
区别: 和item相比:items以列表形式返回字典键值对
快速找到字典中的公共键
from random import randint, sample s1 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))} s2 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))} s3 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))}
方法一:
res = [] for k in s1: if k in s2 and k in s3: res.append(k)
方法二:
s1.keys() & s2.keys() & s3.keys()
方法三:
reduce(lambda a, b: a & b, map(dict.viewkeys, [s1, s2, s3]))
让字典保持有序
from time import time from random import randint from collections import OrderedDict d = OrderedDict() players = list('ABCDEFGH') start = time() for i in xrange(8): raw_input() p = players.pop(randint(0, 7 - i)) end = time() print i + 1, p, end - start d[p] = (i + 1, end - start) print('_' * 30) for k in d: print(k, d[k])
实现用户的历史记录
from random import randint from collections import deque N = randint(0, 100) history = deque([], 5) def guess(k): if k == N: print 'right' return True if k < N: print '%s is less-than N' % k else: print '%s is greater than N' % k return False while True: line = raw_input('please input a number: ') if line.isdigit(): k = int(line) history.append(k) if guess(k): break elif line == 'history' or line == 'h?': print list(history)
def getWeather(city): r = requests.get(u'http://wthrcdn.etouch.cn/weather_mini?city=' + city) data = r.json()['data']['forecast'][0] return '%s: %s, %s' %(city, data['low'], data['high']) print(getWeather('北京')) print(getWeather('上海'))
class WeatherIterator(Iterator): def __init__(self, cities): self.cities = cities self.index = 0 def getWeather(self, city): r = requests.get(u'http://wthrcdn.etouch.cn/weather_mini?city=' + city) data = r.json()['data']['forecast'][0] return '%s: %s, %s' %(city, data['low'], data['high']) def next(self): if self.index == len(self.cities): raise StopIteration city = self.cities[self.index] self.index += 1 return self.getWeather(city)
class WeatherIterable(Iterable): def __init__(self, cities): self.cities = cities def __iter__(self): return WeatherIterator(self.cities) for x in WeatherIterable([u'北京', u'上海', u'广州', u'长春']): print x
def f(): print 'in f(), 1' yield 1 print 'in f(), 2' yield 2 print 'in f(), 3' yield 3 g = f() print g.next() for x in g: print(x) g.__iter__() is g
class PrimeNumber: def __init__(self, start, end): self.start = start self.end = end def isPrimeNum(self, k): if k < 2: return False for i in xrange(2, k): if k % i == 0: return False return True def __iter__(self): for k in xrange(self.start, self.end + 1): if self.isPrimeNum(k): yield k for x in PrimeNumber(1, 100): print x
class FloatRange: def __init__(self, start, end, step=0.1): self.start = start self.end = end self.step = step def __iter__(self): t = self.start while t <= self.end: yield t t += self.step def __reversed__(self): t = self.end while t >= self.start: yield t t-= self.step for x in reversed(FloatRange(1.0, 4.0, 0.5)): print x
l = range(20) t = iter(l) for x in lslice(t, 5, 10) print x for x in t: print t
chinese = [randint(60, 100) for x in xrange(40)] math = [randint(60, 100) for x in xrange(40)] english = [randint(60, 100) for x in xrange(40)]
方法一:
for i in xrange(len(math)): chinese[i] + math[i] + english[i]
方法二:并行迭代
total = [] for c, m, e in zip(chinese, math, english): total.append(c + m + e)
e1 = [randint(60, 100) for x in xrange(40)] e2 = [randint(60, 100) for x in xrange(43)] e3 = [randint(60, 100) for x in xrange(45)] e4 = [randint(60, 100) for x in xrange(44)] from itertools import chain count = 0 for s in chain(e1, e2, e3, e4): if s> 90: count += 1
def mySplit(s, ds): res = [s] for d in ds: t = [] map(lambda x: t.extend(x.split(d)), res) res = t return res s = 'ab;cd|efg|hi,jkl|mn\topq;rst,uvw\txyz' print(mySplit(s, ';|\t'))
若s = ‘ab;cd|efg|hi,jkl|mn\topq;rst,uvw\txyz’
则
def mySplit(s, ds): res = [s] for d in ds: t = [] map(lambda x: t.extend(x.split(d)), res) res = t return [x for x in res if x] s = 'ab;cd|efg|hi,jkl|mn\topq;rst,uvw\txyz' print mySplit(s, ';|\t')
方法二:
import re re.split(r'[,;\t|]+', s)
import os, stat f = [name for name in os.listdir('.') if name.endswith(('.sh', '.py'))] for x in f: os.chmod('e.py', os.stat('e.py').st_mode | stat.S_IXUSR)
更改日期的格式
import re re.sub('((\d{4})-(\d{2})-(\d{2})', r'\2/\3/\1', log) re.sub('(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})', r'\g<month>/\g<day>/\g<year>', log)
str.__add__(s1, s2) l = [x for x in 'abcd']
方法一:
s = '' for x in l: s += x
占用空间比较大
方法二:
''.join(l)
若l = [‘abcd’, 123, 45, ‘xyz’]
''.join([str(x) for x in l]) 列表解析开销大 ''.join(str(x) for x in l)生成式开销比较小
dict = {'Alice': '2341', 'Beth': '9102', 'Cecil': '3258'} w = max(map(len, d.keys())) for k in d: print k.ljust(w), ':', d[k] s.ljust(w) = format(s, '<w') s.rjust(w) = format(s, '>w') s.center(w) = format(s, '^w')
方法一: s.strip() 方法二: s[:] + s[:] 方法三: s.replace('\t', '') 只能去掉一种 re.sub('[]', '', s) 方法四: 如s = 'abc123321xyz' import string string.maketrans('abcxyz', 'xyzabc') 可转换加密 s.translate(string.maketrans('abcxyz', 'xyzabc')) s = 'abc\refg\n234\t' s.translate(None, '\t\r\n') seq = ('name', 'age', 'sex') dict = dict.fromkeys(seq) dict.fromkeys(seq, 10)
python2: f = open('py2.txt', 'w') s = u'你好' f.write(s.encode('gbk')) f.close() f = open('py2.txt', 'r') t = f.read() print t.decode('gbk') python3: f = open('py3.txt', 'wt', encoding='utf8') f.write('你好') f.close() f = open('py3.txt', 'rt', encoding='utf8') s = f.read() print(s)
f = open('demo.wav', 'rb') info = f.read(44) import struct struct.unpack('h', '\x01\x02') struct.unpack('>h', '\x01\x02') import array array.array('h', ) f.seek(0, 2) f.tell() n = (f.tell() - 44) / 2 buf = array.array('h', (0 for _ in xrange(n)) f.seek(44) f.readinto(buf) 缩小文件 for i in xrange(n): buf[i] /= 8 f2 = open('demo2.wav', 'wb') f2.write(info) buf.tofile(f2) f2.close()
f = open('demo1.txt', 'w') 全缓冲 f = open('demo2.txt', 'w', buffering=1) 行缓冲 f = open('demo3.txt', 'w', buffering=0) 无缓冲
import mmap f = open('demo.bin', 'r+b') m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE) m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE, offset=mmap.PAGESIZE * 4)
import os s = os.stat('a.txt') os.lstat('x.txt') f = open('a.txt', 'r') f.fileno() = os.fstat s.st_mode bin(s.st_mode) import stat stat.S_ISDIR(s.st_mode) stat.S_ISREG(s.st_mode) s.st_mode & stat.S_IRUSR s.st_mode & stat.S_IXUSR s.st_atime import time time.localtime(s.st_atime) s.st_size import os os.path.isdir('demo.bin') os.path.islink('demo.bin') os.path.isfile('demo.bin') os.path.getatime('demo.bin') os.path.getsize('demo.bin')
from tempfile import TemporaryFile, NamedTemporaryFile f = TemporaryFile() f.write('abcdefg' * 100000) f.seek(0) ntf = NamedTemporaryFile() ntf.name ntf = NamedTemporaryFile(delete=False)
import csv with open('pingan.csv', 'rb')as rf: reader = csv.reader(rf) with open('pingan2.csv', 'wb')as wf: writer = csv.writer(wf) headers = reader.next() writer.writerow(headers) for row in reader: if row[0] < '2016-01-01': break if int(row[5]) >= 50000000: writer.writerow(row)
from record import Record record = Record(channels=1) audioData = record.record(2)
from secret import API_KEY, SECRET__KEYS authUrl = 'https://openapi.baudu.com/oauth/2.0/token?grant_type=client_credentials$client_id=' + API_KEY + '&client_secret=' + SECRET_KEY; response = requests.get(authUrl) res = json.loads(response.countent) token = res['access_token']
cuid = 'xxxxxxx' srvUrl = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token httpHeader = { 'Content-Type':'audio/wav; rate = 8000', } response = requests.post(srvUrl, headers=httpHeader, data=audioData) res = json.loads(response.content) text = res['result'][0] print('\n识别结果:') print(text)
import json l = [1, 2, 'abc', {'name': 'Bob', 'age':13}] json.dumps(l) d = {'b':None, 'a':5, 'c': 'abc'} json.dumps(d) json.dumps(l, separators=[',', ':']) 去空格 json.dumps(d, sort_keys=True) 排序 l2 = json.loads('[1, 2, "abc", {"age": 13, "name": "Bob"}]') d2 = json.loads('{"a": 5, "c": "abc", "b": null}') with open('demo.json', 'wb')as f: json.dump(l, f)
from xml.etree.ElementTree import parse f = open('demo.xml') et = parse(f) root = et.getroot() root.tag root.attrib root.text.strip() root.getchildren() for child in root: print child.get('name') root.find('country') root.findall('country') root.iterfind('country') for e in root.iterfind('country'): print e.get('name') root.findall('rank') root.iter() list(root.iter()) list(root.iter('rank') root.findall('country/*') root.findall('.//rank') root.findall('.//rank/..') root.findall('country[@name]') root.findall('country[@name="Singapore"]) root.findall('country[rank="5"]') root.findall('country[last()'] root.findall('country[last()-1']
from xml.etree.ElementTree import Element, ElementTree e = Element('Data') e.tag e.set('name', 'abc') from xml.etree.ElementTree import tostring tostring(e) e.text = '123' tostring(e) e2 = Element('Row') e3 = Element('open') e3.text = '8.80' e2.append(e3) tostring(e2) e.text = None e.append(e2) tostring(e)
import csv from xml.etree.ElementTree import Element, ElementTree def csvToXml(fname): with open(fname, 'rb')ad f: reader = csv.reader(f) headers = reader.next() root = Element('Data') for row in reader: eRow = Element('Row') root.append(eRow) for tag, text in zip(headers, row): e = Element(tag) e.text = text eRow.append(e) pretty(root) return ElemetnTree(root) def pretty(e, level=0): if len(e) > 0: e.text = '\n' + '\t' * (level + 1) for child in e: pretty(child, level + 1) child.tail = child.tail[:-1] e.tail = '\n' + '\t' * level et = csvToXml('pingan.csv') et.write('pingan.xml')
import xlrd book = xlrd.open_workbook('demo.xlsx') book.sheets() sheet = book.sheet_by_index(0) sheet.nrows sheet.ncols cell = sheet.cell(0, 0) cell.ctype xlrd.XL_CELL_TEXT xlrd.XL_CELL_NUNBER print cell.value sheet.row(1) sheet.row_values(1) sheet.row_values(1, 1) sheet.put_cell import xlwt wbook = xlwt.Workbook() wsheet = wbook.add_sheet('sheet') wsheet.write wbook.save('output.xlsx')
#coding:utf8 import xlrd, xlwt rbook = xlrd.open_workbook('deml.xlsx') rsheet = rbook.sheet_by_index(0) nc = rsheet.ncols rsheet.put_cell(0, nc, xlrd.XL_CELL_TEXT, u'总分', None) for row in xrange(1, rsheet,nrows): rsheet.row_values(row, 1) rsheet.put_cell(row, nc, xlrd.XL_CELL_NUMBER, t, None) wbook = xlwt.Workbook() wsheet = wbook.add_sheet(rsheet.name) style = xlwt.easyxf('align: vertical center, horizontal center') for r in xrange(rsheet.nrows): for c in xrange(rsheet.ncols): wsheet.write(r, c, rsheet.cell_value) wbook.save('output.xlsx')
class IntTuple(tuple): def __new__(cls, iterable): g = (x for x in iterable if isinstance(x, int) and x >0) return super(IntTuple, cls).__new__(cls, g) def __init__(self, iterable): # before print self super(IntTuple, self).__init(iterable) # after t = IntTuple([1, -1, 'abc', 6, ['x', 'y'], 3]) print t
class Player(object): def __init(self, uid, name, status=0, level=1): self.uid = uid self.name = name self.stat = status self.level = level class Player2(object): __slots__ = ['uid', 'name', 'stat', 'level'] def __init(self, uid, name, status=0, level=1): self.uid = uid self.name = name self.stat = status self.level = level from e import Player, Player2 p1 = Player('0001', 'Jim') p2 = Player('0001', 'Jim') set(dir(p1)) - set(dir(p2)) p1.__dict__ import sys sys.getsizeof(p1.__dict__)
with open('demo.txt', 'w')as f: f.write('abcdef') f.writelines(['xyz\n', '123\n']) from telnetlib import Telnet from sys import stdin, stdout from collections import deque class TelnetClient(object): def __init__(self, addr, port=23): self.addr = addr self.port = port self.tn = None def start(self): raise Exception('Test') # user t = self.tn.read_until('login:') stdout.write(t) user = stdin.readline() self.tn.write(user) # password t = self.tn.read_until('Password:') if t.startswith(user[:-1]): t = t[len(user) + 1:] stdout.write(t) self.tn.write(stdin.readline()) t = self.tn.read_until('$ ') stdout.write(t) while True: uinput = stdin.readline() if not uinput: break self.history.append(uinput) self.tn.write(uinput) t = self.tn.read_until('$ ') stdout.write(t[len(uinput) + 1:]) def cleanup(self): pass def __enter__(self): self.tn = Telnet(self.addr, self.port) self.history = deque() return self def __exit__(self, exc_type, exc_val, exc_tb): print('In __exit__') self.tn.close() self.tn = None with open(self.addr + '_history.txt', 'w')as f: f.writelines(self.history) retunr True with TelnetClient('127.0.0.1')as client: client.start() print('END') ''' client = TelnetClient('127.0.0.1') print '\nstat...' client.start() print '\ncleanup' client.cleanup() '''
class Circle(object): def __init__(self, radius): self.radius = radius def getRadius(self): return self.radius def setRadius(self, value): if not isinstance(value, (int, long, float)): raise ValueError('wrong type') self.radius = float(value) def getArea(self): return self.radius ** 2 * pi R = property(getRadius, setRadius) c = Circle(3.2) print(c.R) c.R = 5.9 print(c.R)
from functools import total_ordering from abc import ABCMeta, abstractmethod @total_ordering class Shape(object): @abstractmethod def area(self): pass def __lt__(self, obj): print 'in __lt__' if not isinstance(obj, Shape): raise TypeError('obj is not Shape') return self.area() < obj.area() def __eq__(self, obj): print 'in __eq__' if not isinstance(obj, Shape): raise TypeError('obj is not Shape') return self.area() == obj.area() class Rectangle(Shape): def __init__(self, w, h): self.w = w self.h = h def area(self): return self.w * self.h ''' def __le__(self, obj): return self < obj or self == obj def __gt__(self, obj): return not (self < obj or self == obj) ''' class Circle(Shape): def __init__(self, f): self.r = f def area(self): return self.r ** 2 * 3.14 r1 = Rectangle(5, 3) r2 = Rectangle(4, 4) c1 = Circle(3) print(r1 <= c1 # r1.__le__(r2)) print(r1 > 1)
class Attr(object): def __init__(self, name, type_): self.name = name self.type_ = type_ def __get__(self, instance, cls): return instance.__dict__[self.name] def __set__(self, instance, value): if not isinstance(value, self.type_): raise TypeError('expected an %s' % self.type_) instance.__dict__[self.name] = value def __delete__(self, instance): del instance.__dict__[self.name] class Person(object): name = Attr('name', str) age = Attr('age', int) height = Attr('height', float) p = Person p.name = 'Bob' print p.name p.age = '17' print p.age class Descriptor(object): def __get__(self, instance, cls): print 'in __get__', instance, cls return isntance.__dict__['x'] def __set__(self, instance, value): print 'in __set__' instance.__dict__['x'] = value def __delete__(self, instance): print 'in __del__' # del instance.__dict__[xxx] class A(object): x = Descriptor() a = A() a.x = 5 print a.__dict__
class A(object): def __del__(self): print 'in A.__del__' a = A() import sys sys.getrefcount(a) sys.getrefcount(a) - 1 a2 = a sys.getrefcount(a) - 1 del a2 sys.getrefcount(a) - 1 a = 5 返回in A.__del__ 使用标准库中weakref, 它能够建立一种能访问对象但不能增长引用计数的对象 import weakref a_wref = weakref.ref(a) a2 = a_wref() a is a2 sys.getrefcount(a) - 1 del a del a2 a_wref() is None import weakref class Data(object): def __init__(self, value, owner): self.owner = weakref.ref(owner) self.value = value def __str__(self): return "%s's data, value is %s" % (self.owner(), self.value) def __del__(self): print('in Data.__del__') class Node(object): def __init__(self, value): self.data = Data(value, self) def __del__(self): print('in Node.__del__') node = Node(100) del node raw_input('wait....')
方法一:使用内置函数getattr,经过名字在实例上获取方法对象进行调用
from lib1 import Circle from lib2 import Triangle from lib3 import Rectangle def getArea(shape): for name in ('area', 'getArea', 'get_area'): f = getattr(shape, name, None) if f: return f() shape1 = Circle(2) shape2 = Triangle(3, 4, 5) shape3 = Rectangle(6, 4) shapes = [shape1, shape2, shape3] print(map(getArea, shapes))
方法二:使用标准库operator下的methodcaller函数调用
from operator import methodcaller s = 'abc123abc456' s.find('abc', 4) methodcaller('find', 'abc', 4) methodcaller('find', 'abc', 4)(s)
import csv from xml.etree.ElementTree import Element, ElementTree import requests from StringIO import StringIO from xml_pretty import pretty def download(url): response = requests.get(url, timeout=3) if response.ok: return StringIO(response.content) def csvToXml(scsv, fxml): reader = csv.reader(scsv) headers = reader.next() headers = map(lambda h: h.replace(' ', ''), headers) root = Element('Data') for row in reader: eRow = Element('Row') root.append(eRow) for tag, text in zip(headers, row): e = Element(tag) e.text = text pretty(root) et = ElemetnTree(root) et.write(fxml) def handle(sid): print 'Download...(%d)' % sid url = 'http://table.finance.yahoo.com/table.csv?s=%s,sz' url %= str(sid).rjust(6, '0') rf = download(url) if rf is None: continue print 'Convert to XML...(%d)' % sid fname = str(sid).rjust(6, '0') + '.xml' with open(fname, 'wb')as f: csvToXml(rf, wf) from threading import Thread ''' t = Thread(target=handle, args=(1,) t.start() ''' class MyThread(Thread): def __init__(self, sid): Thread.__init__(self) self.sid = sid def run(self): handle(self.sid) thread = [] for i in xrange(1, 11): t = MyThead(i) threads.append(t) t.start() for t in threads: t.join() print 'main thead'
import csv from xml.etree.ElementTree import Element, ElementTree import requests from StringIO import StringIO from xml_pretty import pretty from Queue import Queue ''' from collections import deque q =deque() ''' class DownloadThread(Thread): def __init__(self, sid, queue): Thread.__init__(self) self.sid = sid self.url = 'http://table.finance.yahoo.com/table.csv?s=%s,sz' self.url %= str(sid).rjust(6, '0') self.queue = queue def download(self, url): response = requests.get(url, timeout=3) if response.ok: return StringIO(response.content) def run(self): #1. data = self.download(self.url) #2. (sid, data) self.queue.put((self.sid, data)) class ConvertThread(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def csvToXml(self, scsv, fxml): reader = csv.reader(scsv) headers = reader.next() headers = map(lambda h: h.replace(' ', ''), headers) root = Element('Data') for row in reader: eRow = Element('Row') root.append(eRow) for tag, text in zip(headers, row): e = Element(tag) e.text = text pretty(root) et = ElemetnTree(root) et.write(fxml) def run(self): while True: sid, data = self.queue.get() if sid == -1: break if data: fname = str(sid).rjust(6, '0') + '.xml' with open(fname, 'wb')as f: csvToXml(rf, wf) q = Queue() dThreads = [DownloadThread(i, q) for i in xrange(1, 11) cThread = ConvertThread(q) for t in dThreads: t.start() cThread.start() for t in dThreads: t.join() q.put((-1, None)) def handle(sid): print 'Download...(%d)' % sid url = 'http://table.finance.yahoo.com/table.csv?s=%s,sz' url %= str(sid).rjust(6, '0') rf = download(url) if rf is None: continue print 'Convert to XML...(%d)' % sid fname = str(sid).rjust(6, '0') + '.xml' with open(fname, 'wb')as f: self.csvToXml(data, wf) from threading import Thread ''' t = Thread(target=handle, args=(1,) t.start() ''' class MyThread(Thread): def __init__(self, sid): Thread.__init__(self) self.sid = sid def run(self): handle(self.sid) thread = [] for i in xrange(1, 11): t = MyThead(i) threads.append(t) t.start() for t in threads: t.join() print('main thead')
import tarfile import os def tarXML(tfname): tf = tarfile.open(tfname, 'w:gz') for fname in os.listdir('.'): if fname.endswith('.xml'): tf.add(fname) os.remove(fname) tf.close() if not tf.memvers: os.remove(tfname) tarXML('test.tgz')
from threading import Event, Thread def f(e): print 'f 0' e.wait() print 'f 1' e = Event() t = Thread(target=f, args=(e,)) t.start() e.set() e.clear() import csv from xml.etree.ElementTree import Element, ElementTree import requests from StringIO import StringIO from xml_pretty import pretty from threading import Thread, Event from Queue import Queue ''' from collections import deque q =deque() ''' class DownloadThread(Thread): def __init__(self, sid, queue): Thread.__init__(self) self.sid = sid self.url = 'http://table.finance.yahoo.com/table.csv?s=%s,sz' self.url %= str(sid).rjust(6, '0') self.queue = queue def download(self, url): response = requests.get(url, timeout=3) if response.ok: return StringIO(response.content) def run(self): #1. data = self.download(self.url) #2. (sid, data) self.queue.put((self.sid, data)) class ConvertThread(Thread): def __init__(self, queue, cEvent, tEvent): Thread.__init__(self) self.queue = queue self.cEvent = cEvent self.tEvent = tEvent def csvToXml(self, scsv, fxml): reader = csv.reader(scsv) headers = reader.next() headers = map(lambda h: h.replace(' ', ''), headers) root = Element('Data') for row in reader: eRow = Element('Row') root.append(eRow) for tag, text in zip(headers, row): e = Element(tag) e.text = text pretty(root) et = ElemetnTree(root) et.write(fxml) def run(self): count = 0 while True: sid, data = self.queue.get() if sid == -1: slef.cEvent.set() self.tEvent.wait() break if data: fname = str(sid).rjust(6, '0') + '.xml' with open(fname, 'wb')as f: self.csvToXml(data, wf) count += 1 if count == 5: self.cEvent.set() self.tEvent.wait() self.tEvent.clear() count = 0 import tarfile import os class TarThread(Thread): def __init__(self, cEvent, tEvent): Thread.__init__(self) self.count = 0 self.cEvent = cEvent self.tEvent = tEvent self.setDaemon(True) def tarXML(self): self.count += 1 tfname = '%d.tgz' % self.count tf = tarfile.open(tfname, 'w:gz') for fname in os.listdir('.'): if fname.endswith('.xml'): tf.add(fname) os.remove(fname) tf.close() if not tf.memvers: os.remove(tfname) def run(self): while True: self.cEvent.wait() self.tarXML() self.cEvent.clear() self.cEvent.set() if __name__ == '__main__': q = Queue() dTheads = [DownloadThread(i, q) for i in xrange(1, 11)] cEvent = Event() tEvent = Event() cThread = ConvertThread(q, cEvent, tEvent) tThread = TarThread(cEvent, tEvent) tThread.start() for t in dThreads: t.start() cThread.start() q.put((-1, None)) print('main thread')
import threading l = threading.local() l.x = 1 def f(): print l.x f() >>>> 1 threading.Thread(target=f).start() >>> 异常 def f(): l.x = 5 threading.Thread(target=f).start() l.x 实例: import os, cs2, time, struct, threading from BaseHTTPSever import HTTPSever, BaseHTTPRequestHandler from SocketSever import Thread, RLock from threading import Thread, RLock from select import select class JpegStreamer(Thread): def __init__(self, camera): Thread.__init__(self, camera): self.cap = cv2.VideoCapture(camera) self.lock = RLock self.pipes = {} def register(self): pr, pw = os.pipe() self.lock.acquire() self.pipe[pr] = pw self.lock.release() return pr def unrsgister(self, pr): self.lock.acquire() self.pipes.pop(pr) self.lock.release() pr.close() pw.close() def capture(self): cap = self.cap while cap.isOpened(): ret, frame = cap.read() if ret: # ret, data = cv2.imencode('.jpg', frame) ret, data = cv2.imencode('jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40)) yield data.tostring() def send(self, frame): n = struct.pack('l', len(fname)) self.lock.acquire() if len(self.pipes): _, pipes, _ = select([], self.pipes.itervalues(), [], 1) for pipe in pipes: os.write(pipe, n) os.write(pipe, frame) self.lock.release() def run(self): for frame in self.capture(): self.send(fname) class JpegRetriever(object): def __init__(self, streamer): self.streamer = streamer self.local = threading.local() def retrieve(self): while True: ns = os.read(self.local.pipe, 8) n = struct.unpack('l', ns)[0] data = os.read(self.local.pipe, n) yield data def __enter__(self): if hasattr(self.local, 'pipe'): return RuntimeError() self.pipe = streamer.register() return self.retrieve() def __exit__(self, *args): slef.streamer.unregister(self.local.pipe) del self.local.pipe return True class Handle(BaseHTTPRequestHandler): retriever = None @staticmethod def setJpegRetriever(retriever): Handle.retriever = retriever def do_GET(self): if self.retriver is None: raise RuntimeError('no retriver') if self.path != '/': return self.send_response(200) self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=abcde') self.end_headers() with self.retriever as frames: for fname in frames: self.send_frame(frame) def send_frame(self, frame): self.wfile.write('--abcde\r\n') self.wfile.write('Content Type: image/jped\r\n') if __name__ == '__main__': streamer = JpegStreamer(0) streamer.start() retriever = JpegRetriever(streamer) Handler.setJpegRetriever(retriever) print 'Start server...' httpd = ThreadingTCPServer(('', 9000), Handle) httpd.serve_forever()
from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(3) def f(a, b): print('f', a, b) time.sleep(10) return a ** b future = executor.submit(f, 2, 3) future.result() executor.map(f, [2, 3, 4, 5, 6, 7], [4, 5, 6, 7, 8]) import os, cs2, time, struct, threading from BaseHTTPSever import HTTPSever, BaseHTTPRequestHandler from SocketSever import Thread, RLock from threading import Thread, RLock from select import select class JpegStreamer(Thread): def __init__(self, camera): Thread.__init__(self, camera): self.cap = cv2.VideoCapture(camera) self.lock = RLock self.pipes = {} def register(self): pr, pw = os.pipe() self.lock.acquire() self.pipe[pr] = pw self.lock.release() return pr def unrsgister(self, pr): self.lock.acquire() self.pipes.pop(pr) self.lock.release() pr.close() pw.close() def capture(self): cap = self.cap while cap.isOpened(): ret, frame = cap.read() if ret: # ret, data = cv2.imencode('.jpg', frame) ret, data = cv2.imencode('jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40)) yield data.tostring() def send(self, frame): n = struct.pack('l', len(fname)) self.lock.acquire() if len(self.pipes): _, pipes, _ = select([], self.pipes.itervalues(), [], 1) for pipe in pipes: os.write(pipe, n) os.write(pipe, frame) self.lock.release() def run(self): for frame in self.capture(): self.send(fname) class JpegRetriever(object): def __init__(self, streamer): self.streamer = streamer self.local = threading.local() def retrieve(self): while True: ns = os.read(self.local.pipe, 8) n = struct.unpack('l', ns)[0] data = os.read(self.local.pipe, n) yield data def __enter__(self): if hasattr(self.local, 'pipe'): return RuntimeError() self.pipe = streamer.register() return self.retrieve() def __exit__(self, *args): slef.streamer.unregister(self.local.pipe) del self.local.pipe return True class Handle(BaseHTTPRequestHandler): retriever = None @staticmethod def setJpegRetriever(retriever): Handle.retriever = retriever def do_GET(self): if self.retriver is None: raise RuntimeError('no retriver') if self.path != '/': return self.send_response(200) self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=abcde') self.end_headers() with self.retriever as frames: for fname in frames: self.send_frame(frame) def send_frame(self, frame): self.wfile.write('--abcde\r\n') self.wfile.write('Content Type: image/jped\r\n') class ThreadingPoolServer(ThreadingTCPServer): def __init__(slef, server_address, RequestHandlerClass, bind_and_activate=Ture, max_thread_num=100): super().__init__(server_adderss, RequestHandlerClass, bind_and_activate) self.executor = ThreadPoolExecutor(max_thread_num) def process_request(self, request, client_address): self.executor.submit(self.process_request_thread, request, client_address) if __name__ == '__main__': streamer = JpegStreamer(0) streamer.start() retriever = JpegRetriever(streamer) Handler.setJpegRetriever(retriever) print('Start server...)' httpd = ThreadingPoolServer(('', 9000), Handle, max_thread_num=3) httpd.serve_forever()
from multiprocessing import Process x = 1 def f(): global x x =5 f() x >> 5 x = 1 p = Process(target=f) p. start() x >> 1 说明进程的变量是独立的 from multiprocessing import Queue, Pipe from multiprocessing import Process q = Queue() def f(c): c.send(c.recv() * 2) c1, c2 = Pipe() Process(target=f, args=(c2,)).start() c1.send(55) c1.recv()
from threading import Thread from multiprocessing import Process def isArmstrong(n): a, t = [], n while t > 0: a.append(t % 10) t /= 10 k = len(a) return sum(x ** k for x in a) == n def findArmstrong(a, b): print a, b res = [k for k in xrange(a, b) if isArmstrong(k)] print '%s ~ %s: %s' % (a, b, res) def findByThead(*argslist): workers = [] for args in argslist: worker = Thread(target=findArmstrong, args=args) workers.append(worker) worker.start() for worker in workers: worker.join() def findByProcess(*argslist): workers = [] for args in argslist: worker = Process(target=findArmstrong, args=args) workers.append(worker) worker.start() for worker in workers: worker.join() if __name__ == "__mian__": import time start = time.time() findByProcess((20000000, 25000000), (25000000, 30000000)) # findByThread((20000000, 25000000), (25000000, 30000000)) print time.time() - start
#coding:utf8 def memo(func): cache = {} def wrap(*args): if args not in cache: cache[args] = func(*args) return cache[args] return wrap
@memo def fibonacci(n): if n <= 1: return 1 return fibonacci(n -1) + fibonacci(n -2) fibonacci = memo(fibonacci) print(fibonacci(50))
@memo def climb(n, steps): count = 0 if n == 0: count = 1 elif n > 0: for step in steps: count += climb(n - step, steps) return count print(climb(10, (1, 2, 3)))
def f(a, b=1, c=[]): print a,b,c f.__defaults__ f.__defaults__[1].append('abc') f(100) def f(): a = 2 return lambda k: a ** k g = f() c = g.__closure__[0] c.cell_contents from functools import update_wrapper, wraps, WRAPPER_ASSIGNMENTS, WRAPPER_UPDATES def mydecorator(func): @wraps(func) def wrapper(*args, **kargs): '''wrapper function''' print('In wraper)' func(*args, **kargs) # update_wrapper(wrapper, func) return wrapper @mydecorator def example(): '''example function''' print('In example') print(example.__name__) print(example.__doc__ )
from inspect import signature def typeassert(*ty_args, **ty_kwargs): def decorator(func): # func -> a,b # d = {'a': int, 'b': str} sig = signature(func) btypes = sig.bind_partial(*ty_args, **ty_kwargs).arguments def wrapper(*args, **kargs): # arg in d, instance(arg, **kargs) for name, obj in sig.bind(*args, **kargs).arguments.items(): if name in btypes: if not isinstance(obj, btypes[name]): raise TypeError('"%s" must be "%s"' % (name, btypes[name])) return func(*args, **kargs) return wrapper return decorator @typeassert(int, str, list) def f(a, b, c): print(a, b, c) f(1, 'abc', [1, 2, 3]) f(1, 2, [1, 2, 3])
python3: from functools import wraps import time import logging def warn(timeout): def decorator(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) used = time.time() - start if used > timeout: msg = '"%s": %s > %s' % (func.__name__, used, timeout) logging.warn(msg) return res def setTimeout(k): nonlocal timeout timeout = k wrapper.setTimeout = setTimeout return wrapper return decorator from random import randint @warn(1.5) def test(): print('In test') while randint(0, 1): time.sleep(0.5) for x in range(30): test() test.setTimeout(1) for x in range(30): test() python2: from functools import wraps import time import logging def warn(timeout): timeout = [timeout] def decorator(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) used = time.time() - start if used > timeout[0]: msg = '"%s": %s > %s' % (func.__name__, used, timeout[0]) logging.warn(msg) return res def setTimeout(k): # nonlocal timeout timeout[0] = k wrapper.setTimeout = setTimeout return wrapper return decorator from random import randint @warn(1.5) def test(): print('In test') while randint(0, 1): time.sleep(0.5) for x in range(30): test() test.setTimeout(1) for x in range(30): test()