Java
、PHP
、hdfs
、mqrocket
、excel
、poi
、报表
php
在业务需求方面,每一个企业或多或少都会有报表导出的做业,量少则但是使用输出流或者字符串的输出便可完成,只要指定respose的相应Content-Type便可。若是大量的数据须要导出,尤为是订单这类业务逻辑复杂的报表,导出的时候须要加入各类条件和权限,从数据处理方面就已经很费力了,更况且导出的需求不是一天两天,而是半月一月的数据量,小公司的业务,数量级也可能达到了十多万。html
function generateExcel($filename, $header, array &$data) { generateDownHeader($filename); $rs = '<table><tr>'; if (is_string($header)) { $header = explode(',', $header); } foreach ($header as $v) { $rs .= '<th>'.$v.'</th>'; } $rs .= '</tr>'; foreach ($data as $coll) { $rs .= '<tr>'; foreach ($coll as $v) { if (AppHelper::isDouble($v)) { $rs .= '<td style="vnd.ms-excel.numberformat:@">'.$v.'</td>'; } else { $rs .= '<td>'.$v.'</td>'; } } $rs .= '</tr>'; } $rs .= '</table>'; echo $rs; exit; } function generateDownHeader($filename) { header("Content-Type: application/force-download"); header("Content-Type: application/octet-stream"); header("Content-Type: application/download"); header('Content-Disposition:inline;filename="'.$filename.'"'); header("Content-Transfer-Encoding: binary"); header("Last-Modified: " . gmdate("D, d M Y H:i:s") . " GMT"); header("Cache-Control: must-revalidate, post-check=0, pre-check=0"); header("Pragma: no-cache"); }
这十多万的数据,若是使用通常的方法
(上面代码所示)或许是不可行的(其余通常方法没有尝试过),php处理中通常使用curl调用接口,nginx服务器和php中的curl请求超时通常都是30s,30s处理1w条数据的导出工做,若是服务器的性能好,而且是多核的,可使用multi_curl多线程处理,若是服务器的性能不是很好,这种处理方法或许更耗时。前端
下面是我使用的curl处理接口数据:java
function curl($url, $option = null, $method = 'POST', $getCode = false, $header = []) { $curl = curl_init (); curl_setopt($curl, CURLOPT_URL, $url); curl_setopt($curl, CURLOPT_TIMEOUT, 30); if (!array_key_exists('Content-Type', $header)) { $header['Content-Type'] = 'application/json;charset=UTF-8'; } $headers = []; if ($header) { foreach ($header as $k=>$v) { $headers[] = $k.': '.$v; } } curl_setopt($curl, CURLOPT_HTTPHEADER, $headers); if ($option) { if (is_array($option)) { $option = json_encode($option); } curl_setopt($curl, CURLOPT_POSTFIELDS, $option); } curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1); curl_setopt($curl, CURLOPT_CUSTOMREQUEST, $method); $result = curl_exec($curl); if ($getCode) { $curl_code = curl_getinfo($curl, CURLINFO_HTTP_CODE); $message = self::isJson($result) ? json_decode($result, true) : $result; $result = ['code' => $curl_code]; if (isset($message['exception']) && count($message) == 1) { $result['exception'] = $message['exception']; $result['result'] = null; } else { $result['result'] = $message; } } curl_close($curl); return $result; }
由于数据量大,后来改成多线程:nginx
function curlMulti(array $urls, $options = null, $method = 'POST', $getCode = false, $header = []) { $mh = curl_multi_init(); // 添加curl批处理会话 $handles = $contents = []; foreach ($urls as $key => $url) { $handles[$key] = curl_init($url); curl_setopt($handles[$key], CURLOPT_RETURNTRANSFER, 1); curl_setopt($handles[$key], CURLOPT_TIMEOUT, 30); curl_setopt($handles[$key], CURLOPT_CUSTOMREQUEST, $method); if (!array_key_exists('Content-Type', $header)) { $header['Content-Type'] = 'application/json;charset=utf-8'; } $headers = []; if ($header) { foreach ($header as $k => $val) { $headers[] = $k.': '.$val; } } curl_setopt($handles[$key], CURLOPT_HTTPHEADER, $headers); if ($options) { if (is_array($options)) { $options = json_encode($options); } curl_setopt($handles[$key], CURLOPT_POSTFIELDS, $options); } curl_multi_add_handle($mh, $handles[$key]); } // 执行批处理句柄 /*$active = null; do{ $mrc = curl_multi_exec($mh, $active); } while ($mrc == CURLM_CALL_MULTI_PERFORM); while ($active and $mrc == CURLM_OK) { if (curl_multi_select($mh) === -1) { usleep(100); do { $mrc = curl_multi_exec($mh, $active); }while($mrc == CURLM_CALL_MULTI_PERFORM); } }// 获取批处理内容 $errors = []; foreach ($handles as $k => $ch) { $errors[$k] = curl_error($ch); $content = curl_multi_getcontent($ch); if ($getCode) { $content = curl_errno($ch) == 0 && self::isJson($content)? json_decode($content,true) : []; } $contents = array_merge($contents,$content); } $info = curl_multi_info_read($mh);*/ $output = $errors = $infos = []; do { while (($execrun = curl_multi_exec($mh, $running)) == CURLM_CALL_MULTI_PERFORM); if ($execrun != CURLM_OK) break; while ($done = curl_multi_info_read($mh)) { $info= curl_getinfo($done['handle']); $infos['http_code'][] = $info['http_code']; $result['code'] = $info['http_code']; $infos['url'][] = $info['url']; $errors[] = curl_error($done['handle']); $output = self::isJson(curl_multi_getcontent($done['handle'])) ? array_merge($output, json_decode(curl_multi_getcontent($done['handle']),true)) : $output; if ($running) curl_multi_select($mh, 30); } } while ($running); $result['result'] = $output; $result['exception'] = $errors; $result['info'] = $infos; foreach ($handles as $ch) { curl_multi_remove_handle($mh, $ch); } curl_multi_close($mh); return $result; }
上面的代码中有一段代码是注释掉的,按照道理来讲,上面的代码执行的结果应该和下面的同样,事实证实,倒是执行的结果是同样,我这里说的结果不是多线程返回的结果,既然是多线程,那么不一样的线程竞争到资源也是不同的,返回结果出现了混乱,导出的excel数据并非根据某种排序而排序的,也就是你不知道那个线程先返回告终果,这是问题一,其二,在导出的过程当中,发现不一样程度的丢失数据,加热管每一个线程500条数据,结果在验证数据时,发现仅仅返回了300多条数据,数据变更不一致,第三,过多的数据,依然形成nginx服务器超时,错误code 504。git
PS: 为何在php的中没有使用phpexcel第三方包,缘由很简单,测试发现,phpexcel太耗内存,机器吃不消,因此就没用。github
既然php的多线程方案不能解决问题,只能找其余的办法,最可靠的也是你们都能想到的,就是队列处理,把导出请求放入到队列中,直接返回给客户端,告诉客户业务正在处理,而后具体的导出交由消费端处理,最后把结果反馈到客户端。web
咱们都知道php的队列有不少,经常使用的好比Swoole,Workman以及Gearman等。我选择了Gearman,由于方便,而Swoole原来在咱们的项目中,后来被踢掉了,不知起因。spring
Gearman服务端work的代码demo:apache
<?php /** * Created by PhpStorm. * User: zhoujunwen * Date: 16/7/12 * Time: 下午4:54 */ namespace console\controllers; use Yii; use common\extensions\AppHelper; use yii\console\Controller; class ExportController extends Controller { public function actionExport() { $worker = new \GearmanWorker(); $worker->addServer(); $worker->addFunction('export', function (\GearmanJob $job) { $workload = $job->workload(); if (($data = $this->parseJson($workload)) == false) { return AppHelper::encodeJson(['code' => '-1', 'result' => null, 'exception' => '参数错误']); } $user = isset($data['user']) && !empty($data['user']) ? $data['user'] : 'guest'; $path = dirname(Yii::$app->basePath) . '/backend/downloads/' . sha1($user) . '/' . date('Y-m-d') . '/'; $filename = isset($data['filename']) && !empty($data['filename']) ? $data['filename'] : date('Y-m-d') . '-order.xls'; $rs = $this->getData($data['type']['data'], $data['type']['count'], $data['api'], $data['params']); $this->writeExcel($path, $filename, $rs, $data['header']); return 200; }); //无际循环运行,gearman内部已有处理,不会出现占用太高死掉的状况 while ($worker->work()) { if ($worker->returnCode() !== GEARMAN_SUCCESS) { echo 'error' . PHP_EOL; } } } public function parseJson($str) { $data = json_decode($str, true); return (json_last_error() == JSON_ERROR_NONE) ? $data : false; } public function writeExcel($path, $filename, $data, $header) { if ($this->mkDir($path)) { $data = $this->assembleData($data); $rs = $this->generateExcel($header, $data); file_put_contents(rtrim($path, '/') . '/' . $filename, $rs); } else { echo '目录不存在,写文件错误!'; } return; } public function getData($dataApi, $countApi, $api, $params) { $start = microtime(true); $count = AppHelper::getData($api . $countApi . '?' . http_build_query($params)); echo $api . $countApi . '?' . http_build_query($params).PHP_EOL; echo '总条数:' . $count . PHP_EOL; $params['perpage'] = 500; $times = ceil($count / $params['perpage']); $data = []; if ($count > 0) { for ($i = 0; $i < $times; $i++) { $params['page'] = $i + 1; $rs = AppHelper::getData($api . $dataApi . '?' . http_build_query($params)); $data = array_merge($data, $rs); } } $end = microtime(true); echo "花费时间:" . ($end - $start) . PHP_EOL; return $data; } public function generateExcel($header, array &$data) { $rs = '<table><tr>'; if (is_string($header)) { $header = explode(',', $header); } foreach ($header as $v) { $rs .= '<th>' . $v . '</th>'; } $rs .= '</tr>'; foreach ($data as $coll) { $rs .= '<tr>'; foreach ($coll as $v) { if (AppHelper::isDouble($v)) { $rs .= '<td style="vnd.ms-excel.numberformat:@">' . $v . '</td>'; } else { $rs .= '<td>' . $v . '</td>'; } } $rs .= '</tr>'; } $rs .= '</table>'; unset($data); return $rs; } public function assembleData($rs) { $users = []; if ($rs) { $uids = array_column($rs, 'uid'); $us = Yii::$app->get('db')->createCommand('select uid,gender,adminflag,mobile,type from {{%user}} where uid in (' . implode(',', $uids) . ')')->queryAll(); if ($us && is_array($us)) { foreach ($us as $u) { $users[$u['uid']] = $u; } } } $content = []; foreach ($rs as $k => $v) { $data = AppHelper::decodeJson($v['data'], true); $status = '已删除'; if ($v['status'] == 0) { $status = '已关闭'; } elseif ($v['status'] == 1) { $status = '下单'; } elseif ($v['status'] == 2) { $status = '付款确认中'; } elseif ($v['status'] == 3) { $status = '已付款'; } elseif ($v['status'] == 4) { $status = '已发货'; } elseif ($v['status'] == 5) { $status = '已确认收货'; } elseif ($v['status'] == 6) { $status = '已评价'; } elseif ($v['status'] == 7) { $status = '支付价格与订单价格不一致'; } $refund = '未申请退款'; if (isset($v['refund'])) { if ($v['refund'] == 5) { $refund = '退款已到帐'; } elseif ($v['refund'] == 4) { $refund = '卖家已确认但需人工处理'; } elseif ($v['refund'] == 3) { $refund = '赞成退款'; } elseif ($v['refund'] == 2) { $refund = '拒绝退款'; } elseif ($v['refund'] == 1) { $refund = '退款申请中'; } elseif ($v['refund'] == 0) { $refund = '未申请'; } elseif ($v['refund'] == 6) { $refund = '退货退款申请中'; } elseif ($v['refund'] == 7) { $refund = '赞成退货申请'; } elseif ($v['refund'] == 8) { $refund = '拒绝退货申请'; } elseif ($v['refund'] == 9) { $refund = '买家退货已发出'; } elseif ($v['refund'] == 10) { $refund = '卖家确认收货'; } elseif ($v['refund'] == 11) { $refund = '收到货拒绝退款'; } elseif ($v['refund'] == 12) { $refund = '退货退款已到帐'; } } $gender = '未知'; if (isset($users[$v['uid']]) && $users[$v['uid']]['gender'] == 1) { $gender = '男'; } else if (isset($users[$v['uid']]) && $users[$v['uid']]['gender'] == 2) { $gender = '女'; } $type = '普通用户'; if (isset($users[$v['uid']]) && $users[$v['uid']]['adminflag'] == 3) { $type = '审核中的匠人'; } else if (isset($users[$v['uid']]) && $users[$v['uid']]['adminflag'] == 2) { $type = '种子用户'; } else if (isset($users[$v['uid']]) && $users[$v['uid']]['adminflag'] == 1) { $type = '管理员'; } $itype = '未设置/现货'; if (isset($data['type'])) { if ($data['type'] == 1) { $itype = '现货'; } else if ($data['type'] == 2) { $itype = '定制'; } else { $itype = '拍卖'; } } $utype = isset($users[$v['uid']]['type']) && $users[$v['uid']]['type'] == 4 ? '微信购买注册' : 'APP内注册'; $otype = !$v['otype'] ? 'APP内购买' : '微信购买'; $paytype = !$v['prepaytype'] ? 'APP内付款' : '微信付款'; $snapshot = AppHelper::getData(Yii::$app->params['imageServer'] . $v['snapshot']); $content[] = [date('Y/m/d H:i:s', floor($v['createtm'] / 1000)), $v['ooid'], isset($snapshot['item']['pid']) ? $snapshot['item']['pid'] : '', $v['iid'], $data['title'], $itype, (isset($v['parentCategory']) ? $v['parentCategory'] . '/' : '') . $v['category'], $v['craftsman'], $v['suid'], $v['quantity'], $v['username'], $utype, $v['uid'], $data['address'], $status, $refund, $data['price'], $v['realpay'], $otype, $paytype, isset($users[$v['uid']]['mobile']) ? $users[$v['uid']]['mobile'] : '未知', $gender, $type]; } return $content; } public function mkDir($path) { if (is_dir($path)) { echo '目录' . $path . '已存在!'; return true; } else { $res = mkdir($path, 0777, true); echo $res ? '目录建立成功' : '目录建立失败'; return $res; } } } }
Gearman的Client端的代码:
<?php ... public function exportExcel($str) { $client = new \GearmanClient(); $client->addServer('127.0.0.1', 4730); $client->setCompleteCallback(completeCallBack); $result2 = $client->doBackground('export', $str);//异步进行,只返回处理句柄。 // $result1 = $client->do('export', 'do');//do是同步进行,进行处理并返回处理结果。 // $result3 = $client->addTask('export', 'addTask');//添加任务到队列,同步进行?经过添加task能够设置回调函数。 // $result4 = $client->addTaskBackground('export', 'addTaskBackground');//添加后台任务到队列,异步进行? $client->runTasks();//运行队列中的任务,只是do系列不须要runTask() return $result2; } //绑定回调函数,只对addTask有效 function completeCallBack($task) { echo 'CompleteCallback!handle result:'.$task->data().'<br/>'; }
ps:要运行上面的代码,须要在服务器或者本地安装Gearman服务,而且须要安装php_gearman扩展,安装教程自行搜索。
若是你的业务逻辑不复杂,到此能够导出几万条数据绰绰有余了,然而,个人问题并无所以而解决,上司说,不想用Gearman队列处理,最好仍是java处理。嗯,不要紧,我喜欢这种在技术中跳来跳去的解决问题,既然不知足上司的需求,那就另行方案。
说明:这里用到的java项目都是基于spring+dubbo/dubbox的项目。所用到的配置或者注解均在spring的相关配置和注解范畴,除了mapper的配置和注解。
三个项目:
mq项目:提供rest服务,发送消息(@rxl)
biz项目:提供dubbo、restfull接口,处理业务(@lee)
data项目:处理数据导出
如上,三个项目分别是不一样的工程师所写,咱们不关心怎么实现的,只需知道咱们能使用每一个功能便可。
@Path("/message") @Produces({ContentType.APPLICATION_JSON_UTF_8}) @Component("sendMessageService") public class SendMessageImpl implements SendMessageService{ @Resource public IProducer producer; @PUT @Path("send") @Consumes({MediaType.APPLICATION_JSON}) @Override public void sendMessage(Message message) { System.out.println("message" + message.getMessage()); producer.send(message.getTopic(),message.getKey(),message.getMessage()); } }
这样咱们在php后台经过put方式,调用该接口,将须要处理的数据发送给导出处理服务端。发送put请求可使用curl强大的request功能。
curl_setopt($curl, CURLOPT_CUSTOMREQUEST, 'PUT');
假如mq提供的rest接口是:http://localhost:8018/mq/message/send
,咱们须要传递一个json字符串,该字符串原型是一个关联数组,数组的key分别为“topic”、“key”和“message”,topic是消息的主题,须要指定的mq主题去消费,key是消息的key,该topic下面会有不少key,所以,咱们的消费方即数据导出方须要根据key作判断处理。message里面就是具体的一下参数,好比须要导出哪些字段,好比文件上传服务器地址等等信息。
$message = [ 'topic' => 'order_export', 'key' => 'order_tag_' . $orderNo, 'message' => [ 'params' => [ ... ], 'headers' => [ ... ], 'options' => [ ... ], ], ];
完整的接口请求:
http://localhost:8018/mq/message/send?{"topic":"order_export","key":"order_tag_","message":{"params":[],"header":[],"options":[]}}
Java的Excel API不少,惟独Apache POI这款使用最方便最灵活(或许其余的没有使用过)。
HSSF is the POI Project's pure Java implementation of the Excel '97(-2007) file format. XSSF is the POI Project's pure Java implementation of the Excel 2007 OOXML (.xlsx) file format.
HSSF and XSSF provides ways to read spreadsheets create, modify, read and write XLS spreadsheets. They provide:
low level structures for those with special needs
an eventmodel api for efficient read-only access
a full usermodel api for creating, reading and modifying XLS files
在gradle引入poi包:
// java excel api compile 'org.apache.poi:poi:3.10.1' compile 'org.apache.poi:poi-ooxml:3.9'
package cn.test.web.utils; import cn.test.util.Utils; import org.apache.commons.io.FilenameUtils; import org.apache.poi.hssf.record.crypto.Biff8EncryptionKey; import org.apache.poi.hssf.usermodel.HSSFFont; import org.apache.poi.hssf.usermodel.HSSFFooter; import org.apache.poi.hssf.usermodel.HSSFHeader; import org.apache.poi.hssf.usermodel.HSSFWorkbook; import org.apache.poi.openxml4j.exceptions.InvalidFormatException; import org.apache.poi.poifs.filesystem.POIFSFileSystem; import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.CellStyle; import org.apache.poi.ss.usermodel.Font; import org.apache.poi.ss.usermodel.Footer; import org.apache.poi.ss.usermodel.Header; import org.apache.poi.ss.usermodel.Row; import org.apache.poi.ss.usermodel.Sheet; import org.apache.poi.ss.usermodel.Workbook; import org.apache.poi.ss.usermodel.WorkbookFactory; import org.apache.poi.xssf.usermodel.XSSFWorkbook; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Properties; /** * Created with test-data * User zhoujunwen * Date 16/8/11 * Time 下午5:02 */ public class POIUtils { private static final short HEADER_FONT_SIZE = 16; // 大纲字体 private static final short FONT_HEIGHT_IN_POINTS = 14; // 行首字体 public static Workbook createWorkbook(String file) { String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file)); Workbook wb = null; switch (ext) { case "xls": wb = createHSSFWorkbook(); break; case "xlsx": wb = createXSSFWorkbook(); break; default: wb = createHSSFWorkbook(); } return wb; } public static Workbook createWorkbookByIS(String file, InputStream inputStream) { String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file)); Workbook wb = null; try { switch (ext) { case "xls": wb = new HSSFWorkbook(inputStream); break; case "xlsx": wb = new XSSFWorkbook(inputStream); break; default: wb = new HSSFWorkbook(inputStream); } } catch (IOException e) { e.printStackTrace(); } return wb; } public static Workbook writeFile(Workbook wb, String file) { if (wb == null || Utils.isEmpty(file)) { return null; } FileOutputStream out = null; try { out = new FileOutputStream(file); wb.write(out); } catch (IOException e) { e.printStackTrace(); } finally { if (out != null) { try { out.close(); } catch (IOException e) { e.printStackTrace(); } } } return wb; } public static Workbook createHSSFWorkbook() { //生成Workbook HSSFWorkbook wb = new HSSFWorkbook(); //添加Worksheet(不添加sheet时生成的xls文件打开时会报错) @SuppressWarnings("unused") Sheet sheet = wb.createSheet(); return wb; } public static Workbook createXSSFWorkbook() { XSSFWorkbook wb = new XSSFWorkbook(); @SuppressWarnings("unused") Sheet sheet = wb.createSheet(); return wb; } public static Workbook openWorkbook(String file) { FileInputStream in = null; Workbook wb = null; try { in = new FileInputStream(file); wb = WorkbookFactory.create(in); } catch (InvalidFormatException | IOException e) { e.printStackTrace(); } finally { try { if (in != null) { in.close(); } } catch (IOException e) { e.printStackTrace(); } } return wb; } public static Workbook openEncryptedWorkbook(String file, String password) { FileInputStream input = null; BufferedInputStream binput = null; POIFSFileSystem poifs = null; Workbook wb = null; try { input = new FileInputStream(file); binput = new BufferedInputStream(input); poifs = new POIFSFileSystem(binput); Biff8EncryptionKey.setCurrentUserPassword(password); String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file)); switch (ext) { case "xls": wb = new HSSFWorkbook(poifs); break; case "xlsx": wb = new XSSFWorkbook(input); break; default: wb = new HSSFWorkbook(poifs); } } catch (IOException e) { e.printStackTrace(); } return wb; } /** * 追加一个sheet,若是wb为空且isNew为true,建立一个wb * * @param wb * @param isNew * @param type 建立wb类型,isNew为true时有效 1:xls,2:xlsx * @return */ public static Workbook appendSheet(Workbook wb, boolean isNew, int type) { if (wb != null) { Sheet sheet = wb.createSheet(); } else if (isNew) { if (type == 1) { wb = new HSSFWorkbook(); wb.createSheet(); } else { wb = new XSSFWorkbook(); wb.createSheet(); } } return wb; } public static Workbook setSheetName(Workbook wb, int index, String sheetName) { if (wb != null && wb.getSheetAt(index) != null) { wb.setSheetName(index, sheetName); } return wb; } public static Workbook removeSheet(Workbook wb, int index) { if (wb != null && wb.getSheetAt(index) != null) { wb.removeSheetAt(index); } return wb; } public static Workbook insert(Workbook wb, String sheetName, int row, int start, List<?> columns) { if (row == 0 || wb == null) return wb; for (int i = start; i < (row + start); i++) { Row rows = wb.getSheet(sheetName).createRow(i); if (columns != null && columns.size() > 0) { for (int j = 0; j < columns.size(); j++) { Cell ceil = rows.createCell(j); ceil.setCellValue(String.valueOf(columns.get(j))); } } } return wb; } /** * 设置excel头部 * * @param wb * @param sheetName * @param columns 好比:["国家","活动类型","年份"] * @return */ public static Workbook setHeader(Workbook wb, String sheetName, List<?> columns) { if (wb == null) return null; if (sheetName == null) { sheetName = wb.getSheetAt(0).getSheetName(); } return setHeaderStyle(insert(wb, sheetName, 1, 0, columns), sheetName); } /** * 插入数据 * * @param wb Workbook * @param sheetName sheetName,默认为第一个sheet * @param start 开始行数 * @param data 数据,List嵌套List ,好比:[["中国","奥运会",2008],["伦敦","奥运会",2012]] * @return */ public static Workbook setData(Workbook wb, String sheetName, int start, List<?> data) { if (wb == null) return null; if (sheetName == null) { sheetName = wb.getSheetAt(0).getSheetName(); } if (data != null || data.size() > 0) { if (data instanceof List) { int s = start; for (Object columns : data) { insert(wb, sheetName, data.size() - (s - 1), s, (List<?>) columns); s++; } } } return wb; } /** * 移除某一行 * * @param wb * @param sheetName sheet name * @param row 行号 * @return */ public static Workbook delRow(Workbook wb, String sheetName, int row) { if (wb == null) return null; if (sheetName == null) { sheetName = wb.getSheetAt(0).getSheetName(); } Row r = wb.getSheet(sheetName).getRow(row); wb.getSheet(sheetName).removeRow(r); return wb; } /** * 移动行 * * @param wb * @param sheetName * @param start 开始行 * @param end 结束行 * @param step 移动到那一行后(前) ,负数表示向前移动 * moveRow(wb,null,2,3,5); 把第2和3行移到第5行以后 * moveRow(wb,null,2,3,-1); 把第3行和第4行往上移动1行 * @return */ public static Workbook moveRow(Workbook wb, String sheetName, int start, int end, int step) { if (wb == null) return null; if (sheetName == null) { sheetName = wb.getSheetAt(0).getSheetName(); } wb.getSheet(sheetName).shiftRows(start, end, step); return wb; } public static Workbook setHeaderStyle(Workbook wb, String sheetName) { Font font = wb.createFont(); CellStyle style = wb.createCellStyle(); font.setBoldweight(HSSFFont.BOLDWEIGHT_BOLD); font.setFontHeightInPoints(FONT_HEIGHT_IN_POINTS); font.setFontName("黑体"); style.setFont(font); if (Utils.isEmpty(sheetName)) { sheetName = wb.getSheetAt(0).getSheetName(); } int row = wb.getSheet(sheetName).getFirstRowNum(); int cell = wb.getSheet(sheetName).getRow(row).getLastCellNum(); for (int i = 0; i < cell; i++) { wb.getSheet(sheetName).getRow(row).getCell(i).setCellStyle(style); } return wb; } public static Workbook setHeaderOutline(Workbook wb, String sheetName, String title) { if (wb == null) return null; if (Utils.isEmpty(sheetName)) { sheetName = wb.getSheetAt(0).getSheetName(); } Header header = wb.getSheet(sheetName).getHeader(); header.setLeft(HSSFHeader.startUnderline() + HSSFHeader.font("宋体", "Italic") + "打鸡血的口号!" + // 好比:爱我中华 HSSFHeader.endUnderline()); header.setCenter(HSSFHeader.fontSize(HEADER_FONT_SIZE) + HSSFHeader.startDoubleUnderline() + HSSFHeader.startBold() + title + HSSFHeader.endBold() + HSSFHeader.endDoubleUnderline()); header.setRight("时间:" + HSSFHeader.date() + " " + HSSFHeader.time()); return wb; } public static Workbook setFooter(Workbook wb, String sheetName, String copyright) { if (wb == null) return null; if (Utils.isEmpty(sheetName)) { sheetName = wb.getSheetAt(0).getSheetName(); } Footer footer = wb.getSheet(sheetName).getFooter(); if (Utils.isEmpty(copyright)) { copyright = "中华人民共和国"; // 版权信息,本身公司的名字或者app的名字 } footer.setLeft("Copyright @ " + copyright); footer.setCenter("Page:" + HSSFFooter.page() + " / " + HSSFFooter.numPages()); footer.setRight("File:" + HSSFFooter.file()); return wb; } public static String create(String sheetNm, String file, List<?> header, List<?> data, String title, String copyright) { Workbook wb = createWorkbook(file); if (Utils.isEmpty(sheetNm)) { sheetNm = wb.getSheetAt(0).getSheetName(); } setHeaderOutline(wb, sheetNm, title); setHeader(wb, sheetNm, header); setData(wb, sheetNm, 1, data); setFooter(wb, sheetNm, copyright); writeFile(wb, file); if (wb != null) { return file; } return null; } public static String getSystemFileCharset() { Properties pro = System.getProperties(); return pro.getProperty("file.encoding"); } // TODO 后面增长其余设置 }
Hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统。它和现有的分布式文件系统有不少共同点。但同时,它和其余的分布式文件系统的区别也是很明显的。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,很是适合大规模数据集上的应用。HDFS放宽了一部分POSIX约束,来实现流式读取文件系统数据的目的。HDFS在最开始是做为Apache Nutch搜索引擎项目的基础架构而开发的。HDFS是Apache Hadoop Core项目的一部分。
HDFS有着高容错性(fault-tolerant)的特色,而且设计用来部署在低廉的(low-cost)硬件上。并且它提供高吞吐量(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求(requirements)这样能够实现流的形式访问(streaming access)文件系统中的数据。
在gradle中引入hdfs:
// jersey compile 'com.sun.jersey:jersey-core:1.19.1' compile 'com.sun.jersey:jersey-server:1.19.1' compile 'com.sun.jersey:jersey-client:1.19.1' compile 'com.sun.jersey:jersey-json:1.19.1' // hadoop compile ('org.apache.hadoop:hadoop-common:2.7.2') { exclude(module: 'jersey') exclude(module: 'contribs') } compile ('org.apache.hadoop:hadoop-hdfs:2.7.2') { exclude(module: 'jersey') exclude(module: 'contribs') } compile ('org.apache.hadoop:hadoop-client:2.7.2') { exclude(module: 'jersey') exclude(module: 'contribs') }`
package cn.test.web.utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.IOUtils; import org.apache.poi.ss.usermodel.Workbook; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URISyntaxException; /** * Created with test-data * User zhoujunwen * Date 16/8/11 * Time 下午5:41 */ public class HDFSUtils { private static FileSystem fs = null; public static FileSystem getFileSystem(Configuration conf) throws IOException, URISyntaxException { fs = FileSystem.get(conf); //fs = FileSystem.newInstance(conf); return fs; } /** * 判断路径是否存在 * * @param conf * @param path * @return * @throws IOException */ public static boolean exits(Configuration conf, String path) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); return fs.exists(new Path(path)); } /** * 建立文件 * * @param conf * @param filePath * @param contents * @throws IOException */ public static void createFile(Configuration conf, String filePath, byte[] contents) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); Path path = new Path(filePath); FSDataOutputStream outputStream = fs.create(path); outputStream.write(contents, 0, contents.length); outputStream.hflush(); outputStream.close(); fs.close(); } /** * 建立文件 * * @param conf * @param filePath * @param fileContent * @throws IOException */ public static void createFile(Configuration conf, String fileContent, String filePath) throws IOException, URISyntaxException { createFile(conf, filePath, fileContent.getBytes()); } /** * 上传文件 * * @param conf * @param localFilePath * @param remoteFilePath * @throws IOException */ public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); Path localPath = new Path(localFilePath); Path remotePath = new Path(remoteFilePath); fs.copyFromLocalFile(true, true, localPath, remotePath); fs.close(); } /** * 删除目录或文件 * * @param conf * @param remoteFilePath * @param recursive * @return * @throws IOException */ public static boolean deleteFile(Configuration conf, String remoteFilePath, boolean recursive) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); boolean result = fs.delete(new Path(remoteFilePath), recursive); fs.close(); return result; } /** * 删除目录或文件(若是有子目录,则级联删除) * * @param conf * @param remoteFilePath * @return * @throws IOException */ public static boolean deleteFile(Configuration conf, String remoteFilePath) throws IOException, URISyntaxException { return deleteFile(conf, remoteFilePath, true); } /** * 文件重命名 * * @param conf * @param oldFileName * @param newFileName * @return * @throws IOException */ public static boolean renameFile(Configuration conf, String oldFileName, String newFileName) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); Path oldPath = new Path(oldFileName); Path newPath = new Path(newFileName); boolean result = fs.rename(oldPath, newPath); fs.close(); return result; } /** * 建立目录 * * @param conf * @param dirName * @return * @throws IOException */ public static boolean createDirectory(Configuration conf, String dirName) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); Path dir = new Path(dirName); boolean result = fs.mkdirs(dir); fs.close(); return result; } /** * 列出指定路径下的全部文件(不包含目录) * * @param fs * @param basePath * @param recursive */ public static RemoteIterator<LocatedFileStatus> listFiles(FileSystem fs, String basePath, boolean recursive) throws IOException { RemoteIterator<LocatedFileStatus> fileStatusRemoteIterator = fs.listFiles(new Path(basePath), recursive); return fileStatusRemoteIterator; } /** * 列出指定路径下的文件(非递归) * * @param conf * @param basePath * @return * @throws IOException */ public static RemoteIterator<LocatedFileStatus> listFiles(Configuration conf, String basePath) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(new Path(basePath), false); fs.close(); return remoteIterator; } /** * 列出指定目录下的文件\子目录信息(非递归) * * @param conf * @param dirPath * @return * @throws IOException */ public static FileStatus[] listStatus(Configuration conf, String dirPath) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); FileStatus[] fileStatuses = fs.listStatus(new Path(dirPath)); fs.close(); return fileStatuses; } /** * 读取文件内容并写入outputStream中 * * @param conf 配置 * @param filePath 文件路径 * @param os 输出流 * @return * @throws IOException */ public static void readFile(Configuration conf, String filePath, OutputStream os) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); Path path = new Path(filePath); try (FSDataInputStream inputStream = fs.open(path)) { Workbook wb = POIUtils.createWorkbookByIS(filePath, inputStream); wb.write(os); inputStream.close(); } finally { fs.close(); } } /** * 读取文件内容并返回 * @param conf * @param filePath * @return * @throws IOException * @throws URISyntaxException */ public static String readFile(Configuration conf, String filePath) throws IOException, URISyntaxException { String fileContent = null; FileSystem fs = getFileSystem(conf); Path path = new Path(filePath); InputStream inputStream = null; ByteArrayOutputStream outputStream = null; try { inputStream = fs.open(path); outputStream = new ByteArrayOutputStream(inputStream.available()); IOUtils.copyBytes(inputStream, outputStream, conf); byte[] lens = outputStream.toByteArray(); fileContent = new String(lens, "UTF-8"); } finally { IOUtils.closeStream(inputStream); IOUtils.closeStream(outputStream); fs.close(); } return fileContent; } }
对于hdfs我单独有谢了两个类,一个是HDFSFileUploader,一个是Configuration。如类名,前者用于文件上传,后者用于hdfs的配置。
package cn.test.web.utils.hadoop; import cn.test.common.log.Log; import cn.test.common.log.LogFactory; import cn.test.common.util.Utils; import cn.test.web.utils.HDFSUtils; import org.apache.commons.lang.NullArgumentException; import java.io.IOException; import java.net.URISyntaxException; import java.util.UUID; /** * Created with test-data * User zhoujunwen * Date 16/8/11 * Time 下午5:42 */ public class HDFSFileUploader { public static final byte FROM_LOCAL_COPY = 1; // 从本地上传文件 public static final byte FROM_CONTENT_WRITE = 2; // 读取字符串或字节,生成文件 private static final Log LOGGER = LogFactory.getLog(HDFSFileUploader.class); private static final String HDFS_SCHEMA = "hdfs://"; private static final String SEPARATOR = "/"; private static final String SUFFIX_PREFIX = "."; private static final int BUFFER_SIZE = 1024; private static final Configuration CONF = new Configuration(); /** * 上传二进制文件,使用默认配置的域名,随机生成文件名 * * @param path * @param suffix * @param contents * @return */ public static String upload(String path, String suffix, byte[] contents) { return upload(null, path, suffix, contents); } /** * 上传二进制文件,随机生成文件名 * * @param domain * @param path * @param suffix * @param contents * @return */ public static String upload(String domain, String path, String suffix, byte[] contents) { return upload(domain, path, null, suffix, contents); } /** * 上传二进制文件,指定文件名,只能经过流上传 * * @param domain * @param path * @param filename * @param suffix * @param content * @return */ public static String upload(String domain, String path, String filename, String suffix, final byte[] content) { return upload(domain, path, filename, suffix, new String(content), FROM_CONTENT_WRITE); } /** * 上传文件,默认域名和随机文件名 * * @param path * @param suffix * @param src * @return */ public static String upload(String path, String suffix, String src, byte fromLocal) { return upload(null, path, suffix, src, fromLocal); } /** * 上传文件到指定域名的指定目录,文件名随机生成 * * @param domain 域名,好比 10.25.126.28:9000 * @param path 文件路径,好比 /usr/local/com.hd.test/2016-08-08/ * @param suffix 文件后缀,好比 .xsl,xsl * @param src 文件内容,字符串 || 本地文件路径 * @return String 完整的文件名 */ public static String upload(String domain, String path, String suffix, String src, byte fromLocal) { return upload(domain, path, null, suffix, src, fromLocal); } /** * 上传文件,指定了域名,路径,文件名,后缀 * * @param domain 域名 * @param path 路径 * @param filename 文件名 * @param suffix 后缀 * @param src 内容 || 本地路径 * @return */ public static String upload(String domain, String path, String filename, String suffix, String src, byte fromLocal) { String filePath = getRealAddr(domain, path, suffix, filename); System.out.println(filePath); try { switch (fromLocal) { case FROM_LOCAL_COPY: HDFSUtils.copyFromLocalFile(CONF, src, filePath); break; case FROM_CONTENT_WRITE: HDFSUtils.createFile(CONF, src, filePath); break; } return filePath; } catch (IOException | URISyntaxException e) { LOGGER.warn("上传文件失败:{}",e.getMessage()); } return null; } /** * 文件完整的路径 * * @param domain 域名 * @param path 目录路径 * @param suffix 后缀 * @param filename 文件名 * @return */ private static String getRealAddr(String domain, String path, String suffix, String filename) { if (!Utils.isEmpty(domain) && !domain.startsWith(HDFS_SCHEMA)) { domain = HDFS_SCHEMA + domain; } else { domain = ""; } path = getPath(path); filename = getFilename(filename, suffix); return String.format("%s%s%s", domain, path, filename); } /** * 文件路径 * * @param path * @return */ private static String getPath(String path) { if (Utils.isEmpty(path)) { throw new NullArgumentException("path id null"); } if (!path.startsWith(SEPARATOR)) { path = SEPARATOR + path; } if (!path.endsWith(SEPARATOR)) { path = path + SEPARATOR; } return path; } /** * 生成文件名 * * @param filename * @param suffix * @return */ private static String getFilename(String filename, String suffix) { if (Utils.isEmpty(filename)) { filename = generateFilename(); } if (!Utils.isEmpty(suffix)) { filename = suffix.equals(SEPARATOR) ? filename : (filename.endsWith(suffix) ? filename : ((filename.endsWith(SUFFIX_PREFIX) || suffix.startsWith(SUFFIX_PREFIX)) ? filename + suffix : filename + SUFFIX_PREFIX + suffix)); } return filename; } /** * 生成文件名 * * @return */ private static String generateFilename() { return getUuid(false); } /** * 生成UUID * * @param isNeedHyphen * @return */ public static String getUuid(boolean isNeedHyphen) { UUID uuid = UUID.randomUUID(); String str = uuid.toString(); if (isNeedHyphen) { str = str.replaceAll("-", ""); } return str; } public static void setConfResource(final Configuration config) { CONF.addResource(config); } }
HDFSFileUploader中的一系列方法,用于上传不一样类型的文件,好比二进制文件,字符串等,还有hdfs的copy本地文件以及文件名uuid生成等方法。
package cn.test.web.utils.hadoop; import cn.test.web.utils.CommonUtils; import org.apache.commons.io.FilenameUtils; import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * Created with test-data * User zhoujunwen * Date 16/8/9 * Time 上午9:30 * 建议使用方法: * <bean id="hadoopConfig" class="cn.test.util.hadoop.Configuration"> * <property name="resources"> * <list> * <value>classpath:/spring/core-site.xml</value> * </list> * </property> * </bean> * 在使用的地方直接注入hadoopConfig: * * @Resource private Configuration hadoopConfig; */ public class Configuration extends org.apache.hadoop.conf.Configuration { private Resource[] resources; public void setResources(List<String> filenames) throws IOException { List<Resource> resources = new ArrayList<>(); if (filenames != null && filenames.size() > 0) { for (String filename : filenames) { filename = filename.trim(); String realName = getFileName(filename); String ext = FilenameUtils.getExtension(realName); if (ext.equals("xml")) { PathMatchingResourcePatternResolver pathMatchingResourcePatternResolver = new PathMatchingResourcePatternResolver(); try { Resource[] resourceList = pathMatchingResourcePatternResolver.getResources(filename); Collections.addAll(resources, resourceList); } catch (IOException e) { e.printStackTrace(); } } } } for (Resource resource : resources) { this.addResource(resource.getURL()); } } private String getFileName(String fileName) { return CommonUtils.getFileName(fileName); } }
这个类很简单,实际上是集成了hadoop的org.apache.hadoop.conf.Configuration类,目的是为了在spring配置文件中,灵活的指定hadoop的配置文件,所用到的就是org.apache.hadoop.conf.Configuration的addResource(String name)方法,下面是在spring xml中的配置。
<!-- hadoop配置 --> <bean id="hadoopConfig" class="cn.test.web.utils.hadoop.Configuration"> <property name="resources"> <list> <value>classpath:META-INF/hadoop/*.xml</value> </list> </property> </bean>
package cn.test.web.mq.consumer; ... // 不少依赖包 /** * Created with test-data * User zhoujunwen * Date 16/8/9 * Time 下午2:14 */ public class OrderExportHandler implements IMessageHandler<String, String> { private static final Log LOGGER = LogFactory.getLog(OrderExportHandler.class); private static final int MUL_SEC = 1000; private static final Gson GSON = new Gson(); @Value("${image_server}") private String imageServer; @Autowired private DataManager manager; @Override public void handle(final String key, final String message) { System.out.println("message" + message); Pattern p = Pattern.compile("-"); String[] skey = p.split(key); if (skey.length < 3) { return; } int res = insert(skey[1], skey[0], skey[2]); LOGGER.debug("主键:{}", res); if (res > 0) { //插入数据成功,执行导出数据逻辑 Map data = manager.parseData(message); List<?> header = null; List<?> content = null; List<Order> orders = null; DataExportLog log = new DataExportLog(); log.setDelid(res); log.setUid(Integer.valueOf(skey[2])); if (data.containsKey("params")) { LOGGER.debug("params:{}", data.get("params")); orders = manager.getOrders(data.get("params")); LOGGER.debug("导出数据的条数:{}", orders.size()); } if (orders == null || orders.size() == 0) { log.setStatus((byte) 4); } else if (data.containsKey("header") && (data.get("header") instanceof Map)) { Object obj = data.get("header"); Map<String, List> map = (obj instanceof Map) ? manager.parseHeader((Map<String, String>) obj) : null; if (map != null && map.size() > 0) { if (map.containsKey("header")) { header = getHeader(map.get("header")); } if (map.containsKey("key")) { content = getContent(orders, map.get("key")); } } // 调用hdfs 接口,上传文件 if (!Utils.isEmpty(header) || !Utils.isEmpty(content)) { // 生成excel文件 String fName = getFilename(data); String localFile = manager.writeExecelFile(fName, header, content, null, null); String file = manager.copyFileFromLocal(skey[0], localFile); if (Utils.isEmpty(localFile) || Utils.isEmpty(file)) { log.setStatus((byte) 3); } else { log.setStatus((byte) 1); log.setLink(file); } LOGGER.info("本地临时文件:{}", localFile); LOGGER.info("上传到hadoop服务器中的文件:{}", file); } } update(log); } } // TODO // 处理数据,这里面会调用biz项目的dubbo接口 // 具体的操做不在这里面写 }
订单导出逻辑都在上面的类,以及DataManager中进行处理,期间获取数据等接口则由biz项目的dubbo接口提供,具体业务逻辑在此不涉及。
下面会给出manager.writeExecelFile(fName, header, content, null, null);
方法和manager.copyFileFromLocal(skey[0], localFile);
方法的code:
public String writeExecelFile(String filename, List<?> header, List<?> datas, String title, String copyright) { SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd"); String date = sd.format(new Date()); if (Utils.isEmpty(filename)) { filename = HDFSFileUploader.getUuid(true) + this.ext; } String filePath = this.tmpDir + "/" + date + "/" + filename; filePath = filePath.replaceAll("//", "/"); File f = new File(CommonUtils.getFilePath(filePath)); if (!f.exists() && !f.isDirectory()) { f.mkdir(); } if (Utils.isEmpty(title)) { title = DEFAULT_TITLE; } if (Utils.isEmpty(copyright)) { copyright = this.copyright; } return POIUtils.create(null, filePath, header, datas, title, copyright); }
writeExecelFile
方法调用了poi的create方法,此时临时文件已生成。
还有一点须要说一下,好比临时路径,上传到hdfs的路径,版权信息等最好是在配置文件中可配置的,这就依赖予spring的org.springframework.beans.factory.config.PropertyPlaceholderConfigurer
类,他能够作到,咱们只须要在代码中这么写而且在properties文件中写入相应的配置便可:
@Value("${hdfs_upload_dir}") private String uploadDir; @Value("${file_tmp_dir}") private String tmpDir; @Value("${copyright}") private String copyright; @Value("${default_file_ext}") private String ext;
再看看copyFileFromLocal
这个方法:
/** * 写hdfs文件 * * @param type * @param file * @return */ public String copyFileFromLocal(String type, String file) { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); String date = format.format(new Date()); String path = this.uploadDir + type + '/' + date + '/'; HDFSFileUploader.setConfResource(hadoopConfig); return HDFSFileUploader.upload(path, this.ext, file, HDFSFileUploader.FROM_LOCAL_COPY); }
这个方法中调用了HDFSFileUploader.upload的方法,即上面展现的一个封装类中的方法。须要注意的是,这地方注入了hadoop的配置文件HDFSFileUploader.setConfResource(hadoopConfig);
。而hadoop得Configuration这样引入在DataMananager类中:
@Resource private Configuration hadoopConfig;
到此,咱们把生成的excel文件上传到了hdfs的指定文件路径。可使用hadoop客户端的命令查看:
hadoop fs -ls /cn/test/order/ (这里是上传路径)
订单导出,这里由java后端直接提供rest接口,若是使用php的hdfs第三方包phdfs(github),用起来并不那么书顺畅,编译时报错。
好吧,看看这个接口是怎么写的:
package cn.test.web.impl; import cn.test.common.log.Log; import cn.test.common.log.LogFactory; import cn.test.util.Utils; import cn.test.web.manager.DataManager; import cn.test.web.service.DownloadService; import cn.test.web.utils.CommonUtils; import com.alibaba.dubbo.rpc.protocol.rest.support.ContentType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import java.io.IOException; import java.net.URISyntaxException; /** * Created with test-data * User zhoujunwen * Date 16/8/16 * Time 下午5:21 */ @Path("download") @Component("downloads") @Produces({ContentType.APPLICATION_JSON_UTF_8}) public class DownloadServiceImpl implements DownloadService { private static final Log LOGGER = LogFactory.getLog(DownloadServiceImpl.class); @Autowired private DataManager manager; @Override @GET @Path("order") public void down(@Context HttpServletResponse response, @QueryParam("url") String url, @QueryParam("uid") Integer uid) { LOGGER.debug("下载地址:{}", url); if (Utils.isEmpty(url)) { return; } String filename = CommonUtils.getFileName(url); // 设置头部 response.setContentType(MediaType.APPLICATION_OCTET_STREAM); response.setContentType("application/vnd.ms-excel;charset=gb2312"); response.setHeader("Content-Disposition", "attachment;filename=" + filename); try { // 读取并写入下载数据 manager.readFile(url, response.getOutputStream()); response.flushBuffer(); } catch (IOException | URISyntaxException e) { LOGGER.error(e.getMessage()); } } }
PHP页面只须要一个超级连接便可。优化了一下,线上接口所有走内网的,所以,在a标签中不可能直接把该接口的ip暴露出去,所以在nginx服务器作了代理配置,只须要访问一个downloads/order?url=xxx&uid=xxx便可。
location /downloads/ { proxy_pass http://127.0.0.1:8086/presentation/download/; }
public List<Order> getOrders(Object params) { OrderSearch search = null; if (params != null && (params instanceof Map)) { System.out.println("params:" + params); search = GSON.fromJson(GSON.toJson(params), OrderSearch.class); System.out.println("title:" + search.getTitle()); } else { search = new OrderSearch(); } int count = orderService.searchCount(search); int cycleTimes = (int) Math.ceil(count * 1.0 / TIMES_IN_SIGNEL_PROCESSOR); LOGGER.debug("数据总条数count:{},外部循坏执行次数:times:{}", count, cycleTimes); // 获取全部并发任务的运行结果 List<Order> orders = new ArrayList<>(); int page = 0; for (int j = 0; j < cycleTimes; j++) { int signel = (count > TIMES_IN_SIGNEL_PROCESSOR) ? TIMES_IN_SIGNEL_PROCESSOR : count; count = count - signel; int poolNum = (int) Math.ceil(signel * 1.0 / LIMIT); LOGGER.debug("线程池数量:{}", poolNum); // 建立一个线程池 ExecutorService pool = Executors.newFixedThreadPool(poolNum); // 建立多个有返回值的任务 List<Future> list = new ArrayList<Future>(); for (int i = 0; i < poolNum; i++) { Callable c = new OrderExportCallable(i + "", ++page, LIMIT, orderService, search); // 执行任务并获取Future对象 Future f = pool.submit(c); list.add(f); } // 关闭线程池 pool.shutdown(); try { Thread.sleep(THREAD_SLEEP); } catch (InterruptedException e) { LOGGER.debug("线程休眠时,引发中断异常:{}", e.getMessage()); } for (Future f : list) { // 从Future对象上获取任务的返回值 try { orders.addAll((Collection<? extends Order>) f.get()); LOGGER.debug(">>>线程:{}返回的数据条数:{}", f.toString(), ((Collection<? extends Order>) f.get()).size()); } catch (InterruptedException | ExecutionException e) { LOGGER.warn("调用OrderService接口的search方法失败:{}", e.getMessage()); return null; } } } return orders; }
该方法是一个多线程调用dubbo接口,返回订单数据。在调用Callable c = new OrderExportCallable(i + "", ++page, LIMIT, orderService, search);
这个方法以后,发现每次获取的数据都是最好设定的过滤条件,好比分页,无论传入的page是1仍是2,假如最后一次传入的是5,那么起做用的就是5,并非1或者2,缘由到底出在哪里呢?通过打印日志,发现OrderExportCallable类中,传入的参数若是是对象,则是引用传递,无论哪个线程去修改,都会修改原来的对象属性值,所以,问题找到了,解决方法也就出来了。
/** * 多线程处理数据 */ class OrderExportCallable implements Callable<List<Order>> { private static final int THREAD_SLEEP = 1000; private String taskNum; private int page; private int limit; private OrderService orderService; private OrderSearch orderSearch; OrderExportCallable(String taskNum, int page, int limit, OrderService orderService, OrderSearch orderSearch) { this.taskNum = taskNum; this.page = page; this.limit = limit; this.orderService = orderService; this.orderSearch = orderSearch; } /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ @Override public List<Order> call() throws Exception { System.out.println(">>>" + taskNum + "任务启动"); Thread.sleep(THREAD_SLEEP); OrderSearch os = new OrderSearch(); BeanUtils.copyProperties(orderSearch, os); os.setPage(this.page); os.setLimit(this.limit); return orderService.search(os); } }
在OrderExportCallable从新new一个对象,把传入的对象属性copy到新建立的对象便可。至于为何选择Future,由于Future线程执行完会有返回结果。并且为了处理数据的顺序性,将Future对象加入到list,等待结果返回,依次处理返回结果。
若是数据量过大,超过并发线程单次请求的数量,则须要等待结果返回,从新建立线程。每次请求500条数据,若是有1万条,那么开20个线程,这样就有点不划算了,因此1万条数据分红两次执行,每次10个并发线程。因此,在建立线程的时候使用了两次for循环。
PS:这里须要优化的是如何让返回结果的线程不关闭,继续执行下一次请求,直到没有后续的请求再关闭线程,减小建立线程的资源消耗。
导出数据的时候,一开始想到的是把excel读取成流,转换成字符串直接由rest响应到前端,可是这个方法失败了,不管如何,导出的excel都是乱码。
问题:HDFS读取excel内容出现乱码
上面有相关尝试过的代码,各类经常使用流都尝试过,均失败了,就在我绝望的时候,上司@hsj帮我解决了此问题。在rest接口中看到这句code了吗?
manager.readFile(url, response.getOutputStream());
是的,就是传入一个response的output流,可是,仅仅这句话还不能解决此问题,继续往下看:
public void readFile(String filename, OutputStream os) throws IOException, URISyntaxException { if (HDFSUtils.exits(hadoopConfig, filename)) { HDFSUtils.readFile(hadoopConfig, filename, os); } }
就是上面封装的hdfs工具类的一个方法:
/** * 读取文件内容并写入outputStream中 * * @param conf 配置 * @param filePath 文件路径 * @param os 输出流 * @return * @throws IOException */ public static void readFile(Configuration conf, String filePath, OutputStream os) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); Path path = new Path(filePath); try (FSDataInputStream inputStream = fs.open(path)) { Workbook wb = POIUtils.createWorkbookByIS(filePath, inputStream); wb.write(os); inputStream.close(); } finally { fs.close(); } }
而后调用poi工具类的方法:
public static Workbook createWorkbookByIS(String file, InputStream inputStream) { String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file)); Workbook wb = null; try { switch (ext) { case "xls": wb = new HSSFWorkbook(inputStream); break; case "xlsx": wb = new XSSFWorkbook(inputStream); break; default: wb = new HSSFWorkbook(inputStream); } } catch (IOException e) { e.printStackTrace(); } return wb; }
咱们看到,最终调用了poi的createWorkbookByIS方法,而该方法仅仅作了一件事,就是根据文件扩展名建立了一个已有输入流的Workbook对象,而后readFile将调用Workbook对象的write方法,将输入流写入到输出列,而且response到request请求。同时,在rest接口中指定了请求响应的内容类型:response.setContentType("application/vnd.ms-excel;charset=gb2312");
。
public static Workbook createWorkbook(String file) { String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file)); Workbook wb = createSXSSFWorkbook(MEM_ROW); /*switch (ext) { case "xls": wb = createHSSFWorkbook(); break; case "xlsx": wb = createXSSFWorkbook(); break; default: wb = createHSSFWorkbook(); }*/ return wb; } public static Workbook createSXSSFWorkbook(int memRow) { Workbook wb = new SXSSFWorkbook(memRow); Sheet sheet = wb.createSheet(); return wb; }
使用SXSSFWorkbook建立wb对象。
java.lang.OutOfMemoryError:GC overhead limit exceeded填坑心得
java.lang.OutOfMemoryError: Java heap space解决方法
java解析获取Excel中的数据--同时兼容2003及2007
public static Workbook createWorkbookByIS(String file, InputStream inputStream) { String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file)); Workbook wb = null; try { wb = new XSSFWorkbook(inputStream); } catch (Exception e) { try { wb = new HSSFWorkbook(inputStream); } catch (IOException e1) { e1.printStackTrace(); } } return wb; }
PS:然而,这篇文章所描述的内容并无完全解决大数据导出问题,好比,此刻导出的数据若是达到上万条,CPU吃紧内存爆满(4核,32G),还有一个吻头疼的问题,导出4w+的数据须要3个小时,逆天了,这个还不是重点,重点是下载的慢的要死,4w的数据能导出1G之多。
优化之路才开始,下一篇《基于haddop的HDFS和Excel开源库POI导出大数据报表(二)》
若是您喜欢,或者能帮上您的忙,请收藏,若是您有更好的建议,请留言。若是您看到这句话而且不反感的话,请点个赞,给我一点码字的鼓励!