在io.moquette.spi.impl.BrokerInterceptor的构造函数中,新建了一个线程池,代码以下:segmentfault
private BrokerInterceptor(int poolSize, List<InterceptHandler> handlers) { LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers)); this.handlers = new HashMap<>(); for (Class<?> messageType : InterceptHandler.ALL_MESSAGE_TYPES) { this.handlers.put(messageType, new CopyOnWriteArrayList<InterceptHandler>()); } for (InterceptHandler handler : handlers) { this.addInterceptHandler(handler); } executor = Executors.newFixedThreadPool(poolSize); }
executor = Executors.newFixedThreadPool(poolSize);这句代码虽然建立了一个固定线程数量的线程池,可是线程池的任务队列并无作限制,一旦某个InterceptHandler中的某个方法进行了耗时处理,在高并发的状况下,会很容易致使线程池的队列堆积大量待处理的任务,进而可能形成内存溢出。并发
分别添加如下类和接口ide
public class ThreadPoolHelper { public static ExecutorService createFixedExecutor(int threadNum,int capacity,String threadFactoryName) { return new ThreadPoolExecutor( threadNum, threadNum, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(capacity), new SimpleThreadFactory(threadFactoryName), new LogDiscardRejectPolicy() ); } } public class SimpleThreadFactory implements ThreadFactory { private static final String NAME_FORMAT = "%s-%s"; private String threadNamePrefix; public SimpleThreadFactory(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable); thread.setName(String.format(NAME_FORMAT, threadNamePrefix, System.currentTimeMillis())); return thread; } } public class LogDiscardRejectPolicy implements RejectedExecutionHandler { private static final Logger LOG = LoggerFactory.getLogger(LogDiscardRejectPolicy.class); @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { LOG.error("executor:{} task queue has full, runnable:{} discarded",executor,r); if (!(r instanceof PublishTask)) { return; } PublishTask publishTask = (PublishTask) r; InterceptHandler interceptHandler = publishTask.getInterceptHandler(); if (!(interceptHandler instanceof RejectHandler)) { return; } ((RejectHandler)interceptHandler).rejectedExecution(r,executor); } } public interface RejectHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
BrokerInterceptor 建立线程池的逻辑改成函数
private BrokerInterceptor(int poolSize, int capacity, List<InterceptHandler> handlers) { LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers)); this.handlers = new HashMap<>(); for (Class<?> messageType : InterceptHandler.ALL_MESSAGE_TYPES) { this.handlers.put(messageType, new CopyOnWriteArrayList<InterceptHandler>()); } for (InterceptHandler handler : handlers) { this.addInterceptHandler(handler); } /** modify by liuhh */ executor = ThreadPoolHelper.createFixedExecutor(poolSize, capacity, THREAD_POOL_NAME); //executor = Executors.newFixedThreadPool(poolSize); }
解释:
(1)ThreadPoolHelper中的createFixedExecutor()方法为新建的线程池指定任务队列大小和拒绝策略LogDiscardRejectPolicy
(2)在LogDiscardRejectPolicy中,首先将被拒绝的任务log一遍,对于PublishTask(moquette改造笔记(二):优化BrokerInterceptor notifyTopicPublished()逻辑)作特殊处理,会交给实现RejectHandler的InterceptHandler处理,由业务逻辑决定,出现任务太多处理不完被遗弃的任务该如何处理。高并发
在 moquette改造笔记(一):整合到SpringBoot 中修改SafetyInterceptHandler实现,添加对RejectHandler的实现以下优化
@Slf4j @Component public class SafetyInterceptHandler extends AbstractInterceptHandler{ @Override public String getID() { return SafetyInterceptHandler.class.getName(); } @Override public void onConnect(InterceptConnectMessage msg) { } @Override public void onConnectionLost(InterceptConnectionLostMessage msg) { } @Override public void onPublish(InterceptPublishMessage msg) { } @Override public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) { } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { /**MQTT SERVICE 负载过大,处理不过来时,会回调该方法*/ //例如能够发生邮件通知相关人员 } }
moquette改造笔记(四):解决InterceptHandler中的onConnectionLost()调用两次this