本系列的(1)和(2)介绍了如何修改File Connector的轮询机制,使得File Connector每次轮询只轮询一个文件。本文主要阐述如何对前面的实现作进一步的修改,这个修改有如下两点:java
1)如何自定义定义File Connector轮询文件时使用的Comparator。git
2)如何替换Blocking Queue为Mule ESB定义的VM Transport中的Queue.github
File Connector 默认的文件排序Comparator是查看file:inbound-endpoint中是否认义了comparator,若是定义了指定的comparator类,则按照comparator类中定义的排序规则对文件列表进行排序,不然就按照轮询文件时文件放入列表的顺序。app
FileMessageReceiver.java异步
protected Comparator<File> getComparator() throws Exception { Object comparatorClassName = getEndpoint().getProperty(COMPARATOR_CLASS_NAME_PROPERTY); if (comparatorClassName != null) { Object reverseProperty = this.getEndpoint().getProperty(COMPARATOR_REVERSE_ORDER_PROPERTY); boolean reverse = false; if (reverseProperty != null) { reverse = Boolean.valueOf((String) reverseProperty); } Class<?> clazz = endpoint.getMuleContext().getExecutionClassLoader().loadClass(comparatorClassName.toString()); Comparator<?> comparator = (Comparator<?>)clazz.newInstance(); return reverse ? new ReverseComparator(comparator) : comparator; } return null; }
Mule ESB提供了一个按照文件修改时间进行排序的Comparator类OlderFirstComparator,若是咱们想根据文件修改时间进行轮询能够引用这个Comparator类。async
public class OlderFirstComparator implements Comparator { public int compare(Object o1, Object o2) { if (o1 instanceof File && o2 instanceof File) { File f = (File) o1; File f1 = (File) o2; boolean fileNewer = FileUtils.isFileNewer(f, f1); boolean fileOlder = FileUtils.isFileOlder(f, f1); if (!fileNewer && !fileOlder) { return 0; } else if (fileNewer) { return 1; } else { return -1; } } throw new IllegalArgumentException(MessageFormat.format( "Expected java.io.File instance, but was {0} and {1}", ClassUtils.getShortClassName(o1, "<null>"), ClassUtils.getShortClassName(o2, "<null"))); } }
若是咱们想按照文件名对文件进行轮询,须要自定义Comparator类,而且把这个Compartor类指定给file:inbound-endpoint的compartor属性。ide
咱们参照OlderFirstComparator类,定义FileNameSeparator类函数
public class FileNameSeparator implements Comparator<File> { public int compare(File file1, File file2) { String fileName1 = file1.getName(); String fileName2 = file2.getName(); return fileName1.compareToIgnoreCase(fileName2); } }
这里只比较了文件名,若是还须要比较文件夹名,能够自行添加相应代码。优化
最后咱们设置FileNameSeparator类到file:inbound-endpoint上 this
这里的Reverse Order复选框勾上,表示按照FileNameSeparator的逆序进行文件排序。设置后生成的file:inbound-endpoint的xml文档以下:
<file:inbound-endpoint path="D:\connectorTest" responseTimeout="10000" doc:name="File" connector-ref="input" comparator="fileconnectortest.FileNameSeparator" reverseOrder="true"/>
咱们还使用students1.csv和students2.csv两个文件,运行ESB项目,查看运行日志。
从日志能够看出,按文件名逆序排列,students2.csv文件先被处理,再是students1.csv文件被处理。
这个系列中使用Blocking Queue对象(ArrayBlockingQueue)存储文件解析后的各行文本内容,分别使用PutContentTask和ReadContentTask对Blocking Queue进行写和读。这样作有如下缺点:
1)Blocking Queue与读写Task造成了紧耦合关系,每新建一个Task,都必须传给它一个Blocking Queue引用,这样不利于代码解耦。
2)定义和管理Blocking Queue过于繁琐,若是须要使用多个Blocking Queue,就须要在applicationContext.xml文件中定义多个Blocking Queue的bean对象。
3)不容易对Blocking Queue的读写线程进行优化配置。
如今对Blocking Queue进行读写的线程都是自定义的Task,放到指定线程池里执行,若是要对读写线程进行优化配置,就须要在applicationContext.xml文件中定义的
线程池进行配置。若是想访问多个Blocking Queue,且读写操做分别使用不一样的线程池设置,须要定义和配置多个线程池对象,这样的操做会很是繁琐。
4)Blocking Queue的数据没有实现持久化。
VM Queue是Mule ESB定义的VM Transport定义的内部Queue队列,用于不一样Flow之间进行信息传递,能够理解成Mule ESB项目内部的Message Queue,因为是内部MQ,效率要比外部的MQ通讯效率高,Mule的组件能够像与MQ通讯同样,向VM Queue发送消息,或者从VM Queue获取消息,组件不必定与VM Endpoint相连。这样就实现了VM Queue与Mule 逻辑组件之间的松耦合。并且Mule ESB还能够经过receiver-thread-profile和dispatcher-thread-profile设置VM Queue的消息接收者和发送者的线程数。
基于VM Queue,咱们对系列的项目作如下修改
1)从applicationContext.xml文件中去掉Blocking Queue的定义。修改FileTransformer类,去掉对ReadContentTask的使用。
public Object transformMessage(MuleMessage message, String outputEncoding) throws TransformerException { logger.info("Process File"); try { muleContext.getRegistry().unregisterObject("processCompleteFlag"); muleContext.getRegistry().registerObject("processCompleteFlag", "false"); } catch (RegistrationException e) { logger.error(ExceptionUtils.getFullStackTrace(e)); } //The file input stream sent by file connector. InputStream inputStream = (InputStream)message.getPayload(); PutContentTask putContentTask = new PutContentTask(muleContext, inputStream); queueExecutor.execute(putContentTask); return null; }
2)在ESB项目中新建一个Flow,在Source部分拖入一个VM inbound endpoint,设置它的Path为data,这个VM Queue的访问地址即为vm://data
3) 修改PutContentTask类,去掉原有的Blocking Queue属性,添加MuleContext属性muleContext。在构造函数中传入muleContext。解析文件时将每行文本信息封装到Mule Message中,发送到VM Queue。
public class PutContentTask implements Runnable { //Current mule context. private MuleContext muleContext; ......... public PutContentTask(MuleContext muleContext, InputStream inputStream) { this.muleContext = muleContext; this.inputStream = inputStream; } @Override public void run() { try { MuleClient muleClient = muleContext.getClient(); Scanner scanner = new Scanner(inputStream); DefaultMuleMessage newMessage = null; while (scanner.hasNext()) { String line = scanner.nextLine(); //Wrapper each line content into mule message and send to vm queue. newMessage = new DefaultMuleMessage(line, muleContext); muleClient.send("vm://data", newMessage); } //Wrapper EOF content into mule message to notify the reader component instance //the file has been parsed completely. DefaultMuleMessage endMessage = new DefaultMuleMessage("EOF", muleContext); muleClient.send("vm://data", endMessage); scanner.close();
关于发送Mule Message到VM Queue有两点须要注意:
4) 删除ReadContentTask类,在VM inbound endpoint后建立一个自定义Component类ReadContentComponent
ReadContentComponent类的代码以下:
public class ReadContentComponent implements Callable { private Logger logger = LogManager.getLogger(ReadContentComponent.class); @Override public Object onCall(MuleEventContext eventContext) throws Exception { MuleMessage muleMessage = eventContext.getMessage(); MuleContext muleContext = eventContext.getMuleContext(); String threadName = Thread.currentThread().getName(); if(muleMessage !=null && muleMessage.getPayload() != null) { String message = muleMessage.getPayloadAsString(); if(message.equals("EOF")) { muleContext.getRegistry().unregisterObject("processCompleteFlag"); muleContext.getRegistry().registerObject("processCompleteFlag", "true"); logger.info("Set Complete Flag True"); } else { System.out.println(threadName + " Line is:" + message); } } return eventContext; } }
5) 配置VM inbound endpoint引用的VM Connector,咱们自定义了一个VM Connector memoryStore,并把它赋给VM inbound endpoint
<vm:connector name="memoryStore" doc:name="VM" validateConnections="true"> <vm:queue-profile> <simple-in-memory-queue-store/> </vm:queue-profile> </vm:connector> <flow name="Read_Data_Flow"> <vm:inbound-endpoint exchange-pattern="one-way" path="data" doc:name="VM" connector-ref="memoryStore"/>
memoryStore的queue使用memory store,即queue发送和接收的Mule Message存在于内存中,当ESB项目终止后Mule Message消息会丢失。咱们也能够设置queue的profile为持久化的store,例以下图中的 file-queue-store或者default-persistent-queue-store.
配置好后,咱们拷贝文件到监控目录,启动ESB项目,能够看到文件被正常解析和处理。
从输出的结果日志中的Thread Name(Read_Data_Flow.stage1.xxx)能够看出,Read_Data_Flow使用了多个线程接收和处理Mule Message。默认Read_Data_Flow使用最大16个线程对VM Queue的Mule Message进行接收和处理。咱们能够设置这个线程数。咱们在流程文件中定义一个Flow Processing Strategy,定义最大线程数为4。
<queued-asynchronous-processing-strategy name="singleThreadStrategy" maxThreads="4" minThreads="4" poolExhaustedAction="WAIT" doc:name="Queued Asynchronous Processing Strategy" />
运行ESB项目,使用jconsole查看Read_Data_Flow相关的线程,能够看到确实有4个线程。
能够看出线程名为stage1.01的线程的堆栈和其余三个线程的堆栈不同,这是由于它们执行的是不一样的操做。
若是修改log4j2.xml,将org.mule,org.mulesoft的日志级别从INFO改成DEBUG,能够从日志文件看到每一个线程启动的Work信息
com.mulesoft.mule.config.pool.MonitoredThreadPoolExecutor - Starting Work: org.mule.processor.SedaStageInterceptingMessageProcessor in Thread[[fileconnectortest].Read_Data_Flow.stage1.01,5,main]. Active tasks: 1 (1 threads in a pool) com.mulesoft.mule.config.pool.MonitoredThreadPoolExecutor - Starting Work: org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@747140de in Thread[[fileconnectortest]. Read_Data_Flow.stage1.02,5,main]. Active tasks: 2 (2 threads in a pool) com.mulesoft.mule.config.pool.MonitoredThreadPoolExecutor - Starting Work: org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@6cad0545 in Thread[[fileconnectortest]. Read_Data_Flow.stage1.03,5,main]. Active tasks: 3 (3 threads in a pool) Starting Work: org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@21c436f5 in Thread[[fileconnectortest]. Read_Data_Flow.stage1.04,5,main]. Active tasks: 4 (4 threads in a pool)
咱们能够看到stage1.01激活的Task相关的Message Processor是SedaStageInterceptingMessageProcessor,而其余三个线程相关的是AsyncMessageProcessorWorker,也就是说第一个线程负责从VM Queue中读取Mule Message,并将Message推送给后续的Component,然后面三个线程负责接收Mule Message并处理Mule Message。这体现了Mule ESB使用的SEDA机制(以下图所示,此图参考Mule in Action 第二版11.1)
stage1.01线程是从Event Queue中获取Mule Event对象(Mule Message),stage1.02,stage1.03,stage1.04线程则位于Processing Stage,接收stage1.01取出的Mule Event对象并处理。
了解了Read_Data_Flow的线程分工原理,若是咱们要设置Read_Data_Flow对VM Queue中取出的Mule Message作单线程处理,queued-asynchronous-processing-strategy中配置的线程数最小值必须是2,不然只有获取Mule Event的线程运行,而没有处理Mule Event的线程存在。咱们也能够直接配置Synchronous Processing Strategy给Read_Data_Flow,保证整个Flow单线程处理。
<flow name="Read_Data_Flow" processingStrategy="synchronous">
这时接收Mule Event,处理消息的线程是queue executor
2016-08-11 17:07:01,737 [queueExecutor-4] DEBUG fileconnectortest.ReadContentComponent - queueExecutor-4 Line is:20160205,Student205,Class5
修改的代码上传到fileconnectortest_vm分支
https://github.com/yosaku01/fileconnectortest/tree/fileconnectortest_vm