spring integration fixed-delay的实现

以下配置: spring

<int:service-activator input-channel="channel"  ref="serviceActivator" method="execute"> 线程

    <int:poller receive-timeout="1000" fixed-delay="600" task-executor="taskExecutor" 继承

        max-messages-per-poll="2" /> 队列

</int:service-activator> 内存

taskExecutor是系统定义的一个线程池; input

还有一个线程池,线程的名字已“task-scheduler-”开头,spring integration在初始化的时候建立的,线程数为10个,每隔600毫秒,往 taskExecutor 提交一个任务;该任务代码从 input-channel 中读取数据,执行后续操做; io

taskScheduler(线程池的名字)的初始化,在这个类 DefaultConfiguringBeanFactoryPostProcessor 中,此类实现了 BeanFactoryPostProcessor,扩展了spring的基本的扩展点。 扩展

taskScheduler的实现类为 ThreadPoolTaskScheduler。 配置

每个poller都是一个PollingConsumer,PollingConsumer继承自AbstractPollingEndpoint,实现了声明周期方法;最终会调用 ThreadPoolTaskScheduler 的 schedule 方法,传递的参数第一个就是poller任务,第二个是 Trigger,表示调度的策略(对应fixed-delay);poller任务代码就是往 taskExecutor 中提交任务(参考 AbstractPollingEndpoint$Poller)。 任务的启动代码入口在 AbstractPollingEndpoint doStart 方法,而后 调度的处理参考 new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();的相关处理逻辑。 线程池

receive-timeout ,表示 taskExecutor 从 input-channel 获取数据时,若是队列为空要等待 1 秒种。

综合一下,也就是说,taskExecutor 每秒钟从队里中移除一个元素,而taskScheduler会每600毫秒往队列中放置一个元素,同时配置中不仅存在一个 poller;因此即便没有请求到来,taskExecutor的队列长度也会慢慢增加,直至内存溢出。解决方案:合适配置taskExecutor的对列长度,线程数;配置receive-timeout为一个较小的值,好比50毫秒;

相关文章
相关标签/搜索