Mule ESB File Connector轮询单个文件的实现(3)

本系列的(1)和(2)介绍了如何修改File Connector的轮询机制,使得File Connector每次轮询只轮询一个文件。本文主要阐述如何对前面的实现作进一步的修改,这个修改有如下两点:java

1)如何自定义定义File Connector轮询文件时使用的Comparator。git

2)如何替换Blocking Queue为Mule ESB定义的VM Transport中的Queue.github

File Connector 自定义Comparator

File Connector 默认的文件排序Comparator是查看file:inbound-endpoint中是否认义了comparator,若是定义了指定的comparator类,则按照comparator类中定义的排序规则对文件列表进行排序,不然就按照轮询文件时文件放入列表的顺序。app

FileMessageReceiver.java异步

protected Comparator<File> getComparator() throws Exception
{
	Object comparatorClassName = getEndpoint().getProperty(COMPARATOR_CLASS_NAME_PROPERTY);
	if (comparatorClassName != null)
	{
		Object reverseProperty = this.getEndpoint().getProperty(COMPARATOR_REVERSE_ORDER_PROPERTY);
		boolean reverse = false;
		if (reverseProperty != null)
		{
			reverse = Boolean.valueOf((String) reverseProperty);
		}

		Class<?> clazz = endpoint.getMuleContext().getExecutionClassLoader().loadClass(comparatorClassName.toString());
		Comparator<?> comparator = (Comparator<?>)clazz.newInstance();
		return reverse ? new ReverseComparator(comparator) : comparator;
	}
	return null;
}

Mule ESB提供了一个按照文件修改时间进行排序的Comparator类OlderFirstComparator,若是咱们想根据文件修改时间进行轮询能够引用这个Comparator类。async

public class OlderFirstComparator implements Comparator
{
    public int compare(Object o1, Object o2)
    {
        if (o1 instanceof File && o2 instanceof File)
        {
            File f = (File) o1;
            File f1 = (File) o2;
            boolean fileNewer = FileUtils.isFileNewer(f, f1);
            boolean fileOlder = FileUtils.isFileOlder(f, f1);
            if (!fileNewer && !fileOlder)
            {
                return 0;
            }
            else if (fileNewer)
            {
                return 1;
            }
            else
            {
                return -1;
            }

        }
        throw new IllegalArgumentException(MessageFormat.format(
                "Expected java.io.File instance, but was {0} and {1}",
                ClassUtils.getShortClassName(o1, "<null>"),
                ClassUtils.getShortClassName(o2, "<null")));
    }
}

    若是咱们想按照文件名对文件进行轮询,须要自定义Comparator类,而且把这个Compartor类指定给file:inbound-endpoint的compartor属性。ide

    咱们参照OlderFirstComparator类,定义FileNameSeparator类函数

public class FileNameSeparator implements Comparator<File> 
{	
	public int compare(File file1, File file2) 
	{		
        String fileName1 = file1.getName();        
        String fileName2 = file2.getName();       
        return fileName1.compareToIgnoreCase(fileName2);
	}
}

这里只比较了文件名,若是还须要比较文件夹名,能够自行添加相应代码。优化

    最后咱们设置FileNameSeparator类到file:inbound-endpoint上    this

这里的Reverse Order复选框勾上,表示按照FileNameSeparator的逆序进行文件排序。设置后生成的file:inbound-endpoint的xml文档以下:

<file:inbound-endpoint path="D:\connectorTest" responseTimeout="10000" doc:name="File"   
        		connector-ref="input" comparator="fileconnectortest.FileNameSeparator" 
reverseOrder="true"/>

咱们还使用students1.csv和students2.csv两个文件,运行ESB项目,查看运行日志。

从日志能够看出,按文件名逆序排列,students2.csv文件先被处理,再是students1.csv文件被处理。

使用VM Queue替换Blocking Queue

    这个系列中使用Blocking Queue对象(ArrayBlockingQueue)存储文件解析后的各行文本内容,分别使用PutContentTask和ReadContentTask对Blocking Queue进行写和读。这样作有如下缺点:
    1)Blocking Queue与读写Task造成了紧耦合关系,每新建一个Task,都必须传给它一个Blocking Queue引用,这样不利于代码解耦。
    2)定义和管理Blocking Queue过于繁琐,若是须要使用多个Blocking Queue,就须要在applicationContext.xml文件中定义多个Blocking Queue的bean对象。
    3)不容易对Blocking Queue的读写线程进行优化配置。
      如今对Blocking Queue进行读写的线程都是自定义的Task,放到指定线程池里执行,若是要对读写线程进行优化配置,就须要在applicationContext.xml文件中定义的
      线程池进行配置。若是想访问多个Blocking Queue,且读写操做分别使用不一样的线程池设置,须要定义和配置多个线程池对象,这样的操做会很是繁琐。

    4)Blocking Queue的数据没有实现持久化。

     VM Queue是Mule ESB定义的VM Transport定义的内部Queue队列,用于不一样Flow之间进行信息传递,能够理解成Mule ESB项目内部的Message Queue,因为是内部MQ,效率要比外部的MQ通讯效率高,Mule的组件能够像与MQ通讯同样,向VM Queue发送消息,或者从VM Queue获取消息,组件不必定与VM Endpoint相连。这样就实现了VM Queue与Mule 逻辑组件之间的松耦合。并且Mule ESB还能够经过receiver-thread-profile和dispatcher-thread-profile设置VM Queue的消息接收者和发送者的线程数。

    基于VM Queue,咱们对系列的项目作如下修改

1)从applicationContext.xml文件中去掉Blocking Queue的定义。修改FileTransformer类,去掉对ReadContentTask的使用。  

public Object transformMessage(MuleMessage message, String outputEncoding) throws TransformerException {

        logger.info("Process File");        
        try 
        {
            muleContext.getRegistry().unregisterObject("processCompleteFlag");
            muleContext.getRegistry().registerObject("processCompleteFlag", "false");
        } 
        catch (RegistrationException e) {
            logger.error(ExceptionUtils.getFullStackTrace(e));
        }        


        //The file input stream sent by file connector.
        InputStream inputStream = 
                (InputStream)message.getPayload();

        PutContentTask putContentTask =
                new PutContentTask(muleContext, inputStream);
        queueExecutor.execute(putContentTask);          

        return null;
    }

2)在ESB项目中新建一个Flow,在Source部分拖入一个VM inbound endpoint,设置它的Path为data,这个VM Queue的访问地址即为vm://data

3) 修改PutContentTask类,去掉原有的Blocking Queue属性,添加MuleContext属性muleContext。在构造函数中传入muleContext。解析文件时将每行文本信息封装到Mule Message中,发送到VM Queue。

public class PutContentTask implements Runnable {
	//Current mule context.
	private MuleContext muleContext;
    .........

    public PutContentTask(MuleContext muleContext, InputStream inputStream) {
        this.muleContext = muleContext;
        this.inputStream = inputStream;
    }

    @Override
    public void run() {
    try {
        MuleClient muleClient = muleContext.getClient();
        Scanner scanner = new Scanner(inputStream);
        DefaultMuleMessage newMessage = null;
        while (scanner.hasNext()) {
            String line = scanner.nextLine();
            //Wrapper each line content into mule message and send to vm queue.
            newMessage = new DefaultMuleMessage(line, muleContext);
            muleClient.send("vm://data", newMessage);
        }
        //Wrapper EOF content into mule message to notify the reader component instance
        //the file has been parsed completely.
        DefaultMuleMessage endMessage = new DefaultMuleMessage("EOF", muleContext);
        muleClient.send("vm://data", endMessage);
        scanner.close();

关于发送Mule Message到VM Queue有两点须要注意:

  • 咱们没有在File Parser这个Transformer控件后添加VM outbound endpoint,虽然这样作,咱们能够没必要使用MuleClient对象发送Mule Message,可是这样发送的Mule Message的Payload将是File Parser这个Transformer的transformMessage方法的返回值。这样是没法将文件的各行文本逐一发送给VM Queue的。所以咱们仍是采用后台线程的方式解析文件,发送Mule Message(也能够在FileTransformer主线程中进行)。
  • 发送Mule Message的方法咱们使用了同步方法send,而不是异步方法dispatch,这是由于咱们须要保证发送到VM Queue中的Mule Message的消息顺序。若是使用dispatch方法发送,EOF消息可能先于前面各行的文本消息先发送,这样接收端在接收到EOF消息时,将没法准确判断文件解析是否已经结束。所以咱们使用了同步发送方法send。若是对发送消息的顺序没有特殊要求,可使用dispatch方法。(若是使用send方法发送Message,VM inbound endpoint必须至少一个receiver,这也是咱们下面定义Component从VM Queue中获取Mule Message的缘由)。

4) 删除ReadContentTask类,在VM inbound endpoint后建立一个自定义Component类ReadContentComponent

  

ReadContentComponent类的代码以下:

public class ReadContentComponent implements Callable {

    private Logger logger = LogManager.getLogger(ReadContentComponent.class);

    @Override
    public Object onCall(MuleEventContext eventContext) throws Exception {
        MuleMessage muleMessage = eventContext.getMessage();
        MuleContext muleContext = eventContext.getMuleContext();
        String threadName = Thread.currentThread().getName();
        if(muleMessage !=null && muleMessage.getPayload() != null)
        {
                String message = muleMessage.getPayloadAsString();    
                if(message.equals("EOF"))
                {                    
                    muleContext.getRegistry().unregisterObject("processCompleteFlag");
                    muleContext.getRegistry().registerObject("processCompleteFlag", "true");
                    logger.info("Set Complete Flag True");        
                }
                else
                {
                    System.out.println(threadName + " Line is:" + message);
                }
        }
        return eventContext;
    }
}

5) 配置VM inbound endpoint引用的VM Connector,咱们自定义了一个VM Connector memoryStore,并把它赋给VM inbound endpoint

<vm:connector name="memoryStore" doc:name="VM" validateConnections="true"> 
   <vm:queue-profile>
       <simple-in-memory-queue-store/>
   </vm:queue-profile>
</vm:connector>

<flow name="Read_Data_Flow">
   <vm:inbound-endpoint exchange-pattern="one-way" 
       path="data" doc:name="VM" connector-ref="memoryStore"/>

    memoryStore的queue使用memory store,即queue发送和接收的Mule Message存在于内存中,当ESB项目终止后Mule Message消息会丢失。咱们也能够设置queue的profile为持久化的store,例以下图中的   file-queue-store或者default-persistent-queue-store.

   配置好后,咱们拷贝文件到监控目录,启动ESB项目,能够看到文件被正常解析和处理。

从输出的结果日志中的Thread Name(Read_Data_Flow.stage1.xxx)能够看出,Read_Data_Flow使用了多个线程接收和处理Mule Message。默认Read_Data_Flow使用最大16个线程对VM Queue的Mule Message进行接收和处理。咱们能够设置这个线程数。咱们在流程文件中定义一个Flow Processing Strategy,定义最大线程数为4。

<queued-asynchronous-processing-strategy 
    	name="singleThreadStrategy" maxThreads="4"  minThreads="4" 
    	poolExhaustedAction="WAIT" doc:name="Queued Asynchronous Processing Strategy" />

运行ESB项目,使用jconsole查看Read_Data_Flow相关的线程,能够看到确实有4个线程。

能够看出线程名为stage1.01的线程的堆栈和其余三个线程的堆栈不同,这是由于它们执行的是不一样的操做。

若是修改log4j2.xml,将org.mule,org.mulesoft的日志级别从INFO改成DEBUG,能够从日志文件看到每一个线程启动的Work信息

com.mulesoft.mule.config.pool.MonitoredThreadPoolExecutor - 
Starting Work: org.mule.processor.SedaStageInterceptingMessageProcessor 
in Thread[[fileconnectortest].Read_Data_Flow.stage1.01,5,main]. 
Active tasks: 1 (1 threads in a pool)

com.mulesoft.mule.config.pool.MonitoredThreadPoolExecutor - 
Starting Work: org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@747140de 
in Thread[[fileconnectortest].
Read_Data_Flow.stage1.02,5,main]. 
Active tasks: 2 (2 threads in a pool)

com.mulesoft.mule.config.pool.MonitoredThreadPoolExecutor - 
Starting Work: org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@6cad0545 
in Thread[[fileconnectortest].
Read_Data_Flow.stage1.03,5,main]. Active tasks: 3 (3 threads in a pool)

Starting Work: org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@21c436f5 
in Thread[[fileconnectortest].
Read_Data_Flow.stage1.04,5,main]. Active tasks: 4 (4 threads in a pool)

咱们能够看到stage1.01激活的Task相关的Message Processor是SedaStageInterceptingMessageProcessor,而其余三个线程相关的是AsyncMessageProcessorWorker,也就是说第一个线程负责从VM Queue中读取Mule Message,并将Message推送给后续的Component,然后面三个线程负责接收Mule Message并处理Mule Message。这体现了Mule ESB使用的SEDA机制(以下图所示,此图参考Mule in Action 第二版11.1)

       stage1.01线程是从Event Queue中获取Mule Event对象(Mule Message),stage1.02,stage1.03,stage1.04线程则位于Processing Stage,接收stage1.01取出的Mule Event对象并处理。

      了解了Read_Data_Flow的线程分工原理,若是咱们要设置Read_Data_Flow对VM Queue中取出的Mule Message作单线程处理,queued-asynchronous-processing-strategy中配置的线程数最小值必须是2,不然只有获取Mule Event的线程运行,而没有处理Mule Event的线程存在。咱们也能够直接配置Synchronous Processing Strategy给Read_Data_Flow,保证整个Flow单线程处理。

<flow name="Read_Data_Flow" processingStrategy="synchronous">

这时接收Mule Event,处理消息的线程是queue executor

2016-08-11 17:07:01,737 [queueExecutor-4] 
DEBUG fileconnectortest.ReadContentComponent - 
queueExecutor-4 Line is:20160205,Student205,Class5

修改的代码上传到fileconnectortest_vm分支

https://github.com/yosaku01/fileconnectortest/tree/fileconnectortest_vm

相关文章
相关标签/搜索