本文主要是解析SpoutOutputCollector源码,顺便分析该类中所涉及的设计模式–代理模式。
首先介绍一下Spout输出收集器接口–ISpoutOutputCollector,该接口主要声明了如下3个抽象方法用来约束ISpoutOutputCollector的实现类。接口定义与方法说明以下:java
/** * ISpoutOutputCollector:Spout输出收集器接口 */ public interface ISpoutOutputCollector { /** * 改方法用来向外发送数据,它的返回值是该消息全部发送目标的taskID集合; * 参数: * streamId:消息Tuple将要被输出到的流 * tuple:要输出的消息,是一个Object列表 * messageId:输出消息的标记信息,若是messageId被设置为null,则Storm不会追踪该消息, * 不然它会被用来追踪所发出的消息处理状况 */ List<Integer> emit(String streamId, List<Object> tuple, Object messageId); /** * 该方法与上面emit方法相似,区别在于: * 1.数据(消息)只由所指定taskId的Task接收;(这就意味着若是没有下游节点接收该消息,则该消息就没有被真正发送) * 2.该方法要求参数streamId所对应的流必须为直接流,接收端的Task必须以直接分组的方式来接收消息, * 不然会抛出异常. */ void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId); /** * 用来处理异常 */ void reportError(Throwable error); }
Storm提供了接口ISpoutOutputCollector的默认类SpoutOutputCollector,这个类其实是一个代理类,该类持有一个ISpoutOutputCollector类型的对象,全部的操做实际上都过该对象来实现的。SpoutOutputCollector定义以下:设计模式
public class SpoutOutputCollector implements ISpoutOutputCollector { /** * 持有SpoutOutputCollector要代理的对象 */ ISpoutOutputCollector _delegate; public SpoutOutputCollector(ISpoutOutputCollector delegate) { _delegate = delegate; } /** * 实现了接口中的emit方法,而且提供了它的几个重载方法 * eg.若是不指定streamId,默认使用default,若是不指定messageId,则默认使用空(null) */ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId){ return _delegate.emit(streamId, tuple, messageId); } public List<Integer> emit(List<Object> tuple, Object messageId) { return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId); } public List<Integer> emit(List<Object> tuple) { return emit(tuple, null); } public List<Integer> emit(String streamId, List<Object> tuple) { return emit(streamId, tuple, null); } /** * 实现了接口中的emitDirect方法,同时也提供了几个重载方法,与上面emit方法一致. */ public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { _delegate.emitDirect(taskId, streamId, tuple, messageId); } public void emitDirect(int taskId, List<Object> tuple, Object messageId) { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId); } public void emitDirect(int taskId, String streamId, List<Object> tuple) { emitDirect(taskId, streamId, tuple, null); } public void emitDirect(int taskId, List<Object> tuple) { emitDirect(taskId, tuple, null); } /** * 处理异常方法的实现 */ @Override public void reportError(Throwable error) { _delegate.reportError(error); } }
PS:
代理模式主要分为两种:静态代理和动态代理ide
静态代理:
在程序运行前代理类与委托类的关系在运行前就肯定,即在程序运行前就已经存在代理类的字节码文件了.
代理模式角色:
Subject(抽象主题角色):能够是抽象类也能够是接口,声明了被委托角色和委托类共有的处理方法;
RealSubject(具体主题角色):又称被委托角色、被代理角色,是业务逻辑的具体执行者;
ProxySubject(代理主题角色):又称委托类、代理类,负责对真实角色的应用,
把全部抽象主题类定义的方法限制委托给具体主题角色来实现,而且在具体主题角色处理完毕先后作预处理和蔼后处理.测试
静态代理模式案例以下:this
//抽象主题 public interface Subject { public void process(String taskName); }
被代理角色:设计
public class RealSubject implements Subject { @Override public void process(String taskName) { System.out.println("正在执行任务:"+taskName); try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
代理类:代理
public class ProxySubject implements Subject { //代理类持有一个委托类的对象引用 private Subject delegate; public ProxySubject(Subject delegate){ this.delegate=delegate; } @Override public void process(String taskName) { //预处理 this.before(); //将请求分派给委托类处理 delegate.process(taskName); //善后处理 this.after(); } private void before(){ System.out.println("预处理!"); } private void after(){ System.out.println("善后处理!"); } }
案例测试:orm
public class Test { public static void main(String[] args) { RealSubject subject = new RealSubject(); ProxySubject p = new ProxySubject(subject); p.process("排水"); } }
测试结果:对象
预处理! 正在执行任务:排水 善后处理!
静态代理类的优缺点:
优势:
业务类只需关注业务逻辑自己,这样就保证了业务类的重用性.
缺点:
代理对象的一个接口只服务于一种类型的对象.当要代理的方法不少,就要为每一种方法进行代理。所以静态代理在程序规模变大时就没法很好地胜任工做了.blog
动态代理:
代理类和委托类的关系在程序运行时才肯定的.动态代理类的源码是在程序运行期间由JVM根据反射等机制动态生成,因此不存在代理类的字节码文件.
动态代理模式案例以下:
public interface Service { //目标方法 public void process(); }
public class UserServiceImpl implements Service { @Override public void process() { System.out.println("用户service处理"); } }
动态代理实现实例:
public class MyInvocatioHandler implements InvocationHandler { private Object target; public MyInvocatioHandler(Object target) { this.target = target; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //System.out.println("-----before-----"); this.before(); Object result = method.invoke(target, args); // System.out.println("-----end-----"); this.after(); return result; } // 生成代理对象 public Object getProxy() { ClassLoader loader = Thread.currentThread().getContextClassLoader(); Class<?>[] interfaces = target.getClass().getInterfaces(); return Proxy.newProxyInstance(loader, interfaces, this); } private void before(){ System.out.println("预处理!"); } private void after(){ System.out.println("善后处理!"); } }
案列测试:
public class ProxyTest { public static void main(String[] args) { Service service = new UserServiceImpl(); MyInvocatioHandler handler = new MyInvocatioHandler(service); Service serviceProxy = (Service)handler.getProxy(); serviceProxy.process(); } }
测试结果:
预处理! 用户service处理 善后处理!
动态代理的优缺点:
优势:
接口中的全部方法都被转移到调用处理器一个集中的方法中在方法“运行时”动态的加入,决定你是什么类型,较灵活
缺点:
1. 与静态代理相比,效率下降了
2. JDK动态代理只能对实现了接口的类进行代理
欢迎关注下面二维码进行技术交流: