python的小技巧

过滤列表中的负数
方法一:node

from random import randint
data = [randint(-10, 10) for _ in xrange(10)]
filter(lambda x: x>=0, data)

方法二:python

[x for x in data if x >= 0]

时间的比较:
timeit [x for x in data if x >= 0]
1000000 loops, best of 3: 469 ns per loop
timeit filter(lambda x: x >= 0, data)
1000000 loops, best of 3: 1.55 µs per loo
能够看出列表解析更快一些git

筛出字典中的高于90的值web

d = {x: randint(60, 100) for x in xrange(1, 21)}
data = {k: v for k, v in d.iteritems() if v > 90}

集合的筛选json

data = [randint(-10, 10) for _ in xrange(10)]
s = set(data)
r = {x for x in s if x % 3 == 0}

为元组中的元素命名api

from collections import namedtuple
Student = namedtuple('Student', ['name', 'age', 'sex', 'email'])
s = Student('Jim',  16, 'male',  'jim8721@gmail.com')
isinstance(s, tuple)

统计序列中元素出现的频度的前三个
方法一:数据结构

import random
data = [random.randint(0, 20) for x in range(30)]
c = dict.fromkeys(data, 0)
for x in data:
	c[x] += 1
sorted(c.tems(), key = lambda x:x[1], reverse = True)[:3]

方法二:多线程

from collections import Counter
c2 = Counter(data)
c2.most_common(3)

字典排序的两种方式app

d = {x: random.randint(60, 100) for x in 'qwerasdf'}
 sorted(zip(d.itervalues(), d.iterkeys()))
 sorted(d.items(), key=lambda x: x[1])

字典的小应用
相同键的排序:less

a_dic = {'a':{'val':3}, 'b':{'val':4}, 'c':{'val':1}}
dict= sorted(a_dic.iteritems(), key=lambda d:d[1]['val'], reverse = True)

不一样键的排序:

a = {'a':{'val':3}, 'b':{'val':4}, 'c':{'val':1}, 'd':{'val2':0}}
dict= sorted(a.iteritems(), key=lambda d:d[1].get('val',0), reverse = True)

功能: iteritems()以迭代器对象返回字典键值对
区别: 和item相比:items以列表形式返回字典键值对

快速找到字典中的公共键

from random import randint, sample
s1 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))}
s2 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))}
s3 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))}

方法一:

res = []
for k in s1:
	if k in s2 and k in s3:
		res.append(k)

方法二:

s1.keys() & s2.keys() & s3.keys()

方法三:

reduce(lambda a, b: a & b, map(dict.viewkeys, [s1, s2, s3]))

让字典保持有序

from time import time
from random import randint
from collections import OrderedDict

d = OrderedDict()
players = list('ABCDEFGH')
start = time()
for i in xrange(8):
    raw_input()
    p = players.pop(randint(0, 7 - i))
    end = time()
    print i + 1, p, end - start
    d[p] = (i + 1, end - start)
print('_' * 30)
for k in d:
    print(k, d[k])

实现用户的历史记录

from random import randint
from collections import deque

N = randint(0, 100)
history = deque([], 5)


def guess(k):
    if k == N:
        print 'right'
        return True
    if k < N:
        print '%s is less-than N' % k
    else:
        print '%s is greater than N' % k
    return False


while True:
    line = raw_input('please input a number: ')
    if line.isdigit():
        k = int(line)
        history.append(k)
        if guess(k):
            break
    elif line == 'history' or line == 'h?':
        print list(history)

常规函数

def getWeather(city):
    r = requests.get(u'http://wthrcdn.etouch.cn/weather_mini?city=' + city)
    data = r.json()['data']['forecast'][0]
    return '%s: %s, %s' %(city, data['low'], data['high'])

print(getWeather('北京'))
print(getWeather('上海'))

变成可迭代的对象

class WeatherIterator(Iterator):
    def __init__(self, cities):
        self.cities = cities 
        self.index = 0 

    def getWeather(self, city):
        r = requests.get(u'http://wthrcdn.etouch.cn/weather_mini?city=' + city)
        data = r.json()['data']['forecast'][0]
        return '%s: %s, %s' %(city, data['low'], data['high'])

    def next(self):
        if self.index == len(self.cities):
            raise StopIteration 
        city = self.cities[self.index]
        self.index += 1
        return self.getWeather(city)

变成迭代器对象

class WeatherIterable(Iterable):
    def __init__(self, cities):
        self.cities = cities
        
    def __iter__(self):
        return WeatherIterator(self.cities)

for x in WeatherIterable([u'北京', u'上海', u'广州', u'长春']):
    print x

生成器对象

def f():
	print 'in f(), 1'
    yield 1
    print 'in f(), 2'
    yield 2
    print 'in f(), 3'
    yield 3
g = f()
print g.next()
for x in g:
   print(x)
g.__iter__() is g

生成器返回素数

class PrimeNumber:
    def __init__(self, start, end):
        self.start = start 
        self.end = end 

	def isPrimeNum(self, k): 
    	if k < 2:
        	return False 

for i in xrange(2, k): 
    if k % i == 0:
        return False 
	return True 

def __iter__(self):
    for  k in xrange(self.start, self.end + 1): 
        if self.isPrimeNum(k):
            yield k

for x in PrimeNumber(1, 100):
    print x

反向迭代

class FloatRange:
    def __init__(self, start, end, step=0.1):
        self.start = start 
        self.end = end 
        self.step = step 

    def __iter__(self):
        t = self.start 
        while t <= self.end:
            yield t
            t += self.step 

    def __reversed__(self):
        t = self.end 
        while t >= self.start: 
            yield t
            t-= self.step   


for x in reversed(FloatRange(1.0, 4.0, 0.5)):
    print x

迭代器进行切片操做

l = range(20)
t = iter(l)
for x in lslice(t, 5, 10)
	print x
for x in t:
	print t

for循环迭代多个对象

chinese = [randint(60, 100) for x in xrange(40)]
math = [randint(60, 100) for x in xrange(40)]
english = [randint(60, 100) for x in xrange(40)]

方法一:

for i in xrange(len(math)):
	chinese[i] + math[i] + english[i]

方法二:并行迭代

total = []
for c, m, e in zip(chinese, math, english):
	total.append(c + m + e)

串行迭代

e1 = [randint(60, 100) for x in xrange(40)]
e2 = [randint(60, 100) for x in xrange(43)]
e3 = [randint(60, 100) for x in xrange(45)]
e4 = [randint(60, 100) for x in xrange(44)]
from itertools import chain
count = 0
for s in chain(e1, e2, e3, e4):
	if s> 90:
		count += 1

分割字符串

def mySplit(s, ds):
   res = [s] 

   for d in ds: 
       t = []
       map(lambda x: t.extend(x.split(d)), res)
       res = t 

   return res 

s = 'ab;cd|efg|hi,jkl|mn\topq;rst,uvw\txyz'
print(mySplit(s, ';|\t'))

若s = ‘ab;cd|efg|hi,jkl|mn\topq;rst,uvw\txyz’

def mySplit(s, ds):
   res = [s] 

   for d in ds: 
       t = []
       map(lambda x: t.extend(x.split(d)), res)
       res = t 

   return [x for x in res if x]

s = 'ab;cd|efg|hi,jkl|mn\topq;rst,uvw\txyz'
print mySplit(s, ';|\t')

方法二:

import re
re.split(r'[,;\t|]+', s)

更该用户权限

import os, stat
f = [name for name in os.listdir('.') if name.endswith(('.sh', '.py'))]
for x in f:
	os.chmod('e.py', os.stat('e.py').st_mode | stat.S_IXUSR)

更改日期的格式

import re
re.sub('((\d{4})-(\d{2})-(\d{2})', r'\2/\3/\1', log)
re.sub('(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})', r'\g<month>/\g<day>/\g<year>', log)

字符串的拼接

str.__add__(s1, s2)
l = [x for x in 'abcd']

方法一:

s = ''
for x in l:
	s += x

占用空间比较大
方法二:

''.join(l)

若l = [‘abcd’, 123, 45, ‘xyz’]

''.join([str(x) for x in l]) 列表解析开销大
 ''.join(str(x) for x in l)生成式开销比较小

字符串的居中对齐

dict = {'Alice': '2341', 'Beth': '9102', 'Cecil': '3258'}
w = max(map(len, d.keys()))
for k in d:
	print k.ljust(w), ':', d[k]
s.ljust(w) = format(s, '<w')
s.rjust(w) = format(s, '>w')
s.center(w) = format(s, '^w')

去掉不须要的字符

方法一:
s.strip()
方法二:
s[:] + s[:]
方法三:
s.replace('\t', '')  只能去掉一种
re.sub('[]', '', s)
方法四:
如s = 'abc123321xyz'
import string

string.maketrans('abcxyz', 'xyzabc') 可转换加密
s.translate(string.maketrans('abcxyz', 'xyzabc'))
s  = 'abc\refg\n234\t'
s.translate(None, '\t\r\n')
seq = ('name', 'age', 'sex')
dict = dict.fromkeys(seq)
dict.fromkeys(seq, 10)

python文件的读写

python2:
f = open('py2.txt', 'w')
s = u'你好'
f.write(s.encode('gbk'))
f.close()
f = open('py2.txt', 'r')
t = f.read()
print t.decode('gbk')
python3:
f = open('py3.txt', 'wt', encoding='utf8')
f.write('你好')
f.close()
f = open('py3.txt', 'rt', encoding='utf8')
s = f.read()
print(s)

处理二进制文件

f = open('demo.wav', 'rb')
info = f.read(44)
import struct
struct.unpack('h', '\x01\x02')
struct.unpack('>h', '\x01\x02')

import array
array.array('h', )
f.seek(0, 2)
f.tell()
n = (f.tell() - 44) / 2
buf = array.array('h', (0 for _ in xrange(n))
f.seek(44)
f.readinto(buf)
缩小文件
for i in xrange(n): buf[i] /= 8
f2 = open('demo2.wav', 'wb')
f2.write(info)
buf.tofile(f2)
f2.close()

设置文件的缓冲

f = open('demo1.txt', 'w')  全缓冲
f = open('demo2.txt', 'w', buffering=1)  行缓冲
f = open('demo3.txt', 'w', buffering=0)  无缓冲

将文件映射到内存中

import mmap
f = open('demo.bin', 'r+b')
m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE)
 m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE, offset=mmap.PAGESIZE * 4)

访问文件的状态

import os
s = os.stat('a.txt')
os.lstat('x.txt')
f = open('a.txt', 'r')
f.fileno() = os.fstat
s.st_mode
bin(s.st_mode)
import stat
stat.S_ISDIR(s.st_mode)
stat.S_ISREG(s.st_mode)
s.st_mode & stat.S_IRUSR
s.st_mode & stat.S_IXUSR
s.st_atime
import time
time.localtime(s.st_atime)
s.st_size
import os
os.path.isdir('demo.bin')
os.path.islink('demo.bin')
os.path.isfile('demo.bin')
os.path.getatime('demo.bin')
os.path.getsize('demo.bin')

使用临时文件

from tempfile import TemporaryFile, NamedTemporaryFile
f = TemporaryFile()
f.write('abcdefg' * 100000)
f.seek(0)
ntf = NamedTemporaryFile()
ntf.name
ntf = NamedTemporaryFile(delete=False)

读写CSV文件

import csv 

with open('pingan.csv', 'rb')as rf: 
    reader = csv.reader(rf)


with open('pingan2.csv', 'wb')as wf: 
   writer = csv.writer(wf)
   headers = reader.next()
   writer.writerow(headers)
   for row in reader:
      if row[0] < '2016-01-01':
      	  break 
      if int(row[5]) >= 50000000:
         writer.writerow(row)

录音

from record import Record
record = Record(channels=1)
audioData = record.record(2)

获取token

from secret import API_KEY, SECRET__KEYS
authUrl = 'https://openapi.baudu.com/oauth/2.0/token?grant_type=client_credentials$client_id=' + API_KEY + '&client_secret=' + SECRET_KEY;
response = requests.get(authUrl)
res = json.loads(response.countent)
token = res['access_token']

语音识别

cuid = 'xxxxxxx'
srvUrl = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
httpHeader = {
        'Content-Type':'audio/wav; rate = 8000',
        }
response = requests.post(srvUrl, headers=httpHeader, data=audioData)
res = json.loads(response.content)
text = res['result'][0]

print('\n识别结果:')
print(text)

json数据的读写

import json
l = [1, 2, 'abc', {'name': 'Bob', 'age':13}]
json.dumps(l)
d = {'b':None, 'a':5, 'c': 'abc'}
json.dumps(d)
json.dumps(l, separators=[',', ':'])  去空格
json.dumps(d, sort_keys=True)  排序
l2 = json.loads('[1, 2, "abc", {"age": 13, "name": "Bob"}]')
d2 = json.loads('{"a": 5, "c": "abc", "b": null}')
with open('demo.json', 'wb')as f:
	json.dump(l, f)

解析XML文档

from xml.etree.ElementTree import parse
f = open('demo.xml')
et = parse(f)
root = et.getroot()
root.tag
root.attrib
root.text.strip()
root.getchildren()
for child in root:
	print child.get('name')
root.find('country')
root.findall('country')
root.iterfind('country')
for e in root.iterfind('country'):
	print e.get('name')
root.findall('rank')
root.iter()
list(root.iter())
list(root.iter('rank')
root.findall('country/*')
root.findall('.//rank')
root.findall('.//rank/..')
root.findall('country[@name]')
root.findall('country[@name="Singapore"])
root.findall('country[rank="5"]')
root.findall('country[last()']
root.findall('country[last()-1']

构建xml文档

from xml.etree.ElementTree import Element, ElementTree
e = Element('Data')
e.tag
e.set('name', 'abc')
from xml.etree.ElementTree import tostring
tostring(e)
e.text = '123'
tostring(e)
e2 = Element('Row')
e3 = Element('open')
e3.text = '8.80'
e2.append(e3)
tostring(e2)
e.text = None
e.append(e2)
tostring(e)

CSV文件转XML文档

import csv
from xml.etree.ElementTree import Element, ElementTree 


def csvToXml(fname):
    with open(fname, 'rb')ad f:
        reader = csv.reader(f)
        headers = reader.next()

        root = Element('Data')
        for row in reader:
            eRow = Element('Row')
            root.append(eRow)
            for tag, text in zip(headers, row):
                e = Element(tag)
                e.text = text 
                eRow.append(e)
    
    pretty(root)
    return ElemetnTree(root)


def pretty(e, level=0):
    if len(e) > 0:
        e.text = '\n' + '\t' * (level + 1)
        for child in e:
            pretty(child, level + 1)
        child.tail = child.tail[:-1]
    e.tail = '\n' + '\t' * level


et = csvToXml('pingan.csv')
et.write('pingan.xml')

读写excel文件

import xlrd
book = xlrd.open_workbook('demo.xlsx')
book.sheets()
sheet = book.sheet_by_index(0)
sheet.nrows
sheet.ncols
cell = sheet.cell(0, 0)
cell.ctype
xlrd.XL_CELL_TEXT
xlrd.XL_CELL_NUNBER
print cell.value
sheet.row(1)
sheet.row_values(1)
sheet.row_values(1, 1)
sheet.put_cell
import xlwt
wbook = xlwt.Workbook()
wsheet = wbook.add_sheet('sheet')
wsheet.write
wbook.save('output.xlsx')

添加一列

#coding:utf8
import xlrd, xlwt

rbook = xlrd.open_workbook('deml.xlsx')
rsheet = rbook.sheet_by_index(0)

nc = rsheet.ncols
rsheet.put_cell(0, nc, xlrd.XL_CELL_TEXT, u'总分', None)

for row in xrange(1, rsheet,nrows):
    rsheet.row_values(row, 1)
    rsheet.put_cell(row, nc, xlrd.XL_CELL_NUMBER, t, None)


wbook = xlwt.Workbook()
wsheet = wbook.add_sheet(rsheet.name)
style = xlwt.easyxf('align: vertical center, horizontal center')

for r in xrange(rsheet.nrows):
    for c in xrange(rsheet.ncols):
        wsheet.write(r, c, rsheet.cell_value)

wbook.save('output.xlsx')

派生不可变类型并修改实例化行为 过滤元组中的负数和非数字

class IntTuple(tuple):
    def __new__(cls, iterable):
        g = (x for x in iterable if isinstance(x, int) and x >0) 
        return super(IntTuple, cls).__new__(cls, g)  


    def __init__(self, iterable):
        # before
        print self 
        super(IntTuple, self).__init(iterable)
        # after


t = IntTuple([1, -1, 'abc', 6, ['x', 'y'], 3]) 
print t

如何建立大量的实例节省内存

class Player(object):
    def __init(self, uid, name, status=0, level=1):
        self.uid = uid 
        self.name = name 
        self.stat = status 
        self.level = level 


class Player2(object):
    __slots__ = ['uid', 'name', 'stat', 'level']
    def __init(self, uid, name, status=0, level=1):
        self.uid = uid 
        self.name = name 
        self.stat = status 
        self.level = level 

from e import Player, Player2
p1 = Player('0001', 'Jim')
p2 = Player('0001', 'Jim')

set(dir(p1)) - set(dir(p2))
p1.__dict__

import sys
sys.getsizeof(p1.__dict__)

定义类的__slots__属性,它是用来声明实例属性名字的列表

如何让一个对象支持上线文管理 with自带上下文管理,不用手动关闭

with open('demo.txt', 'w')as f:
	f.write('abcdef')
	f.writelines(['xyz\n', '123\n'])

from telnetlib import Telnet
from sys import stdin, stdout
from collections import deque


class TelnetClient(object):
    def __init__(self, addr, port=23):
        self.addr = addr
        self.port = port 
        self.tn = None 

	def start(self):
    	raise Exception('Test')
   	 	# user 
   	 	t = self.tn.read_until('login:')
    	stdout.write(t)
    	user = stdin.readline()
    	self.tn.write(user)

    # password
    t = self.tn.read_until('Password:')
    if t.startswith(user[:-1]): t = t[len(user) + 1:]
    stdout.write(t)
    self.tn.write(stdin.readline())

    t = self.tn.read_until('$ ')
    stdout.write(t)
    while True:
        uinput = stdin.readline()
        if not uinput:
            break 
        self.history.append(uinput)
        self.tn.write(uinput)
        t = self.tn.read_until('$ ')
        stdout.write(t[len(uinput) + 1:])

	def cleanup(self):
    	pass 

	def __enter__(self):
    	self.tn = Telnet(self.addr, self.port)
    	self.history = deque()
    	return self 

	def __exit__(self, exc_type, exc_val, exc_tb):
    	print('In __exit__')
    	self.tn.close()
    	self.tn = None 
    with open(self.addr + '_history.txt', 'w')as f:
        f.writelines(self.history)
    retunr True 


with TelnetClient('127.0.0.1')as client:
    client.start()
print('END')

'''
client = TelnetClient('127.0.0.1')
print '\nstat...'
client.start()
print '\ncleanup'
client.cleanup()
'''

建立可管理对象的属性

class Circle(object):
    def __init__(self, radius):
        self.radius = radius 

    def getRadius(self):
        return self.radius 

    def setRadius(self, value):
        if not isinstance(value, (int, long, float)):
            raise ValueError('wrong type')
        self.radius = float(value)

    def getArea(self):
        return self.radius ** 2 * pi  

    R = property(getRadius, setRadius)

c = Circle(3.2)
print(c.R) 
c.R = 5.9
print(c.R)

让类支持比较操做 标准库下的functools下的类装饰器total_ordering能够简化此过程

from functools import total_ordering
from abc import ABCMeta, abstractmethod


@total_ordering 
class Shape(object):
    @abstractmethod 
    def area(self):
        pass 

    def __lt__(self, obj):
        print 'in __lt__'
        if not isinstance(obj, Shape):
            raise TypeError('obj is not Shape')
        return self.area() < obj.area()

    def __eq__(self, obj):
        print 'in __eq__'
        if not isinstance(obj, Shape):
            raise TypeError('obj is not Shape')
        return self.area() == obj.area()


class Rectangle(Shape):
    def __init__(self, w, h):
        self.w = w
        self.h = h

    def area(self):
        return self.w * self.h 

    '''
    def __le__(self, obj):
        return self < obj or self == obj 

    def __gt__(self, obj):
        return not (self < obj or self == obj) 
    '''

class Circle(Shape):
    def __init__(self, f):
        self.r = f

    def area(self):
        return self.r ** 2 * 3.14 


r1 = Rectangle(5, 3)
r2 = Rectangle(4, 4)
c1 = Circle(3)

print(r1 <= c1 # r1.__le__(r2))
print(r1 > 1)

使用描述符对实例属性作类型检查

class Attr(object):
    def __init__(self, name, type_):
        self.name = name 
        self.type_ = type_ 

    def __get__(self, instance, cls):
        return instance.__dict__[self.name]

    def __set__(self, instance, value):
        if not isinstance(value, self.type_):
            raise TypeError('expected an %s' % self.type_)
        instance.__dict__[self.name] = value 

    def __delete__(self, instance):
        del instance.__dict__[self.name]


class Person(object):
    name = Attr('name', str)
    age = Attr('age', int)
    height = Attr('height', float)

p = Person 
p.name = 'Bob'
print p.name 
p.age = '17'
print p.age 

                           

class Descriptor(object):
    def __get__(self, instance, cls):
        print 'in __get__', instance, cls 
        return isntance.__dict__['x']

    def __set__(self, instance, value):
        print 'in __set__'
        instance.__dict__['x'] = value 

    def __delete__(self, instance):
        print 'in __del__'
        # del instance.__dict__[xxx]


class A(object):
    x = Descriptor()

a = A()
a.x = 5
print a.__dict__

如何在环状数据结构中管理内存

class A(object):
    def __del__(self):
             print 'in A.__del__'
a = A()
import sys
sys.getrefcount(a)
sys.getrefcount(a) - 1
a2 = a
sys.getrefcount(a) - 1
del a2
sys.getrefcount(a) - 1
a = 5
返回in A.__del__
使用标准库中weakref, 它能够建立一种能访问对象但不能增长引用计数的对象
import weakref
a_wref = weakref.ref(a)
a2 = a_wref()
a is a2
sys.getrefcount(a) - 1
 del a
 del a2
 a_wref() is None


import weakref

class Data(object):
    def __init__(self, value, owner):
        self.owner = weakref.ref(owner)
        self.value = value 

    def __str__(self):
        return "%s's data, value is %s" % (self.owner(), self.value)

    def __del__(self):
        print('in Data.__del__')


class Node(object):
    def __init__(self, value):
        self.data = Data(value, self)

    def __del__(self):
        print('in Node.__del__')


node = Node(100)
del node 
raw_input('wait....')

经过实例方法名字的字符串调用方法

方法一:使用内置函数getattr,经过名字在实例上获取方法对象进行调用

from lib1 import Circle
from lib2 import Triangle
from lib3 import Rectangle

def getArea(shape):
    for name in ('area', 'getArea', 'get_area'):
        f = getattr(shape, name, None)
        if f:
            return f()


shape1 = Circle(2)
shape2 = Triangle(3, 4, 5)
shape3 = Rectangle(6, 4)

shapes = [shape1, shape2, shape3]
print(map(getArea, shapes))

方法二:使用标准库operator下的methodcaller函数调用

from operator import methodcaller
s = 'abc123abc456'
 s.find('abc', 4)
 methodcaller('find', 'abc', 4)
methodcaller('find', 'abc', 4)(s)

使用多线程

import csv
from xml.etree.ElementTree import Element, ElementTree 
import requests
from StringIO import StringIO 
from xml_pretty import pretty 

def download(url):
    response = requests.get(url, timeout=3)
    if response.ok:
        return StringIO(response.content)


def csvToXml(scsv, fxml):
    reader = csv.reader(scsv)
    headers = reader.next()
    headers = map(lambda h: h.replace(' ', ''), headers)

    root = Element('Data')
    for row in reader:
        eRow = Element('Row')
        root.append(eRow)
        for tag, text in zip(headers, row):
            e = Element(tag)
            e.text = text 

    pretty(root)
    et = ElemetnTree(root)
    et.write(fxml)


def handle(sid):
    print 'Download...(%d)' % sid 
    url = 'http://table.finance.yahoo.com/table.csv?s=%s,sz'
    url %= str(sid).rjust(6, '0')
    rf = download(url)
    if rf is None:
        continue 

    print 'Convert to XML...(%d)' % sid 
    fname = str(sid).rjust(6, '0') + '.xml'
    with open(fname, 'wb')as f:
        csvToXml(rf, wf)


from threading import Thread
'''
t = Thread(target=handle, args=(1,)
t.start()
'''

class MyThread(Thread):
    def __init__(self, sid):
        Thread.__init__(self)
        self.sid = sid 

    def run(self):
        handle(self.sid)

thread = []
for i in xrange(1, 11):
    t = MyThead(i)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print 'main thead'

线程间的通讯

import csv
from xml.etree.ElementTree import Element, ElementTree 
import requests
from StringIO import StringIO 
from xml_pretty import pretty 
from Queue import Queue 
'''
from collections import deque
q =deque()
'''


class DownloadThread(Thread):
    def __init__(self, sid, queue):
        Thread.__init__(self)
        self.sid = sid 
        self.url = 'http://table.finance.yahoo.com/table.csv?s=%s,sz'
        self.url %= str(sid).rjust(6, '0')
        self.queue = queue 

    def download(self, url):
        response = requests.get(url, timeout=3)
        if response.ok:
            return StringIO(response.content)

    def run(self):
        #1.
        data = self.download(self.url)
        #2. (sid, data)
        self.queue.put((self.sid, data))

class ConvertThread(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue 

    def csvToXml(self, scsv, fxml):
        reader = csv.reader(scsv)
        headers = reader.next()
        headers = map(lambda h: h.replace(' ', ''), headers)
    
        root = Element('Data')
        for row in reader:
            eRow = Element('Row')
            root.append(eRow)
            for tag, text in zip(headers, row):
                e = Element(tag)
                e.text = text 
    
        pretty(root)
        et = ElemetnTree(root)
        et.write(fxml)

    def run(self):
        while True:
            sid, data = self.queue.get()
            if sid == -1:
                break 
            if data:
                fname = str(sid).rjust(6, '0') + '.xml'
                with open(fname, 'wb')as f:
                    csvToXml(rf, wf)

q = Queue()
dThreads = [DownloadThread(i, q) for i in xrange(1, 11)
cThread = ConvertThread(q)
for t in dThreads:
    t.start()
cThread.start()
for t in dThreads:
    t.join()

q.put((-1, None))

def handle(sid):
    print 'Download...(%d)' % sid 
    url = 'http://table.finance.yahoo.com/table.csv?s=%s,sz'
    url %= str(sid).rjust(6, '0')
    rf = download(url)
    if rf is None:
        continue 

    print 'Convert to XML...(%d)' % sid 
    fname = str(sid).rjust(6, '0') + '.xml'
    with open(fname, 'wb')as f:
        self.csvToXml(data, wf)


from threading import Thread
'''
t = Thread(target=handle, args=(1,)
t.start()
'''

class MyThread(Thread):
    def __init__(self, sid):
        Thread.__init__(self)
        self.sid = sid 

    def run(self):
        handle(self.sid)

thread = []
for i in xrange(1, 11):
    t = MyThead(i)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print('main thead')

打包文件

import tarfile
import os 

def tarXML(tfname):
    tf = tarfile.open(tfname, 'w:gz')
    for fname in os.listdir('.'):
        if fname.endswith('.xml'):
            tf.add(fname)
            os.remove(fname)
    tf.close()

    if not tf.memvers:
        os.remove(tfname)

tarXML('test.tgz')

线程间的通知

from threading import Event, Thread
def f(e):
	print 'f 0'
	e.wait()
	print 'f 1'
e = Event()
t = Thread(target=f, args=(e,))
t.start()
e.set()
e.clear()

import csv
from xml.etree.ElementTree import Element, ElementTree 
import requests
from StringIO import StringIO 
from xml_pretty import pretty 
from threading import Thread, Event 
from Queue import Queue 
'''
from collections import deque
q =deque()
'''


class DownloadThread(Thread):
    def __init__(self, sid, queue):
        Thread.__init__(self)
        self.sid = sid 
        self.url = 'http://table.finance.yahoo.com/table.csv?s=%s,sz'
        self.url %= str(sid).rjust(6, '0')
        self.queue = queue 

    def download(self, url):
        response = requests.get(url, timeout=3)
        if response.ok:
            return StringIO(response.content)

    def run(self):
        #1.
        data = self.download(self.url)
        #2. (sid, data)
        self.queue.put((self.sid, data))

class ConvertThread(Thread):
    def __init__(self, queue, cEvent, tEvent):
        Thread.__init__(self)
        self.queue = queue 
        self.cEvent = cEvent 
        self.tEvent = tEvent 

    def csvToXml(self, scsv, fxml):
        reader = csv.reader(scsv)
        headers = reader.next()
        headers = map(lambda h: h.replace(' ', ''), headers)
    
        root = Element('Data')
        for row in reader:
            eRow = Element('Row')
            root.append(eRow)
            for tag, text in zip(headers, row):
                e = Element(tag)
                e.text = text 
    
        pretty(root)
        et = ElemetnTree(root)
        et.write(fxml)

    def run(self):
        count = 0
        while True:
            sid, data = self.queue.get()
            if sid == -1:
                slef.cEvent.set()
                self.tEvent.wait()
                break 
            if data:
                fname = str(sid).rjust(6, '0') + '.xml'
                with open(fname, 'wb')as f:
                    self.csvToXml(data, wf)
                count += 1
                if count == 5:
                    self.cEvent.set() 
                    self.tEvent.wait()
                    self.tEvent.clear() 
                    count = 0

import tarfile 
import os 

class TarThread(Thread):
    def __init__(self, cEvent, tEvent):
        Thread.__init__(self)
        self.count = 0
        self.cEvent = cEvent 
        self.tEvent = tEvent 
        self.setDaemon(True)

    def tarXML(self):
        self.count += 1
        tfname = '%d.tgz' % self.count 
        tf = tarfile.open(tfname, 'w:gz')
        for fname in os.listdir('.'):
            if fname.endswith('.xml'):
                tf.add(fname)
                os.remove(fname)
        tf.close()

        if not tf.memvers:
            os.remove(tfname)

    def run(self):
        while True:
            self.cEvent.wait()
            self.tarXML()
            self.cEvent.clear()
            self.cEvent.set() 


if __name__ == '__main__':
    q = Queue()
    dTheads = [DownloadThread(i, q) for i in xrange(1, 11)]
    cEvent = Event()
    tEvent = Event()
    cThread = ConvertThread(q, cEvent, tEvent)
    tThread = TarThread(cEvent, tEvent)
    tThread.start()
    for t in dThreads:
        t.start()
    cThread.start()
    q.put((-1, None))
    print('main thread')

使用线程本地数据

import threading 
l = threading.local()
l.x = 1
def f():
     print l.x
 f()   >>>> 1
 threading.Thread(target=f).start()  >>> 异常
 def f():
     l.x = 5
 threading.Thread(target=f).start()
l.x

实例:
import os, cs2, time, struct, threading
from BaseHTTPSever import HTTPSever, BaseHTTPRequestHandler
from SocketSever import Thread, RLock
from threading import Thread, RLock 
from select import select 

class JpegStreamer(Thread):
    def __init__(self, camera):
        Thread.__init__(self, camera):
        self.cap = cv2.VideoCapture(camera)
        self.lock = RLock 
        self.pipes = {} 

    def register(self):
        pr, pw = os.pipe() 
        self.lock.acquire() 
        self.pipe[pr] = pw 
        self.lock.release() 
        return pr 

    def unrsgister(self, pr):
        self.lock.acquire()
        self.pipes.pop(pr)
        self.lock.release()
        pr.close()
        pw.close()

    def capture(self):
        cap = self.cap 
        while cap.isOpened():
            ret, frame = cap.read()
            if ret:
                # ret, data = cv2.imencode('.jpg', frame)
                ret, data = cv2.imencode('jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40))
                yield data.tostring()

    def send(self, frame):
        n = struct.pack('l', len(fname))
        self.lock.acquire()
        if len(self.pipes):
            _, pipes, _ = select([], self.pipes.itervalues(), [], 1)
            for pipe in pipes:
                os.write(pipe, n)
                os.write(pipe, frame)
        self.lock.release()

    def run(self):
        for frame in self.capture():
            self.send(fname)


class JpegRetriever(object):
    def __init__(self, streamer):
        self.streamer = streamer 
        self.local = threading.local()

    def retrieve(self):
        while True:
            ns = os.read(self.local.pipe, 8)
            n = struct.unpack('l', ns)[0]
            data = os.read(self.local.pipe, n)
            yield data 

    def __enter__(self):
        if hasattr(self.local, 'pipe'):
            return RuntimeError()
        self.pipe = streamer.register() 
        return self.retrieve()

    def __exit__(self, *args):
        slef.streamer.unregister(self.local.pipe)
        del self.local.pipe 
        return True 

class Handle(BaseHTTPRequestHandler):
    retriever = None 
    @staticmethod 
    def setJpegRetriever(retriever):
        Handle.retriever = retriever 

    def do_GET(self):
        if self.retriver is None:
            raise RuntimeError('no retriver')

        if self.path != '/':
            return 

        self.send_response(200)
        self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=abcde')
        self.end_headers()

        with self.retriever as frames:
            for fname in frames: 
                self.send_frame(frame)

    def send_frame(self, frame):
        self.wfile.write('--abcde\r\n')
        self.wfile.write('Content Type: image/jped\r\n')


if __name__ == '__main__':
    streamer = JpegStreamer(0)
    streamer.start() 

    retriever = JpegRetriever(streamer)
    Handler.setJpegRetriever(retriever)

    print 'Start server...'
    httpd = ThreadingTCPServer(('', 9000), Handle)
    httpd.serve_forever()

使用多线程

from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(3)
def f(a, b):
	print('f', a, b)
	time.sleep(10)
    return a ** b
future = executor.submit(f, 2, 3)
future.result()
executor.map(f, [2, 3, 4, 5, 6, 7], [4, 5, 6, 7, 8])

import os, cs2, time, struct, threading
from BaseHTTPSever import HTTPSever, BaseHTTPRequestHandler
from SocketSever import Thread, RLock
from threading import Thread, RLock 
from select import select 

class JpegStreamer(Thread):
    def __init__(self, camera):
        Thread.__init__(self, camera):
        self.cap = cv2.VideoCapture(camera)
        self.lock = RLock 
        self.pipes = {} 

    def register(self):
        pr, pw = os.pipe() 
        self.lock.acquire() 
        self.pipe[pr] = pw 
        self.lock.release() 
        return pr 

    def unrsgister(self, pr):
        self.lock.acquire()
        self.pipes.pop(pr)
        self.lock.release()
        pr.close()
        pw.close()

    def capture(self):
        cap = self.cap 
        while cap.isOpened():
            ret, frame = cap.read()
            if ret:
                # ret, data = cv2.imencode('.jpg', frame)
                ret, data = cv2.imencode('jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40))
                yield data.tostring()

    def send(self, frame):
        n = struct.pack('l', len(fname))
        self.lock.acquire()
        if len(self.pipes):
            _, pipes, _ = select([], self.pipes.itervalues(), [], 1)
            for pipe in pipes:
                os.write(pipe, n)
                os.write(pipe, frame)
        self.lock.release()

    def run(self):
        for frame in self.capture():
            self.send(fname)


class JpegRetriever(object):
    def __init__(self, streamer):
        self.streamer = streamer 
        self.local = threading.local()

    def retrieve(self):
        while True:
            ns = os.read(self.local.pipe, 8)
            n = struct.unpack('l', ns)[0]
            data = os.read(self.local.pipe, n)
            yield data 

    def __enter__(self):
        if hasattr(self.local, 'pipe'):
            return RuntimeError()
        self.pipe = streamer.register() 
        return self.retrieve()

    def __exit__(self, *args):
        slef.streamer.unregister(self.local.pipe)
        del self.local.pipe 
        return True 

class Handle(BaseHTTPRequestHandler):
    retriever = None 
    @staticmethod 
    def setJpegRetriever(retriever):
        Handle.retriever = retriever 

    def do_GET(self):
        if self.retriver is None:
            raise RuntimeError('no retriver')

        if self.path != '/':
            return 

        self.send_response(200)
        self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=abcde')
        self.end_headers()

        with self.retriever as frames:
            for fname in frames: 
                self.send_frame(frame)

    def send_frame(self, frame):
        self.wfile.write('--abcde\r\n')
        self.wfile.write('Content Type: image/jped\r\n')


class ThreadingPoolServer(ThreadingTCPServer):
    def __init__(slef, server_address, RequestHandlerClass, bind_and_activate=Ture, max_thread_num=100):
        super().__init__(server_adderss, RequestHandlerClass, bind_and_activate)
        self.executor = ThreadPoolExecutor(max_thread_num)

    def process_request(self, request, client_address):
        self.executor.submit(self.process_request_thread, request, client_address)


if __name__ == '__main__':
    streamer = JpegStreamer(0)
    streamer.start() 

    retriever = JpegRetriever(streamer)
    Handler.setJpegRetriever(retriever)

    print('Start server...)'
    httpd = ThreadingPoolServer(('', 9000), Handle, max_thread_num=3)
    httpd.serve_forever()

如何使用多进程

from multiprocessing import Process
x = 1
def f():
    global x
    x =5
f()
x >> 5
x = 1
p = Process(target=f)
p. start()
x >> 1
说明进程的变量是独立的
from multiprocessing import Queue, Pipe
from multiprocessing import Process
q = Queue()
def f(c):
    c.send(c.recv() * 2)
c1, c2 = Pipe()
Process(target=f, args=(c2,)).start()
c1.send(55)
c1.recv()

查找水仙花数时间的比较

from threading import Thread 
from multiprocessing import Process 

def isArmstrong(n):
    a, t = [], n
    while t > 0:
        a.append(t % 10)
        t /= 10
    k = len(a)
    return sum(x ** k for x in a) == n

def findArmstrong(a, b):
    print a, b 
    res = [k for k in xrange(a, b) if isArmstrong(k)]
    print '%s ~ %s: %s' % (a, b, res)

def findByThead(*argslist):
    workers = []
    for args in argslist:
        worker = Thread(target=findArmstrong, args=args)
        workers.append(worker)
        worker.start()
    
    for worker in workers:
        worker.join()

def findByProcess(*argslist):
    workers = []
    for args in argslist:
        worker = Process(target=findArmstrong, args=args)
        workers.append(worker)
        worker.start()
    
    for worker in workers:
        worker.join()

if __name__ == "__mian__":
    import time 
    start = time.time()
    findByProcess((20000000, 25000000), (25000000, 30000000))
    # findByThread((20000000, 25000000), (25000000, 30000000))
    print time.time() - start

函数装饰器 传统函数调用

装饰器

#coding:utf8

def memo(func):
    cache = {}
    def wrap(*args):
        if args not in cache:
            cache[args] = func(*args)
        return cache[args]
    return wrap

斐波那契数列

@memo   
def fibonacci(n):
    if n <= 1:
        return 1
    return fibonacci(n -1) + fibonacci(n -2)

fibonacci = memo(fibonacci)
print(fibonacci(50))

10个台阶,一次只能迈1-3个台阶,不能后退,走完楼梯有几种方法

@memo   
def climb(n, steps):
    count = 0
    if n == 0:
        count = 1
    elif n > 0:
        for step in steps:
            count += climb(n - step, steps)
    return count 

print(climb(10, (1, 2, 3)))

为被装饰的函数保存元数据

def f(a, b=1, c=[]):
    print a,b,c
f.__defaults__
f.__defaults__[1].append('abc')
f(100)

def f():
     a = 2
     return lambda k: a ** k
g = f()
c = g.__closure__[0]
c.cell_contents


from functools import update_wrapper, wraps, WRAPPER_ASSIGNMENTS, WRAPPER_UPDATES

def mydecorator(func):
    @wraps(func)
    def wrapper(*args, **kargs):
        '''wrapper function'''
        print('In wraper)'
        func(*args, **kargs)
    # update_wrapper(wrapper, func)
    return wrapper 

@mydecorator 
def example():
    '''example function'''
    print('In example')

print(example.__name__) 
print(example.__doc__ )

如何定义但参数的装饰器 实现一个装饰器用来检查被装饰的参数的类型,装饰器能够经过参数指明函数参数的类型,调用时若是检测出类型不匹配则抛出异常 带参数的装饰器,也就是根据参数定制化一个装饰器,能够经过生产装饰器的工厂,每次调用typeassert,返回一个特定的装饰器,而后用它去修饰其余的函数

提取函数的签名:inspect.signature()

from inspect import signature 


def typeassert(*ty_args, **ty_kwargs):
    def decorator(func):
        # func -> a,b
        # d = {'a': int, 'b': str}
        sig = signature(func)
        btypes = sig.bind_partial(*ty_args, **ty_kwargs).arguments
        def wrapper(*args, **kargs):
            # arg in d, instance(arg, **kargs)
            for name, obj in sig.bind(*args, **kargs).arguments.items():
                if name in btypes:
                    if not isinstance(obj, btypes[name]):
                        raise TypeError('"%s" must be "%s"' % (name, btypes[name])) 
            return func(*args, **kargs)
        return wrapper 
    return decorator

@typeassert(int, str, list)
def f(a, b, c):
    print(a, b, c) 

f(1, 'abc', [1, 2, 3])
f(1, 2, [1, 2, 3])

如何实现属性可修改的装饰器 为分析程序内哪些函数执行时间开销较大, 咱们定义一个带timeout参数的装饰器,装饰功能以下: 统计被装饰函数单词运行时间 时间大于参数timeout的,将这次函数调用记录到log日志中 运行时间可修改timeout的值

python3:
from functools import wraps
import time 
import logging 

def warn(timeout):
    def decorator(func):
        def wrapper(*args, **kwargs):
            start = time.time()
            res = func(*args, **kwargs)
            used = time.time() - start 
            if used > timeout:
                msg = '"%s": %s > %s' % (func.__name__, used, timeout)
                logging.warn(msg)
            return res 
        def setTimeout(k):
            nonlocal timeout 
            timeout = k
        wrapper.setTimeout = setTimeout 
        return wrapper 
    return decorator 


from random import randint
@warn(1.5)
def test():
    print('In test')
    while randint(0, 1):
        time.sleep(0.5)

for x in range(30):
    test()

test.setTimeout(1)
for x in range(30):
    test()

python2:
from functools import wraps
import time 
import logging 

def warn(timeout):
    timeout = [timeout]
    def decorator(func):
        def wrapper(*args, **kwargs):
            start = time.time()
            res = func(*args, **kwargs)
            used = time.time() - start 
            if used > timeout[0]:
                msg = '"%s": %s > %s' % (func.__name__, used, timeout[0])
                logging.warn(msg)
            return res 
        def setTimeout(k):
            # nonlocal timeout 
            timeout[0] = k
        wrapper.setTimeout = setTimeout 
        return wrapper 
    return decorator 


from random import randint
@warn(1.5)
def test():
    print('In test')
    while randint(0, 1):
        time.sleep(0.5)

for x in range(30):
    test()

test.setTimeout(1)
for x in range(30):
    test()