使用线程执行框架的一次经历

场景html

一个线程从某个地方接收消息(数据),能够是其余主机或者消息队列,而后转由另外的一个线程池来执行具体处理消息的逻辑,而且消息的处理速度小于接收消息的速度。这种情景很常见,试想一下,你会怎么设计和实现?java

直观想法多线程

很显然采用JUC的线程框架,能够迅速写出代码。架构

消息接收者:框架

  1. public class Receiver { 
  2.     private static volatile boolean inited = false; 
  3.     private static volatile boolean shutdown = false; 
  4.     private static volatile int cnt = 0; 
  5.  
  6.     private MessageHandler messageHandler; 
  7.  
  8.     public void start(){ 
  9.         Executors.newSingleThreadExecutor().execute(new Runnable() { 
  10.             @Override 
  11.             public void run() { 
  12.                 while(!shutdown){ 
  13.                     init(); 
  14.                     recv(); 
  15.                 } 
  16.             } 
  17.         }); 
  18.     } 
  19.  
  20.     /** 
  21.      * 模拟消息接收 
  22.      */ 
  23.     public void recv(){ 
  24.             Message msg = new Message("Msg" + System.currentTimeMillis()); System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg)); messageHandler.handle(msg); } public void init(){ if(!inited){ messageHandler = new MessageHandler(); inited = true; } } public static void main(String[] args) { new Receiver().start(); 
  25.     } 

消息处理:jvm

  1. public class MessageHandler { 
  2.  
  3.     private static final int THREAD_POOL_SIZE = 4; 
  4.  
  5.     private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE); 
  6.  
  7.     public void handle(Message msg) { 
  8.         try { 
  9.             service.execute(new Runnable() { 
  10.                 @Override 
  11.                 public void run() { 
  12.                     parseMsg(msg); 
  13.                 } 
  14.             }); 
  15.         } catch (Throwable e) { 
  16.             System.out.println("消息处理异常" + e); } } /** * 比较耗时的消息处理流程 */ public void parseMsg(Message message) { while (true) { try { System.out.println("解析消息:" + message); Thread.sleep(5000); System.out.println("============================"); } catch (InterruptedException e) { 
  17.                 e.printStackTrace(); 
  18.             } 
  19.  
  20.         } 
  21.     } 

效果:这种方案致使的现象是接收到的消息会迅速堆积,咱们从消息队列(或者其余地方)取出了大量消息,可是处理线程的速度又跟不上,因此致使的问题是大量的Task会堆积在线程池底层维护的一个阻塞队列中,这会极大的耗费存储空间,影响系统的性能。ide

分析:当execute()一个任务的时候,若是有空闲的worker线程,那么投入运行,不然看设置的最大线程个数,没有达到线程个数限制就建立新线程,接新任务,不然就把任务缓冲到一个阻塞队列中,问题就是这个队列,默认的大小是没有限制的,因此就会大量的堆积任务,必然耗费heap空间。性能

  1. public static ExecutorService newFixedThreadPool(int nThreads) { 
  2.         return new ThreadPoolExecutor(nThreads, nThreads, 
  3.                                       0L, TimeUnit.MILLISECONDS, 
  4.                                       new LinkedBlockingQueue<Runnable>()); 
  5.     } 
  6.  
  7. public LinkedBlockingQueue() { 
  8.         this(Integer.MAX_VALUE); // capacity 
  9.     } 

计数限制学习

面对上述问题,想到了要限制消息接收的速度,天然就想到了各类线程同步的原语,不过在这里最简单的就是使用一个Volatile的计数器。this

消息接收者:

  1. public class Receiver { 
  2.     private static volatile boolean inited = false; 
  3.     private static volatile boolean shutdown = false; 
  4.     private static volatile int cnt = 0; 
  5.     private MessageHandler messageHandler; 
  6.     public void start(){ 
  7.         Executors.newSingleThreadExecutor().execute(new Runnable() { 
  8.             @Override 
  9.             public void run() { 
  10.                 while(!shutdown){ 
  11.                     init(); 
  12.                     recv(); 
  13.                 } 
  14.             } 
  15.         }); 
  16.     } 
  17.  
  18.     /** 
  19.      * 模拟消息接收 
  20.      */ 
  21.     public void recv(){ 
  22.             Message msg = new Message("Msg" + System.currentTimeMillis()); System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg)); messageHandler.handle(msg); } public void init(){ if(!inited){ messageHandler = new MessageHandler(); inited = true; } } public static void main(String[] args) { new Receiver().start(); 
  23.     } 

消息处理:

  1. public class MessageHandler { 
  2.     private static final int THREAD_POOL_SIZE = 1; 
  3.     private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE); 
  4.  
  5.     public void handle(Message msg){ 
  6.         try { 
  7.             service.execute(new Runnable() { 
  8.  
  9.                 @Override 
  10.                 public void run() { 
  11.                     parseMsg(msg); 
  12.                 } 
  13.             }); 
  14.         } catch (Throwable e) { 
  15.             System.out.println("消息处理异常" + e); } } /** * 比较耗时的消息处理流程 */ public void parseMsg(Message message){ try { Thread.sleep(10000); System.out.println("解析消息:" + message); } catch (InterruptedException e) { e.printStackTrace(); }finally { 
  16.             Receiver.limit --; 
  17.         } 
  18.  
  19.     } 

效果:经过控制消息的个数来阻塞消息的接收过程,就不会致使任务的堆积,系统的内存消耗会比较平缓,限制消息的个数本质就和下面限制任务队列大小同样。

使用同步队列 SynchronousQueue

SynchronousQueue 虽名为队列,可是其实不会缓冲任务的对象,只是做为对象传递的控制点,若是有空闲线程或者没有达到最大线程限制,就会交付给worker线程去执行,不然就会拒绝,咱们须要本身实现对应的拒绝策略RejectedExecutionHandler,默认的是抛出异常RejectedExecutionException。

消息接收者同上。

消息处理:

  1. public class MessageHandler { 
  2.     private static final int THREAD_POOL_SIZE = 4; 
  3.  
  4.     ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, 
  5.             new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() { 
  6.         @Override 
  7.         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
  8.             System.out.println("自定义拒绝策略"); try { executor.getQueue().put(r); System.out.println("从新听任务回队列"); } catch (InterruptedException e) { e.printStackTrace(); } } }); public void handle(Message msg) { try { System.out.println(service.getTaskCount()); System.out.println(service.getQueue().size()); System.out.println(service.getCompletedTaskCount()); service.execute(new Runnable() { @Override public void run() { parseMsg(msg); } }); } catch (Throwable e) { System.out.println("消息处理异常" + e); } } /** * 比较耗时的消息处理流程 */ public void parseMsg(Message message) { while (true) { try { System.out.println("线程名:" + Thread.currentThread().getName()); System.out.println("解析消息:" + message); Thread.sleep(1000); } catch (InterruptedException e) { 
  9.                 e.printStackTrace(); 
  10.             } 
  11.         } 
  12.     } 

效果:可以控制消息的接收速度,可是咱们须要在rejectedExecution中实现某种阻塞的操做,可是选择在发生拒绝的时候把任务从新放回队列,带来的问题就是这个Task会发生饥饿现象。

使用大小限制的阻塞队列

使用LinkedBlockingQueue做为线程框架底层的任务缓冲区,而且设置大小限制,思想上和上述方案同样,都是有一个阻塞的点,可是经过最后的jvm monitor看到这里的CPU消耗更少,内存使用有所下降,而且波动小(具体缘由有待探索)。

消息接收者同上。

消息处理:

  1. public class MessageHandler { 
  2.     private static final int THREAD_POOL_SIZE = 4; 
  3.     private static final int BLOCK_QUEUE_CAP = 500; 
  4.     ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, 
  5.             new LinkedBlockingQueue<Runnable>(BLOCK_QUEUE_CAP), new SimpleThreadFactory(), new RejectedExecutionHandler() { 
  6.         @Override 
  7.         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
  8.             System.out.println("自定义拒绝策略"); try { executor.getQueue().put(r); System.out.println("从新听任务回队列"); } catch (InterruptedException e) { e.printStackTrace(); } } }); public void handle(Message msg) { try { service.execute(new Runnable() { @Override public void run() { parseMsg(msg); } }); } catch (Throwable e) { System.out.println("消息处理异常" + e); } } /** * 比较耗时的消息处理流程 */ public void parseMsg(Message message) { try { Thread.sleep(5000); System.out.println("线程名:" + Thread.currentThread().getName()); System.out.println("解析消息:" + message); } catch (InterruptedException e) { e.printStackTrace(); } } static class SimpleThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("Thread-" + System.currentTimeMillis()); return thread; 
  9.         } 
  10.     } 

总结

多线程是比较容易出问题的地方,特别当对方法不熟悉的时候

感兴趣的能够本身来个人Java架构群,能够获取免费的学习资料,群号:855801563对Java技术,架构技术感兴趣的同窗,欢迎加群,一块儿学习,相互讨论。

相关文章
相关标签/搜索