ceph的数据存储之路(5) -----osd数据处理

osd的数据处理

当osd接管了来自rbd client发送的请求,在osd上进行解析,通过一系列的处理,最后保存到了osd的磁盘上。安全

 

上一节中讲述了rbdclient端是如何处理请求数据的,下面开始从osd接受数据开始进行讲述。服务器

 

1、client如何与osd进行创建通讯的呢?session

1. osd是运行在服务器上的进程,进程的主体在ceph_osd.cc中的main函数开始,而且申请一些用于服务的线程。好比这里用于接客户端受链接的线程Accepter。若是配对链接成功后,会把链接成功的socket交给SimpleMessager来处理。app

  

2. 在ceph_osd.cc 中main函数中 会申请一个Messager 消息管理的类,该类是一个基类,最终会选择SimpleMessager类实例,SimpleMessager类中会有一个叫作Accepter的成员,会申请该对象,而且初始化。socket

而后在main中把它绑定一个固定的ip,这个ip根据配置文件来获取。如0577行函数

 

3.这个ip最终会绑定在Accepter中。而后在Accepter->bind函数中,会对这个ip初始化一个socket,而且保存为listen_sd。接着会启动Accepter->start(),这里会启动Accepter的监听线程,这个线程作的事情放在Accepter->entry()函数中。接下来看看这个函数中实现了什么。spa

0216:开始一个循环,线程不能结束。知道中止这个线程时。线程

0218:poll监听一个端口,等待客户端来进行链接。日志

0219:若是有客户端来链接时,若是来接成功则进行下面的处理对象

0222:msgr 是具体实现类为SimpleMessager,跟踪到实现的函数中去。

0343:当接受到一个链接时,这时须要创建一个数据通道pipe,。

0344:告知pipe 处理socket为sd。也就是从这个sd读取数据。

0346:启动pipe的读线程,从sd中读取message信息。

0349:将pipe加入到SimpleMessager中的accepting_pipes,便于管理。

 

2、消息的传递

4. 接下来已经创建了pipe的通道,立刻就要开始进行message的接受, 目前接收message的工做交给了pipe的reader线程。接下来看看pipe->reader()作了什么事情。

pipe->reader()中会先 read_message(),将传递的消息解析出来,而后把这个消息放入in_q->fast_dispatch(m)中处理,继续发送到OSD::ms_fast_dispatch中处理,在ms_fast_dispatch中会将message 转化为OpRequestRef op,后续直接对这个op进行处理。继续通过dispatch_session_waiting()、dispatch_op_fast()、handle_op()。handle_op()中开始将这个处理由OSD转化为PG的处理。

8502:根据message的信息能够得到pg_id。

8503:再根据pg_id能够恢复出pool的id。也就是pool的序号。

8508:根据固然获取到的osdmap从新修正这个pg_id。

8512:根据pg_id信息建立 pg的信息结构spg_t pgid。

 

3、消息的处理

5.上面已经根据message中的信息恢复出pg和pool的相关信息。这些信息对于osd组织数据有很大的帮助。接下来开始要交给PG进行处理了。

8568:根据pg_id在osd->pg_map中查找pg。这里要说明下PG是一个基类,由ReplicatedPG类实现。这里的PG实际上使用的就是继承类的方法。

8573:找到PG之后,这时将这个op交给PG进行继续处理。已经从OSD处理的message转化为了PG处理的op。由这里开始,最终添加到OSDService->op_wq队列,该队列是由OSD->ShardedOpWQ队列初始化,其实是保存到了OSD-> 的ShardedOpWQ  ,而后保存在结构ShardData中等待被处理,而后唤醒处理这个队列的线程,线程处理函数OSD::ShardedOpWQ::_process调用到OSD::dequeue_op()、pg->do_request()、ReplicatedPG::do_request()、ReplicatedPG::do_op()。接下来看看do_op中是如何处理的。

 

1809:这里要建立一个OpContext结构,该结构会接管message中的全部ops的操做,ops的操做就是客户端将rbd请求拆分红object的请求。

 

4、造成事务Transaction

1810:获取一个transaction,由于pgbackend类被ReplicatedBackend类实现,因此get_transaction的实现由ReplicatedBackend::get_transaction() 完成,申请一个叫作RPGTransaction 而后交给了ctx->op_t。这里想要说明的是,申请了一个事物transaction,下面就要处理这个Transaction须要与咱们的op组合了吧,在一个transaction中完成这些ops的操做,看看他们是如何关联的execute_ctx(),。

 

7. 首先使用int result = prepare_transaction(ctx); 他会将 ops上要写入的数据所有都按着结构保存在transaction的data_bl,op_bl中。拿着这个transaction 就已经得到了所有的操做和数据,这也就是将ops与Transaction绑定的结果。由于primary osd不只仅是要本身保存数据,其余的replica osd也要保存数据的副本,这里有primary osd将准备好的数据发送给replica osd。

 

5、osd完成统计处理repop

2523:申请repop,这个repop是用来管理发送给其余副本以及本身进行数据处理的统计,根据这个结构可知那些osd都完成了数据的读写操做,回调比不可少的。

2528:issue_repop()将操做率先发给其余副本,这里其实就是与其余副本通讯,将ops发送给其余副本,其余副本操做完成时可以及时的回调找到repop,而后开始处理,在issue_repop中规定了全部osd完成时的on_all_commit与on_all_applied的操做,而且将提交Transaction,使用函数submit_transaction()。这里pgbackend->submit_transaction()函数中,pgbackend对象已经被ReplicatedBackend进行了继承,因此最终使用的是ReplicatedBackend::submit_transaction()。

2530:若是数据处理完成了,使用eval_repop()进行收尾的工做,将结果回调给客户端。

 

8. 接下来继续查看函数void ReplicatedBackend::submit_transaction()中的实现方式

0590:开始准备一个处理操做的结构InProgressOp,而后将申请的on_all_commit、on_all_applied等回调操做进行统计。

0594:开始统计全部须要applied的副本操做数量,等待副本操做完成回调时进行清除,使用该结构方便统计是否是全部的副本都完成了操做。

0598:做用同0594相同,区别在于统计commit的副本操做数量。

0620:issue_op将ops的信息封装成message发送给replica osd副本的。这个操做就是在封装message,这里就再也不多说了。

0619:开始记录本端操做object的log。这个log对于数据的恢复存在相当重要的决定。

0624:开始统计本端的sync回调,主要用于快照和克隆等等的数据同步。

0625:开始注册本端的applied回调函数,这里回调后会直接向上回调all_applied()

0627:开始注册本端删除object的回调操做函数(这个暂时不用考虑)。

0629:开始注册本端的commit回调函数,这里回调后直接向上回调all_commited()。

0631:本端开始真正的处理请求。这个parent指的就是ReplicaPG -> queue_transaction,而后在进行最后osd->store->queue_transaction ,这里的store是ObjectStore 几经展转最后调用到继承类FileStore::queue_transactions()开始处理。

 

6、文件与日志的处理

9.从这里开始的 接下来就所有的交给了FileStore来处理了。

2071:在进行存储数据的时候 确定是须要记录journal的,也就是当数据进行写入的时候须要写到journal中一份,当data数据失败的时候能够从journal中进行恢复。因此这里为了安全起见,通常都会采用journal的方式进行保存数据。

2082:若是选择的文件系统(如brfs)时,这种文件系统存在checkpoint机制时,能够采用的方法,能够并行的处理日志与data的写入操做。

2084:准备写入日志的操做_op_journal_transactions。

2086:准备写入数据的操做queue_op。

2088:当选择这条路时,这种文件系统不存在checkpoint机制,因此必须先写入日志,等待日志完成后才能够写入data。

2091:这时申请一个C_JournaledAhead的回调操做,这个操做会在日志完成以后进行回调处理处理时会将data写入磁盘。_op_journal_transactions()这里开始激发写入r日志的操做。

在_op_journal_transactions()中的处理操做以下图:

0266:判断天然要进行写入journal的。

0271:循环处理Transaction,固然此时假设只有一个Transaction。

0274:解析每个Transaction的操做。

0278:获取每一个操做的数据长度。

0279:获取每一个操做的数据段偏移。

0281:将数据操做打包到内存tbl中造成日志数据。

0283:提交日志数据,完成下发任务journal->submit_entry,这里的journal由继承类FileJournal 实现的,因此这里继续调用FileJournal::submit_entry()。

1614:将申请的写日志完成的回调oncommit添加到completions队列中,等待写日志完成后开始调用,这里指的是C_JournaledAhead。

1618:若是当前的写操做队列没有能够处理的日志操做时,写操做线程应该处于睡眠的状态,因此须要唤醒这个写线程。

1620:开始唤醒写日志的线程。

1622:将本次须要写入的buffer统计到写线程的队列writeq中。

接下来就交给了写日志的线程FileJournal::write_thread_entry()中开始处理。这个写操做的线程就是要把数据写入到文件当中,完成后调取completions中对应的成员进行回调,而后调到C_JournaledAhead->finish() ,再进入FileStore::_journaled_ahead()中,这个函数分为了两个部分,一部分是将data操做提交到op_wq队列,另一个就是对于journal操做完成的回调处理。

2178:将操做的op添加到FileStore:: op_wq中,而后等待写数据的线程将数据写入文件。

2181:先解析出在日志完成以后须要回调的那些操做,造成to_queue队列。

2187:若是这个to_queue队列不是空的队列,则交给ondisk_finisher队列进行处理。

2185:将日志保存完成的回调ondisk交给ondisk_finisher,后续有finisher线程处理,这里的ondisk注册回调为C_OSD_OnOpApplied。

 

 

10. 上面说道op_wq队列,将写data的操做加入到了op_wq中以后,这里会触发写data的线程。

data的写操做线程OpWQ::_process(),再调到FileStore::_do_op()函数中,再到FileStore::_do_transactions()函数、FileStore::_do_transaction()函数调用。

 

2529:这里解析这个操做,确定是一个OP_write的操做。

2531:解析文件目录。

2532:解析文件的名字。

2533:解析要写入文件的位置。

2534:解析要写入文件的数据长度。

2537:解析要写入文件的数据。

2541:根据上面解析的信息,开始调用FileStore::_write 将数据真正的写入文件中的正确位置。当完成写任务后开始进行逐级回调。

OpWQ::_process()操做 对应的就是 FileStore::_finish_op()。在FileStore::_finish_op中主要作的就剩下一件事儿了,就是将请求进行回调处理。

1998:开始回调sync的操做,暂时没有涉及到快照克隆等 不须要考虑。

2001:准备开始回调onreadable的参数,这里回调注册的为C_OSD_OnOpCommit。

2002:进行其余操做的回调处理。

 

7、请求的回调处理

11. 到目前为止已经完成了journal的写入操做和data的写入操做。他们分别对应的回调为C_OSD_OnOpApplied,C_OSD_OnOpCommit。他们作的事情差很少,接下来先看看C_OSD_OnOpApplied的操做是怎样的

C_OSD_OnOpApplied->finish()  调取pg->op_applied(),最后到ReplicatedBackend::op_applied()

0667:开始删除在等待队列中的本端osd序号。这样表示本端已经处理完成了。

0670:检查是否是全部的等待在waiting_for_applied队列上的osd都完成了操做。这时能够进行彻底回调。这里的on_applied注册的是C_OSD_RepopApplied回调。

0676:这里开始检查是否是全部的waiting_for_applied、waiting_for_commit队列中都已经处理完成则done成功,那时就能够删除in_progress_ops队列的ops了。

 

12. 接续C_OSD_RepopApplied 逐渐的想客户端进行调用。这里开始调用ReplicatedPG::repop_all_applied(),在这里会设置repop->all_applied 为ture。而后调用eval_repop进行处理。

7571:查看是否存在请求须要回调。

7574:开始循环处理这里的请求,而后将请求发还给客户端。

7578:恢复开始接收到请求的MOSDOp的操做。

7579:建议对应的应答消息,这个应答消息与接受到的请求相对应。

7582:将请求发还给客户端。

 

13. 以上就是整理osd来处理数据写操做请求的所有内容和流程了。

 

 

总结:

1.这里的流程太多了就再也不细说了,请参考下面的所有结构图。

2.osd 预先运行进程 ceph_osd.cc中的main开始。

3.osd结构中申请了管理消息的SimpleMessager模块,该模块中存在一个叫作Accepter模块,该模块专门用来监听端口等待客户端进行链接的。而后交给reader,这时reader将已经创建的链接中开始读取message。

4.将message转化为OpRequest,而且交到dispath队列中。

5.由message等信息 转化得知pool信息和PG的信息,而后把message交给PG进行处理。

6.将OpRequest添加到osdservice->op_wq队列中,该队列中会有处理的线程,线程会将继续处理这个OpRequest的。

7.交给ReplicaPG处理 建立repop 全部的applied与commit的管理。

8.解析出OpRequest中的全部op与data。把这些数据用一个叫作Transaction的结构进行管理。

9.有transaction的处理开始,将replica osd的操做发送给副本。

10.本端开始处理journal的写入操做,加入到writeq队列,该队列有独立线程完成写入的操做。

11.写journal完成后,再将要写入的data添加到op_wq中,该队列一样存在一个线程进行处理,将数据最终的保存到文件中。

 

相关文章
相关标签/搜索