VPP是多线程模型,共享地址空间,最快的通讯机制就是直接访问彼此之间的数据。VPP本身实现了一套简单的线程安全机制,用于保护临界区。node
VPP多线程之间同步采用的是相似于带信号和超时机制的自旋锁,主要有check、sync、release操做。
整体上相似于pthread_cond_timedwait中的互斥体改为自旋锁所提供的功能,超过BARRIER_SYNC_TIMEOUT时间的话说明可能发生死锁故直接abort。
其中:api
typedef struct { ...... volatile u32 *wait_at_barrier;/* 通知work线程开始等待sync标志,main线程开启sync,设置为1,结束设置为0 */ volatile u32 *workers_at_barrier;/* 统计已经进入sync的worker线程的个数,由worker线程加1 */ i64 recursion_level;/* 当前递归深度 */ u64 barrier_sync_count;/* 当前多少个线程已经同步了,当该值等于work线程数时,开始执行临界区操做 */ u8 barrier_elog_enabled; const char *barrier_caller;/* 开启本次sync的函数名字 */ const char *barrier_context; } vlib_worker_thread_t;
typedef struct vlib_main_t { ...... /* debugging */ volatile int parked_at_barrier; /* * Barrier epoch - Set to current time, each time barrier_sync or * barrier_release is called with zero recursion. * 用于计算sync持续时间 */ f64 barrier_epoch; /* Earliest barrier can be closed again */ /* 当前时间小于barrier_no_close_before,不容许启动sync */ f64 barrier_no_close_before; ...... } vlib_main_t;
main线程调用该函数通知worker线程开始sync,等待全部worker线程进入sync状态后,执行临界操做。数组
#define vlib_worker_thread_barrier_sync(X) {vlib_worker_thread_barrier_sync_int(X, __FUNCTION__);} void vlib_worker_thread_barrier_sync_int (vlib_main_t * vm, const char *func_name) { f64 deadline; f64 now; f64 t_entry; f64 t_open; f64 t_closed; u32 count; if (vec_len (vlib_mains) < 2) return; /* 只有主线程可以调用该函数 */ ASSERT (vlib_get_thread_index () == 0); /* vlib_worker_threads[0]为主线程,记录调用该函数的名字 */ vlib_worker_threads[0].barrier_caller = func_name; count = vec_len (vlib_mains) - 1;/* 工做线程个数 */ /* Record entry relative to last close */ now = vlib_time_now (vm); t_entry = now - vm->barrier_epoch; /* Tolerate recursive calls,递归深度,非首次调用直接返回 */ if (++vlib_worker_threads[0].recursion_level > 1) { barrier_trace_sync_rec (t_entry); return; } /* 发起sync次数统计 */ vlib_worker_threads[0].barrier_sync_count++; /* Enforce minimum barrier open time to minimize packet loss */ /* 再次发起sync,必须在禁止其外,每次sync完成后,在指定时间内不能发起第二次sync */ ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT)); while (1) { now = vlib_time_now (vm); /* Barrier hold-down timer expired? */ if (now >= vm->barrier_no_close_before) break; if ((vm->barrier_no_close_before - now) > (2.0 * BARRIER_MINIMUM_OPEN_LIMIT)) { clib_warning ("clock change: would have waited for %.4f seconds", (vm->barrier_no_close_before - now)); break; } } /* Record time of closure */ /* 两次启动sync的间隔时间,即open时间 */ t_open = now - vm->barrier_epoch; vm->barrier_epoch = now; /* 最大时间,debug版本下600秒,其它状况下1秒 */ deadline = now + BARRIER_SYNC_TIMEOUT; /* 设置wait_at_barrier值为1,通知worker */ *vlib_worker_threads->wait_at_barrier = 1; /* 等待全部的工做者线程就绪 */ while (*vlib_worker_threads->workers_at_barrier != count) { /* 超时直接打印os panic */ if ((now = vlib_time_now (vm)) > deadline) { fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__); os_panic (); } } /* 从开始启动sync过程到全部work线程接受sync的时间 */ t_closed = now - vm->barrier_epoch; barrier_trace_sync (t_entry, t_open, t_closed); }
main线程处理完临界区操做后,调用该函数通知worker线程sync过程结束。安全
/* sync过程结束函数*/ void vlib_worker_thread_barrier_release (vlib_main_t * vm) { f64 deadline; f64 now; f64 minimum_open; f64 t_entry; f64 t_closed_total; f64 t_update_main = 0.0; int refork_needed = 0; if (vec_len (vlib_mains) < 2) return; ASSERT (vlib_get_thread_index () == 0); now = vlib_time_now (vm); /* 一对sync与release调用时间段 */ t_entry = now - vm->barrier_epoch; /* 减小递归深度,若是大于0表示sync还没结束 */ if (--vlib_worker_threads[0].recursion_level > 0) { barrier_trace_release_rec (t_entry); return; } ...... deadline = now + BARRIER_SYNC_TIMEOUT; /* * Note when we let go of the barrier. * Workers can use this to derive a reasonably accurate * time offset. See vlib_time_now(...) */ vm->time_last_barrier_release = vlib_time_now (vm); CLIB_MEMORY_STORE_BARRIER (); /* 清除等待标志 */ *vlib_worker_threads->wait_at_barrier = 0; /* 等待全部的works线程退出 */ while (*vlib_worker_threads->workers_at_barrier > 0) { /* 时间太长,打印panic */ if ((now = vlib_time_now (vm)) > deadline) { fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__); os_panic (); } } ...... /* 整个sync持续时间 */ t_closed_total = now - vm->barrier_epoch; /* 计算下一次sync最少须要休息多久才能启动,与本次sync耗时正相关 */ minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR; if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT) { minimum_open = BARRIER_MINIMUM_OPEN_LIMIT; } /* 设置下次sync的最先时间 */ vm->barrier_no_close_before = now + minimum_open; /* Record barrier epoch (used to enforce minimum open time) */ /* 更新epoch时间 */ vm->barrier_epoch = now; barrier_trace_release (t_entry, t_closed_total, t_update_main); }
vlib_worker_thread_barrier_sync和vlib_worker_thread_barrier_release函数只能由main线程成对使用,能够支持嵌套调用。用于实现main线程访问worker线程的数据,效率较差。数据结构
vpp_main线程启动sync后,worker线程须要调用该函数等待。多线程
static inline void vlib_worker_thread_barrier_check (void) { /* 若是main线程已经启动了sync过程,则本线程须要进入sync状态 */ if (PREDICT_FALSE (*vlib_worker_threads->wait_at_barrier)) { vlib_main_t *vm = vlib_get_main (); u32 thread_index = vm->thread_index; f64 t = vlib_time_now (vm); ...... /* 等待线程数加1 */ clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1); if (CLIB_DEBUG > 0) { vm = vlib_get_main (); vm->parked_at_barrier = 1; } /* 自旋等待sync结束 */ while (*vlib_worker_threads->wait_at_barrier); /* * Recompute the offset from thread-0 time. * Note that vlib_time_now adds vm->time_offset, so * clear it first. Save the resulting idea of "now", to * see how well we're doing. See show_clock_command_fn(...) */ { f64 now; vm->time_offset = 0.0; now = vlib_time_now (vm); vm->time_offset = vlib_global_main.time_last_barrier_release - now; vm->time_last_barrier_release = vlib_time_now (vm); } if (CLIB_DEBUG > 0) vm->parked_at_barrier = 0; /* sync已经结束,将等待线程数减掉1 */ clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1); ...... } }
咱们以命令“ set interface rx-placement”的主要函数:vnet_hw_interface_assign_rx_thread为例进行展现:app
/* main线程收到命令后,最终会调用该函数 */ void vnet_hw_interface_assign_rx_thread (vnet_main_t * vnm, u32 hw_if_index, u16 queue_id, uword thread_index) { vnet_device_main_t *vdm = &vnet_device_main; vlib_main_t *vm, *vm0; vnet_device_input_runtime_t *rt; vnet_device_and_queue_t *dq; vnet_hw_interface_t *hw = vnet_get_hw_interface (vnm, hw_if_index); ASSERT (hw->input_node_index > 0); if (vdm->first_worker_thread_index == 0) thread_index = 0; if (thread_index != 0 && (thread_index < vdm->first_worker_thread_index || thread_index > vdm->last_worker_thread_index)) { thread_index = vdm->next_worker_thread_index++; if (vdm->next_worker_thread_index > vdm->last_worker_thread_index) vdm->next_worker_thread_index = vdm->first_worker_thread_index; } vm = vlib_mains[thread_index]; vm0 = vlib_get_main ();/* 本线程,通常是主线程 */ /* 通知worker线程,开始sync */ vlib_worker_thread_barrier_sync (vm0); rt = vlib_node_get_runtime_data (vm, hw->input_node_index); vec_add2 (rt->devices_and_queues, dq, 1); dq->hw_if_index = hw_if_index; dq->dev_instance = hw->dev_instance; dq->queue_id = queue_id; dq->mode = VNET_HW_INTERFACE_RX_MODE_POLLING; rt->enabled_node_state = VLIB_NODE_STATE_POLLING; vnet_device_queue_update (vnm, rt); vec_validate (hw->input_node_thread_index_by_queue, queue_id); vec_validate (hw->rx_mode_by_queue, queue_id); hw->input_node_thread_index_by_queue[queue_id] = thread_index; hw->rx_mode_by_queue[queue_id] = VNET_HW_INTERFACE_RX_MODE_POLLING; /* 通知worker线程,sync结束 */ vlib_worker_thread_barrier_release (vm0); vlib_node_set_state (vm, hw->input_node_index, rt->enabled_node_state); }
/* 参数is_main决定是主线程仍是worker线程 */ static_always_inline void vlib_main_or_worker_loop (vlib_main_t * vm, int is_main) { ...... while (1) { vlib_node_runtime_t *n; /* 存在须要处理的rpc请求,处理 */ if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0)) { if (!is_main)/* 只有work线程才会发送rpc请求 */ vl_api_send_pending_rpc_requests (vm); } if (!is_main)/* worker线程 */ { /* 与main线程进行互斥,若是main线程进入了临界区的话,自旋等待 */ vlib_worker_thread_barrier_check (); ...... } ...... vlib_increment_main_loop_counter (vm); /* Record time stamp in case there are no enabled nodes and above calls do not update time stamp. */ cpu_time_now = clib_cpu_time_now (); } }
VPP的rpc机制经过API机制实现的,在api机制中注册了两个api:less
#define foreach_rpc_api_msg \ _(RPC_CALL,rpc_call) \ _(RPC_CALL_REPLY,rpc_call_reply)
typedef struct vlib_main_t { ...... /* RPC requests, main thread only */ uword *pending_rpc_requests; /* 线程准备发送给vpp_main线程处理的rpc */ uword *processing_rpc_requests; /* vpp_main线程正在处理的rpc数组 */ clib_spinlock_t pending_rpc_lock; /* 保护上面两个数组的自旋锁 */ } vlib_main_t;
rpc的api传递的请求消息ide
#ifndef _vl_api_defined_rpc_call #define _vl_api_defined_rpc_call typedef VL_API_PACKED(struct _vl_api_rpc_call { u16 _vl_msg_id;/* 消息id */ u32 client_index;/* 不须要该索引,由于这个api是内部的 */ u32 context; u64 function;/* rpc函数 */ u8 multicast; u8 need_barrier_sync;/* 是否须要进行互斥保护 */ u8 send_reply;/* 是否发送应答,通常不发送应答 */ u32 data_len; u8 data[0]; }) vl_api_rpc_call_t; #endif
static void vl_api_rpc_call_t_handler (vl_api_rpc_call_t * mp) { vl_api_rpc_call_reply_t *rmp; int (*fp) (void *); i32 rv = 0; vlib_main_t *vm = vlib_get_main (); if (mp->function == 0)/* 用户的rpc函数为空,输出waring */ { rv = -1; clib_warning ("rpc NULL function pointer"); } else { if (mp->need_barrier_sync)/* 若是须要互斥,则进行保护 */ vlib_worker_thread_barrier_sync (vm); fp = uword_to_pointer (mp->function, int (*)(void *));/* 转换成函数地址 */ rv = fp (mp->data);/* 执行函数 */ if (mp->need_barrier_sync) vlib_worker_thread_barrier_release (vm); } if (mp->send_reply)/* 若是须要发送应答,则发送应答给客户端,通常不须要发送应答 */ { svm_queue_t *q = vl_api_client_index_to_input_queue (mp->client_index); if (q) { rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp)); rmp->_vl_msg_id = ntohs (VL_API_RPC_CALL_REPLY); rmp->context = mp->context; rmp->retval = rv; vl_msg_api_send_shmem (q, (u8 *) & rmp); } } if (mp->multicast) { clib_warning ("multicast not yet implemented..."); } } /* 应答处理函数,没有实现 */ static void vl_api_rpc_call_reply_t_handler (vl_api_rpc_call_reply_t * mp) { clib_warning ("unimplemented"); }
/* 通知main_thread线程执行咱们的函数,通知者能够是worker线程也能够是main线程。 ** force_rpc:表示强制使用rpc模式,即不直接调用咱们指定的函数,让对应的协程去执行 ** worker线程调用该函数时,必须设置为1。main线程能够设置也能够不设置 */ always_inline void vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length, u8 force_rpc) { vl_api_rpc_call_t *mp; vlib_main_t *vm_global = &vlib_global_main; vlib_main_t *vm = vlib_get_main (); /* Main thread and not a forced RPC: call the function directly */ /* main线程没有设置force_rpc标志,那就直接执行,不放入协程 */ if ((force_rpc == 0) && (vlib_get_thread_index () == 0)) { void (*call_fp) (void *); vlib_worker_thread_barrier_sync (vm); call_fp = fp; call_fp (data); vlib_worker_thread_barrier_release (vm); return; } /* Otherwise, actually do an RPC */ /* 进行一次rpc,分配rpc通讯消息结构,使用的是共享内存 */ mp = vl_msg_api_alloc_as_if_client (sizeof (*mp) + data_length); clib_memset (mp, 0, sizeof (*mp)); clib_memcpy_fast (mp->data, data, data_length); /* 第一个成员必须是消息id,api机制须要这个。内嵌的消息,非插件api,不须要模块基础消息id。 */ mp->_vl_msg_id = ntohs (VL_API_RPC_CALL); mp->function = pointer_to_uword (fp); mp->need_barrier_sync = 1; /* Add to the pending vector. Thread 0 requires locking. */ /* main线程的pending_rpc_requests向量是临界区,须要进行保护 ** 其它线程pending_rpc_requests本身读占,不须要保护 */ if (vm == vm_global) clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock); vec_add1 (vm->pending_rpc_requests, (uword) mp); if (vm == vm_global) clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock); } /* * Check if called from worker threads. * If so, make rpc call of fp through shmem. * Otherwise, call fp directly */ void vl_api_rpc_call_main_thread (void *fp, u8 * data, u32 data_length) { vl_api_rpc_call_main_thread_inline (fp, data, data_length, /*force_rpc */ 0); } /* * Always make rpc call of fp through shmem, useful for calling from threads * not setup as worker threads, such as DPDK callback thread * 强制main线程经过共享内存进行rpc调用,不直接调用 */ void vl_api_force_rpc_call_main_thread (void *fp, u8 * data, u32 data_length) { vl_api_rpc_call_main_thread_inline (fp, data, data_length, /*force_rpc */ 1); } main线程中的协程还可使用函数vlib_rpc_call_main_thread发起RPC void *rpc_call_main_thread_cb_fn; void vlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size) { /* 全局函数指针,在初始化的时候其值被设置为vl_api_rpc_call_main_thread函数的地址 */ if (rpc_call_main_thread_cb_fn) { void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn; (*fp) (callback, args, arg_size); } else clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!"); }
/* worker线程将收集的rpc请求从本身的pending_rpc_requests中转移到main线程的pending_rpc_requests */ void vl_api_send_pending_rpc_requests (vlib_main_t * vm) { vlib_main_t *vm_global = &vlib_global_main; ASSERT (vm != vm_global); clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock); vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests); vec_reset_length (vm->pending_rpc_requests); clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock); }
只有worker线程才须要将RPC请求转移到main线程。函数
/* 参数is_main决定是主线程仍是worker线程 */ static_always_inline void vlib_main_or_worker_loop (vlib_main_t * vm, int is_main) { while (1) { vlib_node_runtime_t *n; /* woerk线程将本线程收集的rpc请求转交给main线程 */ if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0)) { if (!is_main)/* 只有work线程才会将本身发起的rpc请求转移到main线程 */ vl_api_send_pending_rpc_requests (vm); } ...... vlib_increment_main_loop_counter (vm); /* Record time stamp in case there are no enabled nodes and above calls do not update time stamp. */ cpu_time_now = clib_cpu_time_now (); } }
RPC处理是在协程"api-rx-from-ring",这个协程也是处理api的协程。
/* *INDENT-OFF* */ VLIB_REGISTER_NODE (vl_api_clnt_node) = { .function = vl_api_clnt_process, .type = VLIB_NODE_TYPE_PROCESS, .name = "api-rx-from-ring", .state = VLIB_NODE_STATE_DISABLED, };
static uword vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * f) { ...... /* $$$ pay attention to frame size, control CPU usage */ while (1) { /* * There's a reason for checking the queue before * sleeping. If the vlib application crashes, it's entirely * possible for a client to enqueue a connect request * during the process restart interval. * * Unless some force of physics causes the new incarnation * of the application to process the request, the client will * sit and wait for Godot... */ vector_rate = vlib_last_vector_length_per_node (vm); start_time = vlib_time_now (vm); while (1) { if (vl_mem_api_handle_rpc (vm, node)/* 执行协程请求 */ || vl_mem_api_handle_msg_main (vm, node))/* 执行api请求 */ { vm->api_queue_nonempty = 0; VL_MEM_API_LOG_Q_LEN ("q-underflow: len %d", 0); sleep_time = 20.0; break; } ...... } ...... } return 0; } int vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node) { api_main_t *am = &api_main; int i; uword *tmp, mp; /* * Swap pending and processing vectors, then process the RPCs * Avoid deadlock conditions by construction. * 将等待处理的人rpc请求转移到局部变量tmp。避免临界时间太长。 */ clib_spinlock_lock_if_init (&vm->pending_rpc_lock); tmp = vm->processing_rpc_requests; vec_reset_length (tmp); vm->processing_rpc_requests = vm->pending_rpc_requests; vm->pending_rpc_requests = tmp; clib_spinlock_unlock_if_init (&vm->pending_rpc_lock); /* * RPCs are used to reflect function calls to thread 0 * when the underlying code is not thread-safe. * * Grabbing the thread barrier across a set of RPCs * greatly increases efficiency, and avoids * running afoul of the barrier sync holddown timer. * The barrier sync code supports recursive locking. * * We really need to rewrite RPC-based code... */ if (PREDICT_TRUE (vec_len (vm->processing_rpc_requests))) { vl_msg_api_barrier_sync (); for (i = 0; i < vec_len (vm->processing_rpc_requests); i++)/* 循环处理每个rpc */ { mp = vm->processing_rpc_requests[i]; vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node); } vl_msg_api_barrier_release (); } return 0; } /* This is only to be called from a vlib/vnet app */ void vl_msg_api_handler_with_vm_node (api_main_t * am, void *the_msg, vlib_main_t * vm, vlib_node_runtime_t * node) { u16 id = ntohs (*((u16 *) the_msg));/* 获取消息id,传递的消息第一个成员就是消息id */ u8 *(*handler) (void *, void *, void *); u8 *(*print_fp) (void *, void *); ...... /* 根据消息id获取对应的执行函数,即VL_API_RPC_CALL对应的函数vl_api_rpc_call_t_handler */ if (id < vec_len (am->msg_handlers) && am->msg_handlers[id]) { handler = (void *) am->msg_handlers[id]; if (PREDICT_FALSE (am->rx_trace && am->rx_trace->enabled)) vl_msg_api_trace (am, am->rx_trace, the_msg); if (PREDICT_FALSE (am->msg_print_flag)) { fformat (stdout, "[%d]: %s\n", id, am->msg_names[id]); print_fp = (void *) am->msg_print_handlers[id]; if (print_fp == 0) { fformat (stdout, " [no registered print fn for msg %d]\n", id); } else { (*print_fp) (the_msg, vm); } } if (!am->is_mp_safe[id]) { vl_msg_api_barrier_trace_context (am->msg_names[id]); vl_msg_api_barrier_sync (); } /* 执行函数vl_api_rpc_call_t_handler */ (*handler) (the_msg, vm, node); if (!am->is_mp_safe[id]) vl_msg_api_barrier_release (); } else { clib_warning ("no handler for msg id %d", id); } /* * Special-case, so we can e.g. bounce messages off the vnet * main thread without copying them... */ if (!(am->message_bounce[id])) vl_msg_api_free (the_msg); ...... }