最近架构一个项目,实现行情的接入和分发,须要达到极致的低时延特性,这对于证券系统是很是重要的。接入的行情源是能够配置,既能够是Level-1,也能够是Level-2或其余第三方的源。虽然Level-1行情没有Level-2快,可是做为系统支持的行情源,咱们仍是须要优化它,使得从文件读取,到用户经过socket收到行情,端到端的时延尽量的低。本文主要介绍对level-1行情dbf文件读取的极致优化方案。相信对其余的dbf文件读取应该也有借鉴意义。 html
Level-1行情是由行情小站,定时每隔几秒把dbf文件(上海是show2003.dbf,深圳是sjshq.dbf)更新一遍,用新的行情替换掉旧的。咱们的目标就是,在新文件完成更新后,在最短期内将文件读取到内存,把每一行转化为对象,把每一个列转化为对应的数据类型。 java
咱们一共采用了6种优化方式。 算法
咱们在上文《Java读取Level-1行情dbf文件极致优化(1)》 《Java读取Level-1行情dbf文件极致优化(2)》中,已经介绍了4种优化策略:多线程
优化一:采用内存硬盘(RamDisk)
优化二:采用JNotify,用通知替代轮询架构
优化三:采用NIO读取文件
优化四:减小读取文件时内存反复分配和GC
socket
行情dbf文件不少字段是价格类型的字段,带2位或者3位小数,从dbf读取他们的后,咱们会把它们保存在Long类型或者Int类型,而不是Float或Double类型,好比1.23,转换为1230保存。由于Float型或Double型会丢失精度。ide
若是不优化,读取步骤为:性能
1,从byte[]对应的偏移中读取并保存到String中。测试
2,对String作trim操做优化
3,把String转换为Float类型
4,把Float类型乘以1000并强转为Long类型。
不用多说,以上的过程必定是低效的,光前两步就涉及到2次字符串拷贝,2次对象建立。第三步效率也不高。我这里经过优化,在DBFReader.java中添加一个get_long_efficiently_and_multiply_1000方法,将4个步骤合并为一步,经过一次扫描获得结果。
public long get_long_efficiently_and_multiply_1000(byte[] src, final int index) { long multiplicand = 3; long result =0; Field field = getFields()[index]; boolean in_decimal_part = false; boolean negative = false; int offset = field.getOffset(); int length = field.getLength(); int end = offset+length; for(int i =field.getOffset(); i< end; i++) { byte ch = src[i]; if(ch>=48 && ch<=57) //若是是数字 { result *= 10; result += ch-48; if(in_decimal_part) multiplicand--; if(multiplicand==0) break; continue; } if(ch==32) //若是是空格 continue; if(ch == 46) //若是是小数点 { in_decimal_part = true; continue; } if(ch == '-') //若是是负号 { negative = true; } throw new NumberFormatException(); } if(multiplicand == 3) result *= 1000; else if (multiplicand == 2) result *=100; else if (multiplicand == 1) result *=10; if(negative) { result= 0 - result; } return result; }
上面的算法负责读取字段转换为数字的同时,对它乘以1000。而且代码中尽可能优化了执行步骤。
对于整形的读取,咱们也进行了优化,添加一个get_long_efficiently:
public long get_long_efficiently(byte[] src, final int index) { long result =0; boolean negative = false; Field field = getFields()[index]; for(int i =field.getOffset(); i< field.getOffset()+ field.getLength(); i++) { byte ch = src[i]; if(ch>=48 && ch<=57) //若是是数字 { result = result*10 + (src[i]-48); continue; } if(src[i]==32) //若是是空格 continue; if(ch == '-') //若是是负号 { negative = true; } throw new NumberFormatException(); } if(negative) { result= 0 - result; } return result; }
以上的2个算法并不复杂,但却很是关键,一个dbf文件包含大约5000行,每行包括20~30个Float类型或者Int类型的字段,该优化涉及10万+个字段的读取。测试下来,这步改进将读取速度从50ms-70ms提高至15ms至20ms,细节在魔鬼当中,这是速度提高最快的一项优化。
(优化五的代码在改进的DBFReader中,上午中已经提供下载,这里再提供下载连接:DBFReader库 )
对5000多个行进行字段读取并转换成对象,采用多线程处理是最天然不过的优化方式。
通常咱们采用的方法是把任务分红等份的块,每一个线程处理一大块。好比,若是采用5个线程处理,那么把5000行分红1000个行一块,每一个线程处理一块。这样看貌似公平,其实否则,由于咱们的操做系统是分时操做系统,每一个线程开始工做的时间,占用的CPU时间片,和任务的强度都不彻底一致。等分的办法貌似平均,可是颇有可能致使有些线程完成工做了,另一些还有不少没作完。
这里介绍一种我喜欢的任务分配方式:每一个线程每次从5000个行的任务中申请一小块,好比16个行,完成后,再申请16个行。这样快的线程就会多工做些,慢的就少工做些,直到全部的行处理完毕。那么,这些线程怎么协调呢,任务分配岂不是要用到锁?不用锁,咱们采用CAS机制就能作到(实际用的是AtomicInteger,AtomicInteger就是基于CAS实现的),这里不解释太多了。看代码:
class ReaderTask implements Runnable { Collector collector; List<byte[]> recordList; CountDownLatch countDownLatch; AtomicInteger cursor; DBFReader reader; public ReaderTask(Collector collector, DBFReader dbfreader, List<byte[]> recordList, AtomicInteger cursor, CountDownLatch countDownLatch) { this.collector = collector; this.reader = dbfreader; this.recordList = recordList; this.cursor = cursor; this.countDownLatch = countDownLatch; } @Override public void run() { try { int length = recordList.size(); do { final int step = 16; //每次分配16行给该线程处理。 int endIndex = cursor.addAndGet(step); int startIndex = endIndex - step ; for (int i = startIndex; i < endIndex && i < length; i++) { byte[] row = recordList.get(i); MarketRealtimeData SHData = new MarketRealtimeData(); SHData.setMarketType(Constants.MARKET_SH_STOCK); SHData.setIdNum(reader.get_string_efficiently(row, 0)); SHData.setPrefix(reader.get_string_efficiently(row, 1)); SHData.setPreClosePrice(reader.get_long_efficiently_and_multiply_1000(row, 2)); SHData.setOpenPrice(reader.get_long_efficiently_and_multiply_1000(row, 3)); SHData.setTurnover(reader.get_long_efficiently_and_multiply_1000(row, 4)); SHData.setHighPrice(reader.get_long_efficiently_and_multiply_1000(row, 5)); SHData.setLowPrice(reader.get_long_efficiently_and_multiply_1000(row, 6)); SHData.setMatchPrice(reader.get_long_efficiently_and_multiply_1000(row, 7)); //读取全部的Field,如下省略若干行 //... ... //... ... if (collector != null) { collector.collect(SHData); } } } while (cursor.get() < length); } finally { if (countDownLatch != null) countDownLatch.countDown(); } } }
private void readHangqingFile(String path, String name) throws Exception { // Long t1 = System.nanoTime(); DBFReader dbfreader_SH = null; try { dbfreader_SH = new DBFReader(new File(path+File.separator + name)); List<byte[]> list_sh = dbfreader_SH.recordsWithOutDel_efficiently(cacheManager); AtomicInteger cursor = new AtomicInteger(0); //原子变量,用于线程间分配任务 CountDownLatch countDownLatch = new CountDownLatch(WORK_THREAD_COUNT); for (int i = 0; i < WORK_THREAD_COUNT - 1; i++) { //把任务分配给线程池多个线程 ReaderTask task = new ReaderTask(collector, dbfreader_SH, list_sh, cursor, countDownLatch); globalExecutor.execute(task); } new ReaderTask(collector, dbfreader_SH, list_sh, cursor, countDownLatch).run(); //当前线程本身也做为工做线程 countDownLatch.await(); //Long t2 = System.nanoTime(); //System.out.println("speed time on read and object:" + (t2 - t1)); } finally { if (dbfreader_SH != null) dbfreader_SH.close(); } }
测试代表,在使用4个线程并行处理的状况下,处理时间从15ms-20ms缩短至4ms-7ms。
在使用本文章介绍的全部优化方法,整个读取效率从耗时300ms以上,优化至5ms-10ms之间。咱们讨论的是从文件更新始,到完成文件读取,完成5000多个对象,100,000个字段的转换的总耗时。
若是继续深刻,咱们可能还有很多细节能够改进。测试代表,时延的稳定性还不够好,极可能是因为GC形成的,咱们还能够从减小对象的建立,以减小性能损耗,减小GC;而且控制GC执行的时间,让GC在空闲时执行等方面优化。
Binhua Liu原创文章,转载请注明原地址http://www.cnblogs.com/Binhua-Liu/p/5616761.html