原文连接:醒者呆的博客园,www.cnblogs.com/Evsward/p/s…html
上文书说到区块链的存储方式,并结合了EOSIO进行分析,其中也提到了使用CLion调试EOS的方法。本文将继续深刻细致地展开对加载了mongo_db_plugin的nodeos的调试过程以及心得。node
关键字:源码分析,Debug EOS,nodeos,mongo_db_plugin,CLion,C++,boost::asio::signal_set,queuelinux
本文涉及的环境:clang-6.0, clang++-6.0, GDB Debugger, make 4.1, mongodb-linux-x86_64-3.6.3, boost 1.67.0sql
关于EOS的调试环境的搭建这里再也不赘述了,下文开始针对nodeos程序进行调试。mongodb
nodeos开始运行前,要先使用项目的总CmakeList.txt配置,这里我配置了boost库的位置,若是你配置了boost的环境变量能够跳过这里。数据库
set( BOOST_ROOT "/home/evsward/opt/boost")
复制代码
总的CMakeList文件介绍完了,下面会执行到nodeos目录下的CMakeList.txt文件:json
咱们打开每个plugin的cpp文件,会发现有一个static的register方法的调用。这里会首先执行按上面plugin定义的顺序中第一个login_plugin,它的static语句以下:数组
static appbase::abstract_plugin& _login_plugin = app().register_plugin<login_plugin>();
复制代码
执行此语句时,会先执行app(),这是application单例方法。bash
application& application::instance() {
static application _app;
return _app;
}
application& app() { return application::instance(); }
复制代码
application与plugin拥有相同的实现函数,而因为它做为执行者、统筹者的存在,它会安排调用全部plugin,例如set_program_options。
复制代码
执行app()之后获取到了application的实例,而后调用了register_plugin函数,经过模板类(泛型类)携带了login_plugin的类型。register_plugin函数是模板函数,定义在application.hpp文件中。数据结构
application.hpp 中定义了私有的内存变量
map<string, std::unique_ptr<abstract_plugin>> plugins;
复制代码
abstract_plugin是全部plugin的基类,它定义了虚函数,须要继承它的子类去实现。他们与application的关系是:
abstract_plugin=>plugin(对基类的虚函数进一步使用,由application定义管理)=>各个plugin
复制代码
template<typename Plugin>
auto& register_plugin() {
auto existing = find_plugin<Plugin>(); // 从plugins寻找该plugin是否已注册。
if(existing)
return *existing; // 若是已注册了直接返回plugin的实例
auto plug = new Plugin(); // 建立该未注册plugin的实例
plugins[plug->name()].reset(plug); // 先插入到上面定义的内存变量plugins
plug->register_dependencies();// 注册该plugin的依赖plugins,每一个plugin内部都会调用APPBASE_PLUGIN_REQUIRES((chain_plugin))来注册本身依赖的别的plugin。
return *plug; // 返回plugin的实例
}
复制代码
在编译runtime环境结束之后,进入入口函数main(),
int main(int argc, char** argv)
复制代码
main函数的参数就是调用命令nodeos的经过--加入的参数,咱们能够经过nodeos的Edit Configuration来调整。其中argc是个数,argv是参数的值,是一个数组类型。以下图:
咱们接着来看main函数,它的函数体是经过app()对application单例进行的设置,包括版本号、data路径、config路径,而后是对于application实例内部方法的调用:
main函数执行了内部函数initialize_logging()还经过ilog打印了日志,输出了版本号以及eosio root路径地址。
因为main函数是入口函数,上面也介绍了它主要是对application实例的使用,以及一些异常处理等,接下来会逐一进行debug跟踪分析。
这个初始化函数是一个模板函数,模板类参数是plugin基类,在main函数调用该函数时传入了基本的插件依赖(这些是不须要咱们在config中配置的,是链启动的基础插件):chain_plugin, http_plugin, net_plugin, producer_plugin。下面来看initialize函数在application头文件中的声明:
/**
* @brief 查看 --plugin(存在于命令行或者config配置文件中)调用这些plugin的 initialize方法。
*
* @tparam Plugin plugin的列表用来初始化,即便在config中没有配置的但被其余plugin所依赖的plugin,均可以统一使用该模板类没有影响。
* @return true:plugin初始化完成,false:出现异常。
*/
template<typename... Plugin>
bool initialize(int argc, char** argv) {
return initialize_impl(argc, argv, {find_plugin<Plugin>()...}); // ...是可变参数的语法,上面经过main函数的调用,咱们传入了多个plugin。
}
复制代码
实现类initialize_impl的内容较多,不粘贴源码,直接总结一下:
application.cpp文件中的set_program_options()函数是用来生成初始化的config.ini文件内容以及nodeos命令行--help的输出内容。
复制代码
该函数首先遍历插件列表,调用每一个插件都会实现的plugin基类的虚函数set_program_options(options_description& cli, options_description& cfg),例以下面就是mongo_db_plugin的实现:
void mongo_db_plugin::set_program_options(options_description& cli, options_description& cfg)
{
cfg.add_options()
("mongodb-queue-size,q", bpo::value<uint32_t>()->default_value(256),
"The target queue size between nodeos and MongoDB plugin thread.")
("mongodb-wipe", bpo::bool_switch()->default_value(false),
"Required with --replay-blockchain, --hard-replay-blockchain, or --delete-all-blocks to wipe mongo db."
"This option required to prevent accidental wipe of mongo db.")
("mongodb-block-start", bpo::value<uint32_t>()->default_value(0),
"If specified then only abi data pushed to mongodb until specified block is reached.")
("mongodb-uri,m", bpo::value<std::string>(),
"MongoDB URI connection string, see: https://docs.mongodb.com/master/reference/connection-string/."
" If not specified then plugin is disabled. Default database 'EOS' is used if not specified in URI."
" Example: mongodb://127.0.0.1:27017/EOS")
;
}
复制代码
经过调用mongo_db_plugin的这个方法,就能够拼凑到config.ini文件中关于mongo_db_plugin的部分,由于这个插件只有对于config.ini配置文件的配置,没有对于命令行的内容,咱们能够去查看chain_plugin的实现,它会同时有配置文件和命令行两个方面的内容设置,源码较长请自行查看。
配置的对象options_description_easy_init是一个灵活的结构。能够表示:一个配置项,一个配置的值;一个配置项,一个配置的值,一个注释或者描述;一个配置项,一个注释或者描述。这些多种组合,咱们也能够直接去查看本身的config.ini的每个配置项去对应。
那么是如何拼凑全部的插件配置内容呢?
复制代码
application.cpp文件中的set_program_options()函数的函数体中使用了application的两个类变量来存储这些参数:
插件遍历结束后,咱们已经有了全部插件的config.ini配置内容以及命令行提示配置内容,下面要从宏观角度去配置一些属于application的配置项,config.ini中加入了plugins的配置,经过这个配置咱们能够手动指定要启动的插件,例如mongo_db_plugin就是须要手动配置的。接着,要配置命令行的参数内容,包括help, version, print-default-config, data-dir, config-dir, config, logconf。将他们追加存储到上面那两个类变量中。
到这里,application.cpp文件中的set_program_options()函数的工做就完成了。
上面提到的_app_options和_cfg_options仍就是傻傻分不清楚,他们的用意究竟是什么呢?
复制代码
简单来理解就是,命令行可以作全部配置文件config.ini中的配置的工做,同时命令行还有专属于本身的help, version, print-default-config, data-dir, config-dir, config, logconf配置。这样就好理解了,config.ini是命令行配置的子集,命令行配置是全集。
咱们回到initialize_impl,目前咱们已经拥有了两套默认配置参数,这里直接使用全集_app_options配置,咱们先接收来自于命令行的参数,将以它为优先级高的方式与_app_options配置合并:
bpo::variables_map options;
bpo::store(bpo::parse_command_line(argc, argv, my->_app_options), options);
复制代码
拿到合并后的配置对象options,依次针对配置项的内容进行响应:
evsward@evsward-TM1701:~/.local/share/eosio/nodeos$ tree
.
├── config
│ └── config.ini
└── data
├── blocks
│ ├── blocks.index
│ ├── blocks.log
│ └── reversible
│ ├── shared_memory.bin
│ └── shared_memory.meta
└── state
├── forkdb.dat
├── shared_memory.bin
└── shared_memory.meta
5 directories, 8 files
复制代码
bpo::store(bpo::parse_config_file<char>(config_file_name.make_preferred().string().c_str(),
my->_cfg_options, true), options);
复制代码
获得整合好本地config.ini文本配置的options对象。而后对其参数配置项进行设置。
承接上文,全部相关的plugin的执行各自的initialize。这个initialize函数是abstract_plugin的虚函数,而该虚函数被plugin类所复写:
virtual void initialize(const variables_map& options) override {
if(_state == registered) {//若是注册过
_state = initialized;
static_cast<Impl*>(this)->plugin_requires([&](auto& plug){ plug.initialize(options); });// 先执行依赖plugin的initialize方法。
static_cast<Impl*>(this)->plugin_initialize(options);// 调用自身的plugin_initialize方法实现。
//ilog( "initializing plugin ${name}", ("name",name()) );
app().plugin_initialized(*this);// 保存到一个initialized_plugins类成员变量中,用来按顺序记录已经开始启动运行的plugin。
}
assert(_state == initialized); /// 若是插件未注册,则不能执行initialize方法。
}
复制代码
因此在plugin调用initialize函数的时候,是先执行的以上复写的plugin的虚函数。咱们这里先设定几个要跟踪的plugin为目标吧,不然plugin太多,望山止步。
目标:主要研究mongo_db_plugin,以及基础plugin(chain_plugin, http_plugin, net_plugin, producer_plugin),路线是研究主plugin,如有额外的依赖plugin,看状况控制研究的深浅程度。
复制代码
前面写set_program_options()提到了mongo_db_plugin,这里研究它的plugin_initialize方法。传入的参数是结合了命令行以及本地config文件的合并配置项,按照此配置环境。
void mongo_db_plugin::plugin_initialize(const variables_map& options)
{
try {
if( options.count( "mongodb-uri" )) {//查mongodb-uri的配置,config.ini中有对应的。
ilog( "initializing mongo_db_plugin" );
my->configured = true;//设置标志位:已配置
if( options.at( "replay-blockchain" ).as<bool>() || options.at( "hard-replay-blockchain" ).as<bool>() || options.at( "delete-all-blocks" ).as<bool>() ) {//捕捉是否有replay-blockchain、hard-replay-blockchain、delete-all-blocks三个动做,有的话要判断是否要擦出mongo历史数据。
if( options.at( "mongodb-wipe" ).as<bool>()) {//检查擦除项mongodb-wipe的配置
ilog( "Wiping mongo database on startup" );
my->wipe_database_on_startup = true;//若是设置擦除,这里设置本地标志位wipe_database_on_startup
} else if( options.count( "mongodb-block-start" ) == 0 ) {//若是设置是从0开始,检查是否要所有擦除历史数据。
EOS_ASSERT( false, chain::plugin_config_exception, "--mongodb-wipe required with --replay-blockchain, --hard-replay-blockchain, or --delete-all-blocks"
" --mongodb-wipe will remove all EOS collections from mongodb." );
}
}
if( options.count( "abi-serializer-max-time-ms") == 0 ) {//eosio::chain_plugin的配置
EOS_ASSERT(false, chain::plugin_config_exception, "--abi-serializer-max-time-ms required as default value not appropriate for parsing full blocks");
}
my->abi_serializer_max_time = app().get_plugin<chain_plugin>().get_abi_serializer_max_time();
if( options.count( "mongodb-queue-size" )) {// queue大小
my->queue_size = options.at( "mongodb-queue-size" ).as<uint32_t>();
}
if( options.count( "mongodb-block-start" )) {// mongo对应的开始区块号
my->start_block_num = options.at( "mongodb-block-start" ).as<uint32_t>();
}
if( my->start_block_num == 0 ) {
my->start_block_reached = true;
}
std::string uri_str = options.at( "mongodb-uri" ).as<std::string>();
ilog( "connecting to ${u}", ("u", uri_str));
mongocxx::uri uri = mongocxx::uri{uri_str};
my->db_name = uri.database();
if( my->db_name.empty())
my->db_name = "EOS";// 默认起的库名字为EOS,若是在mongodb-uri有配置的话就使用配置的名字。
my->mongo_conn = mongocxx::client{uri};// 客户端链接到mongod
// controller中拉取得信号,在init函数中注册信号机制,始终监听链上信号,做出反应。
chain_plugin* chain_plug = app().find_plugin<chain_plugin>();//检查chain_plugin是否加载,chain_plugin是必要依赖,下面咱们要使用chain的数据。
EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "" );
auto& chain = chain_plug->chain();// 得到chain实例
my->chain_id.emplace( chain.get_chain_id());
// accepted_block_connection对应了chain的signal,是boost提供的一种信号槽机制,这种connection对象有四个,见当前源码的下方展现。
my->accepted_block_connection.emplace( chain.accepted_block.connect( [&]( const chain::block_state_ptr& bs ) {// 创建connect,每当chain有accepted_block信号(这些信号是定义在controller.hpp中,稍后会介绍),即调用下面的函数。
my->accepted_block( bs );// accepted_block认同block信息
} ));
my->irreversible_block_connection.emplace(//含义同上
chain.irreversible_block.connect( [&]( const chain::block_state_ptr& bs ) {
my->applied_irreversible_block( bs );// applied_irreversible_block,应用不可逆区块
} ));
my->accepted_transaction_connection.emplace(//含义同上
chain.accepted_transaction.connect( [&]( const chain::transaction_metadata_ptr& t ) {
my->accepted_transaction( t );// accepted_transaction认同交易
} ));
my->applied_transaction_connection.emplace(//含义同上
chain.applied_transaction.connect( [&]( const chain::transaction_trace_ptr& t ) {
my->applied_transaction( t );// applied_transaction,应用交易
} ));
if( my->wipe_database_on_startup ) {
my->wipe_database();// 擦除mongo历史数据
}
my->init();//初始化函数
} else {
wlog( "eosio::mongo_db_plugin configured, but no --mongodb-uri specified." );
wlog( "mongo_db_plugin disabled." );
}
} FC_LOG_AND_RETHROW()
}
复制代码
四个connection对象的声明以下:
fc::optional<boost::signals2::scoped_connection> accepted_block_connection;
fc::optional<boost::signals2::scoped_connection> irreversible_block_connection;
fc::optional<boost::signals2::scoped_connection> accepted_transaction_connection;
fc::optional<boost::signals2::scoped_connection> applied_transaction_connection;
复制代码
queue
这段代码中涉及到四个函数分别是accepted_block,applied_irreversible_block,accepted_transaction,applied_transaction,他们都对应着对queue的操做,mongo_db_plugin_impl类成员定义了一下几种queue:
std::deque<chain::transaction_metadata_ptr> transaction_metadata_queue;
std::deque<chain::transaction_metadata_ptr> transaction_metadata_process_queue;
std::deque<chain::transaction_trace_ptr> transaction_trace_queue;
std::deque<chain::transaction_trace_ptr> transaction_trace_process_queue;
std::deque<chain::block_state_ptr> block_state_queue;
std::deque<chain::block_state_ptr> block_state_process_queue;
std::deque<chain::block_state_ptr> irreversible_block_state_queue;
std::deque<chain::block_state_ptr> irreversible_block_state_process_queue;
复制代码
queue是mongo_db_plugin本身定义的:
/**
* 模板类Queue,能够匹配以上咱们定义的多个queue类型。
* 模板类Entry,能够匹配block_state_ptr以及transaction_trace_ptr做为被存储实体类型。
*/
template<typename Queue, typename Entry>
void queue(boost::mutex& mtx, boost::condition_variable& condition, Queue& queue, const Entry& e, size_t queue_size) {
int sleep_time = 100;//默认线程睡眠时间
size_t last_queue_size = 0;
boost::mutex::scoped_lock lock(mtx);//mutex锁机制
if (queue.size() > queue_size) {//若是超过了咱们设定的queue大小,则采起以下措施。
lock.unlock();//先解锁
condition.notify_one();// 见下文对condition的介绍
if (last_queue_size < queue.size()) {//说明queue的增长速度大于咱们程序消费处理的速度
sleep_time += 100;//增长睡眠时间
} else {
sleep_time -= 100;//说明queue的增长速度小于咱们消费的速度,就要减小睡眠时间,尽快更新last_queue_size的值。
if (sleep_time < 0) sleep_time = 100;
}
last_queue_size = queue.size();
boost::this_thread::sleep_for(boost::chrono::milliseconds(sleep_time));//线程睡眠,睡眠的时间按照上面的机制定夺。
lock.lock();//上锁
}
queue.emplace_back(e);//生效部分:插入到队列中去。
lock.unlock();//解锁
condition.notify_one();
}
复制代码
mongo_db_plugin_impl::wipe_database()
真正执行擦除mongo历史数据的函数,这个动做是由咱们配置mongodb-wipe参数来指定。擦除的函数体以下:
void mongo_db_plugin_impl::wipe_database() {
ilog("mongo db wipe_database");
// 定义的六张mongo的表类型,经过客户端链接获取到六张表的权限。
auto block_states = mongo_conn[db_name][block_states_col];
auto blocks = mongo_conn[db_name][blocks_col];
auto trans = mongo_conn[db_name][trans_col];
auto trans_traces = mongo_conn[db_name][trans_traces_col];
auto actions = mongo_conn[db_name][actions_col];
accounts = mongo_conn[db_name][accounts_col];
// 分别删除,执行drop动做。
block_states.drop();
blocks.drop();
trans.drop();
trans_traces.drop();
actions.drop();
accounts.drop();
}
复制代码
mongo_db_plugin_impl::init()
源码较多不粘贴,上面wipe_database函数,咱们删除了六张表,在init函数中,咱们要对应的创建这六张表。表名初始化:
const std::string mongo_db_plugin_impl::block_states_col = "block_states";
const std::string mongo_db_plugin_impl::blocks_col = "blocks";
const std::string mongo_db_plugin_impl::trans_col = "transactions";
const std::string mongo_db_plugin_impl::trans_traces_col = "transaction_traces";
const std::string mongo_db_plugin_impl::actions_col = "actions";
const std::string mongo_db_plugin_impl::accounts_col = "accounts";
复制代码
这就是刘张表对应的名字。这六张表在初始化创建时是一个总体操做,也就是说互为依赖关系,accounts表先建立,经过
accounts = mongo_conn[db_name][accounts_col];
复制代码
便可建立成功accounts表,其余表亦然,后面不赘述。表数据是由make_document进行组装的。首先咱们向accounts表插入一条数据,结构是name为eosio,createAt是当前时间。
经过insert_one方法将该条数据插入accounts表中。
接下来经过create_index方法对五张表创建索引,注意transaction_traces是没有索引的,init操做时不涉及transaction_traces表。
auto blocks = mongo_conn[db_name][blocks_col]; // Blocks
blocks.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" ));
blocks.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" ));// block创建了两个索引
auto block_stats = mongo_conn[db_name][block_states_col];
block_stats.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" ));
block_stats.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" ));// block_stats创建了两个索引
// accounts indexes
accounts.create_index( bsoncxx::from_json( R"xxx({ "name" : 1 })xxx" ));
// transactions indexes
auto trans = mongo_conn[db_name][trans_col]; // Transactions
trans.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" ));
auto actions = mongo_conn[db_name][actions_col];
actions.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" ));
复制代码
初始化准备就完成了,接下来要创建线程监听出块消息,同步到mongo数据库中来。
ilog("starting db plugin thread");
consume_thread = boost::thread([this] { consume_blocks(); });
startup = false;// 结束,调用析构函数,关闭mongo_db_plugin:设定标志位done = true;
复制代码
上面init函数执行到最后时,开启了一个线程,执行的是consume_blocks()函数,如字面含义这是消费区块的函数。这个函数是一个无限循环,保持线程的存活,监听queue的数据随时消费同步到mongo数据库中去,而queue的数据的是由上面plugin_initialize函数中的connect信号槽机制链接chain的出块信号往queue里面插入/同步链上数据。
condition
无线循环第一部分就是对condition.wait(lock)的操做,condition在上面queue的源码中有一个notify_one()的调用,实际上就是与wait相互应的操做。
boost::mutex::scoped_lock lock(mtx);
while ( transaction_metadata_queue.empty() &&
transaction_trace_queue.empty() &&
block_state_queue.empty() &&
irreversible_block_state_queue.empty() &&
!done ) {
condition.wait(lock);
}
复制代码
消费区块占用了一个线程,这个线程在上面四个queue是空的时候而且done也没有完成是flase的时候,该线程会经过condition来阻塞线程,参数是mutex的一个锁。
复制代码
condition.notify_one()会从新唤起这个阻塞的线程,而在mongo_db_plugin中,condition.notify_one()出现了3次:
mongo_db_plugin_impl::~mongo_db_plugin_impl() {
if (!startup) {//标志位,上面init函数结尾有这个值的赋值。
try {
ilog( "mongo_db_plugin shutdown in process please be patient this can take a few minutes" );
done = true;//设定标志位done,consume_block()会使用到。
condition.notify_one();// 唤醒consume_thread线程继续消费掉queue中残余数据。
consume_thread.join();// 挂起主线程,等待consume_thread线程执行完毕再唤起主线程。
} catch( std::exception& e ) {
elog( "Exception on mongo_db_plugin shutdown of consume thread: ${e}", ("e", e.what()));
}
}
}
复制代码
process_queue准备
咱们要将链上的数据同步至mongo,是经过上面判断是否为空的那四个queue来作,为了增长消费效率,进入consume_block函数之后,要先将数据move导入到一个process_queue中来慢慢处理,至关于一个中转站。
size_t transaction_metadata_size = transaction_metadata_queue.size();
if (transaction_metadata_size > 0) {
transaction_metadata_process_queue = move(transaction_metadata_queue);
transaction_metadata_queue.clear();
}
size_t transaction_trace_size = transaction_trace_queue.size();
if (transaction_trace_size > 0) {
transaction_trace_process_queue = move(transaction_trace_queue);
transaction_trace_queue.clear();
}
size_t block_state_size = block_state_queue.size();
if (block_state_size > 0) {
block_state_process_queue = move(block_state_queue);
block_state_queue.clear();
}
size_t irreversible_block_size = irreversible_block_state_queue.size();
if (irreversible_block_size > 0) {
irreversible_block_state_process_queue = move(irreversible_block_state_queue);
irreversible_block_state_queue.clear();
}
复制代码
队列大小报警器
接下来是一个针对四个源队列的大小进行一个监控,当任意超过限额的75%时,会触发报警,打印到控制台。
分发到具体执行函数消费队列
接下来,就是将上面的四个中转的process_queue的数据分别分发到不一样的函数(对应下面四个_process函数)中去消费处理。最后每一个中转队列处理一条,就pop出去一条,都处理结束之后,会再次判断四个源队列的大小是否为空,都消费完了,同时也得有析构函数的done标志位为true,才会中断consume_thread线程的consume_block()的无线循环。
1. mongo_db_plugin_impl::_process_accepted_transaction() 执行接收交易, 须要start_block_reached标识位为true。源码较长不粘贴,语言介绍一下,该函数的主要工做是得到mongo的链接以及库表对象,同时解析传入的const chain::transaction_metadata_ptr& t 对象,该对象的路线是:
chain=>signal=>mongo_db_plugin connect signal=>queue=>process_queue=>遍历出一条数据便是t
复制代码
得到这个对象之后,也准备好了mongo数据库的链接库表对象,剩下的工做就是从t解析导入mongo库表了。
mongo做为列存储的nosql文件数据库,这里只接收document类型
复制代码
这里建立了一个它的对象act_doc,解析过程:
const auto trx_id = t->id;
const auto trx_id_str = trx_id.str();
const auto& trx = t->trx;
const chain::transaction_header& trx_header = trx;
复制代码
act_doc.append( kvp( "action_num", b_int32{act_num} ), kvp( "trx_id", trx_id_str ));
act_doc.append( kvp( "cfa", b_bool{cfa} ));
act_doc.append( kvp( "account", act.account.to_string()));
act_doc.append( kvp( "name", act.name.to_string()));
act_doc.append( kvp( "authorization", [&act]( bsoncxx::builder::basic::sub_array subarr ) {
for( const auto& auth : act.authorization ) {
subarr.append( [&auth]( bsoncxx::builder::basic::sub_document subdoc ) {
subdoc.append( kvp( "actor", auth.actor.to_string()),
kvp( "permission", auth.permission.to_string()));
} );
}
} ));
复制代码
process_action函数处理的是action数据的匹配,而若是action涉及到新帐户的建立,这部分要在process_action函数中继续经过update_account函数进行处理。update_account函数只会过滤由system合约执行的newaccount动做,system合约默认是由chain::config::system_account_name(就是eosio)来建立的。因此过滤后的action的结构以下:
field | value |
---|---|
account | eosio |
name | newaccount |
而后会同步在mongo的accounts表中添加一条记录,要有当时的添加时间createdAt。添加以前,要根据这个用户名去mongo中查找,经过函数find_account,若是查找到了则update,未查到就insert。
auto find_account(mongocxx::collection& accounts, const account_name& name) {
using bsoncxx::builder::basic::make_document;
using bsoncxx::builder::basic::kvp;
return accounts.find_one( make_document( kvp( "name", name.to_string())));
}
复制代码
接着,是transaction表的数据插入,这个工做是对trans_doc文本类型变量的设置:
整合完毕,将trans_doc插入到transaction表中去。整个_process_accepted_transaction执行完毕,其中涉及到了transaction, action, accounts三张表的内容的增长与修改。
2. mongo_db_plugin_impl::_process_applied_transaction 执行应用交易,须要start_block_reached标识位为true。这个函数是对mongo中transaction_traces表的操做。一样的,是经过一个文本类型变量trans_traces_doc操做。这个函数的参数传入是transaction_trace_ptr类型的对象t(对应的上面_process_accepted_transaction接收的是transaction_metadata_ptr类型的)
abi_serializer::to_variant, 转化成abi格式的json数据。
abi_serializer::from_variant, 经过abi格式的json数据转换出来对应的对象数据。
复制代码
3. mongo_db_plugin_impl::_process_accepted_block
这里先要从process_accepted_block函数进入,上面的下划线_开头的函数都是又没有下划线的相同名字的函数调用的,只是他们除了调用之外都是一些异常的处理,日志的输出工做。而process_accepted_block函数有了简单的逻辑,就是根据标志位start_block_reached做出了处理。前面咱们介绍plugin_initialize函数的时候,经过配置文件的配置项"mongodb-block-start",咱们设定了全局变量start_block_num做为标志位。这里面就是对于这个参数值的一个判断,若是达到了这个设定的起始区块,则设定全局变量标志位start_block_reached为true,那么就能够进入到_process_accepted_block函数进行处理了。
复制代码
这个函数是接收区块处理。传入的参数为block_state_ptr类型的对象bs。它的路线与上面介绍过的其余函数的参数t是相同的,只是类结构不一样,存的数据不一样。该函数涉及到mongo的两张表,一个是block_states,另外一个是blocks。咱们分别来研究。
mongocxx::options::update update_opts{};
update_opts.upsert( true );// upsert模式为true,表明update操做若是未找到对象则新增一条数据。
auto block_states = mongo_conn[db_name][block_states_col];
auto block_state_doc = bsoncxx::builder::basic::document{};
// 数据结构映射
block_state_doc.append(kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
kvp( "block_id", block_id_str ),
kvp( "validated", b_bool{bs->validated} ),
kvp( "in_current_chain", b_bool{bs->in_current_chain} ));
auto json = fc::json::to_string( bhs );
try {
const auto& value = bsoncxx::from_json( json );
block_state_doc.append( kvp( "block_header_state", value ));// 追加block_header_state的值
} catch( bsoncxx::exception& ) {
try {
json = fc::prune_invalid_utf8(json);
const auto& value = bsoncxx::from_json( json );
block_state_doc.append( kvp( "block_header_state", value ));
block_state_doc.append( kvp( "non-utf8-purged", b_bool{true}));
} catch( bsoncxx::exception& e ) {
elog( "Unable to convert block_header_state JSON to MongoDB JSON: ${e}", ("e", e.what()));
elog( " JSON: ${j}", ("j", json));
}
}
block_state_doc.append(kvp( "createdAt", b_date{now} ));// 追加createdAt的值
try {
// update_one, 没有查询到相关数据则直接新增一条
if( !block_states.update_one( make_document( kvp( "block_id", block_id_str )),
make_document( kvp( "$set", block_state_doc.view())), update_opts )) {
EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block_state ${bid}", ("bid", block_id));
}
} catch(...) {
handle_mongo_exception("block_states insert: " + json, __LINE__);
}
复制代码
auto blocks = mongo_conn[db_name][blocks_col];
auto block_doc = bsoncxx::builder::basic::document{};
// 数据结构映射
block_doc.append(kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
kvp( "block_id", block_id_str ),
kvp( "irreversible", b_bool{false} ));
auto v = to_variant_with_abi( *bs->block, accounts, abi_serializer_max_time );// 转化为abi格式的数据存储。
json = fc::json::to_string( v );
try {
const auto& value = bsoncxx::from_json( json );
block_doc.append( kvp( "block", value ));// 追加block的值,为json
} catch( bsoncxx::exception& ) {
try {
json = fc::prune_invalid_utf8(json);
const auto& value = bsoncxx::from_json( json );
block_doc.append( kvp( "block", value ));
block_doc.append( kvp( "non-utf8-purged", b_bool{true}));
} catch( bsoncxx::exception& e ) {
elog( "Unable to convert block JSON to MongoDB JSON: ${e}", ("e", e.what()));
elog( " JSON: ${j}", ("j", json));
}
}
block_doc.append(kvp( "createdAt", b_date{now} ));// 追加createdAt的值
try {
// update_one, 没有查询到相关数据则直接新增一条
if( !blocks.update_one( make_document( kvp( "block_id", block_id_str )),
make_document( kvp( "$set", block_doc.view())), update_opts )) {
EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block ${bid}", ("bid", block_id));
}
} catch(...) {
handle_mongo_exception("blocks insert: " + json, __LINE__);
}
复制代码
4. mongo_db_plugin_impl::_process_irreversible_block 执行不可逆区块,,须要start_block_reached标识位为true。涉及mongo的两张表:blocks表和transactions表。
// 创世块区块号为1,没有信号到accepted_block处理。
if (block_num < 2) return;
复制代码
传入的参数,思想与上面的几个函数设计是相同的,它的类型与上面的_process_accepted_block函数相同,是block_state_ptr类型的对象bs。从bs中获取到区块,首先会经过find_block去mongo中查询,若是有的话就再也不处理。
bulk: 是一系列操做的集合。
mongocxx::options::bulk_write bulk_opts;
bulk_opts.ordered(false);// false说明能够并行,全部操做互不影响。若为true,则顺序执行,一旦遇错直接停止,后面的操做不会被执行到。
auto bulk = trans.create_bulk_write(bulk_opts);//全部的操做针对的是trans对象,对应的mongo表为transactions。
复制代码
auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ),
kvp( "validated", b_bool{bs->validated} ),
kvp( "in_current_chain", b_bool{bs->in_current_chain} ),
kvp( "updatedAt", b_date{now}))));
blocks.update_one( make_document( kvp( "_id", ir_block->view()["_id"].get_oid())), update_doc.view());
复制代码
auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ),
kvp( "block_id", block_id_str),
kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
kvp( "updatedAt", b_date{now}))));
mongocxx::model::update_one update_op{ make_document(kvp("_id", ir_trans->view()["_id"].get_oid())), update_doc.view()};
复制代码
最后经过在transaction循环中设定一个标志位transactions_in_block来肯定transaction遍历结束。
mongo_db_plugin总结
咱们是经过nodeos命令的initialize函数跟踪到mongo_db_plugin的,关于mongo_db_plugin的一切,能够总结为顺序:
1. set_program_option,设置配置参数
2. plugin_initialize,初始化使plugin配置参数生效,准备mongo链接,queue机制,信号槽监听chain出块action。
3. init,mongo库表初始化,创建索引,定义了consume_thread线程用来消费queue区块数据。initialize周期结束。
4. consume_block,线程触发与等待策略,process_queue消费中转策略,根据四种数据结构(即上文反复提到的那四个结构)分发消费函数。
复制代码
table | function insert | function update |
---|---|---|
transactions | accepted_trx | irreversible_block(bulk) |
actions | accepted_trx(bulk) | |
block_states | accepted_block | |
blocks | accepted_block | irreversible_block |
transaction_traces | applied_trx | |
accounts | accepted_trx |
比较特殊的一个表是accounts,它能够过滤actions内容,找到newaccount的action并保存帐户到表里。这给咱们以启发,咱们能够本身定义新的表来过滤本身须要的action,例如咱们本身写的智能合约。
复制代码
日志系统初始化。
void initialize_logging()
{
auto config_path = app().get_logging_conf();
if(fc::exists(config_path))
fc::configure_logging(config_path); //故意不去捕捉异常
for(auto iter : fc::get_appender_map())
iter.second->initialize(app().get_io_service());
// 重复以上代码逻辑,利用boost::asio::signal\_set机制,async\_wait。
logging_conf_loop();
}
复制代码
void application::startup() {
try {
for (auto plugin : initialized_plugins)//遍历全部已初始化的插件,执行他们的startup函数。
plugin->startup();
} catch(...) {
shutdown();//若有异常,则调用shutdown函数,清空容器,释放资源。
throw;
}
}
复制代码
这里仍旧以mongo_db_plugin为例,它的startup()是空。而对于其余plugin而言,startup都有不少工做要作,例如producer_plugin和chain_plugin都很是重要,此外涉及到重要的控制器部分controller也须要仔细研究。因为本文篇幅过长,咱们重点仍旧围绕mongo_db_plugin来介绍整个nodeos启动的生命周期。
main入口函数执行到最后一个步骤:exec函数。
void application::exec() {
std::shared_ptr<boost::asio::signal_set> sigint_set(new boost::asio::signal_set(*io_serv, SIGINT));
sigint_set->async_wait([sigint_set,this](const boost::system::error_code& err, int num) {
quit();
sigint_set->cancel();
});
std::shared_ptr<boost::asio::signal_set> sigterm_set(new boost::asio::signal_set(*io_serv, SIGTERM));
sigterm_set->async_wait([sigterm_set,this](const boost::system::error_code& err, int num) {
quit();
sigterm_set->cancel();
});
std::shared_ptr<boost::asio::signal_set> sigpipe_set(new boost::asio::signal_set(*io_serv, SIGPIPE));
sigpipe_set->async_wait([sigpipe_set,this](const boost::system::error_code& err, int num) {
quit();
sigpipe_set->cancel();
});
io_serv->run();// 与上面initialize_logging的get_io_service()获取到的io\_serv是同一个对象
shutdown(); /// 同步推出
}
复制代码
这个函数与initialize_logging的循环中涉及到相同的信号机制boost::asio::signal_set。
boost::asio::signal_set
boost库的信号量技术。它要使用到boost::asio::io_service,这也是上面提到屡次的。信号量对象的初始化可参照前文一段代码,以下:
std::shared_ptr<boost::asio::signal_set> sigint_set(new boost::asio::signal_set(*io_serv, SIGINT));
复制代码
共享指针这里不谈了,感兴趣的同窗请转到这里。它的构造函数是传入了一个boost::asio::io_service以及一个信号number SIGINT。这个SIGINT的声明为:
#define SIGINT 2 /* Interrupt (ANSI). */
复制代码
这个构造函数实现了向信号量集合中添加了一个信号2。
接着,我要经过async_wait来使用信号量。能够贴上上面initialize_logging函数的logging_conf_loop函数。
void logging_conf_loop()
{
std::shared_ptr<boost::asio::signal_set> sighup_set(new boost::asio::signal_set(app().get_io_service(), SIGHUP));
sighup_set->async_wait([sighup_set](const boost::system::error_code& err, int /*num*/) {
if(!err)
{
ilog("Received HUP. Reloading logging configuration.");
auto config_path = app().get_logging_conf();
if(fc::exists(config_path))
::detail::configure_logging(config_path);
for(auto iter : fc::get_appender_map())
iter.second->initialize(app().get_io_service());
logging_conf_loop();
}
});
}
复制代码
能够直接经过sighup_set->async_wait的方式来使用。它的声明定义是:
void (boost::system::error_code, int))
复制代码
会在所监听的信号触发时调用函数体。当发生错误的时候,退出logging_conf_loop函数的递归调用。
写到这里,咱们的nodeos的命令就启动成功了,因为篇幅限制,咱们没有仔细去研究全部依赖的plugin,以及controller的逻辑。本文重点研究了mongo_db_plugin的源码实现,经过该插件,咱们全面分析了nodeos命令启动的全部流程。而对于mongo_db_plugin插件自己的学习,咱们也明白了链数据是如何同步到mongo里面的。接下来,我会继续深刻分析其余相关插件的初始化流程以及启动流程,还有controller的逻辑细节,以及出块逻辑等等。
EOSIO/eos
圆方圆学院聚集大批区块链名师,打造精品的区块链技术课程。 在各大平台都长期有优质免费公开课,欢迎报名收看。
公开课地址:ke.qq.com/course/3451…