1.背景java
以前使用spark进行数据计算,须要将计算结果发送到rocketmq上去,有两种作法:第一种是将计算结果collect到Driver端,而后统一发送。第二种是直接在各个计算结果的partition(即foreachPartition函数)分片中发送。第一种存在的问题是,若是计算结果的数据量很是庞大,如上千万,就须要很大的内存来支持,同时增长了网络传输开销。若是是第二种就不存在这种问题,直接在worker节点发送完毕,不存在数据堆积和网络开销。网络
既然说是要发送数据到rocketMQ就要说到rocketmq客户端DefaultMQProducer类,该类是没有实现java的Serializable接口的,因此没法定义一个全局变量,让各个worker直接使用该变量来发送数据,因此须要用到另外一种写法——静态类工具。app
2.Java序列化基本规则函数
上面说到须要使用静态类工具来实如今各个partition分别发送mq消息,其理论基础就是Java序列化规则。咱们知道Java在默认状况下,不会对被static和transient关键词修饰的属性进行序列化和反序列化。这个能够验证,静态属性反序列化有仍是默认值,利用这个原理封装rocketmq工具。工具
public class JavaBean { private String name; private int version; } public class WrapperBean implements Serializable { private static JavaBean javaBean;//因为改对象没有实现Serializable接口,因此必须定义为静态属性,不然报错 private static String staticName="默认静态变量值"; } ###序列化 public class JdkSerializableMain { public static void main(String[] args) { String file = "D:/demo/javabean.seri"; serializable(file); } private static void serializable(String file) { ObjectOutputStream oos = null; try{ oos = new ObjectOutputStream(new FileOutputStream(file)); Object object = getObject(); System.out.println("序列化对象:"+object.toString()); oos.writeObject(object); oos.flush(); }catch (Exception e){ e.printStackTrace(); }finally { if(oos != null){ try { oos.close(); } catch (IOException e) { e.printStackTrace(); } } } } private static Object getObject() { JavaBean javaBean = new JavaBean("Java设计本来", 44); WrapperBean wb = new WrapperBean(javaBean,"修改后的静态变量值"); return wb; } } #####反序列化 public class JdkDeSerializableMain { public static void main(String[] args) { String file = "D:/demo/javabean.seri"; deserializable(file); } private static void deserializable(String file) { ObjectInputStream ois = null; try{ ois = new ObjectInputStream(new FileInputStream(file)); Object o = ois.readObject(); if(o != null){ System.out.println("Class :"+o.getClass()); WrapperBean jb = (WrapperBean)o; System.out.println("反序列化结果:"+jb.toString()); } }catch (Exception e){ e.printStackTrace(); }finally { if(ois != null){ try { ois.close(); } catch (IOException e) { e.printStackTrace(); } } } } } 结果: 序列化对象:WrapperBean{javaBean=JavaBean{name='Java设计本来', version=44}},staticName=修改后的静态变量值 反序列化结果:WrapperBean{javaBean=null},staticName=默认静态变量值
3.RocketMq工具spa
该工具利用静态属性没法被序列化原理,在各个worker节点中调用getInstance()方法时,实际拿到的是该worker节点加载RocketMqUtils初始化静态代码块拿到的DefaultMQProducer实例,因此能够正常在foreachPartition()中调用发送rocketmq消息设计
public class RocketMqUtils implements Serializable { private static Logger log = LoggerFactory.getLogger(RocketMqUtils.class); private static DefaultMQProducer producer=null; private static RocketMqUtils rocketMqUtils = null; static { ClassPathResource classPathResource = new ClassPathResource("/task-config.properties"); Properties properties = null; try { properties = PropertiesLoaderUtils.loadProperties(classPathResource); String address = properties.getProperty("mq.namesrvAddr"); String produceGroup = properties.getProperty("mq.producerGroup"); log.info("初始化RocketMq,Address={},producerGroup={}",address,produceGroup); producer = new DefaultMQProducer(produceGroup); producer.setNamesrvAddr(address); producer.start(); } catch (Exception e) { log.error("初始化RocketMq失败",e); } } public static synchronized RocketMqUtils getInstance(){ if(rocketMqUtils ==null){ rocketMqUtils = new RocketMqUtils(); } return rocketMqUtils; } public static void main(String[] args) throws Exception { RocketMqUtils rm = new RocketMqUtils(); Message msg = new Message(); msg.setTopic("test_jcc"); msg.setTags("jcc"); msg.setKeys("kkk"); msg.setBody("test msg".getBytes()); rm.sendMsg(msg); rm.shutDownMq(); } public void sendMsg(Message msg) throws Exception { try { SendResult sendResult = producer.send(msg); log.info("sendMsg = " + sendResult.toString()); System.out.println(sendResult.toString()); } catch (Exception var3) { log.error("MQ send ERROR", var3); throw new Exception("操做MQ出错!"); } } public void shutDownMq(){ if (producer != null){ producer.shutdown(); } } }