ETL集群采集器基于ETL采集器(单机版)的一个升级,主要分为五大组件:分别为JOB任务组件、采集器组件、KAFKA消息组件、ETL清洗组件、存储组件,每台节点每秒清洗3W-5W条日志。 java
-
JOB任务组件
-
JOB任务组件简要介绍
-
技术简要说明:基于JOB管理器、zookeeper、hadoop RPC 开发 node
-
JOB任务组件分三部分组成:MasterJob、SlaveJob、zookeeper 数据库
-
MasterJob主要责任是生产任务,创建RPC服务 服务器
-
SlaveJob主要责任是消费执行任务,经过RPC获取任务 架构
-
zookeeper主要责任监控MasterJob、SlaveJob 快速切换 Master,并广播任务 并发
-
负载均衡消费任务 负载均衡
-
支持任务的启用、停用 ide
-
支持MySql、Oracle、DB2等多种数据库管理任务 函数
-
灵活便利的管理quartz任务 oop
-
支持任务参数的传递
-
具体实现参考单机版ETL http://my.oschina.net/u/2470985/blog/509714
-
支持任务参数的传递
-
JOB任务组件架构设计
-
zookeeper 监控Master 、Slave ,选举Master,选举RPC服务端
papublic void init() throws Exception { client = CuratorFrameworkFactory.newClient(zookQuorum, // 服务器列表 createTimeout, // 会话超时时间,单位毫秒 connTimeout, // 链接建立超时时间,单位毫秒 new ExponentialBackoffRetry(time, timeoutCount) // 重试策略 ); nodeFactory = NodeFactory.getNodeFactory(); // 启动zk client.start(); // 监控分发job任务监控 jobMonitor(); // 添加master、slave监控 addMasterMonitor(); // 初始化节点 initCreateNode(client, nodePath, nodePathName, jobTaskPath, jobTaskPathName, nodeFactory); } /** * * @Title: initCreateNode * @Description: 初始化节点 * @param client * @param nodePath * @param nodePathName * @param jobTaskPath * @param jobTaskPathName * @param nodeFactory * @throws Exception * @return: void */ public void initCreateNode(CuratorFramework client, String nodePath, String nodePathName, String jobTaskPath, String jobTaskPathName, NodeFactory nodeFactory) throws Exception { // 建立node节点 client.create().creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(nodePath + nodePathName); if (client.checkExists().forPath(jobTaskPath + jobTaskPathName) == null) { client.create() .creatingParentsIfNeeded() .forPath(jobTaskPath + jobTaskPathName, nodeFactory.getIp().getBytes()); } } /** * * @Title: jobMonitor * @Description: 监控JOB * @throws Exception * @return: void */ public void jobMonitor() throws Exception { zookeeperjobTask = new PathChildrenCache(client, jobTaskPath, true); zookeeperjobTask.getListenable().addListener( new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent enEvent) throws Exception { nodeFactory.getZookManage().masterMonitor(client, jobTaskPath, rpcPort, nodeFactory); } }); zookeeperjobTask.start(StartMode.BUILD_INITIAL_CACHE); } /** * * @Title: addmasterMonitor * @Description: 监控Master * @throws Exception * @return: void */ public void addMasterMonitor() throws Exception { zookeeperNodeEvent = new PathChildrenCache(client, nodePath, true); zookeeperNodeEvent.getListenable().addListener( new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent enEvent) throws Exception { nodeFactory.getZookManage().masterMonitor(client, nodePath, rpcPort, nodeFactory); } }); zookeeperNodeEvent.start(StartMode.BUILD_INITIAL_CACHE); }
@Override public void masterMonitor(CuratorFramework client, String nodePath, int port, NodeFactory nodeFactory) throws Exception { // 状态 Map<String, String> nodeState = nodeFactory.getNodeState(); // 获取当前IP String ip = nodeFactory.getIp(); // 获取masterNode String masterNode = nodeFactory.getMasterNode(); // 设置job节点 List<String> nodeList = client.getChildren().forPath(nodePath); Collections.sort(nodeList); String master = (new String(client.getData().forPath( nodePath + "/" + nodeList.get(0)))); // 选本机为master if (ip.equals(master)) { // 判断本机是否新的master null则为新的master if (nodeFactory.getRpcServiceNode() == null) { nodeFactory.setMasterNode(master); RpcServiceNode rpcServiceNode = new RpcServiceNode(); rpcServiceNode.setPort(port); rpcServiceNode.setHost(ip); rpcServiceNode.start(); // 设置rpc对象 nodeFactory.setRpcCommand(getRpcCommandProxy(master, port)); // 设置RPC客户端 nodeFactory.setRpcServiceNode(rpcServiceNode); // 开启策略 BeanFactory.getBean().getLoadingService().init(); // 设置服务 nodeState.put(ip + PROCESS_CUT + StartNode.HOST_MASTER, PROCESS_START); nodeState.put(ip + PROCESS_CUT + StartNode.HOST_SLAVE, PROCESS_START); } } else { // 假如本机不是当前master则关闭RPC服务 if (nodeFactory.getRpcServiceNode() != null) { nodeFactory.getRpcServiceNode().stop(); nodeFactory.setRpcServiceNode(null); //关闭JOB BeanFactory.getBean().getLoadingService().stop(); nodeState.remove(ip + PROCESS_CUT + StartNode.HOST_MASTER); } // 假如当前master发生改变则切换 if ((!masterNode.equals(master))) { nodeFactory.setRpcCommand(getRpcCommandProxy(master, port)); nodeFactory.setMasterNode(master); nodeState.put(ip + PROCESS_CUT + StartNode.HOST_SLAVE, PROCESS_START); } } } @Override public RpcCommand getRpcCommandProxy(String ip, int port) throws Exception { return RPC.getProxy(RpcCommand.class, RpcCommand.versionID, new InetSocketAddress(ip, port), new Configuration()); }
-
采集器组件
-
采集组件简要介绍
-
采集层支持DB并发采集、FTP并发采集、syslog接收、本地文件采集
-
支持FTP、DB 异常补采
-
采集层支持JOB任务阀值配置,DB链接池设置、Ftp链接设置、syslog 批量生产文件等
-
提供采集层开发者模式,标准API接口
-
数据库表管理采集任务
-
将采集的数据负载均衡到KAFKA中间件中
-
kafka组件
-
kafka组件简要介绍
-
接收采集器消息存放分区中
-
kafka负载均衡消息,ETL负载均衡消费分区中消息
-
kafka支持订阅、消费消息、ETL实时分析消息
-
etl组件
-
etl组件简要介绍
-
清洗层支持数据追加、数据汇总、数据补全、过滤、映射、转换、拆分、解析
-
清洗层支持清洗任务阀值配置
-
清洗层清洗开发者模式 ,标准API接口
-
清洗层支持库表管理清洗流程
-
接收清洗完成的数据,自定义存储,库、表、hive 等
-
存储层支持自定义多库存储、自定义表存储
-
提供存储层开发者模式,标准API接口
-
存储异常保存文件,监控异常文件从新存储。
-
支持实时分析,支持开发etl函数库
ETL集群采集器设计
ETL采集清洗应用(审计系统架构)
