本系列(1)中已经实现了对指定文件夹下的文件的轮询和处理。可是根据File Connector的FileMessageReceiver的poll方法代码java
@Override public void poll() { try { List<File> files = this.listFiles(); if (logger.isDebugEnabled()) { logger.debug("Files: " + files.toString()); } Comparator<File> comparator = getComparator(); if (comparator != null) { Collections.sort(files, comparator); } for (File file : files) { ..................
从上述代码能够看出File Connector是轮询指定文件夹下的全部文件,逐一送到FileTransformer进行处理,可是这样作存在一个问题,因为每一个ReadContentTask是从Blocking Queue从读取"EOF"来判断是否文件解析结束,然而在多个文件的状况下,可能出现PutContentTask解析完一个文件后,继续解析后续送来的Input Stream,将新的文件内容放入Blocking Queue中,从而致使ReadContentTask从Blocking Queue中继续读取非"EOF"的内容,再进行处理。
这样就出现了多个文件的内容混杂在一块儿被处理的状况,若是这些被处理的文件存在先后关联,这样的混杂处理就会出现逻辑错误。所以咱们须要将File Connector每次轮询的文件个数缩减到1个。ide
网上的文章有推荐重载FileMessageReceiver的poll方法,但实际一试,发现这方法并不现实,由于poll方法引用了不少FileMessageReceiver的private的方法和属性,若是真要重载poll方法,须要在子类从新定义这些方法和属性,增长了开发工做量。因为poll方法是先引用listFiles方法,先扫描指定文件夹下的全部文件,放入列表,传给poll方法,poll方法再遍历文件列表,进行处理,而listFiles方法实际是调用basicListFiles方法进行文件夹文件扫描的,所以咱们选择了重载basicListFiles方法,使得每次轮询时,送给poll方法的文件列表只包含了单个文件,这样就实现了单文件轮询的要求。ui
咱们自定义了FileMessageReceiver重载basicListFiles方法this
@Override protected void basicListFiles(File currentDirectory, List<File> discoveredFiles) { File[] files; Filter filter = endpoint.getFilter(); //Filter the file or directory according to setting filter conditions. if ( filter instanceof FileFilter) { files = currentDirectory.listFiles((FileFilter)filter); } else if(filter instanceof FilenameFilter) { files = currentDirectory.listFiles((FilenameFilter)filter); } else { files = currentDirectory.listFiles(); } // the listFiles calls above may actually return null (check the JDK code). if (files == null || files.length == 0) { logger.info("No Available Files"); logger.info("Process End"); return; } List<File> fileList =new ArrayList<File>(); scanFiles(currentDirectory, fileList); //Sort the scanned file list according to specified comparator. if(fileList.size() > 0) { //Compare all the scanned files. if(fileList.size() >1) { Comparator<File> comparator = null; try { comparator = getComparator(); if (comparator != null) { Collections.sort(fileList, comparator); } } catch (Exception e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } } discoveredFiles.clear(); //Add the required number's files. for(int i=0; i<1; i++) { discoveredFiles.add(fileList.get(i)); } } } /** * This method accesses the directory recursively, gets all the files under this directory. * @param currentDirectory * The special file directory. * @param fileList * The scanned file list. */ private void scanFiles(File currentDirectory, List<File> fileList) { File[] files; Filter filter = endpoint.getFilter(); //Filter the file or directory according to setting filter conditions. if (filter instanceof FileFilter) { files = currentDirectory.listFiles((FileFilter)filter); } else if(filter instanceof FilenameFilter) { files = currentDirectory.listFiles((FilenameFilter)filter); } else { files = currentDirectory.listFiles(); } if (files == null || files.length == 0) { return; } for(File file:files) { if (!file.isDirectory()) { fileList.add(file); } else { if (((FileConnector)connector).isRecursive()) { this.scanFiles(file, fileList); } } } }
咱们定义了循环迭代访问文件夹,获取文件夹下全部文件的方法scanFiles,basicListFiles方法即调用这个方法获取轮询文件夹下的全部文件列表。spa
咱们是否能够直接从这个文件列表获取单个文件列表discoveredFiles,返回给poll方法?回答是否认的,由于这个文件列表尚未通过排序。在默认的FileMessageReceiver中,因为basicListFiles方法轮询文件夹下全部文件返回给poll方法,由poll方法排序后再进行文件读取。线程
@Override public void poll() { ......... Comparator<File> comparator = getComparator(); if (comparator != null) { Collections.sort(files, comparator); }
然而因为咱们返回的是的是单个文件列表,poll方法里的排序将不会有效果,因此咱们必须在重载的basicListFiles方法中添加相应的排序代码(上面代码已经列出,再也不赘述)debug
在自定义FileMessageReceiver类后,咱们还须要让流程中的File inbound endpoint使用咱们定义的FileMessageReceiver类。咱们对流程文件作下列修改:日志
<!--自定义全局File Connector,使用自定义的FileMessageReceiver类覆盖了默认的FileMessageReceiver类 --> <file:connector name="input" doc:name="File" recursive="true" > <service-overrides messageReceiver="fileconnectortest.CustomFileMessageReceiver"/> </file:connector> <flow name="fileconnectortestFlow" > <!--file inbound endpoint引用自定义的File Connector,使用了自定义的FileMessageReceiver类 --> <file:inbound-endpoint path="D:\connectorTest" responseTimeout="10000" doc:name="File" connector-ref="input"/> </flow>
最后还有一个问题须要解决,因为FileMessageReceiver和FileTransformer分别是不一样的线程执行,当FileTransformer解析处理完前一个文件流时,如何通知FileMessageReceiver轮询下一个文件?咱们使用了设置Mule上下文环境变量的办法。 code
1. 在流程文件中定义一个Mule环境变量processCompleteFlag,初始值为trueorm
<global-property name="processCompleteFlag" value="true" doc:name="Global Property"/>
2.修改重载的basicListFiles方法,加入环境变量检查的代码。
@Override protected void basicListFiles(File currentDirectory, List<File> discoveredFiles) { MuleContext muleContext = this.getFlowConstruct().getMuleContext(); String processFlag = muleContext.getRegistry().get("processCompleteFlag"); if(processFlag.equals("false")) { logger.info("Not Processed"); return; } ................. if (files == null || files.length == 0) { try { muleContext.getRegistry().unregisterObject("processCompleteFlag"); muleContext.getRegistry().registerObject("processCompleteFlag", "true"); } catch (RegistrationException e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } logger.info("No Available Files"); logger.info("Process End"); return; } ......... if(fileList.size() > 0) { .......... } else { try { muleContext.getRegistry().unregisterObject("processCompleteFlag"); muleContext.getRegistry().registerObject("processCompleteFlag", "true"); } catch (RegistrationException e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } logger.info("No Available Files"); logger.info("Process End"); }
basicListFiles方法在轮询指定文件夹前,会先读取processCompleteFlag环境变量,若是变量为false,说明FileTransformer在处理文件中,没必要对文件夹进行轮询。若是值为true,则说明没有文件在被处理中,能够对文件夹进行轮询。 若是轮询过程当中出现轮询文件夹为空或者轮询文件夹下只有文件夹,没有文件,则返回空文件列表给poll方法,而且设置processCompleteFlag为true,以便进行下一次轮询。
3.修改FileTransformer的transformMessage方法,添加设置processCompleteFlag的代码。
@Override public Object transformMessage(MuleMessage message, String outputEncoding) throws TransformerException { logger.info("Process File"); try { muleContext.getRegistry().unregisterObject("processCompleteFlag"); muleContext.getRegistry().registerObject("processCompleteFlag", "false"); } catch (RegistrationException e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } ..... long endTime = System.currentTimeMillis(); long elaspeTime = endTime - startTime; try { muleContext.getRegistry().unregisterObject("processCompleteFlag"); muleContext.getRegistry().registerObject("processCompleteFlag", "true"); logger.info("Set Complete Flag To True"); } catch (RegistrationException e) { logger.error(ExceptionUtils.getFullStackTrace(e)); }
FileTransformer在处理文件流以前,先设置processCompleteFlag为false,避免FileMessageReceiver没必要要的轮询。在当前文件流处理完毕后(全部ReadContentTask都结束任务后),再设置processCompleteFlag为true,触发FileMessageReceiver从新开始轮询。
咱们仍然使用系列(1)中使用过的students.csv文件,更名为students1.csv,再建立一个名叫students2.csv的文件,学号从20160501开始,包含500个学生的信息。咱们把这两个文件拷贝到轮询目录D:\connectorTest下。
启动修改后的ESB项目,运行结果以下:
Console窗口输出以下:
打开对应的日志文件([Mule Workspace Directory]/.mule/logs/fileconnectortest.log文件)
从日志文件能够看出,FileMessageReceiver先轮询了students1.csv文件,直到FileTransformer类处理完students1.csv文件,设置Flag变量为true后才开始轮询students2.csv文件,证实咱们实现了File Connector一次轮询一个文件的操做。