先说说需求的背景,因为业务数据都在Oracle数据库中,想要对它进行数据的分析会很是很是慢,用传统的数据仓库-->数据集市这种方式,集市层表会很是大,查询的时候若是再作一些group的操做,一个访问须要一分钟甚至更久才能响应。html
为了解决这个问题,就想把业务库的数据迁移到Elasticsearch中,而后针对es再去作聚合查询。java
问题来了,数据库中的数据量很大,如何导入到ES中呢?sql
Logstash提供了一款JDBC的插件,能够在里面写sql语句,自动查询而后导入到ES中。这种方式比较简单,须要注意的就是须要用户本身下载jdbc的驱动jar包。数据库
input { jdbc { jdbc_driver_library => "ojdbc14-10.2.0.3.0.jar" jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:test" jdbc_user => "test" jdbc_password => "test123" schedule => "* * * * *" statement => "select * from TARGET_TABLE" add_field => ["type","a"] } } output{ elasticsearch { hosts =>["10.10.1.205:9200"] index => "product" document_type => "%{type}" } }
所以,就考虑本身来导。json
最后使用发现,本身写的导入程序,比Logstash jdbc快5-6倍~~~~~~ 嗨皮!!!!bash
下面的代码须要注意的就是oracle
public class JDBCUtil { private static Connection conn = null; private static PreparedStatement sta=null; static{ try { Class.forName("oracle.jdbc.driver.OracleDriver"); conn = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:test", "test", "test123"); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (SQLException e) { e.printStackTrace(); } System.out.println("Database connection established"); } /** * 把查到的数据格式化写入到文件 * * @param list 须要存储的数据 * @param index 索引的名称 * @param type 类型的名称 * @param path 文件存储的路径 **/ public static void writeTable(List<Map> list,String index,String type,String path) throws SQLException, IOException { System.out.println("开始写文件"); File file = new File(path); int count = 0; int size = list.size(); for(Map map : list){ FileUtils.write(file, "{ \"index\" : { \"_index\" : \""+index+"\", \"_type\" : \""+type+"\" } }\n","UTF-8",true); FileUtils.write(file, JSON.toJSONString(map)+"\n","UTF-8",true); // System.out.println("写入了" + ((count++)+1) + "[" + size + "]"); } System.out.println("写入完成"); } /** * 读取数据 * @param sql * @return * @throws SQLException */ public static List<Map> readTable(String tablename,int start,int end) throws SQLException { System.out.println("开始读数据库"); //执行查询 sta = conn.prepareStatement("select * from(select rownum as rn,t.* from "+tablename+" t )where rn >="+start+" and rn <"+end); ResultSet rs = sta.executeQuery(); //获取数据列表 List<Map> data = new ArrayList(); List<String> columnLabels = getColumnLabels(rs); Map<String, Object> map = null; while(rs.next()){ map = new HashMap<String, Object>(); for (String columnLabel : columnLabels) { Object value = rs.getObject(columnLabel); map.put(columnLabel.toLowerCase(), value); } data.add(map); } sta.close(); System.out.println("数据读取完毕"); return data; } /** * 得到列名 * @param resultSet * @return * @throws SQLException */ private static List<String> getColumnLabels(ResultSet resultSet) throws SQLException { List<String> labels = new ArrayList<String>(); ResultSetMetaData rsmd = (ResultSetMetaData) resultSet.getMetaData(); for (int i = 0; i < rsmd.getColumnCount(); i++) { labels.add(rsmd.getColumnLabel(i + 1)); } return labels; } /** * 得到数据库表的总数,方便进行分页 * * @param tablename 表名 */ public static int count(String tablename) throws SQLException { int count = 0; Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE); ResultSet rs = stmt.executeQuery("select count(1) from "+tablename); while (rs.next()) { count = rs.getInt(1); } System.out.println("Total Size = " + count); rs.close(); stmt.close(); return count; } /** * 执行查询,并持久化文件 * * @param tablename 导出的代表 * @param page 分页的大小 * @param path 文件的路径 * @param index 索引的名称 * @param type 类型的名称 * @return * @throws SQLException */ public static void readDataByPage(String tablename,int page,String path,String index,String type) throws SQLException, IOException { int count = count(tablename); int i =0; for(i =0;i<count;){ List<Map> map = JDBCUtil.readTable(tablename,i,i+page); JDBCUtil.writeTable(map,index,type,path); i+=page; } } }
在main方法中传入必要的参数便可:curl
public class Main { public static void main(String[] args) { try { JDBCUtil.readDataByPage("TABLE_NAME",1000,"D://data.json","index","type"); } catch (SQLException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
这样获得bulk的数据后,就能够运行脚本分批导入了。elasticsearch
下面脚本的思路,就是每100000行左右的数据导入到一个目标文件,使用bulk命令导入到es中。注意一个细节就是不能随意的切分文件,由于bulk的文件是两行为一条数据的。工具
#!/bin/bash count=0 rm target.json touch target.json while read line;do ((count++)) { echo $line >> target.json if [ $count -gt 100000 ] && [ $((count%2)) -eq 0 ];then count=0 curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null rm target.json touch target.json fi } done < $1 echo 'last submit' curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null
最后执行脚本:
sh auto_bulk.sh data.json
本身测试最后要比logstasj jdbc快5-6倍。