正在开发的项目提出了一个要求,要求Mule ESB流程轮询指定文件夹下的文本文件,逐行读取文本文件的内容,进行业务处理。java
咱们考虑使用File Connector,对指定文件夹进行轮询。File Connector使用org.mule.transport.file.FileMessageReceiver的poll方法,读取文件夹下的每个文件,将其推送给后续的处理节点。 git
考虑处处理的文件可能很大,读取文件和处理文件各行的操做咱们决定分开处理,使用了不一样的后台线程进行处理。各个后台线程共享同一个阻塞队列,以下图所示,其中读取文件的后台线程是图中的Thread 1,相似于生产者,它每读取文件的一行,就将该行放入阻塞队列中。而处理文件各行的线程是图中的Thread 2,相似于消费者,它们从阻塞队列里读出一行数据,进行处理。github
读写文件的后台线程咱们统一使用Spring的线程池进行管理。解析文件,将各行数据放入阻塞队列咱们只使用一个后台线程,从阻塞队列里读取数据使用5个后台线程,每一个后台线程从阻塞队列里读取一行数据,输出到控制台。spring
Mule的File Connector轮询指定文件夹,读取文件夹中符合要求的文件,将其转换为InputStream(实际类型是org.mule.transport.file.ReceiverFileInputStream),传送到File Parser节点。app
File Parser节点使用自定义的FileTransformer类接收传送的InputStream,进行文件读写。FileTransformer的代码以下 ide
public class FileTransformer extends AbstractMessageTransformer { //The blocking queue used to process file content. @Resource private ArrayBlockingQueue<String> contentQueue; //The thread pool executor to provide threads to process file content. @Autowired private ThreadPoolTaskExecutor queueExecutor; private Logger logger = LogManager.getLogger(FileTransformer.class); @Override public Object transformMessage(MuleMessage message, String outputEncoding) throws TransformerException { long startTime = System.currentTimeMillis(); //The file input stream sent by file connector. InputStream inputStream = (InputStream)message.getPayload(); PutContentTask putContentTask = new PutContentTask(contentQueue, inputStream); queueExecutor.execute(putContentTask); CountDownLatch threadSignal = new CountDownLatch(5); for(int i=0; i<5;i++) { String taskName = "ReadContentTask " + i; ReadContentTask readContentTask = new ReadContentTask(contentQueue, taskName, threadSignal); queueExecutor.execute(readContentTask); } try { threadSignal.await(); contentQueue.clear(); } catch (Exception e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } long endTime = System.currentTimeMillis(); long elaspeTime = endTime - startTime; return "The elapse time is:" + elaspeTime; } }
FileTransformer类中注入了一个Blocking Queue对象,用于存放扫描文件时得到的每一行文本,这个对象定义在Spring配置文件中。测试
<!--src/main/app/conf/applicationContext.xml--> <bean id="contentQueue" class="java.util.concurrent.ArrayBlockingQueue"> <constructor-arg value="1000"/> <constructor-arg value="true"/> </bean>
FileTransformer类中还注入了ThreadPoolTaskExecutor对象,用于发送解析文件和处理文件内容的Task。它也定义在Spring配置文件里。线程
<!--src/main/app/conf/applicationContext.xml--> <bean id="queueExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="10" /> <property name="maxPoolSize" value="20" /> <property name="queueCapacity" value="200" /> <property name="keepAliveSeconds" value="20000" /> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy" /> </property> </bean>
在transformMessage方法中,当File Connector推送文件Input Stream过来时,咱们构建一个PutContentTask,用于解析文件流将文件的各行文本数据放入Blocking Queue中。PutContentTask类的主要代码以下:code
public class PutContentTask implements Runnable { //This blocking queue is used to contain each line of file. private ArrayBlockingQueue<String> contentQueue = null; //The file input stream. private InputStream inputStream; .................... @Override public void run() { try { Scanner scanner = new Scanner(inputStream); while(scanner.hasNext()) { String line = scanner.nextLine(); contentQueue.put(line); } //Put EOF string to line to notify read task that we reach the end of file. contentQueue.put("EOF"); scanner.close(); } catch (InterruptedException e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } finally { //Close the file input stream so that mule can auto delete the file. if(inputStream != null) { try { inputStream.close(); } catch (IOException e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } } } }
PutContentTask在Input Stream解析结束后,会向Blocking Queue中放一个"EOF"标志,表示文件已经解析完成。读取队列数据的ReadContentTask在读取到这个标识后,会结束读取操做。orm
在将Input Stream交给PutContentTask去解析后,transformMessage方法建立了5个ReadContentTask,从Blocking Queue中读取ReadContentTask解析出的文件的每一行并作处理(我这个程序只是将读取的每一行输出到控制台,读者能够根据本身的需求编写相应的业务逻辑)。五个ReadContentTask使用CountDownLatch进行同步,等CountDownLatch计数为0时,File Connector推送的文件Input Stream解析和处理结束,统计总共的处理时间。
ReadContentTask类的主要代码以下:
public class ReadContentTask implements Runnable { //This blocking queue is used to contain each line of file. private ArrayBlockingQueue<String> contentQueue = null; //The task name of read content task(It identifies each read task). private String taskName; //The synchronous signal variable for each read task. private CountDownLatch signal = null; private Logger logger = LogManager.getLogger(ReadContentTask.class); @Override public void run() { while(true) { if(!contentQueue.isEmpty()) { try { String line = contentQueue.take(); if(line!=null && !line.equals("") && !line.equals("EOF")) { //process each line. System.out.println(taskName + " Line is:" + line); } else { //Notify other read task. contentQueue.put("EOF"); break; } } catch (InterruptedException e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } } } signal.countDown(); }
对于每一个ReadContentTask,从Blocking Queue中读取到"EOF"后,即视为文件已经解析完成,结束处理任务,同时向contentQueue中放入"EOF",通知其余ReadContentTask结束处理。
当全部ReadContentTask都结束处理后,transformMessage方法将清空Blocking Queue,等待下一次文件解析和处理。
try { threadSignal.await(); contentQueue.clear(); } catch (Exception e) { logger.error(ExceptionUtils.getFullStackTrace(e)); }
为了显示文件的解析和处理时间,FileTransformer类将计算出的elapse time放入mule message的payload中,咱们添加一个Logger对象显示处理时间。
测试的时候,咱们建立一个简单的csv文件students.csv(以下图所示),记录500个学生的学号,姓名和所属班级,放到File Connector监控的connectorTest目录下。
启动ESB项目,能够看到students.csv文件被解析,文件的每行被不一样的ReadContentTask输出到控制台,处理的时间是31ms
项目的代码上传到了github上,地址是
https://github.com/yosaku01/fileconnectortest