做者 hermanmysql
导语 ios
本文源自herman的系列文章之一《鹅厂开源框架TARS之基础组件》。相关代码已按TARS开源社区最新版本更新。git
TARS开源框架库里面用C++实现了比较多的公用组件,这些组件通常统一放在 util
文件夹,在应用层也能够自由使用,工欲善其事必先利其器,因此有必要把这些工具组件作了解,更好的使用,提升效率。接下来,本文将对以下TarsCpp组件进行分析:github
线程操做算法
智能指针sql
DB操做数据库
网络操做缓存
服务配置安全
仿函数服务器
Hash
异常处理
先看下框架对TC_ThreadQueue
类的使用以下:
typedef TC_ThreadQueue<tagRecvData*, deque<tagRecvData*> > recv_queue; // 接收队列 typedef TC_ThreadQueue<tagSendData*, deque<tagSendData*> > send_queue; // 发送队列
TC_ThreadQueue
的实现比较简单,在TARS的网络层实现中能够发现这个类比较重要,由于从框架中收到的网络包都会加入到这个缓存队列里面,而后多业务线程 ServantHandle
会调用 waitForRecvQueue
从该队列里面取网络数据包,而后调用 dispatch
调用协议消息对应的处理函数,先看下框架对 TC_ThreadQueue
的实现:
/** * @brief 线程安全队列 */ template<typename T, typename D = deque<T> > class TC_ThreadQueue { public: TC_ThreadQueue():_size(0){}; public: typedef D queue_type; /** * @brief 从头部获取数据, 没有数据抛异常 * * @param t * @return bool: true, 获取了数据, false, 无数据 */ T front(); /** * @brief 从头部获取数据, 没有数据则等待. * * @param t * @param millsecond(wait = true时才生效) 阻塞等待时间(ms) * 0 表示不阻塞 * -1 永久等待 * @param wait, 是否wait * @return bool: true, 获取了数据, false, 无数据 */ bool pop_front(T& t, size_t millsecond = 0, bool wait = true); ... ... }
TC_ThreadQueue
使用了C++11标准库中的<mutex>
和<condition_variable>
用于实现线程锁和 wait,以下,看下队列的成员函数:push_front
在队列前面加入数据,
template<typename T, typename D> void TC_ThreadQueue<T, D>::push_front(const T& t, bool notify) { if(notify) { std::unique_lock<std::mutex> lock(_mutex); _cond.notify_one(); _queue.push_front(t); ++_size; } else { std::lock_guard<std::mutex> lock (_mutex); _queue.push_front(t); ++_size; } }
如上图调用push_front
函数的时候调用 std::unique_lock<std::mutex> lock(_mutex)
加锁 ,避免网络层接收数据和业务层取同一队列的数据冲突,_cond.notify_one()
通知等待在该锁上某一个线程醒过来,调用该函数以前必须加锁,由于有数据过来了,例如网络层有线程须要取包并进行分发处理。
再看一个成员函数pop_front
,从头部获取数据,没有数据则等待。millisecond
阻塞等待时间(ms)
0
表示不阻塞-1
永久等待template<typename T, typename D> bool TC_ThreadQueue<T, D>::pop_front(T& t, size_t millsecond, bool wait) { if(wait) { std::unique_lock<std::mutex> lock(_mutex); if (_queue.empty()) { if (millsecond == 0) { return false; } if (millsecond == (size_t) -1) { _cond.wait(lock); } else { //超时了 if (_cond.wait_for(lock, std::chrono::milliseconds(millsecond)) == std::cv_status::timeout) { return false; } } } if (_queue.empty()) { return false; } t = _queue.front(); _queue.pop_front(); assert(_size > 0); --_size; return true; } else { std::lock_guard<std::mutex> lock (_mutex); if (_queue.empty()) { return false; } t = _queue.front(); _queue.pop_front(); assert(_size > 0); --_size; return true; } }
BindAdapter::waitForRecvQueue
的函数就是调用了pop_front
函数,用于等待接收队列,函数原型以下:
bool TC_EpollServer::BindAdapter::waitForRecvQueue(uint32_t handleIndex, shared_ptr<RecvContext> &data) { bool bRet = getRecvQueue(handleIndex).pop_front(data); if (!bRet) { return bRet; } --_iRecvBufferSize; return bRet; }
这里BindAdapter::waitForRecvQueue
用于业务线程在等待服务器监听的适配器收到网络包后进行业务包的处理,这里传入的handleIndex
表示接收队列索引,获取对应的_rbuffer
。
TC_ThreadLock
类的定义以下
typedef TC_Monitor<TC_ThreadMutex, TC_ThreadCond> TC_ThreadLock;
TC_Monitor
线程锁监控模板类。一般线程锁,都经过该类来使用,而不是直接用TC_ThreadMutex
、TC_ThreadRecMutex
。
类的定义template <class T, class P> class TC_Monitor
须要传入两个模板参数,TC_Monitor
包括如下成员变量:
mutable int _nnotify; // 上锁的次数 mutable P _cond; // 条件变量 T _mutex; // 互斥锁 /** * @brief 定义锁控制对象 */ typedef TC_LockT<TC_Monitor<T, P> > Lock; typedef TC_TryLockT<TC_Monitor<T, P> > TryLock;
第一个参数 TC_ThreadMutex
表明线程锁:同一个线程不能够重复加锁 ,包含成员变量
mutable std::mutex _mutex
延伸阅读,这里tc_thread_mutex.h
还包括另一个循环锁类TC_ThreadRecMutex
,即一个线程能够加屡次锁,定义以下:
// 定义于tc_monitor.h中 typedef TC_Monitor<TC_ThreadRecMutex, TC_ThreadCond> TC_ThreadRecLock;
第二个参数 TC_ThreadCond
表明线程信号条件类:全部锁能够在上面等待信号发生,包含线程条件成员变量:
mutable std::condition_variable_any _cond
结合实际的使用场景,TC_Monitor::timedWait()
会调用 TC_ThreadCond
对象的 timedWait
函数,下一步调用 chrono
库的 milliseconds
;TC_ThreadCond::signal()
实现发送信号,等待在该条件上的一个线程会醒。
TC_LockT
类定义: template <typename T> class TC_LockT
锁模板类,与其余具体锁配合使用,构造时候加锁,析够的时候解锁。
TC_LockT
构造函数,传入互斥量初始化成员变量 _mutex
,TC_LockT
构造函数实现:
TC_LockT(const T& mutex) : _mutex(mutex) { _mutex.lock(); _acquired = true; }
到这里就能够看出 TC_Monitor
定义的 typedef TC_LockT<TC_Monitor<T, P> > Lock
,这里 Lock
类型的模板参数用的是 TC_Monitor
类。
实际使用场景以下:
Lock lock(*this);
TC_LockT
的构造函数,传入参数 this
为 TC_Monitor
的子类对象,TC_LockT
的构造函数调用_mutex.lock()
;实际就是调用了 TC_Monitor
对象的 lock
函数,TC_Monitor
的 lock
函数实现:
void lock() const { _mutex.lock(); _nnotify = 0; }
这里 _mutex
为 TC_ThreadMutex
对象,进一步调用了 TC_ThreadRecMutex::lock()
成员函数,实现以下:
void TC_ThreadMutex::lock() const { _mutex.lock(); }
而后上面定义的lock栈变量退出函数的时候调用 TC_LockT
的析构函数:实现以下:
virtual ~TC_LockT() { if (_acquired) { _mutex.unlock(); //这里会调用TC_Monitor的unlock函数 } }
TC_Monitor
的 unlock
函数实现:
void unlock() const { notifyImpl(_nnotify); _mutex.unlock(); //这里会调用C++标准库<mutex>中的unlock }
这里调用 notifyImpl
函数是由于 TC_Monitor
类不仅能够实现简单的互斥锁功能,还能够实现条件变量Condition功能,其中 notifyImpl
的实现为
void notifyImpl(int nnotify) const { if(nnotify != 0) { if(nnotify == -1) { _cond.broadcast(); return; } else { while(nnotify > 0) { _cond.signal(); --nnotify; } } } }
仍是老样子,先看下项目实际对线程基类的使用。实际项目使用中,咱们对 TC_Thread
又封装了一下,实现了一个BasicThread
类,下面看下 BasicThread
的定义:
class BasicThread : public tars::TC_Thread, public tars::TC_ThreadLock { ... void terminate() { _bTerm = true; { Lock lock(*this); notifyAll(); } getThreadControl().join(); } }
BasicThread
类,继承了 TC_Thread
和 TC_ThreadLock
,其中 TC_ThreadLock
第二点已经说明过了,因此这里重点看下 TC_Thread
类的使用,TC_Thread
的定义
class TC_Thread : public TC_Runable { ... /** * 使用了C++11标准线程库std::thread, 构造函数传参数threadEntry线程函数, * 返回 TC_ThreadControl(_th),其中_th为std::thread对象 */ TC_ThreadControl start(); static void threadEntry(TC_Thread *pThread); //静态函数, 线程入口 virtual void run() = 0; ... }
下一步看下线程控制类 TC_ThreadControl
的定义:
class TC_ThreadControl { ... explicit TC_ThreadControl(std::thread *th); // 构造,传入std::thread对象 void join(); // 调用std::thread的join()阻塞当前的线程,直到另一个线程运行结束 static void sleep(); // 调用std::this_thread::sleep函数线程将暂停执行 ... }
下一步看下 TC_Runable
的定义:
class TC_Runable { public: virtual ~TC_Runable(){}; virtual void run() = 0; //定义了run纯虚函数 };
最后看下实际项目中对线程类的使用
class AntiSdkSyncThread : public BasicThread //这里等于多继承了TC_Thread和TC_ThreadLock两个类 { void run() //实现基类的纯虚函数 { Lock lock(*this); timedWait(10 * 1000); (间隔执行时间,实现了线程的定时执行功能) if(NULL != g_busi_interf) { Int32 ret = g_busi_interf->proc_(); //须要按期执行的函数 } } }
定义好了 AntiSdkSyncThread g_antiSdkSyncThread;
类,那么须要启动线程的时候执行g_antiSdkSyncThread.start();
就会天然建立线程,而且 threadEntry
线程函数会调用 pThread->run()
多态函数,进程退出的时候调用 g_antiSdkSyncThread.terminate();
。
这里的智能指针能够放在容器中,且线程安全的智能指针,CPP11标准库的auto_ptr
是不能放在容器中的,貌似已经被淘汰了,目前多数使用CPP11标准库的shared_ptr
,不过须要编译器支持CPP11。
TC_HandleBase
智能指针基类的定义以下,全部须要智能指针的类都须要从该对象继承,其中使用了C++11标准库中的<atomic>
进行原子计数。
class UTIL_DLL_API TC_HandleBase { public: /** * @brief 复制 * * @return TC_HandleBase& */ TC_HandleBase& operator=(const TC_HandleBase&) { return *this; } /** * @brief 增长计数 */ void incRef() { ++_atomic; } /** * @brief 减小计数 */ void decRef() { if((--_atomic) == 0 && !_bNoDelete) { _bNoDelete = true; delete this; } } /** * @brief 获取计数. * * @return int 计数值 */ int getRef() const { return _atomic; } /** * @brief 设置不自动释放. * * @param b 是否自动删除,true or false */ void setNoDelete(bool b) { _bNoDelete = b; } protected: /** * @brief 构造函数 */ TC_HandleBase() : _atomic(0), _bNoDelete(false) { } /** * @brief 拷贝构造 */ TC_HandleBase(const TC_HandleBase&) : _atomic(0), _bNoDelete(false) { } /** * @brief 析构 */ virtual ~TC_HandleBase() { } protected: std::atomic<int> _atomic; // 引用计数 bool _bNoDelete; // 是否自动删除 };
下一步看 TC_AutoPtr
智能指针模板类,能够放在容器中,且线程安全的智能指针,该智能指针经过引用计数实现,其构造函数和析构函数定义以下:
template<typename T> class TC_AutoPtr { TC_AutoPtr(T* p = 0) { _ptr = p; if(_ptr) { _ptr->incRef(); //构造函数 引用计算加1 } } ... ~TC_AutoPtr() { if(_ptr) { _ptr->decRef(); //析构函数 引用计算减1 } } }
struct ConnStruct : public TC_HandleBase{...} typedef TC_AutoPtr<ConnStruct> ConnStructPtr;
TC_AutoPtr
拷贝构造调用 _ptr->incRef();
这里 ptr
为 ConnStruct
,ConnStruct
继承于TC_HandleBase
,等于调用了TC_HandleBaseT<int>::incRef() {++_atomic;}
引用计数原子操做加一、析构引用计数原子操做减1,当引用计数减小到0时根据设置的开关是否要进行删除来决定是否触发delete。
// 定义回调函数智能指针,其中SessionCallback父类继承于TC_HandleBase typedef TC_AutoPtr<SessionCallback> SessionCallbackPtr; //建立回调类SessionCallbackPtr,并传入初始化参数uin gameid等; SessionCallbackPtr cb = new SessionCallback(iUin, iGameId, iSeqID, iCmd,sSessionID, theServant, current, cs, this); //异步调用sessionserver远程接口 getSessionPrx()->async_getSession(cb, iUin, iGameId);
接口返回完成,回调SessionCallback::callback_getSession(tars::Int32 ret, const MGComm::SessionValue& retValue)
函数,接收sessionserver
接口的返回的SessionValue
结构。
由于 SessionCallbackPtr
使用了智能指针,因此业务不须要去手动释放前面 new
出来的 SessionCallbackPtr
,仍是比较方便的。
TC_Mysql封装好的mysql操做类,非线程安全,对于 insert/update 能够有更好的函数封装,防止SQL注入
使用方式:
TC_Mysql mysql; //初始化mysql,init时不连接,请求时自动创建连接; //数据库能够为空; //端口默认为3306 mysql.init("192.168.1.2", "pc", "pc@sn", "db_tars_demo");
一般用:void init(const TC_DBConf& tcDBConf);
直接初始化数据库。例如:stDirectMysql.init(_stZoneDirectDBConf);
看下TC_DBConf
的定义
struct TC_DBConf { string _host; string _user; string _password; string _database; string _charset; int _port; int _flag; //客户端标识 TC_DBConf() : _port(0) , _flag(0) {} /** * @brief 读取数据库配置. * * @param mpParam 存放数据库配置的map * dbhost: 主机地址 * dbuser:用户名 * dbpass:密码 * dbname:数据库名称 * dbport:端口 */ void loadFromMap(const map<string, string> &mpParam) { map<string, string> mpTmp = mpParam; _host = mpTmp["dbhost"]; _user = mpTmp["dbuser"]; _password = mpTmp["dbpass"]; _database = mpTmp["dbname"]; _charset = mpTmp["charset"]; _port = atoi(mpTmp["dbport"].c_str()); _flag = 0; if(mpTmp["dbport"] == "") { _port = 3306; } } };
//进一步看下获取数据的使用 TC_Mysql::MysqlData data; data = mysql.queryRecord("select * from t_app_users"); for(size_t i = 0; i < data.size(); i++) { //若是不存在ID字段,则抛出异常 cout << data[i]["ID"] << endl; }
查询出来的mysql数据用MysqlData
封装
class MysqlData { ... vector<map<string, string> >& data(); ... }
//插入数据,指定数据的类型:数值 或 字符串,对于字符串会自动转义 map<string, pair<TC_Mysql::FT, string> > m; m["ID"] = make_pair(TC_Mysql::DB_INT, "2334"); m["USERID"] = make_pair(TC_Mysql::DB_STR, "abcttt"); m["APP"] = make_pair(TC_Mysql::DB_STR, "abcapbbp"); m["LASTTIME"] = make_pair(TC_Mysql::DB_INT, "now()"); mysql.replaceRecord("t_user_logs", m);
整个TARS核心就提供一个很完善的网络框架,包括RPC功能,这里只介绍几个经常使用的网络组件。
提供socket的操做类;支持tcp/udp socket;支持本地域套接字。
再下一层TARS封装了TC_TCPClient
和TC_UDPClient
两个类用于实际操做tcp和udp应用。
例如:tcp客户端
TC_TCPClient stRouterClient; stRouterClient.init(sIP, iPort, iTimeOut); // 这里传入ip和端口而后调用sendRecv进行消息的收发 Int32 ret = stRouterClient.sendRecv(request.c_str(), request.length(), recvBuf, iRecvLen);
注意多线程使用的时候,不能多线程同时send/recv,当心串包。
提供网络epoll的操做类,默认是ET模式,当状态发生变化的时候才得到通知,提供add、mod、del、wait等基础操做。
提供关键成员函数init(const string &sIp, int iPort, int iTimeout)
,传入 IP 端口 和 超时时间
TC_TCPClient
继承于 TC_ClientSocket
提供成员函数:
sendRecv
(发送到服务器, 从服务器返回不超过iRecvLen的字节)sendRecvBySep
( 发送倒服务器, 并等待服务器直到结尾字符, 包含结尾字符)stRouterClient.init(sIP, iPort, iTimeOut); size_t iRecvLen = sizeof(recvBuf)-1; Int32 ret = stRouterClient.sendRecv(request.c_str(), request.length(), recvBuf, iRecvLen);
同理还有TC_UDPClient
实现UDP客户端。
main
的输入参数,支持如下形式的参数:./main.exe --name=value --param1 param2 param3
TC_Option op; //解析命令行 op.decode(argc, argv); //获取成对的参数,即获取 - - 表示的全部参数对 map<string, string> mp = op.getMulti(); //表示非 – 的参数:即 param2, param3 vector<string> d = op.getSingle();
若是value,param有空格或者 --
,用引号括起来就能够了。
TC_Config config; config.parseFile(ServerConfig::BasePath + ServerConfig::ServerName + ".conf"); stTmpGameServerConfig.iGameId = TC_Common::strto<UInt32>(config["/Main/<GameId>"]);
配置文件样例
<Main> GameId = 3001 ZoneId = 102 AsyncThreadCheckInterval = 1000 ... </Main>
使用get
方法例子:若是读不到该配置,则返回默认值 sDefault
,即下面例子中的 20000000
stTmpGameServerConfig.iMaxRegNum = TC_Common::strto<Int32>(config.get("/Main/<MaxRegNum>", "20000000"));
TC_Functor
参考loki
库的设计
a(p1,p2);
A a(p1, p2); a();
简单又好用的封装,具体见下面使用例子天然明白:
void TestFunction3(const string &s, int i){ cout << "TestFunction3('" << s << "', '" << i << "')" << endl; } //采用函数指针构造对象 TC_Functor<void, TL::TLMaker<const string&, int>::Result > cmd3(TestFunction3); string s3("s3"); cmd3(s3, 10);
C函数调用用wrapper封装:
//调用封装,构造的时候传入参数 TC_Functor<void,TL::TLMaker<const string&, int>::Result>::wrapper_type fwrapper3(cmd3, s3, 10); fwrapper3(); //参数已经在构造的时候传入,调用的时候不用传参数了
说明:
void
: 函数的返回值TL::TLMaker<const string&, int>::Result
: 表明参数类型对于调用的封装,注意对于传引用类型,具体的调用时候要保证引用的对象存在。
struct TestMember { void mem3(const string &s, int i) { cout << "TestMember::mem3(" << s << "," << i << ") called" << endl; } } TC_Functor<void, TL::TLMaker<const string&, int>::Result > cmd3(&tm, &TestMember::mem3); cmd3("a", 33);
指向类成员函数的调用用wrapper封装:
TC_Functor<void, TL::TLMaker<const string&, int>::Result >::wrapper_type fwrapper3(cmd3, "a", 10); fwrapper3();
服务初始化initialize的时候,通常会调用
addServantProtocol(sRouterObj, AppProtocol::parseStream<0, uint16_t, false>,iHeaderLen);
这里设置BindAdapter
的协议解析函数 protocol_functor _pf
为 parseStream
函数,以下:
/** * @param T * @param offset * @param netorder * @param in * @param out * @return int */ template<size_t offset, typename T, bool netorder> static TC_NetWorkBuffer::PACKET_TYPE parseStream(TC_NetWorkBuffer& in,vector<char>& out) { size_t len = offset + sizeof(T); if (in.getBufferLength() < len) { return TC_NetWorkBuffer::PACKET_LESS; } string header; in.getHeader(len, header); assert(header.size() == len); T iHeaderLen = 0; ::memcpy(&iHeaderLen, header.c_str() + offset, sizeof(T)); if (netorder) { iHeaderLen = net2host<T>(iHeaderLen); } //长度保护一下 if (iHeaderLen < (T)(len) || (uint32_t)iHeaderLen > TARS_NET_MAX_PACKAGE_SIZE) { return TC_NetWorkBuffer::PACKET_ERR; } if (in.getBufferLength() < (uint32_t)iHeaderLen) { return TC_NetWorkBuffer::PACKET_LESS; } in.getHeader(iHeaderLen, out); assert(out.size() == iHeaderLen); in.moveHeader(iHeaderLen); return TC_NetWorkBuffer::PACKET_FULL; }
注册好解析函数以后,网络层收包调用parseProtocol
函数
int TC_EpollServer::Connection::parseProtocol(TC_NetWorkBuffer &rbuf) { ... TC_NetWorkBuffer::PACKET_TYPE b = _pBindAdapter->getProtocol()(rbuf, ro); //这里回调前面设置好的协议解析函数,从而实现协议解析 ... }
util/tc_hash_fun.h
中包含了对hash算法的实现,使用 hash_new
,能够对输入的字节流进行hash获得至关均匀的hash值,使用方式以下
#include "util/tc_hash_fun.h" #include <iterator> #include <iostream> #include <sys/time.h> using namespace tars; using namespace std; int main(int argc, char* *argv[]) { unsigned int i = tars::hash_new<string>()("abcd"); cout << i << endl; return 0; }
class TC_Exception : public exception { /** * @brief 构造函数,提供了一个能够传入errno的构造函数, * 异常抛出时直接获取的错误信息 * * @param buffer 异常的告警信息 * @param err 错误码, 可用strerror获取错误信息 */ TC_Exception(const string &buffer, int err); }
本文介绍分析了TARS框架中用C++实现的公用基础组件,加深对这些工具类基础组件的理解,减小在使用这些组件过程当中产生的问题,提升开发效率。
TARS能够在考虑到易用性和高性能的同时快速构建系统并自动生成代码,帮助开发人员和企业以微服务的方式快速构建本身稳定可靠的分布式应用,从而令开发人员只关注业务逻辑,提升运营效率。多语言、敏捷研发、高可用和高效运营的特性使 TARS 成为企业级产品。
TARS微服务助您数字化转型,欢迎访问:
TARS官网:https://TarsCloud.org
TARS源码:https://github.com/TarsCloud
获取《TARS官方培训电子书》:https://wj.qq.com/s2/6570357/...
或扫码获取: