MaxCompute(原ODPS) 事件(Event)机制

摘要: 免费开通大数据服务:https://www.aliyun.com/product/odps 转自habai 什么是 MaxCompute事件机制 MaxCompute event 用于监控表和实例等MaxCompute资源(目前只用于监控表)。编程

 

免费开通大数据服务:https://www.aliyun.com/product/odpsapi

 

什么是MaxCompute安全

 

大数据计算服务(MaxCompute,原名ODPS)是一种快速、彻底托管的TB/PB级数据仓库解决方案。MaxCompute向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,可以更快速的解决用户海量数据计算问题,有效下降企业成本,并保障数据安全。服务器

 

什么是 MaxCompute事件机制restful

 

MaxCompute event 用于监控表和实例等MaxCompute资源(目前只用于监控表)。当表状态发生变化时,MaxCompute 会向预先注册(订阅)的 uri 发送信息。Event通知只有在订阅Event以后才能收到。每一个project中能够订阅一个或多个Event。Event是用户的数据,同表数据同样,建立或修改时都须要有这个Project的操做权限。关于Event的Restful Api,在文章里有介绍。网络

 

为何须要 MaxCompute 事件机制多线程

 

考虑如下场景:当一个用户 A 关心某一个表 T 的操做(建立/删除/插入/修改/...)时,若是表 T 不是用户 A 建立的,那么用户 A 能够采用什么方法感知这个操做?一个方法是主动轮询这个表是否作了某个操做,可是缺点是不言而喻的。另外一个方法是,注册一个回调,当表被操做时,被动接受通知。用这种方法可使用户逻辑没必要轮询和等待对表的操做。异步

MaxCompute Event机制就是第二种方法的实现。socket

 

在实际的生产中,对以上应用场景有大量的需求,并已经造成了对MaxCompute Event丰富的应用,例如:分布式

  • 数据地图: 订阅了一些 project 的 Event,并根据 Event 通知展现这些 project 中表的元数据。
  • 跨集群复制: 监听 Event 通知以复制相应的表。
  • 蚂蚁金服: 依赖事件通知机制进行工做流管理,统计,受权等工做。 事实上,每一个 project 都有大量用户订阅了所属project的表以及其它project表的事件通知。

 

MaxCompute 事件机制是怎样实现的

 

本节首先将 MaxCompute 事件机制 做为一个黑盒,从用户的角度介绍其功能和使用方法。然后以此为切入点,深刻剖析 MaxCompute 事件机制的内部机理。最后,提出一些对当前事件机制的思考。

 

订阅(注册)一个事件 & 事件通知

 

在网络编程中,为了减轻多线程的压力,每每使用事件通知驱动的异步编程。如,libevent[2]。使用这个库编写一个服务器程序,能够这样作:

 

void on_accept(int sock, short event, void* arg);

 

int main(int argc, char* argv[])

{

// create socket s

struct sockadddr_in addrin;

int s = socket(AF_INET, SOCK_STREAM, 0);

BOOL bReuseaddr=TRUE;

setsockopt(s, SOL_SOCKET ,SO_REUSEADDR, (const char*)&bReuseaddr, sizeof(BOOL));

memset(&addrin, 0, sizeof(addrin));

addrin.sin_family = AF_INET;

addrin.sin_port = htons(PORT);

addrin.sin_addr.s_addr = INADDR_ANY;

bind(s, (struct sockaddr*)&addrin, sizeof(struct sockaddr));

listen(s, BACKLOG);

 

// 建立事件池 event base

struct event_base* eb = event_base_new();

 

// 建立事件 & 绑定回调

struct event e;

event_set(&e, s, EV_READ|EV_PERSIST, on_accept, NULL);

 

// 注册事件

event_base_set(eb, &e);

event_add(&e, NULL);

 

// 启动事件派发

event_base_dispatch(eb);

 

return 0;

}

抽取出上面事件通知逻辑的主线:建立事件池,建立一个 event 并绑定回调函数, 把 event 注册到事件池并启动事件派发器。

在这个过程当中,事件生产者是socket(严格说是绑定在这个socket上的事件多路复用接口,如epoll),事件中转者是libevent中的事件池(event base)和事件派发器,事件消费者是事件处理回调函数。

一样的过程适用于MaxCompute event。事件池和派发器不须要用户建立。用户首先建立一个事件,而后绑定回调处理逻辑,最后把事件注册到事件池。代码以下:

 

public class TestOdpsEvent {

/**

* 建立事件方法

*/

static Event buildEvent(String eventName, String tableName, String callbackUri, String comment) throws URISyntaxException {

Event event = new Event();

event.setName(eventName); // 指定事件名称

event.setComment(comment); // 事件注释

event.setType(Event.SourceType.TABLE); // 指定事件类型,目前支持 TABLE

Event.Config config = new Event.Config();

config.setName("source");

config.setValue(tableName); // 指定事件源(即 表名). "*" 表示全部表.

event.setConfig(config);

event.setUri(new URI(callbackUri)); // 指定了一个回调地址

return event;

}

 

public static void main(String[] args) throws OdpsException, InterruptedException, URISyntaxException {

 

Account account = new AliyunAccount("xxxxxx", "xxxxxx");

Odps odps = new Odps(account);

String odps_endpoint = "http://xxxx/xxx";

odps.setEndpoint(odps_endpoint);

odps.setDefaultProject("project1");

InternalOdps iodps = new InternalOdps(odps);

 

// 建立事件 & 绑定回调

String callbackUri = "http://xxx.xxx.xxx.xxx:xxxx/xxxxx"; // this is different from odps_endpoint

Event e = buildEvent("table_create_event_1", "table1", callbackUri, "this is a test event");

 

// 注册事件

iodps.events().create(e);

 

// 查看已建立事件

Iterator<Event> events = iodps.events().iterator();

while(events.hasNext()) {

Event event1 = events.next();

System.out.println("Event found: " + event1.getName());

System.out.println("Event uri: " + event1.getUri());

// iodps.events().delete(event1.getName()); // 删除事件

}

}

}

在上面的代码中,指定了一个回调地址。当表发生变化时,就会通知这个回调地址。用户根据在这个回调地址接收到事件通知,使用相应的处理逻辑处理。事件回调地址做为事件处理逻辑入口,支持多种协议,包括但不限于kuafu, http, https等。与libevent不一样的是,MaxCompute event的生产者,中转者和消费者能够位于不一样网络区域。在用户注册事件以后,MaxCompute event机制会在该事件发生后当即通知用户注册的回调地址。

 

剖析 MaxCompute 事件机制

 

图3-1的三个部分分别表示了注册事件,转发通知,删除事件的过程。MessageService是 MaxCompute 内部消息服务,做用是转发事件通知到用户注册的回调地址。为方便理解,把 Create topic, Create subscription, Add endpoint 看做注册事件在消息服务层的三个操做。事件机制在消息服务层具体的实现将在后边介绍。

 

图3-1: 事件建立,转发,删除

 

在图3-1注册事件的过程当中,用户的请求由 OdpsWorker 的 createEventHandler 处理。createEventHandler 依次检查相应的 MessageService topic,subscription,endpoint 是否存在,若是不存在,建立。

 

在图3-1删除事件的过程相对简单,用户的请求由 OdpsWorker 的 deleteEventHandler 处理。deleteEventHandler 直接删除相应的 MessageService subscription。

 

在图3-1转发事件通知的过程当中,事件的生产者主要是ddl task(事实上,因为历史缘由,还有HiveServer,CREATETABLE 事件从这里发出),当执行对表的meta相关的操做时,就会触发ddl task。如drop table, add partition, insert into partition等。ddl task 会发送相应操做的事件通知。事件通知发送给事件中转者——消息服务。消息服务将这个事件通知发送给相应事件绑定的回调地址。

 

消息服务做为事件中转者,主要完成以下功能:

1)做为事件池维护不一样事件和回调地址的对应关系(一个事件对应一个或多个回调地址)

2)做为事件派发器根据事件通知匹配相应事件并将该事件通知转发到对应事件的各个回调地址。

目前不一样的事件依据两个属性区分:project名和事件源。事件源目前是表名。在ddl task发出的事件通知中,包含了这两个关键信息。消息服务根据这两个信息匹配相应事件。在介绍消息服务匹配的实现方式时,首先须要了解 MaxCompute消息服务 的基本概念(为便于理解,本文简化了消息服务的一些概念,如隐藏了partition概念。在文章[3]中,具体介绍了消息服务的设计和实现)。如图3-2:

 

图3-2: 消息服务基本概念

MaxCompute消息服务包含四个基本概念:topic, subscription, filter, endpoint。消息服务使用了典型的发布订阅模型。用户能够建立topic。建立一个或多个subscription(包含一个或多个endpoint)订阅这个topic。消息发布者向topic发送消息。该消息被转发到该topic的全部filter匹配的subscription的全部的endpoint。其中,topic的建立者,subscription的建立者,消息的发送者,以及消息的接收者能够是不一样的用户。在建立subscription时,须要指定filter matcher。在消息发送时须要指定filter。当某条消息发送到某个topic时,消息中的filter须要和这个topic的各个subscription的filter matcher匹配,若是匹配成功,将这个消息的一个副本发送给这个subscription的全部endpoint,不然不发送给它们,而后继续匹配其余的subscription。filter和filter_matcher的示例和匹配规则以下:

filter_matcher filter is matched
"" "k=v" yes. If filter_matcher is "", it will match forever.
"k=v" "k=v" yes
"k=v" "k=v1" no
"k1=v" "k=v" no
"k=v1|v2" "k=v1" yes
"k=v1|v2" "k=v2" yes
"k=v1|v2" "k=v1|v2" no. filter's value is 'v1|v2', not 'v1' or 'v2'.
"k1=v1,k2=v2" "k1=v1,k2=v2" yes
"k1=v1" "k1=v1,k2=v2" yes
"k1=v1,k2=v2" "k1=v1" no
"k=v" "" no
"" "" yes. If filter is "", filter_matcher will never hit except its value is ""

消息服务的这个机制,能够实现上述事件中转者的功能。将一个事件表达为一个subscription,将一个事件通知表达为一条消息,每一个endpoint记录一个回调地址。每一个project对应一个topic,用filter区分事件源。当一个事件通知产生以后,会被发送到产生通知的project所在的topic上。而后,通过匹配,转发全部的endpoint对应回调地址上。事件通知消息体示例以下:

 

<?xml version="1.0" encoding="UTF-8"?>

<Notification>

<Account>ALIYUN$odpstest1@aliyun.com</Account>

<Project>a_2_test_event</Project>

<SourceType>Table</SourceType>

<SourceName>backup_partition</SourceName>

<Reason>CREATETABLE</Reason>

<TimeStamp>Sun, 18 Sep 2016 14:21:32 GMT</TimeStamp>

<Properties/>

<OdpsMessagerId>1</OdpsMessagerId>

<OdpsMessagerTime>1474208492</OdpsMessagerTime>

</Notification>

 

<?xml version="1.0" encoding="UTF-8"?>

<Notification>

<Account>ALIYUN$odpstest1@aliyun.com</Account>

<Project>a_2_test_event</Project>

<SourceType>Table</SourceType>

<SourceName>backup_partition</SourceName>

<Reason>ADDPARTITION</Reason>

<TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp>

<Properties>

<Property>

<Name>Name</Name>

<Value>ds=ds1/pt=pt1</Value>

</Property>

</Properties>

<OdpsMessagerId>4</OdpsMessagerId>

<OdpsMessagerTime>1474289142</OdpsMessagerTime>

</Notification>

当用户订阅了 Project a_2_test_event 的 Table "backup_partition" 的事件后,当发生对这个表的 CREATETABLE 和 ADDPARTITION 操做后,会接收到上面的两个事件通知。每一个事件通知是一个 xml 格式的消息。SourceType 表示订阅的是表的事件通知仍是其它类型资源的事件通知(目前只支持表)。SourceName 表示订阅的表的名字。Reason 表示在该表上发生的操做,上例中分别是建立表的操做和增长分区的操做(在 附录 中列举了更多的操做类型)。Properties 中会有一些附加的通知属性,经常使用来指出操做发生在表的哪一个 parition 上。OdpsMessagerId 在一个 Project 的全部表中是惟一的。OdpsMessagerTime 是这条通知产生的时刻。

 

在MaxCompute线上服务环境中,每个project p1,就会对应一个名为 SQL_p1的topic(由于历史缘由hardcode了前缀SQL_,不过前缀是什么无所谓,只要能够区分事件机制的topic和其它应用中的topic就好),这个 topic 在第一次注册事件的时候自动建立(也能够在建立project时手动建立)。p1的全部事件通知都会发到这个topic上。这个topic在其对应的project删除时被删除。

 

MaxCompute消息服务为事件机制提供了对事件通知的持久化,保序,failover的功能,尽最大努力保证消息不丢,可是依然不能保证绝对不丢。下面分析事件机制可能出现的丢消息状况:事件生产者失败,消息服务失败,事件接收者失败,消息服务热升级。

 

1) 事件生产者失败:在事件通知到达消息服务以前,存在事件通知生产者失败的可能。具体的消息丢失几率取决于事件生产者的持久化,failover能力以及重试机制。

2) 消息服务失败:消息服务失败包括两种状况:消息到达消息服务前失败和消息到达以后失败。若是消息到达以前失败,那么消息服务提供的message client会重试3次,每次间隔5毫秒。若是事件成功地发送到消息服务,消息会首先被持久化。在消息服务中的一条消息只有知足下列两种状况才会被删除:a. 消息发送成功;b. 消息发送失败且超太重试次数(目前重试3600次,每次间隔60秒)。能够看到,事件(消息)丢失最大的风险在于发送到达消息服务以前的一段时间。

3) 事件接收者失败:若是接收者失败,且在得到事件通知以后,处理事件通知以前,消息服务不提供接口使接收者重获这条消息。固然对于这个问题,还有另外一种解决方法,就是使用相似kafka的消息服务模型做为中转者,能够保证事件通知更强大的可靠性。kafka模型[4]不会主动推送消息,仅仅对消息作持久化以实现高吞吐和高可靠。消息订阅者须要给定消息id的范围从某个topic的partition拉消息。当订阅者失败,但愿从新得到历史的消息时,只要给定消息id的范围,若是这个范围内的消息没有过时,就能够被从新得到。可是kafka模型使用拉消息的模式不具有完整的事件派发器功能,也就不能支持如今odps须要的异步事件通知编程方式。而事实上,MaxCompute消息服务设计的出发点,就是MaxCompute事件通知机制(在没有消息服务以前,MaxCompute worker履行着消息服务的职责)。

4)消息服务热升级:虽说是热升级,可是新老服务之间切换也是须要时间的。在线上这个时间最夸张的一次达到了4个多小时(全部topic所有切换完成的时间间隔)。而在切换的过程当中,新老消息服务中处于切换中间状态的 topic 是拒绝服务的(切换完成的 topic 能够服务)。

 

总之,MaxCompute事件通知机制提供了必定程度的高可用保证,可是尚未把丢消息的几率下降为0,其最大风险在于消息服务不服务。而此时,消息服务上某个 topic 丢失消息的数量和该 topic 不服务的时间成正比。

 

总结

 

MaxCompute事件机制给用户监听资源的变化带来了很大的便利。它借鉴了通用的事件异步编程模型,提供了友好的用户接口,支持了线上数据地图,跨集群复制等众多服务。可是,依然有不足之处,例如:

1) 事件监听(订阅/注册)的粒度粗且不可定制:咱们曾经接到用户的需求,想监听一个表的 CREATETABLE 事件,可是现有的机制只支持监听到表的级别,这样用户就不得不本身过滤这个表的各类事件。

2) 事件机制的可靠性须要进一步提升:曾经出现过热升级切换消息服务4个小时的状况,缘由是其中一个 topic 向某个 endpoint 发送消息卡在了发送那里一直没法退出,形成该 topic 上丢失大量消息。

3) 消息服务的 生产者qps(生产环境接收消息1000-2000),消费者 qps(消费极限qps未测过,因其取决于) 与 开源消息服务如 kafka 生产者 qps (50,000),消费者 qps (22,000) 依然有必定差距[4]。

 

对于当前但愿使用MaxCompute消息服务的用户,最好确保知足如下条件:

1) 容许丢失少许的消息通知,由于的确存在小几率丢消息的可能;

2) 事件处理系统具备必定的事件处理能力,接受事件qps最好能够达到500以上。

3) 不用的事件(回调uri发不通且再也不使用)请删除,不然会在消息服务中留下永久性垃圾,形成消息在pangu中大量堆积,由于消息服务没法判断用户的事件是否须要删除!!!

解决上述的问题目前依然有一些挑战,可是咱们会不断改进和完善事件机制的各项功能,减少事件丢失率,细化事件订阅粒度,优化用户体验。

 

参考资料

[1] Event restful api

[2] Libevent: http://libevent.org

[3] Odps Message Service

[4] Kreps J, Narkhede N, Rao J. Kafka: A distributed messaging system for log processing[C]//Proceedings of the NetDB. 2011: 1-7.

 

附录

 

MaxCompute事件机制事件类型列表

 

在触发 DDL 时,Event 会向预先注册的 url 发送 POST 请求,消息体格式以下

 

<?xml version="1.0" encoding="UTF-8"?>

<Notification>

<Account>ALIYUN$odpstest1@aliyun.com</Account>

<Project>a_2_test_event</Project>

<SourceType>Table</SourceType>

<SourceName>backup_partition</SourceName>

<Reason>ADDPARTITION</Reason>

<TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp>

<Properties>

<Property>

<Name>Name</Name>

<Value>ds=ds1/pt=pt1</Value>

</Property>

</Properties>

<OdpsMessagerId>4</OdpsMessagerId>

<OdpsMessagerTime>1474289142</OdpsMessagerTime>

</Notification>

其中:

Reason 可能取值 事件生产者
CREATETABLE hiveserver
DROPTABLE ddltask
ALTERTABLE ddltask
ADDPARTITION ddltask
DROPPARTITION ddltask
ALTERPARTITION ddltask
INSERTOVERWRITETABLE ddltask
INSERTINTOTABLE ddltask
INSERTOVERWRITEPARTITION ddltask
INSERTINTOPARTITION ddltask
MERGETABLE ddltask
MERGEPARTITION ddltask
ALTERVOLUMEPARTITION ddltask
ADDVOLUMEPARTITION ddltask
  • SourceType 可能取值:Table

 

使用限制

 

1) 目前只有 Project Owner 能够建立 event,没法受权给其余人建立 event

2) 接收 post 信息的 url 应返回 http code 200,server 端 post 时并不支持如 302 这样的跳转。

 

原文连接

阅读更多干货好文,请关注扫描如下二维码:

相关文章
相关标签/搜索