点击跳转Python笔记总目录mysql
保证一个类只有一个实例,并提供一个访问它的全局访问点sql
单例,即只有一个类的实例数据库
当类只有一个实例并且客户能够从一个众所周知的访问点访问它时 好比:数据库连接、Socket建立连接安全
单利至关于全局变量,但防止了命名空间被污染 与单利模式功能类似的概念:全局变量、静态变量(方法)bash
试问?为何用单例模式,不用全局变量呢?session
答、全局变量可能会有名称空间的干扰,若是有重名的可能会被覆盖多线程
s1.py
class Foo(object):
def test(self):
print("123")
v = Foo()
#v是Foo的实例
s2.py
from s1 import v as v1
print(v1,id(v1)) #<s1.Foo object at 0x0000000002221710> 35788560
from s1 import v as v2
print(v1,id(v2)) #<s1.Foo object at 0x0000000002221710> 35788560
## 两个的内存地址是同样的
## 文件加载的时候,第一次导入后,再次导入时不会再从新加载。
复制代码
二、基于类实现的单例模式post
## **********************单例模式:没法支持多线程状况**************=
class Singleton(object):
def __init__(self):
import time
time.sleep(1)
@classmethod
def instance(cls, *args, **kwargs):
if not hasattr(Singleton, "_instance"):
Singleton._instance = Singleton(*args, **kwargs)
return Singleton._instance
import threading
def task(arg):
obj = Singleton.instance()
print(obj)
for i in range(10):
t = threading.Thread(target=task,args=[i,])
t.start()
## ********************单例模式:支持多线程状况****************、
import time
import threading
class Singleton(object):
_instance_lock = threading.Lock()
def __init__(self):
time.sleep(1)
@classmethod
def instance(cls, *args, **kwargs):
if not hasattr(Singleton, "_instance"):
with Singleton._instance_lock: #为了保证线程安全在内部加锁
if not hasattr(Singleton, "_instance"):
Singleton._instance = Singleton(*args, **kwargs)
return Singleton._instance
def task(arg):
obj = Singleton.instance()
print(obj)
for i in range(10):
t = threading.Thread(target=task,args=[i,])
t.start()
time.sleep(20)
obj = Singleton.instance()
print(obj)
## 使用先说明,之后用单例模式,obj = Singleton.instance()
## 示例:
## obj1 = Singleton.instance()
## obj2 = Singleton.instance()
## print(obj1,obj2)
## 错误示例
## obj1 = Singleton()
## obj2 = Singleton()
## print(obj1,obj2)
复制代码
三、基于__new__实现的单例模式(最经常使用)fetch
class Singleton:
def __new__(cls, *args, **kw):
if not hasattr(cls, '_instance'):
cls._instance = object.__new__(cls, *args, **kw)
return cls._instance
one = Singleton()
two = Singleton()
two.a = 3
print(one.a)
## 3
## one和two彻底相同,能够用id(), **, is检测
print(id(one))
## 29097904
print(id(two))
## 29097904
print(one ** two)
## True
print(one is two)
复制代码
""" 1.对象是类建立,建立对象时候类的__init__方法自动执行,对象()执行类的 __call__ 方法 2.类是type建立,建立类时候type的__init__方法自动执行,类() 执行type的 __call__方法(类的__new__方法,类的__init__方法) ## 第0步: 执行type的 __init__ 方法【类是type的对象】 class Foo: def __init__(self): pass def __call__(self, *args, **kwargs): pass ## 第1步: 执行type的 __call__ 方法 ## 1.1 调用 Foo类(是type的对象)的 __new__方法,用于建立对象。 ## 1.2 调用 Foo类(是type的对象)的 __init__方法,用于对对象初始化。 obj = Foo() ## 第2步:执行Foo的 __call__ 方法 obj() """
## **********=类的执行流程****************
class SingletonType(type):
def __init__(self,*args,**kwargs):
print(self) #会不会打印? #<class '__main__.Foo'>
super(SingletonType,self).__init__(*args,**kwargs)
def __call__(cls, *args, **kwargs): #cls = Foo
obj = cls.__new__(cls, *args, **kwargs)
obj.__init__(*args, **kwargs)
return obj
class Foo(metaclass=SingletonType):
def __init__(self,name):
self.name = name
def __new__(cls, *args, **kwargs):
return object.__new__(cls, *args, **kwargs)
''' 一、对象是类建立的,建立对象时类的__init__方法会自动执行,对象()执行类的__call__方法 二、类是type建立的,建立类时候type类的__init__方法会自动执行,类()会先执行type的__call__方法(调用类的__new__,__init__方法) Foo 这个类是由SingletonType这个类建立的 '''
obj = Foo("hiayan")
## ************第三种方式实现单例模式****************=
import threading
class SingletonType(type):
_instance_lock = threading.Lock()
def __call__(cls, *args, **kwargs):
if not hasattr(cls, "_instance"):
with SingletonType._instance_lock:
if not hasattr(cls, "_instance"):
cls._instance = super(SingletonType,cls).__call__(*args, **kwargs)
return cls._instance
class Foo(metaclass=SingletonType):
def __init__(self,name):
self.name = name
obj1 = Foo('name')
obj2 = Foo('name')
print(obj1,obj2)
复制代码
import pymysql
import threading
from DBUtils.PooledDB import PooledDB
class SingletonDBPool(object):
_instance_lock = threading.Lock()
def __init__(self):
self.pool = PooledDB(
creator=pymysql, ## 使用连接数据库的模块
maxconnections=6, ## 链接池容许的最大链接数,0和None表示不限制链接数
mincached=2, ## 初始化时,连接池中至少建立的空闲的连接,0表示不建立
maxcached=5, ## 连接池中最多闲置的连接,0和None不限制
maxshared=3,
## 连接池中最多共享的连接数量,0和None表示所有共享。PS: 无用,由于pymysql和MySQLdb等模块的 threadsafety都为1,全部值不管设置为多少,_maxcached永远为0,因此永远是全部连接都共享。
blocking=True, ## 链接池中若是没有可用链接后,是否阻塞等待。True,等待;False,不等待而后报错
maxusage=None, ## 一个连接最多被重复使用的次数,None表示无限制
setsession=[], ## 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
## ping MySQL服务端,检查是否服务可用。## 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8'
)
def __new__(cls, *args, **kwargs):
if not hasattr(SingletonDBPool, "_instance"):
with SingletonDBPool._instance_lock:
if not hasattr(SingletonDBPool, "_instance"):
SingletonDBPool._instance = object.__new__(cls, *args, **kwargs)
return SingletonDBPool._instance
def connect(self):
return self.pool.connection()
复制代码
from pool import SingletonDBPool
def run():
pool = SingletonDBPool()
conn = pool.connect()
## xxxxxx
cursor = conn.cursor()
cursor.execute("select * from td where id=%s", [5, ])
result = cursor.fetchall() ## 获取数据
cursor.close()
conn.close()
if __name__ ** '__main__':
run()
复制代码
#
def wrapper(cls):
instance = {}
def inner(*args,**kwargs):
if cls not in instance:
instance[cls] = cls(*args,**kwargs)
return instance[cls]
return inner
@wrapper
class Singleton(object):
def __init__(self,name,age):
self.name = name
self.age = age
obj1 = Singleton('haiyan',22)
obj2 = Singleton('xx',22)
print(obj1)
print(obj2)
复制代码