Mule ESB File Connector轮询单个文件的实现(1)

        正在开发的项目提出了一个要求,要求Mule ESB流程轮询指定文件夹下的文本文件,逐行读取文本文件的内容,进行业务处理。java

        咱们考虑使用File Connector,对指定文件夹进行轮询。File Connector使用org.mule.transport.file.FileMessageReceiver的poll方法,读取文件夹下的每个文件,将其推送给后续的处理节点。      git

         考虑处处理的文件可能很大,读取文件和处理文件各行的操做咱们决定分开处理,使用了不一样的后台线程进行处理。各个后台线程共享同一个阻塞队列,以下图所示,其中读取文件的后台线程是图中的Thread 1,相似于生产者,它每读取文件的一行,就将该行放入阻塞队列中。而处理文件各行的线程是图中的Thread 2,相似于消费者,它们从阻塞队列里读出一行数据,进行处理。github

        

         读写文件的后台线程咱们统一使用Spring的线程池进行管理。解析文件,将各行数据放入阻塞队列咱们只使用一个后台线程,从阻塞队列里读取数据使用5个后台线程,每一个后台线程从阻塞队列里读取一行数据,输出到控制台。spring

        Mule的File Connector轮询指定文件夹,读取文件夹中符合要求的文件,将其转换为InputStream(实际类型是org.mule.transport.file.ReceiverFileInputStream),传送到File Parser节点。app

       File Parser节点使用自定义的FileTransformer类接收传送的InputStream,进行文件读写。FileTransformer的代码以下      ide

public class FileTransformer extends AbstractMessageTransformer {

	//The blocking queue used to process file content.
	@Resource
	private ArrayBlockingQueue<String> contentQueue;	
	
	//The thread pool executor to provide threads to process file content.
	@Autowired
	private ThreadPoolTaskExecutor queueExecutor;
	
	private Logger logger = LogManager.getLogger(FileTransformer.class);
	
	
	@Override
	public Object transformMessage(MuleMessage message, String outputEncoding) throws TransformerException {
		
		long startTime = System.currentTimeMillis();
		
		//The file input stream sent by file connector.
		InputStream inputStream = 
				(InputStream)message.getPayload();
		
		PutContentTask putContentTask =
    			new PutContentTask(contentQueue, inputStream);
    	queueExecutor.execute(putContentTask);  
    	
    	CountDownLatch threadSignal = new CountDownLatch(5);
    	
    	for(int i=0; i<5;i++)
    	{
    		String taskName = "ReadContentTask " + i;
    		ReadContentTask readContentTask =
    				new ReadContentTask(contentQueue, taskName, threadSignal);
    		queueExecutor.execute(readContentTask);
    	}
    	
    	try 
    	{
			threadSignal.await();			
			contentQueue.clear();
		} 
    	catch (Exception e) {
			logger.error(ExceptionUtils.getFullStackTrace(e));
		} 
    	
    	long endTime = System.currentTimeMillis();
    	long elaspeTime = endTime - startTime;    	
		
    	return "The elapse time is:" + elaspeTime;
	}

}

 FileTransformer类中注入了一个Blocking Queue对象,用于存放扫描文件时得到的每一行文本,这个对象定义在Spring配置文件中。测试

<!--src/main/app/conf/applicationContext.xml-->
<bean id="contentQueue" class="java.util.concurrent.ArrayBlockingQueue">
		<constructor-arg value="1000"/>
		<constructor-arg value="true"/>
</bean>

FileTransformer类中还注入了ThreadPoolTaskExecutor对象,用于发送解析文件和处理文件内容的Task。它也定义在Spring配置文件里。线程

<!--src/main/app/conf/applicationContext.xml-->

<bean id="queueExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="10" />
        <property name="maxPoolSize" value="20" />
        <property name="queueCapacity" value="200" />
        <property name="keepAliveSeconds" value="20000" />
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy" />
        </property>
</bean>

在transformMessage方法中,当File Connector推送文件Input Stream过来时,咱们构建一个PutContentTask,用于解析文件流将文件的各行文本数据放入Blocking Queue中。PutContentTask类的主要代码以下:code

public class PutContentTask implements Runnable {
	//This blocking queue is used to contain each line of file.
	private ArrayBlockingQueue<String> contentQueue = null;
	//The file input stream.
	private InputStream inputStream;

....................
@Override
	public void run() 
	{
			try 
			{						
				Scanner scanner = new Scanner(inputStream);
				while(scanner.hasNext())
				{
					String line = scanner.nextLine();
					contentQueue.put(line);				
				}
				//Put EOF string to line to notify read task that we reach the end of file.
				contentQueue.put("EOF");
				scanner.close();
			} catch (InterruptedException e) {
				logger.error(ExceptionUtils.getFullStackTrace(e));
			}
			finally
			{
				//Close the file input stream so that mule can auto delete the file.
				if(inputStream != null)
				{
					try {
						inputStream.close();
					} catch (IOException e) {
						logger.error(ExceptionUtils.getFullStackTrace(e));
					}
				}						
			}
		}

PutContentTask在Input Stream解析结束后,会向Blocking Queue中放一个"EOF"标志,表示文件已经解析完成。读取队列数据的ReadContentTask在读取到这个标识后,会结束读取操做。orm

在将Input Stream交给PutContentTask去解析后,transformMessage方法建立了5个ReadContentTask,从Blocking Queue中读取ReadContentTask解析出的文件的每一行并作处理(我这个程序只是将读取的每一行输出到控制台,读者能够根据本身的需求编写相应的业务逻辑)。五个ReadContentTask使用CountDownLatch进行同步,等CountDownLatch计数为0时,File Connector推送的文件Input Stream解析和处理结束,统计总共的处理时间。

ReadContentTask类的主要代码以下:

public class ReadContentTask implements Runnable {

  //This blocking queue is used to contain each line of file.
  private ArrayBlockingQueue<String> contentQueue = null;	
  //The task name of read content task(It identifies each read task).
  private String taskName;	
  //The synchronous signal variable for each read task.
  private CountDownLatch signal = null;	
  private Logger logger = LogManager.getLogger(ReadContentTask.class); 

  @Override
  public void run() 
  {
		while(true)
		{				
				if(!contentQueue.isEmpty())
				{
					try 
					{
						String line = contentQueue.take();
						if(line!=null && !line.equals("") && !line.equals("EOF"))
						{
							//process each line.
							System.out.println(taskName + " Line is:" + line);
						}
						else
						{
							//Notify other read task.
							contentQueue.put("EOF");
							break;
						}
					} catch (InterruptedException e) {
							logger.error(ExceptionUtils.getFullStackTrace(e));
					}
				}
		}
		signal.countDown();
		
	}

对于每一个ReadContentTask,从Blocking Queue中读取到"EOF"后,即视为文件已经解析完成,结束处理任务,同时向contentQueue中放入"EOF",通知其余ReadContentTask结束处理。

当全部ReadContentTask都结束处理后,transformMessage方法将清空Blocking Queue,等待下一次文件解析和处理。

try 
  {
     threadSignal.await();			
	 contentQueue.clear();
  } 
  catch (Exception e) {
		logger.error(ExceptionUtils.getFullStackTrace(e));
  }

为了显示文件的解析和处理时间,FileTransformer类将计算出的elapse time放入mule message的payload中,咱们添加一个Logger对象显示处理时间。

测试的时候,咱们建立一个简单的csv文件students.csv(以下图所示),记录500个学生的学号,姓名和所属班级,放到File Connector监控的connectorTest目录下。

启动ESB项目,能够看到students.csv文件被解析,文件的每行被不一样的ReadContentTask输出到控制台,处理的时间是31ms

项目的代码上传到了github上,地址是

https://github.com/yosaku01/fileconnectortest

相关文章
相关标签/搜索