OpenStakc开发工程师面试必备技能

一、python垃圾回收机制
html

python也是带有垃圾回收机制的,就像其它语言同样,如java、ruby、go等,这里介绍下python的垃圾回收机制是怎么实现的?java

参考连接:http://jin-yang.github.io/blog/python-garbage-collection.htmlpython


二、python多进程、多线程git

这块内容转载来自:http://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/0014319272686365ec7ceaeca33428c914edf8f70cca383000 github


先来介绍下并发和并行的概念。数据库

并发:在同一时间作着不少事情,好比只有一个cpu核,那么操做系统就会在各个程序之间快速切换。segmentfault

并行:确实在同一时间作着不少事情,若是有多个cpu核,确实可以同时执行多个程序。windows


整体来讲,多任务的实现由3种方式:api

一、多进程模式ruby

二、多线程模式

三、多进程+多线程模式


一、多进程模式

python的os.fork能够轻松建立子进程

import os    
if __name__ == '__main__':
    
    pid = os.fork()
    if pid == 0:
        print("child process:{0}, parent process:{1}".format(os.getpid(), os.getppid()))
    else:
        print("parent process:{0} is creating child process {1}".format(os.getpid(), pid))

因为windows没有fork调用,因此推荐使用能够跨平台的multiprocessing多进程模块。


import os
from multiprocessing import Process     # 提供Process类来表示进程对象
def run_proc(name):
    print "child process %s is running, pid: %s" % (name, os.getpid()) 
    
if __name__ == '__main__':
    print "parent process: %s" % (os.getpid())
    
    p = Process(target=run_proc, args=('test_process',))   # 建立一个Process实例,能够查看Process的__init__函数,只须要一个执行函数和函数参数
    print "child process will start"
    p.start()                            # 启动子进程
    p.join(timeout=10)                   # 等待子进程执行结束后,再继续执行
    print "child process end"
    
执行结果:    
parent process: 13119
child process will start
child process test_process is running, pid: 13120
child process end


以进程池方式批量建立子进程

import os, random, time
from multiprocessing import Pool   

def run_proc(name):
    print "child process %s is running, pid: %s" % (name, os.getpid()) 
    start = time.time()
    time.sleep(random.random()*5)
    end = time.time()
    print "child process %s runs %s seconds" % (name, end - start)
    
if __name__ == '__main__':
    print "parent process: %s" % (os.getpid())
    p = Pool()
    for i in xrange(5):
        p.apply_async(run_proc, args=(i,))      启动方式跟Process有点区别
    print "wait all child process to finish"
    p.close()
    p.join()                         # 对Pool对象调用join方法以前,必须先调用close方法,即再也不继续添加新的Process对象了
    print "all child process done"
    
运行结果:
parent process: 13149
wait all child process to finish
child process 0 is running, pid: 13151
child process 1 is running, pid: 13152
child process 2 is running, pid: 13153
child process 3 is running, pid: 13154
child process 0 runs 0.198132038116 seconds
child process 4 is running, pid: 13151
child process 3 runs 0.270474910736 seconds
child process 1 runs 4.15184187889 seconds
child process 2 runs 4.84887504578 seconds
child process 4 runs 4.76589512825 seconds
all child process done

这里看到child process 4须要等待后才能执行,由于Pool的默认大小是4;能够修改Pool(10),而后再运行

Python multiprocessing默认不支持instance method

http://nyeggen.com/post/2011-07-16-python-multiprocessing-with-instance-methods/



进程间通讯

multiprocessing模块提供queue、pipes等多种方式来通讯

from multiprocessing import Process, Queue
import time, random

def read(q):
    while True:
        v = q.get(True)
        print("get %s from queue" % v)
        
def write(q):
    for i in ['a','b','c']:
        print("put %s into queue" % i)
        q.put(i)
        time.sleep(random.random())
        
if __name__ == "__main__":
    q = Queue()
    pr = Process(target=read, args=(q,))
    pw = Process(target=write, args=(q,))
    
    pw.start()
    pr.start()
    
    pw.join()
    pr.terminate()


二、多线程模式

多线程也能够实现多任务的执行,python提供了thread和threading模块,threading是对thread的进一步封装;推荐使用threading


启动一个线程

import time, threading
def loop():
    print "thread %s is running" % threading.current_thread().name   # current_thread() 返回当前线程的实例
    i = 0
    while i < 5:
        i+=1
        print "thread %s is implementing task %s" % (threading.currentThread().name, i)    
    print "thread %s end" % threading.currentThread().name
    
print "thread %s is running" % threading.current_thread().name
t = threading.Thread(target=loop, name="thread-1")
t.start()
t.join()
print "thread %s end" % threading.current_thread().name 

运行结果:
thread MainThread is running
thread thread-1 is running
thread thread-1 is implementing task 1
thread thread-1 is implementing task 2
thread thread-1 is implementing task 3
thread thread-1 is implementing task 4
thread thread-1 is implementing task 5
thread thread-1 end
thread MainThread end


线程lock

线程与进程最大的不一样就是,全部线程共享全部变量,变量对每一个进程都是一个完整的拷贝。

import time, threading
n = 0
def change(m):
    global n
    n = n + m
    n = n - m
def loop(m):
    for i in xrange(10000):
        change(m)
t = threading.Thread(target=loop, name="thread-1", args=(5,))
t2 = threading.Thread(target=loop, name='thread-2', args=(9,))
t.start()
t2.start()
t2.join()
t.join()
print n

运行结果:0、-5 -9 十、1四、五、9都出现过,只要循环次数够多


# 经过加锁能够解决多线程资源争用的问题
lock = threading.Lock()   # 建立一个锁

def change(m):
    global n
    n = n + m
    n = n - m

def loop(m):
    for i in xrange(10000):
        lock.acquire()       # 同一时刻,只有一个线程能够得到锁
        try:
            change(m)
        finally:
            lock.release()   # 为了确保,可以释放锁,避免其它线程一直在等待锁

包含锁的代码只能以单线程模式运行,利用不到多线程的优点。


因为python GIL 全局锁的存在,线程执行以前都要先获取GIL锁, 引用:

《GIL的设计简化了CPython的实现,使得对象模型,包括关键的内建类型如字典,都是隐含能够并发访问的。锁住全局解释器使得比较容易的实现对多线程的支持,但也损失了多处理器主机的并行计算能力。

可是,不论标准的,仍是第三方的扩展模块,都被设计成在进行密集计算任务是,释放GIL。

还有,就是在作I/O操做时,GIL老是会被释放。对全部面向I/O 的(会调用内建的操做系统C 代码的)程序来讲,GIL 会在这个I/O 调用以前被释放,以容许其它的线程在这个线程等待I/O 的时候运行。若是是纯计算的程序,没有 I/O 操做,解释器会每隔 100 次操做就释放这把锁,让别的线程有机会执行(这个次数能够经过 sys.setcheckinterval 来调整)若是某线程并未使用不少I/O 操做,它会在本身的时间片内一直占用处理器(和GIL)。也就是说,I/O 密集型的Python 程序比计算密集型的程序更能充分利用多线程环境的好处。》

连接http://blog.csdn.net/jinguangliu/article/details/45422663

虽然python多线程很鸡肋,但仍是能够经过多进程来实现多任务并行工做,每一个进程都有各自独立的GIL锁。


threadlocal

多线程环境下,线程使用本身的局部变量比全局变量好,要想使用全局变量就得加锁,可是使用局部变量函数之间调用传参比较麻烦。

import time, threading
from warnings import catch_warnings
threadlocal = threading.local()     # 每一个threadlocal对象均可以对name属性进行读写,并且互不影响
def print_name():
    print "hello %s, process name: %s" % (threadlocal.name, threading.current_thread().name)
def loop(m):
    threadlocal.name = m
    print_name()
    
t = threading.Thread(target=loop, name="thread-1", args=('yai',))
t2 = threading.Thread(target=loop, name='thread-2', args=('cai',))
t.start()
t2.start()
t2.join()
t.join()

运行结果:
hello yai, process name: thread-1
hello cai, process name: thread-2

threadlocal最经常使用的地方就是为每一个线程绑定一个数据库链接,HTTP请求,用户身份信息等,这样一个线程的全部调用到的处理函数均可以很是方便地访问这些资源。


协程(coroutine)

关于协程的介绍,廖老师的网站上介绍得很好:http://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001432090171191d05dae6e129940518d1d6cf6eeaaa969000

一句话总结协程的特定:子程序(函数)就是协程的特例。


元类(metaclass)

平时虽然用得很少,有必要学习下,这里有篇很好的教程:http://mp.weixin.qq.com/s?__biz=MzA4MjEyNTA5Mw==&mid=2652563643&idx=1&sn=f06beb600b41a6ec8f1d22b2b5912ed0&scene=23&srcid=0710rhyMwjbzJyechK8V3Yu6#rd



三、单元测试

unittest是经常使用的测试框架, 下面看个例子

# 下面程序实现的是像访问class的attribute同样访问dict的value
class Dict(dict):
    def __init__(self, **kwargs):
        super(Dict, self).__init__(**kwargs)
        self.c = 123
        
    def __setattr__(self, key, value):
        self[key] = value
        
    def __getattr__(self, key):
        try:
            return self[key]
        except KeyError:
            raise AttributeError(r"'Dict' object has no attribute '%s'" % key)
            
if __name__ == '__main__':
    a = Dict(a=1,b=2)
    print a.a, a.c, a['b']


针对上面程序的测试用例

import unittest
from ut import Dict

class TestDict(unittest.TestCase):          #  从unittest.TestCase继承,它提供了不少内置的测试方法
    def setUp(self):
        print "starting..."
    def test_init(self):
        d = Dict(a=1, b='123')
        self.assertEquals(d.a, 1)
        self.assertEquals(d.b, '123')
        self.assertTrue(isinstance(d, dict))
    def test_key(self):
        d = Dict()
        d['key'] = 'value'
        self.assertEquals(d.key, 'value')
    def test_value(self):
        d = Dict()
        d.key = 'value'
        self.assertTrue('key' in d)
        self.assertEquals(d['key'], 'value')
    def test_keyerror(self):
        d = Dict()
        with self.assertRaises(KeyError):      # assertRaises期待抛出指定类型的Error, 访问不存在的key时,抛出KeyError
            value = d['empty']
    def test_attrerror(self):           
        d = Dict()
        with self.assertRaises(AttributeError):
            value = d.empty
    def tearDown(self):
        print "ending..."
if __name__ == '__main__':            
    unittest.main()                            # 这样就能够像运行脚本同样,直接python ***.py; 或者不加这个也行,python -m unittest ***
    
注:以test开头的方法就是测试方法,不以test开头的方法不被认为是测试方法,测试的时候不会被执行。
能够在单元测试中编写两个特殊的setUp()和tearDown()方法。这两个方法会分别在每调用一个测试方法的先后分别被执行。


# 运行结果
starting...
ending...
.starting...
ending...
.starting...
ending...
.starting...
ending...
.starting...
ending...
.
----------------------------------------------------------------------
Ran 5 tests in 0.001s

OK


mock实战:https://www.toptal.com/python/an-introduction-to-mocking-in-python

这篇老外的教程至关不错。


Mock和MagicMock的区别:

MagicMock是Mock的子类,其实是对Mock的扩展,容许模拟python的magic methods。下面看个例子:

>>> import mock
>>> mo = mock.Mock()
>>> mo.__str__.return_value = "1234"
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: 'method-wrapper' object has only read-only attributes (assign to .return_value)
>>> mo = mock.MagicMock()
>>> mo.__str__.return_value = "1234"
>>> str(mo)
'1234'
>>>


看到一个介绍OpenStack api的教程:

经过demo学习OpenStack开发所需的基础知识 -- API服务(1)

经过demo学习OpenStack开发所需的基础知识 -- API服务(2)

经过demo学习OpenStack开发所需的基础知识 -- API服务(3)

经过demo学习OpenStack开发所需的基础知识 -- API服务(4)

相关文章
相关标签/搜索