本文主要介绍on_applied、on_commit、on_applied_sync、on_all_commit、on_all_applied在数据IO处理流程中的回调代码梳理。写的比较简单,不过关键点都已经整理。以filestore为例:app
OSD端详细程分析:https://blog.51cto.com/wendashuai/2497104less
主端:ide
PrimaryLogPG::execute_ctx()->issue_repop(repop, ctx)->pgbackend->submit_transaction()->issue_op(); parent->queue_transactions()
1.函数
void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx) { Context *onapplied_sync = new C_OSD_OndiskWriteUnlock() Context *on_all_applied = new C_OSD_RepopApplied(this, repop); Context *on_all_commit = new C_OSD_RepopCommit(this, repop); pgbackend->submit_transaction(//-> ReplicatedBackend::submit_transaction(...,on_local_applied_sync,on_all_acked,on_all_commit,...) soid, ctx->delta_stats, ctx->at_version, std::move(ctx->op_t), pg_trim_to, min_last_complete_ondisk, ctx->log, ctx->updated_hset_history, onapplied_sync, on_all_applied, on_all_commit, repop->rep_tid, ctx->reqid, ctx->op); }
2.this
void ReplicatedBackend::submit_transaction( const hobject_t &soid, const object_stat_sum_t &delta_stats, const eversion_t &at_version, PGTransactionUPtr &&_t, const eversion_t &trim_to, const eversion_t &roll_forward_to, const vector<pg_log_entry_t> &_log_entries, boost::optional<pg_hit_set_history_t> &hset_history, Context *on_local_applied_sync, Context *on_all_acked, Context *on_all_commit, ceph_tid_t tid, osd_reqid_t reqid, OpRequestRef orig_op) { InProgressOp &op = in_progress_ops.insert( make_pair( tid, InProgressOp( tid, on_all_commit, on_all_acked, orig_op, at_version) ) ).first->second; op.waiting_for_applied.insert( parent->get_actingbackfill_shards().begin(), parent->get_actingbackfill_shards().end()); op.waiting_for_commit.insert( parent->get_actingbackfill_shards().begin(), parent->get_actingbackfill_shards().end()); //issue_op将ops的信息封装成message发送给replica osd副本的。这个操做就是在封装message,这里就再也不多说了 issue_op( soid, at_version, tid, reqid, trim_to, at_version, added.size() ? *(added.begin()) : hobject_t(), removed.size() ? *(removed.begin()) : hobject_t(), log_entries, hset_history, &op, op_t); op_t.register_on_applied_sync(on_local_applied_sync); --->on_applied_sync op_t.register_on_applied( --->on_applied parent->bless_context( new C_OSD_OnOpApplied(this, &op))); op_t.register_on_commit( --->on_commit parent->bless_context( new C_OSD_OnOpCommit(this, &op))); parent->queue_transactions(tls, op.op);//int FileStore::queue_transactions() }
3.线程
void ReplicatedBackend::issue_op( const hobject_t &soid, const eversion_t &at_version, ceph_tid_t tid, osd_reqid_t reqid, eversion_t pg_trim_to, eversion_t pg_roll_forward_to, hobject_t new_temp_oid, hobject_t discard_temp_oid, const vector<pg_log_entry_t> &log_entries, boost::optional<pg_hit_set_history_t> &hset_hist, InProgressOp *op, ObjectStore::Transaction &op_t) { get_parent()->send_message_osd_cluster(peer.osd, wr, get_osdmap()->get_epoch());//go }
4.
副本端:code
ReplicatedBackend::handle_message()--->sub_op_modify(op)--->queue_transactions()
// sub op modify 当pg的从副本接收到MSG_OSD_REPOP,调用该函数,完成本地对象的数据写入 void ReplicatedBackend::sub_op_modify(OpRequestRef op) { rm->opt.register_on_commit( parent->bless_context( new C_OSD_RepModifyCommit(this, rm))); rm->localt.register_on_applied( parent->bless_context( new C_OSD_RepModifyApply(this, rm))); parent->queue_transactions(tls, op);// ->int FileStore::queue_transactions() }
5.对象
主回调: class C_OSD_OnOpCommit : public Context { ReplicatedBackend *pg; ReplicatedBackend::InProgressOp *op; public: C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) : pg(pg), op(op) {} void finish(int) override { pg->op_commit(op); } }; class C_OSD_OnOpApplied : public Context { ReplicatedBackend *pg; ReplicatedBackend::InProgressOp *op; public: C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) : pg(pg), op(op) {} void finish(int) override { pg->op_applied(op); } };
6.blog
副本回调: struct ReplicatedBackend::C_OSD_RepModifyApply : public Context { ReplicatedBackend *pg; RepModifyRef rm; C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r) : pg(pg), rm(r) {} void finish(int r) override { pg->repop_applied(rm); } }; struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context { ReplicatedBackend *pg; RepModifyRef rm; C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r) : pg(pg), rm(r) {} void finish(int r) override { pg->repop_commit(rm); } };
7.filestore端rem
int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls, TrackedOpRef osd_op, ThreadPool::TPHandle *handle) { Context *onreadable; Context *ondisk; Context *onreadable_sync; ObjectStore::Transaction::collect_contexts( tls, &onreadable, &ondisk, &onreadable_sync); } static void collect_contexts( vector<Transaction>& t, Context **out_on_applied, Context **out_on_commit, Context **out_on_applied_sync) { list<Context *> on_applied, on_commit, on_applied_sync; for (vector<Transaction>::iterator i = t.begin();i != t.end();++i) { on_applied.splice(on_applied.end(), (*i).on_applied); on_commit.splice(on_commit.end(), (*i).on_commit); on_applied_sync.splice(on_applied_sync.end(), (*i).on_applied_sync); } *out_on_applied = C_Contexts::list_to_context(on_applied); *out_on_commit = C_Contexts::list_to_context(on_commit); *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync); }
8.写journal和data回调关系
_op_journal_transactions(tbl, orig_len, o->op,new C_JournaledAhead(this, osr, o, ondisk),osd_op); ->onjournal ->oncommit 写完journal,回调ondisk,Finisher线程ondisk_finishers ---> *Finisher::finisher_thread_entry() --->complete() ---> finish() if (ondisk) { dout(10) << " queueing ondisk " << ondisk << dendl; ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk); } 写完data,回调onreadable,Finisher线程apply_finishers ---> *Finisher::finisher_thread_entry() --->complete() ---> finish() if (o->onreadable) {//写完filestore后,数据开始可读。(此时可能写到page cache了) apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable); } onreadable ---> out_on_applied ---on_applied ondisk ---> out_on_commit --->on_commit onreadable_sync --->out_on_applied_sync --->on_applied_sync
主完成了journal的写入: C_OSD_OnOpCommit pg->op_commit(op); ondisk ondisk_finishers; 此时可继续写? 主完成data写入: C_OSD_OnOpApplied pg->op_applied(op) onreadable apply_finishers; 此时写入的数据可读? 副本完成了journal的写入: C_OSD_RepModifyCommit pg->repop_commit(rm) send_message_osd_cluster发送到主 副本完成data写入: C_OSD_RepModifyApply pg->repop_applied(rm) send_message_osd_cluster发送到主 完成全部副本journal写入:all_committed on_all_commit C_OSD_RepopCommit repop_all_committed waiting_for_ondisk //called when all commit 完成全部副本data写入: all_applied on_all_applied C_OSD_RepopApplied repop_all_applied waiting_for_ack //called when all acked Context *on_all_commit = new C_OSD_RepopCommit(this, repop);//on_all_commit Context *on_all_applied = new C_OSD_RepopApplied(this, repop);//on_all_acked Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(xxx); //on_local_applied_sync 本地端sync完成 全部端完成了journal的写入后,此时数据已经写到journal盘,会处理waiting_for_ondisk list,ondisk状态。环境一旦崩溃,能够journal replay方式回放恢复; 全部端完成了data的写入后,即写入到了filestore层,此时表明apply成功,会处理waiting_for_ack list,向client端发送ack通知写完成,此时数据处于可读状态onreadable;