前段时间在忙,这几天抽时间写一下阿里比赛总结,以往比赛都是前十名入围,此次只有复赛前五才能入围,竞争很是激烈,我和普哥一组,在普哥的帮助下学到很多优化技巧。先贴下成绩 java
自适应负载均衡:实现一套负载均衡算法,使系统吞吐量达到最大
git
要求:github
实现接口LoadBalance,实现一套自适应负载均衡机制。要求可以具有如下能力:算法
一、Gateway(Consumer) 端可以自动根据服务处理能力变化动态最优化分配请求保证较低响应时间,较高吞吐量;数组
二、Provider 端能自动进行服务容量评估,当请求数量超过服务能力时,容许拒绝部分请求,以保证服务不过载;缓存
三、当请求速率高于全部的 Provider 服务能力之和时,容许 Gateway( Consumer ) 拒绝服务新到请求。安全
LoadBalance接口(其余辅助接口再也不一一列举):bash
public interface LoadBalance {
/** * select one invoker in list. * * @param invokers invokers. * @param url refer url * @param invocation invocation. * @return selected invoker. */
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
复制代码
2.二、赛题解析
在解析赛题以前咱们先了解两个概念:并发
延迟
:处理单次事件所需时间。负载均衡
吞吐
:单位时间内程序能够处理的事件数。
假如一个程序只有1个线程,这个线程每秒能够处理10次事件,那么咱们说这个程序处理单次事件的延迟为100ms,吞吐为10次/秒
假如一个程序有4个线程,每一个线程每秒能够处理5次事件,那么咱们说这个程序处理单次事件的延迟为200ms,吞吐为20次/秒
假如一个程序有1个线程,每一个线程每秒能够处理20次事件,那么咱们说这个程序处理单次事件的延迟为50ms,吞吐为20次/秒
以上咱们能够看出,在一个没有瓶颈的程序中,增长线程能够增长吞吐,下降延迟也能够增长吞吐,但低延迟不必定高吞吐,高延迟也不必定低吞吐
。
赛题要求实时根据Provider能力大小动态选择Provider来处理请求,这里说一下三个Provider的线程和延迟都是会变
,且单个Provider总的能力呈泊松分布
,也就是说屡次请求的rtt方差会比较大
,可是rtt均值在时间段内波动较小
,因此只要咱们每次请求都打到正确(延迟小且有剩余线程
)的Provider上就ok,前面在钉钉群里发现有很多选手有这样两个疑问这里解释一下,
选手A
:我一顿操做下来分数居然比随机算法低不少,为何随机算法会有那么高的分数?
解析
:其实随机算法的分数是能够计算出来的,举个例子,假如咱们有三个容器,容量分别为1L,2L,3L,咱们有6L水,每次随机往某个容器倒入1L水,容器里最终可以保持多少水呢?答案是5L,由于6L水随机倒入三个容器每一个容器都会得到2L水,第一个容器由于容量只有1L因此溢出1L,假如满分120分的话咱们能够拿到100分,若是三个容器容量分别是1.5L,2L,2.5L,那么咱们的分数将会高达110分。(评测环境状况比这个要复杂一些,须要多考虑一些变量,原理差很少这里再也不展开)
选手B:个人程序评测下来全程没有error出现但是分数仍是上不去,咋回事呢?
解析
:不知道你们有没有注意到Provider的总链接数是大于请求数的,有些请求打到有剩余线程可是延迟不必定小的Provider上了,因此看起来没错误可是延迟很高RPS很难上去(由于评测程序中Provider总吞吐是大于请求总数的,因此通常状况下也很难出现错误)。 贴一张Provider请求数与RTT的图,
基于历史rtt为参考的选择策略
,由于rtt是呈泊松分布的,
因此单看rtt(延迟低的)数量,能力大的Provider必定比能力小的Provider的多
,因此咱们能够搞棵树把每次请求rtt存储起来,排序一下,这里须要为rtt特别的低的Provider提权,即当rtt小于某值(或百分比)时,往树里多add几个该Provider,同时从数尾移除相同数量的Provider,提权的目的探测Provider能力变化及探测Provider的真实能力(好比说在全部Provider都不拒绝服务的状况下,单凭树里的rtt并不能真实反映出Provider的实际处理能力),每次选择Provider是从树里移除rtt最小的那个Provider,把请求发送到该Provider,若是某个低延迟的Provider线程不够时,该Provider的rtt逐渐变大,直到出现拒绝服务,此时只要不把reject的res加入到树就ok。
三、复赛
实现一个进程内基于队列的消息持久化存储引擎
,要求包含如下功能:
发送消息功能
A. 查询必定时间窗口内的消息
B. 对必定时间窗口内的消息属性某个字段求平均
例子:t表示时间,时间窗口[1000, 1002]表示: t>=1000 & t<=1002 (这里的t和实际时间戳没有任何关系, 只是一个模拟时间范围)消息包括两个字段,一个是业务字段a,一个是时间戳,以及一个byte数组消息体。 程序接口以下:
public abstract class MessageStore {
/** * 写入一个消息; * 这个接口须要是线程安全的,也即评测程序会并发调用该接口进行put; * @param message message,表明消息的内容,评测时内容会随机产生 */
abstract void put(Message message);
/** * 根据a和t的条件,返回符合条件的消息的集合. t是输入时间戳模拟值,和实际时间戳没有关系, 线程内升序 * 这个接口须要是线程安全的,也即评测程序会并发调用该接口 * 返回的List须要按照t升序排列 (a不要求排序). 若是没有符合的消息, 返回大小为0的List. 若是List里有null元素, 会当结果失败处理 * 单条线程最大返回消息数量不会超过8万 * @param aMin 表明a的最小值(包含此值) * @param aMax 表明a的最大值(包含此值) * @param tMin 表明t的最小值(包含此值) * @param tMax 表明t的最大值(包含此值) */
abstract List<Message> getMessage(long aMin, long aMax, long tMin, long tMax);
/** * 根据a和t的条件,返回符合条件消息的a值的求平均结果. t是输入时间戳模拟值,和实际时间戳没有关系, 线程内升序 * 这个接口须要是线程安全的,也即评测程序会并发调用该接口 * 结果忽略小数位,返回整数位便可. 若是没有符合的消息, 返回0 * 单次查询求和最大值不会超过Long.MAX_VALUE * @param aMin 表明a的最小值(包含此值) * @param aMax 表明a的最大值(包含此值) * @param tMin 表明t的最小值(包含此值) * @param tMax 表明t的最大值(包含此值) */
abstract long getAvgValue(long aMin, long aMax, long tMin, long tMax);
}
复制代码
发送消息以下(忽略消息体):
消息1,消息属性{"a":1,"t":1001}
消息2,消息属性{"a":2,"t":1002}
消息3,消息属性{"a":3,"t":1003}
复制代码
查询以下:
示例1-
输入:时间窗口[1001,9999],对a求平均
输出:2, 即:(1+2+3)/3=2
示例2-
输入:时间窗口[1002,9999],求符合的消息
输出:{"a":1,"t":1002},{"a":3,"t":1003}
示例3-
输入:时间窗口[1000,9999]&(a>=2),对a求平均
输出:2 (去除小数位)
复制代码
语言限定
: JAVA
评测指标和规模
: 评测程序分为3个阶段: 发送阶段、查询聚合消息阶段、查询聚合结果阶段:
发送阶段:假设发送消息条数为N1,全部消息发送完毕的时间为T1;发送线程多个,消息属性为: a(随机整数), t(输入时间戳模拟值,和实际时间戳没有关系, 线程内升序).消息总大小为50字节,消息条数在20亿条左右,总数据在100G左右
查询聚合消息阶段:有屡次查询,消息总数为N2,全部查询时间为T2; 返回以t和a为条件的消息, 返回消息按照t升序排列
查询聚合结果阶段: 有屡次查询,消息总数为N3,全部查询时间为T3; 返回以t和a为条件对a求平均的值
若查询结果都正确,则最终成绩为N1/T1 + N2/T2 + N3/T3
3.二、赛题分析不须要考虑程序崩溃及被kill状况
不须要考虑同时读写状况
消息为定长消息,简化索引设计及存储缓存设计
线程内无需排序,只须要作线程间消息排序
其中t16G,a16G,b68G,
t,a,b,分别存储,有利于avg查询
前面提到过,t在线程内有序,借助这一点咱们能够少不少排序量,只须要作线程间排序就好,这里咱们把每一个发送线程看成一个Queue,把每一个Queue看成一个总体,对多个Queue进行排序,获得一个有序的Queue队列,贴张图,
MQueue sort_head = this.sort_head;
if (queue == sort_head) {
for (; ; ) {
Message message = sort_head.remove();
if (message == null) {
break;
}
put_message(message);
MQueue next = sort_head.next;
if (next == null) {
break;
}
if (sort_head.cur_t() > next.cur_t()) {
sort_head = sort(next, sort_head);
}
}
this.sort_head = sort_head;
}
复制代码
这样咱们获得一个全局有序的队列,buffer落盘便可。这里落盘以前咱们须要对t和a进行压缩(delta),t压缩后接近3.8G能够所有放在内存,a压缩后大约9G出头。 由于t排序后再次对a排序,致使t在分片内乱序,因此t用zigzag压缩。 由于a排序是升序,因此对a使用vlong压缩,这里说下a压缩对vlong作下改进,采用定长(4字节)+变长的方式进行压缩,代码以下:
public long readVLong() {
long b1 = get() & 0xff;
long b2 = get() & 0xff;
long b3 = get() & 0xff;
long v = 0;
int off = 0;
for (; ; ) {
long b = get();
v |= (b & 0b0111_1111) << off;
if (b >= 0) {
break;
}
off += 7;
}
v <<= 24;
return v | ((b1 << 16) | (b2 << 8) | (b3));
}
public void writeVLong(long v) {
long w = v & (0xffffff);
v >>>= 24;
put((byte) (w >>> 16));
put((byte) (w >>> 8));
put((byte) (w));
for (; ; ) {
w = v & 0b0111_1111;
v >>>= 7;
if (v != 0) {
put((byte) (w | 0b1000_0000));
} else {
put((byte) w);
break;
}
}
}
复制代码
有对b尝试使用lz4压缩,效果不理想,放弃对b压缩
3.3.二、查询先上张图,
从图中能够每隔16384作一次segment,segment内每隔128条消息作一个block,每一个block内有128条消息,每隔128条消息作一次索引,索引分别是:
ByteBuffer index_t_sparse = allocateDirect(SPARSE_INDEX_COUNT * 8);
ByteBuffer index_t_data_pos = allocateDirect(COMPACT_INDEX_COUNT * 8);
long[] index_a_data_pos = new long[COMPACT_INDEX_COUNT];
long[] index_t_compact = new long[COMPACT_INDEX_COUNT];
long[] index_a_compact = new long[COMPACT_INDEX_COUNT];
long[] index_a_sum = new long[COMPACT_INDEX_COUNT];
long[] index_a_min_max = new long[COMPACT_INDEX_COUNT * 2];
复制代码
index_t_sparse
:segment索引,记录t值,用于快速定位查询所在segment
index_t_data_pos
:block索引,t压缩后位置索引,记录t数据block起始位置,用于计算t真实值
index_a_data_pos
:同index_t_data_pos
index_t_compact
:block索引,记录t值,用于计算t真实值
index_a_compact
:同index_t_compact
index_a_sum
:block索引,记录block内a的sum值,用于avg时快速跳过block
index_a_min_max
:block索引,记录block内a的min,max值,用于avg时快速跳过block 查询msg时首先定位所在segment:
int t_index_cnt = (index_t_sparse.position() >>> 3) - 1; // size -1
int t_index_min = half_find(index_t_sparse_addr, 0, t_index_cnt, min_t);
int t_index_max = get_t_index_max(index_t_sparse_addr, t_index_min, t_index_cnt, max_t);
复制代码
获得segment起始位置:t_index_min,t_index_max 这里须要处理下首尾segment,由于首尾segment的t值可能不在查询范围内须要单独处理, 遍历segment定位block范围:
int a_index_off = get_a_read_index_off(index_a_min_max, a_index_cnt, a_index_min, min_a);
int a_index_len = get_a_read_index_len(index_a_min_max, a_index_cnt, a_index_min, a_index_off, max_a);
复制代码
获得某个segment内block的off和len:a_index_off,a_index_len 根据off和len定位a,b的读取pos和len
long a_read_pos = index_a_data_pos[a_index_off];
long b_read_pos = 1L * (a_index_off * A_BLOCK) * B_LEN;
int a_read_len = (int) (index_a_data_pos[Math.min(a_index_off + a_index_len + 1, a_index_cnt)] - a_read_pos);
a_read_buf.clear().limit(a_read_len);
b_read_buf.clear().limit(msg_size * B_LEN);
复制代码
接着定位t的读取范围:
long t_read_index = get_long(index_t_data_pos_addr, a_index_off * 8);
long t_write_index = get_limit_index(index_t_data_pos_addr, index_t_data_pos_limit, (a_index_off + 1) * 8, t_data_buf.write_index);
VIntBuf t_data_temp = t_data_buf.slice(t_read_index, t_write_index);
复制代码
接着遍历获得的数据:
for (int i = 0, part_i = 0; i < msg_size; i++) {
t += t_data_temp.readZigZag();
a += a_data_buf.readVLong();
if (t >= min_t && t <= max_t && a >= min_a && a <= max_a) {
byte[] body = new byte[B_LEN];
copy_data(b_read_buf_addr, i * B_LEN, body, B_LEN);
msg_sort_buf[msg_sort_buf_size++] = new Message(a, t, body);
}
part_i++;
if (part_i == A_BLOCK) {
part_i = 0;
a_index_off++;
t = index_t_compact[a_index_off];
a = index_a_compact[a_index_off];
t_read_index = get_long(index_t_data_pos_addr, a_index_off * 8);
t_write_index = get_limit_index(index_t_data_pos_addr, index_t_data_pos_limit, (a_index_off + 1) * 8, t_data_buf.write_index);
t_data_temp = t_data_buf.slice(t_read_index, t_write_index);
}
}
复制代码
排序最终查询的数据:
QuickSort.sort(msg_sort_buf, QuickSort.SORT_VALUE_T, 0, msg_sort_buf_size - 1);
for (int i = 0; i < msg_sort_buf_size; i++) {
msg_list.add(msg_sort_buf[i]);
}
复制代码
查询avg
:查询和avg和msg相似,这里说下跳过逻辑
if (part_i == A_BLOCK) {
part_i = 0;
a_index_off++;
for (; i + A_BLOCK < msg_size; ) {
int a_min_max_pos = (a_index_off + 1) << 1;
long a_min = index_a_min_max[a_min_max_pos];
long a_max = index_a_min_max[a_min_max_pos + 1];
if (a_min >= min_a && a_max <= max_a) {
sum += index_a_sum[a_index_off + 1];
count += A_BLOCK;
i += A_BLOCK;
a_index_off++;
continue;
} else {
break;
}
}
a_read_pos = index_a_data_pos[a_index_off];
a_read_len = (int) (index_a_data_pos[Math.min(a_index_off + 1, a_index_cnt)] - a_read_pos);
a_read_buf.clear().limit(a_read_len);
do_read(a_read_channel, a_read_buf, a_read_pos);
a = index_a_compact[a_index_off];
a_data_buf.set_read_index(index_a_data_pos[a_index_off] - a_read_pos);
}
i++;
复制代码
当part_i等于128时说明接下来是一个完整的block,能够进行block的min_max判断, 若是min_max在查询范围内则直接累加sum, 若是min_max部分包含查询范围则遍历该block
3.3.三、可探索的改进跳读
,在avg查询时,部分a的block是能够直接跳过的,这部分跳读应该会有必定提高。
按t值划分segment
:一直是按照t的个数划分segment的,按t值划分理论上能够划分出更大的segment,理论上会有必定提高。
复赛源码地址:github.com/wangkaish/a…
四、总结此次比赛竞争太激烈了,大佬不少,没能入围确实很遗憾,可是也确实学到很多东西,由于前面在搞华为的比赛,后面参与复赛的时间很少,并且相信不少选手有这样的感受,线下程序是好的,到线上老是跪,其实咱们也是时间大多花在调bug上了(水平太菜),线下和线上评测数据样本相差太多,若是比赛能搞成线下线上评测程序比较类似就行了,这样能够多些时间尝试方案。
另附上华为比赛参赛总结,有兴趣能够阅读一下:华为云TaurusDB性能挑战赛参赛总结