ETL采集器是基于Job管理器管理任务,spring管理采集清洗对象,JDBC管理器管理JDBC。 java
数据处理流程:Job管理器调度->采集(生成文件)>->清洗层读取文件->存储存储泛化日志 spring
1.ETL采集器主要特色
-
ETL采集器:分为三个部分组成采集层、清洗层、存储层 数据库
-
采集层:主要任务采集数据并生成文件 架构
-
采集层支持DB并发采集、FTP并发采集、syslog接收、本地文件采集 并发
-
支持FTP、DB 异常补采 app
-
采集层支持JOB任务阀值配置,DB链接池设置、Ftp链接设置、syslog 批量生产文件等 ide
-
提供采集层开发者模式,标准API接口 this
-
数据库表管理采集任务 url
-
清洗层:主要读取文件拆分任务,并发清洗任务 spa
-
清洗层支持数据追加、数据汇总、数据补全、过滤、映射、转换、拆分、解析
-
清洗层支持清洗任务阀值配置
-
清洗层清洗开发者模式 ,标准API接口
-
清洗层支持库表管理清洗流程
-
存储层:接收清洗完成的数据,自定义存储,库、表、hive 等
-
存储层支持自定义多库存储、自定义表存储
-
提供存储层开发者模式,标准API接口
-
存储异常保存文件,监控异常文件从新存储。
-
日志:根据采集编号记录日志,记录日志采集条数、存储条数、日志采集效率、泛化效率、异常信息等。
-
2.ETL采集器架构设计
-
采集清洗架构图
-
流程图
-
表结构设计以下
3.ETL采集器运行流程
-
策略配置
采集配置
配置清洗流程(Config)
<columns key="1" method="splitThis" parameters="{{this}},{\$}"/> <columns key="2" method="info" parameters="{{1}[7]}{+}-{+}{{1}[9]}"/> <columns key="3" method="info" parameters="{{1}[18]}{+}${+}{{1}[19]}"/> <columns key="4" method="regexpArray" parameters="{{1}[0]},{(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3})}"/> <columns key="SIP" method="iPIntID" parameters="{{4}}"/> <columns key="DIP" method="iPIntArray" parameters="{{1}[1]}"/> <columns key="DEVIP" method="iPIntArray" parameters="{{1}[1]}"/> <columns key="OPEROBJECT" method="subStringArray" parameters="{{1}[2]},{16}"/> <columns key="DEVTIME" method="dateUnixArrayTime" parameters="{{1}[3]},{yyyyMMddHHmmss}"/> <columns key="SECONDARYACCOUNT" method="textArray" parameters="{{1}[4]}"/> <columns key="USERORGANIZATION" method="textArray" parameters="{{1}[5]}"/> <columns key="OPERRESULT" method="textArray" parameters="{{1}[6]}"/> <columns key="RAWEVENTID" method="textArray" parameters="{{1}[7]}"/> <columns key="OPERCONTENT" method="subStringArray" parameters="{{1}[11]},{15}"/> <columns key="EVENTCOUNT" method="regexpArray" parameters="{{1}[11]},{.*金额:([0-9]+\\.[0-9]+).*}}"/> <columns key="FORMID" method="regexpArray" parameters="{{1}[11]},{.*备注:(.*)}"/> <columns key="FOTMID" method="textArray" parameters="{{1}[13]}"/> <columns key="DEVNAME" method="info" parameters="CRM1_1501"/> <columns key="FK_DEVTYPE" method="info" parameters="1501"/> <columns key="VERSION" method="info" parameters="3"/> <mapping method="mappingValueID" parameters="{{2}},{FK_EVENTTYPE},{EVENTNAME}"/> <mapping method="textID" parameters="{{2}}"/> <mapping method="mappingValueArray" parameters="{{1}[17]},{TREASURYSCENENUM},{TREASURYSCENE}"/> <mapping method="mappingArray" parameters="{{1}[18]},{TREASURYAUTHRESULT}"/> <mapping method="mappingID" parameters="{{3}},{TREASURYMODE}"/> <mapping method="completionData" parameters="{{opid}}"/>
-
加入JOB任务

-
执行Job
public void execute(JobExecutionContext context) { JobDataMap map = context.getJobDetail().getJobDataMap(); Iterator<Map.Entry<String, String>> it = map.entrySet().iterator(); IData<String, Object> jobParameters = new DataMap<String, Object>(); // 将参数存储到data中 while (it.hasNext()) { Map.Entry<String, String> entry = it.next(); jobParameters.put(entry.getKey(), entry.getValue()); } BeanFactory beanFactory = BeanFactory.getBean(); beanFactory.getCollectorServer().execcCollectorJob(jobParameters, beanFactory); }
建立采集对象,加入spring工厂
- 建立spring 配置xml,对象加入内存中
<!--配置FTP 生成文件 --> <bean id="collector" class="com.venustech.collector.main.Collector"> <property name="collectorMap"> <map> <!--type 1 ftp --> <entry key="1" value="ftpCreateXmlConfigImpl" /> <!--type 2 db --> <entry key="2" value="dbCreateXmlConfigImpl" /> <!--type 3 syslog --> <entry key="3" value="collectDataSyslogServer" /> <!--type 4 本地File --> <entry key="4" value="collectLocalImpl" /> </map> </property> </bean>
<?xml version="1.0" encoding="GBK"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans classpath:spring-beans.xsd"> <bean id="SA_AUDITALERT_VIEW2" class="com.venustech.collector.service.impl.CollectDataFtpImpl" > <property name="mappingId" value="CRM"/> <property name="username" value="joy"/> <property name="password" value="go2hell"/> <property name="resourcename" value="/data/data/4A"/> <property name="datasource" value="HE_AuditLogging.log.[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}"/> <property name="localpath" value="E:\java\venus\AuditData2.0\config/collector/rawlog/FTP/SA_AUDITALERT_VIEW2/"/> <property name="hostname" value="10.70.41.126"/> <property name="port" value="21"/> <property name="tablename" value="SA_AUDITALERT_VIEW2"/> <property name="parameter"> <map> <entry key="startFile" value="${startFile}"/> <entry key="fileEncode" value="${fileEncode}"/> <entry key="startCount" value="${startCount}"/> <entry key="saveId" value="${saveId}"/> <entry key="isAuto" value="${isAuto}"/> </map> </property> </bean> <bean id="property" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value> file:E:\java\venus\AuditData2.0\config/collector/py/properties/SA_AUDITALERT_VIEW2.properties </value> </list> </property> </bean> </beans>
采集生成文件
public String[] getAutoCollectFile(String id) throws Exception { if (data.getInt(ConfigParameter.PARAMETER_FTP_DIR) == 0) { ftp = new ContinueFTPImp(hostname, port, username, password, resourcename, localpath, datasource, id, data.getInt( ConfigParameter.PARAMETER_FTP_CONN_TIMEOUT, 60), data.getInt(ConfigParameter.PARAMETER_FTP_DATA_TIMEOUT, 60)); } else { ftp = new ContinueFTPDirImp(hostname, port, username, password, resourcename, localpath, datasource, id, data.getInt( ConfigParameter.PARAMETER_FTP_CONN_TIMEOUT, 60), data.getInt(ConfigParameter.PARAMETER_FTP_DATA_TIMEOUT, 60)); } return ftp.ftpAutoDownload(data .getString(ConfigParameter.PARAMETER_START_FILE)); }
拆分任务
public static List<FileAttribute> getFileAttribute(String[] fileName, int startCount, IData<String, Integer> fileData, int countData, String localpath, String mappingId, String tableName, String id, CollectorJobRunState collectorJobRunState, String dbId, int inserCount) { // 循环文件开关 boolean isFlag = true; // 拆分文件容器 List<FileAttribute> listFile = new DatasetList<FileAttribute>(); // 日志总数 int count = 0; BeanFactory beanFactory = BeanFactory.getBean(); // 循环文件计算器 int j = 0; // 文件容器 List<String> fileList = new ArrayList<String>(); // 文件模型 FileAttribute file; // 获取要处理的次数 int countFor = getSize(countData, inserCount); // 本次采集生成文件次数 int index = 1; for (int i = 0; i < countFor; i++) { while (isFlag) { count = count + fileData.getInt(fileName[j]); fileList.add(fileName[j]); if (count - startCount > inserCount - 1) { file = new FileAttribute(); file.setStartCount(startCount); startCount = fileData.getInt(fileName[j], 0) - (count - inserCount - startCount); file.setEndCount(startCount); file.setFileName(fileList); file.setMappingId(mappingId); file.setDbId(dbId); file.setId(id); file.setTableName(tableName); file.setForCount(index); file.setJobExecCount(collectorJobRunState.getCount()); file.setLocalpath(localpath); // 加入采集的日志 listFile.add(file); fileList = new ArrayList<String>(); isFlag = false; beanFactory.getLog().debug( id, Thread.currentThread().getName() + ":" + file.toString(), LogEvent.COLLECT_ADD_TASK_COUNT, inserCount, collectorJobRunState.getCount(), index); count = 0; index++; } else if (countFor == i + 1 && fileName.length - 1 == j) { file = new FileAttribute(); file.setStartCount(startCount); int indexCount = count - startCount; startCount = fileData.getInt(fileName[j], 0) - (count - inserCount - startCount); file.setEndCount(fileData.getInt(fileName[j], 0)); file.setFileName(fileList); file.setMappingId(mappingId); file.setDbId(dbId); file.setId(id); file.setTableName(tableName); file.setForCount(index); file.setJobExecCount(collectorJobRunState.getCount()); file.setLocalpath(localpath); // 加入采集的日志 listFile.add(file); isFlag = false; beanFactory.getLog().debug( id, Thread.currentThread().getName() + ":" + file.toString(), LogEvent.COLLECT_ADD_TASK_COUNT, indexCount, collectorJobRunState.getCount(), index); count = 0; index++; } else { if (j < fileName.length - 1) { j++; } } } isFlag = true; } return listFile; }
线程分发读取日志进入清洗流程
/** * 获得范化数据 * * @param id * @param list * @param key * @return * @throws XMLManagerException * @throws DocumentException */ public static IDataset<IData<String, Object>> getForMatData(String id, String mappingId, IDataset<String> list) { Map<String, CleanDataMethod> method = BeanFactory.getBean() .getMainformat().getMethod(); IDataset<IData<String, Object>> insertList = new DatasetList<IData<String, Object>>(); Element beans = null; try { beans = com.venustech.collector.BeanFactory.getBean() .getElement(id); } catch (Exception e) { new CollectorExceptionLog(Thread.currentThread().getName() + ": " + "get xml content error! ", LogEvent.LOG_TYPE_FORMAT, e) .error(id); } for (String valueThis : list) { try { IData<String, Object> data = new DataMap<String, Object>(); // 泛化 for (Iterator<?> it = beans.elementIterator(); it.hasNext();) { Element bean = (Element) it.next(); executeCleanData(mappingId, valueThis, bean, method, data); } data = ApplicationContextSynchronous.defaultData .completionDefault(data); data.put(IDataHandle.RAWLOG_ID, valueThis); insertList.add(data); data = null; } catch (Exception e) { // 内置过滤方法 } } return insertList; } /** * * @Title: execForMat * @Description: TODO(执行标准化) * @param id * @param valueThis * @param bean * @param forMat * @param forMatReturn * @return * @throws CollectorExceptionLog * @return ForMatReturn 返回类型 * @throws */ public static IData<String, Object> executeCleanData(String id, String valueThis, Element bean, Map<String, CleanDataMethod> methodMap, IData<String, Object> data) throws CollectorExceptionLog { // 方法 String method = bean.attributeValue(INI_FORMAT_ATTRIBUTE_METHOD); // 参数 String parameters = bean .attributeValue(INI_FORMAT_ATTRIBUTE_PARAMETERS); // 标识 String key = bean.attributeValue(INI_FORMAT_ATTRIBUTE_KYE); CleanDataMethod cleanDataMethod = methodMap.get(method); if (cleanDataMethod == null) { new CollectorExceptionLog("[fromat->" + id + "]" + "[element->" + bean.asXML() + "] error:java.lang.NullPointerException", new Exception("can not find method !")).formatError(id, valueThis); return data; } return cleanDataMethod.executeCleanData(valueThis, parameters, bean.asXML(), data, key, id); }
存储清洗数据
package com.venustech.collector.service.impl.thread.table; import com.venustech.collector.model.FileAttribute; import com.venustech.collector.service.SaveCollector; import com.venustech.dao.JdbcDaoManager; import com.venustech.data.IData; import com.venustech.data.IDataset; import com.venustech.model.JdbcDataSource; public class SaveCollectorDb implements SaveCollector { JdbcDataSource jdbcDataSource; @Override public String save(String id, String[] tableName, IDataset<IData<String, Object>> list, FileAttribute fileAttribute) throws Exception { JdbcDaoManager dao = new JdbcDaoManager(jdbcDataSource); try { for (String table : tableName) { dao.insert(table, list); } //自动回收 list=null; //自动回收 fileAttribute=null; } catch (Exception e) { throw e; } finally { dao.cleanupConnections(); } return jdbcDataSource.toString(); } public JdbcDataSource getJdbcDataSource() { return jdbcDataSource; } public void setJdbcDataSource(JdbcDataSource jdbcDataSource) { this.jdbcDataSource = jdbcDataSource; } }
采集层、清洗层、存储层开发者模式
采集层开发者API
实现ICollectData接口,存储本地文件
public interface ICollectData { //执行采集过程 public IData<String, String> execute(String id,CollectorJobRunState collectorJobRunState) throws Exception ; //采集同步 public IData<String, String> synchronous(String id,CollectorJobRunState collectorJobRunState) throws Exception; //自动采集 public IData<String, String> collectAuto(String id,CollectorJobRunState collectorJobRunState) throws Exception; //获取采集参数 public Map<String, String> getParameter(); //初始化采集参数 public void initCollectParameter(String id); //设置参数 public void setCollectParameter(String id,IData<String,String> data); }
将实现类加入spring容器中
</bean>
清洗开发者API
实现CleanDataMethod清洗接口
public interface CleanDataMethod { /** * * @Title: getForMatData * @Description: TODO(清洗数据方法) * @param valueThis 原始日志 * @param parameters 参数 * @param elementXml xml属性 * @param data 保存数据 * @param key 字段 * @param id id * @return * @throws CollectorExceptionLog * @return ForMatReturn 返回类型 * @throws */ public IData<String,Object> executeCleanData(String valueThis,String parameters,String elementXml, IData<String,Object> data,String key,String id)throws CollectorExceptionLog; }
将实现类加入spring容器中
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans classpath:spring-beans.xsd"> <bean id="mappingArray" class="com.venustech.collector.service.fromat.method.mapping.MappingArray" /> <bean id="mappingID" class="com.venustech.collector.service.fromat.method.mapping.MappingID" /> <bean id="mappingValueArray" class="com.venustech.collector.service.fromat.method.mapping.MappingValueArray" /> <bean id="mappingValueID" class="com.venustech.collector.service.fromat.method.mapping.MappingValueID" /> <bean id="dateUnix" class="com.venustech.collector.service.fromat.method.dateunix.DateUnix" /> <bean id="dateUnixArray" class="com.venustech.collector.service.fromat.method.dateunix.DateUnixArray" /> <bean id="dateUnixArrayTime" class="com.venustech.collector.service.fromat.method.dateunix.DateUnixArrayTime" /> <bean id="dateUnixID" class="com.venustech.collector.service.fromat.method.dateunix.DateUnixID" /> <bean id="dateUnixIDTime" class="com.venustech.collector.service.fromat.method.dateunix.DateUnixIDTime" /> <bean id="iPIntArray" class="com.venustech.collector.service.fromat.method.iptoint.IPIntArray" /> <bean id="iPIntID" class="com.venustech.collector.service.fromat.method.iptoint.IPIntID" /> <bean id="regexpArray" class="com.venustech.collector.service.fromat.method.regexp.RegexpArray" /> <bean id="regexpID" class="com.venustech.collector.service.fromat.method.regexp.RegexpID" /> <bean id="regexpThis" class="com.venustech.collector.service.fromat.method.regexp.RegexpThis" /> <bean id="splitArray" class="com.venustech.collector.service.fromat.method.split.SplitArray" /> <bean id="splitID" class="com.venustech.collector.service.fromat.method.split.SplitID" /> <bean id="splitThis" class="com.venustech.collector.service.fromat.method.split.SplitThis" /> <bean id="textArray" class="com.venustech.collector.service.fromat.method.text.TextArray" /> <bean id="textID" class="com.venustech.collector.service.fromat.method.text.TextID" /> <bean id="info" class="com.venustech.collector.service.fromat.method.info.Info" /> <bean id="subStringArray" class="com.venustech.collector.service.fromat.method.substring.SubStringArray" /> <bean id="filterRegexpArray" class="com.venustech.collector.service.fromat.method.regexp.FilterRegexpArray" /> <bean id="filterRegexpID" class="com.venustech.collector.service.fromat.method.regexp.FilterRegexpID" /> <bean id="filterRegexpThis" class="com.venustech.collector.service.fromat.method.regexp.FilterRegexpThis" /> <bean id="splitKeyValueID" class="com.venustech.collector.service.fromat.method.split.SplitKeyValueID" /> <bean id="subStringID" class="com.venustech.collector.service.fromat.method.substring.SubStringID" /> <bean id="completionData" class="com.venustech.collector.service.fromat.method.completion.CompletionData" /> <bean id="completionDataDefault" class="com.venustech.collector.service.fromat.method.completion.CompletionDataDefault" /> <bean id="subIndexStringArray" class="com.venustech.collector.service.fromat.method.substring.SubIndexStringArray" /> <bean id="subIndexStringId" class="com.venustech.collector.service.fromat.method.substring.SubIndexStringId" /> <bean id="urlToID" class="com.venustech.collector.service.fromat.method.urltoip.UrlToID" /> <bean id="urlToIpArray" class="com.venustech.collector.service.fromat.method.urltoip.UrlToIpArray" /> <bean id="mappingKey" class="com.venustech.collector.service.fromat.method.mapping.MappingKey" /> <bean id="mappingValueKey" class="com.venustech.collector.service.fromat.method.mapping.MappingValueKey" /> <bean id="toDateArray" class="com.venustech.collector.service.fromat.method.todate.ToDateArray" /> <bean id="toDateIDTime" class="com.venustech.collector.service.fromat.method.todate.ToDateIDTime" /> <bean id="mainformat" class="com.venustech.collector.service.fromat.MainForMat"> <property name="method"> <map> <entry key="mappingValueKey" value-ref="mappingValueKey" /> <entry key="mappingKey" value-ref="mappingKey" /> <entry key="mappingArray" value-ref="mappingArray" /> <entry key="mappingID" value-ref="mappingID" /> <entry key="mappingValueArray" value-ref="mappingValueArray" /> <entry key="mappingValueID" value-ref="mappingValueID" /> <entry key="dateUnix" value-ref="dateUnix" /> <entry key="dateUnixArray" value-ref="dateUnixArray" /> <entry key="dateUnixArrayTime" value-ref="dateUnixArrayTime" /> <entry key="dateUnixID" value-ref="dateUnixID" /> <entry key="dateUnixIDTime" value-ref="dateUnixIDTime" /> <entry key="iPIntArray" value-ref="iPIntArray" /> <entry key="iPIntID" value-ref="iPIntID" /> <entry key="regexpArray" value-ref="regexpArray" /> <entry key="regexpID" value-ref="regexpID" /> <entry key="regexpThis" value-ref="regexpThis" /> <entry key="splitArray" value-ref="splitArray" /> <entry key="splitID" value-ref="splitID" /> <entry key="splitThis" value-ref="splitThis" /> <entry key="textArray" value-ref="textArray" /> <entry key="textID" value-ref="textID" /> <entry key="subStringID" value-ref="subStringID" /> <entry key="subStringArray" value-ref="subStringArray" /> <entry key="completionData" value-ref="completionData" /> <entry key="subIndexStringArray" value-ref="subIndexStringArray" /> <entry key="subIndexStringId" value-ref="subIndexStringId" /> <entry key="completionDataDefault" value-ref="completionDataDefault" /> <entry key="urlToID" value-ref="urlToID" /> <entry key="urlToIpArray" value-ref="urlToIpArray" /> <entry key="filterRegexpArray" value-ref="filterRegexpArray" /> <entry key="filterRegexpID" value-ref="filterRegexpID" /> <entry key="filterRegexpThis" value-ref="filterRegexpThis" /> <entry key="splitKeyValueID" value-ref="splitKeyValueID" /> <!--映射 --> <entry key="mappingArray" value-ref="mappingArray" /> <entry key="mappingID" value-ref="mappingID" /> <entry key="mappingValueArray" value-ref="mappingValueArray" /> <entry key="mappingValueID" value-ref="mappingValueID" /> <entry key="mappingKey" value-ref="mappingKey" /> <entry key="mappingValueKey" value-ref="mappingValueKey" /> <entry key="info" value-ref="info" /> <!--補全 --> <entry key="completionData" value-ref="completionData" /> <entry key="completionDataDefault" value-ref="completionDataDefault" /> <!--data转换--> <entry key="toDateIDTime" value-ref="toDateIDTime" /> <entry key="toDateArray" value-ref="toDateArray" /> </map> </property> </bean> </beans>
存储层开发者API
实现SaveCollector接口
public interface SaveCollector{ public String save(String id, String[] tableName, IDataset<IData<String, Object>> list,FileAttribute fileAttribute) throws Exception; }将实现类加入spring容器中