Super CSV是一个用于处理CSV文件的Java开源项目。它彻底围绕面向对象的思想进行设计,所以能够利用你的面向对象代码来使得处理CSV文件变得更加简易。它支持输入/输出类型转换、数据完整性校验,支持从任何地方以任何编码读写数据,只要提供相应的Reader与Writer对象。可配置分割符,空格符号和行结束符等。 java
1、下面先来看简单数据处理
引入依赖包:spring
<dependency> <groupId>net.sf.supercsv</groupId> <artifactId>super-csv</artifactId> <version>2.4.0</version> </dependency>
下面来看一下官方文档中的代码示例。 sql
import java.util.Date; public class UserBean { int id; Date date; String username, password, town; int zip; public Date getDate() {return date;} public void setDate(Date date) {this.date = date;} public int getId() { return id;} public String getPassword() { return password; } public String getTown() { return town; } public String getUsername() { return username; } public int getZip() { return zip; } public void setId(int id) { this.id = id; } public void setPassword(String password) { this.password = password; } public void setTown(String town) { this.town = town; } public void setUsername(String username) { this.username = username; } public void setZip(int zip) { this.zip = zip; } }
而且有一个CSV文件,包含一个文件头,假设文件内容以下:
id,username,password,date,zip,town
1,Klaus,qwexyKiks,17/1/2007,1111,New York
2,Oufud,bobilop213,10/10/2007,4555,New York
3,Oufud1,bobilop213,10/10/2007,4555,New York
4,Oufud2,bobilop213,10/10/2007,4555,New York
5,Oufud3,bobilop213,10/10/2007,4555,New York
6,Oufud4,bobilop213,10/10/2007,4555,New York
7,Oufud5,bobilop213,10/10/2007,4555,New York
8,Oufud6,bobilop213,10/10/2007,4555,New York
9,Oufud7,bobilop213,10/10/2007,4555,New York
10,Oufud8,bobilop213,10/10/2007,4555,New York
11,Oufud9,bobilop213,10/10/2007,4555,New York
12,Oufud10,bobilop213,10/10/2007,4555,New York
13,Oufud11,bobilop213,10/10/2007,4555,New York
14,Oufud12,bobilop213,10/10/2007,4555,New York
15,Oufud13,bobilop213,10/10/2007,4555,New York数据库
而后你能够使用一下代码来建立UserBean的实例对象,并打印出对象的属性值:
class ReadingObjects { public static void main(String[] args) throws Exception{ ICsvBeanReader inFile = new CsvBeanReader(new FileReader("foo.csv"), CsvPreference.STANDARD_PREFERENCE); try { final String[] header = inFile.getCSVHeader(true); UserBean user; while( (user = inFile.read(UserBean.class, header, processors)) != null) { System.out.println(user.getZip()); } } finally { inFile.close(); } } }
咱们还剩下processors没有定义,经过名字咱们能够看出是解析器,用来处理每列的数据,固然你也能够传入null,表示该列不作特殊处理,每一个解析器能够被另一个包含在内部,new Unique(new StrMinMax(5,20)),这个代码该列的值为惟一的,而且长度为8到20,具体处理细节咱们先不讲,来看一下咱们所须要的processors是如何定义的:数组
final CellProcessor[] processors = new CellProcessor[] { new Unique(new ParseInt()), new Unique(new StrMinMax(5, 20)), new StrMinMax(8, 35), new ParseDate("dd/MM/yyyy"), new Optional(new ParseInt()), null };
上面的代码的具体意思为:
第一列是一个字符串,而且值是惟一的,长度为5到20
第二列是一个字符串,长度是8到35
第三列为一个日期类型,格式为天/月/年(day/month/year)
第四列是一个整型数字,但只有这列有值的时候ParseInt处理器才会去处理这个值(其实就是该列能够为空)
第五列为一个字符串(默认),不使用处理器 并发
若是你的CSV文件没有头,你也能够定义个数组来替代:ide
final String[] header = new String[] { "id",”"username", "password", "date", "zip", "town"};
若是你想忽略某一列,和定义处理器相似,直接在头数组中使用null。 函数
所有代码以下: 大数据
import Java.io.FileReader; import Java.io.IOException; import org.supercsv.cellprocessor.Optional; import org.supercsv.cellprocessor.ParseDate; import org.supercsv.cellprocessor.ParseInt; import org.supercsv.cellprocessor.constraint.StrMinMax; import org.supercsv.cellprocessor.constraint.Unique; import org.supercsv.cellprocessor.ift.CellProcessor; import org.supercsv.io.CsvBeanReader; import org.supercsv.io.ICsvBeanReader; import org.supercsv.prefs.CsvPreference; class ReadingObjects { static final CellProcessor[] userProcessors = new CellProcessor[] { new Unique(new ParseInt()), new Unique(new StrMinMax(5, 20)), new StrMinMax(8, 35), new ParseDate("dd/MM/yyyy"), new Optional(new ParseInt()), null }; public static void main(String[] args) throws Exception { ICsvBeanReader inFile = new CsvBeanReader(new FileReader("D:\\foo.csv"), CsvPreference.STANDARD_PREFERENCE); try { final String[] header = inFile.getHeader(true); UserBean user; while( (user = inFile.read(UserBean.class, header, userProcessors)) != null) { System.out.println(user.getZip()); } } finally { inFile.close(); } } } public class UserBean { String username, password, town; Date date; int zip; public Date getDate() { return date; } public String getPassword() { return password; } public String getTown() { return town; } public String getUsername() { return username; } public int getZip() { return zip; } public void setDate(final Date date) { this.date = date; } public void setPassword(final String password) { this.password = password; } public void setTown(final String town) { this.town = town; } public void setUsername(final String username) { this.username = username; } public void setZip(final int zip) { this.zip = zip; } }
若是你在读取文件以前根本不知道文件的具体格式,你能够选择CsvListReader.read()方法,把每行读出出来的数据放在一个List里面。 ui
读取文件的代码咱们看到了,下面来看一下写的操做,也很简单。
import Java.util.HashMap; import org.supercsv.io.*; import org.supercsv.prefs.CsvPreference; class WritingMaps { main(String[] args) throws Exception { ICsvMapWriter writer = new CsvMapWriter(new FileWriter(...), CsvPreference.STANDARD_PREFERENCE); try { final String[] header = new String[] { "name", "city", "zip" }; // set up some data to write final HashMap<String, ? super Object> data1 = new HashMap<String, Object>(); data1.put(header[0], "Karl"); data1.put(header[1], "Tent city"); data1.put(header[2], 5565); final HashMap<String, ? super Object> data2 = new HashMap<String, Object>(); data2.put(header[0], "Banjo"); data2.put(header[1], "River side"); data2.put(header[2], 5551); // the actual writing writer.writeHeader(header); writer.write(data1, header); writer.write(data2, header); } finally { writer.close(); } } }
利用MapReader方式解析的代码:
csv文件:
ustomerNo,firstName,lastName,birthDate,mailingAddress,married,numberOfKids,favouriteQuote,email,loyaltyPoints 1,John,Dunbar,13/06/1945,"1600 Amphitheatre Parkway Mountain View, CA 94043 United States",,,"""May the Force be with you."" - Star Wars",jdunbar@gmail.com,0 2,Bob,Down,25/02/1919,"1601 Willow Rd. Menlo Park, CA 94025 United States",Y,0,"""Frankly, my dear, I don't give a damn."" - Gone With The Wind",bobdown@hotmail.com,123456 3,Alice,Wunderland,08/08/1985,"One Microsoft Way Redmond, WA 98052-6399 United States",Y,0,"""Play it, Sam. Play ""As Time Goes By."""" - Casablanca",throughthelookingglass@yahoo.com,2255887799 4,Bill,Jobs,10/07/1973,"2701 San Tomas Expressway Santa Clara, CA 95050 United States",Y,3,"""You've got to ask yourself one question: ""Do I feel lucky?"" Well, do ya, punk?"" - Dirty Harry",billy34@hotmail.com,36
示例代码:
import org.supercsv.cellprocessor.Optional; import org.supercsv.cellprocessor.ParseBool; import org.supercsv.cellprocessor.ParseDate; import org.supercsv.cellprocessor.ParseInt; import org.supercsv.cellprocessor.constraint.LMinMax; import org.supercsv.cellprocessor.constraint.NotNull; import org.supercsv.cellprocessor.constraint.StrRegEx; import org.supercsv.cellprocessor.constraint.UniqueHashCode; import org.supercsv.cellprocessor.ift.CellProcessor; import org.supercsv.io.CsvMapReader; import org.supercsv.io.ICsvMapReader; import org.supercsv.prefs.CsvPreference; import java.io.FileReader; import java.util.Map; public class ReadingObjects { public static void main(String[] args) throws Exception{ ICsvMapReader mapReader = null; try { mapReader = new CsvMapReader(new FileReader("D:\\foo.csv"), CsvPreference.STANDARD_PREFERENCE); // the header columns are used as the keys to the Map final String[] header = mapReader.getHeader(true); final CellProcessor[] processors = getProcessors(); Map<String,Object> customerMap; while( (customerMap = mapReader.read(header, processors)) != null ) { System.out.println(String.format("lineNo=%s, rowNo=%s, customerMap=%s", mapReader.getLineNumber(), mapReader.getRowNumber(), customerMap)); } } finally { if( mapReader != null ) { mapReader.close(); } } } private static CellProcessor[] getProcessors() { final String emailRegex = "[a-z0-9._] @[a-z0-9.] "; // just an example, not very robust! StrRegEx.registerMessage(emailRegex, "must be a valid email address"); final CellProcessor[] processors = new CellProcessor[] { new UniqueHashCode(), // customerNo (must be unique) new NotNull(), // firstName new NotNull(), // lastName new ParseDate("dd/MM/yyyy"), // birthDate new NotNull(), // mailingAddress new Optional(new ParseBool()), // married new Optional(new ParseInt()), // numberOfKids new NotNull(), // favouriteQuote new StrRegEx(emailRegex), // email new LMinMax(0L, LMinMax.MAX_LONG) // loyaltyPoints }; return processors;} }
2、并发分批处理大数据量的数据更新
代码以下
import org.supercsv.cellprocessor.Optional; import org.supercsv.cellprocessor.ParseDate; import org.supercsv.cellprocessor.ParseInt; import org.supercsv.cellprocessor.constraint.StrMinMax; import org.supercsv.cellprocessor.constraint.Unique; import org.supercsv.cellprocessor.ift.CellProcessor; import org.supercsv.io.CsvBeanReader; import org.supercsv.io.ICsvBeanReader; import org.supercsv.prefs.CsvPreference; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; class ThreadReadingObjects { static final CellProcessor[] userProcessors = new CellProcessor[] { new Unique(new ParseInt()),//惟一的,int id new Unique(new StrMinMax(5, 20)),//惟一的,长度为5到20 new StrMinMax(8, 35), //长度是8到35 new ParseDate("dd/MM/yyyy"), //格式为天/月/年(day/month/year) new Optional(new ParseInt()), //整型数字,但只有这列有值的时候ParseInt处理器才会去处理这个值(其实就是该列能够为空) null //不使用处理器 }; public static void main(String[] args) throws Exception { // InputStreamReader freader = new InputStreamReader(inputStream,"UTF-8"); // ICsvBeanReader inFile = new CsvBeanReader(freader, CsvPreference.STANDARD_PREFERENCE); ICsvBeanReader inFile = new CsvBeanReader(new FileReader("D:\\foo.csv"), CsvPreference.STANDARD_PREFERENCE); ExecutorService executorService = null; try { //若是你的CSV文件没有头,你也能够定义个数组来替代: // final String[] header = new String[] { "id","username", "password", "date", "zip", "town"}; final String[] header = inFile.getHeader(true); //建立线程池 //注意: 线程数不宜过多,jdbc操做时会占用链接数,过多会超出数据库链接 List<Future<String>> futureList = new ArrayList<Future<String>>(); executorService = Executors.newFixedThreadPool(5); //分页读取数据后,加入线程池处理 while (getPageUserList(executorService,futureList,inFile, header)) {} //获取线程处理结果 for (Future<String> future : futureList) { while (true) { if (future.isDone() && !future.isCancelled()) { System.out.println("future result: "+future.get()); break; } else { Thread.sleep(1000); } } } } finally { inFile.close(); executorService.shutdown(); } } private static boolean getPageUserList(ExecutorService executorService, List<Future<String>> futureList, ICsvBeanReader inFile, String[] header) throws IOException { int index = 0; boolean status = false; List<UserBean> userBeans = new ArrayList<UserBean>(); UserBean user; while ((user = inFile.read(UserBean.class, header, userProcessors)) != null) {// 这里从第一行开始取数据 userBeans.add(user); index++; //每次读取的行数,每一个线程处理的记录数,根据实际状况修改 if (index == 10) { status = true; break; } } //添加到线程集合 if(!userBeans.isEmpty()){ Future<String> future = executorService.submit(getUpdateDbJob(futureList.size(),userBeans)); futureList.add(future); } return status; } private static Callable<String> getUpdateDbJob(int threadNo,List<UserBean> userBeans) { return new Callable<String>() { @Override public String call() throws Exception { int count = userBeans.size(); //第一种: 数组List函数分批量处理方法 batchPageInsertDataOne(threadNo,userBeans); //第二种:取% 分批处理方法 // batchPageInsertDataTwo(threadNo,userBeans); return String.valueOf(count); } }; } private static void batchPageInsertDataOne(int threadNo,List<UserBean> userBeans){ int perCount = 4, index = 0; int times = userBeans.size() / perCount; long stime=System.currentTimeMillis(); try { do { // 休眠50ms Thread.sleep(50); List<UserBean> listTemp= null; if (userBeans.size() >= perCount) { listTemp = userBeans.subList(0, perCount);// listTemp是分段处理逻辑的参数 System.out.println("线程"+threadNo+"更新用户:"+listTemp.size()+" 个"); }else{ listTemp = userBeans.subList(0, userBeans.size());// listTemp是分段处理逻辑的参数 System.out.println("线程"+threadNo+"更新用户:"+listTemp.size()+" 个"); } // 事务单元执行个数==尽可能在事务里面处理少一点(事务尽可能小一点) //注意: 每次分批事务提交时数量不宜过多,太多会形成行锁; jdbcPerBatchInsert(listTemp); userBeans.removeAll(listTemp); index++; }while(index<= times); // 计算时间 long etime=System.currentTimeMillis(); System.out.println("线程"+threadNo+"批量事务插入总共耗时-----------------------:"+(etime-stime)+"ms!"); }catch(Exception e) { e.printStackTrace(); System.out.println("JDBC批量执行插入异常:>>" + userBeans.size()); throw new RuntimeException(); } } private static void batchPageInsertDataTwo(int threadNo,List<UserBean> userBeans){ long stime=System.currentTimeMillis(); try { //分批量写入数据库 int perCount = 4; List<UserBean> userList = new ArrayList<UserBean>(); for(int i=0;i<userBeans.size();i++){ userList.add(userBeans.get(i)); //若是数据量比较大再次事务分批commit,提交 perCount 条记录 //取 % 条数根据实际状况修改 if (i > 0 && i % perCount == 0) { System.out.println("线程"+threadNo+"更新用户:"+userList.size()+" 个成功"); //采用jdbcTemplate 批量写入数据库 jdbcPerBatchInsert(userBeans); userList.clear(); } else if (i == userBeans.size() - 1) { //处理最后一批数据提交 System.out.println("线程"+threadNo+"更新用户:"+userList.size()+" 个成功"); //采用jdbcTemplate 批量写入数据库 jdbcPerBatchInsert(userBeans); userList.clear(); } } // 计算时间 long etime=System.currentTimeMillis(); System.out.println("线程"+threadNo+"批量事务插入总共耗时-----------------------:"+(etime-stime)+"ms!"); }catch(Exception e) { e.printStackTrace(); System.out.println("JDBC批量执行插入异常:>>" + userBeans.size()); throw new RuntimeException(); } } /** * 采用jdbcTemplate 批量写入数据库 * @param listTemp */ private static void jdbcPerBatchInsert(List<UserBean> listTemp){ } }
运行后返回结果:
线程0更新用户:4 个 线程1更新用户:4 个 线程0更新用户:4 个 线程1更新用户:1 个 线程1批量事务插入总共耗时-----------------------:100ms! 线程0更新用户:2 个 线程0批量事务插入总共耗时-----------------------:151ms! future result: 10 future result: 5
实际工做中遇到的坑,一块儿分享给你们,本人实际操做几百万数据处理遇到的问题
注意:
问题1:
org.springframework.jdbc.CannotGetJdbcConnectionException: Could not get JDBC Connection; nested exception is com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 100000, active 8, maxActive 8, runningSqlCount 7 :
线程数过多,形成数据库链接数不够,调整maxActive最大链接数参数;
问题2:
Caused by: java.sql.BatchUpdateException: Deadlock found when trying to get lock; try restarting transaction
每次jdbc事务comiit的数量过大,形成锁表问题,尽可能在事务里面处理少一点;
好的,记录完毕,以为看了有帮助的点个赞!O(∩_∩)O哈!