还在为开发Flink流处理应用程序时没法像开发Spring Boot程序那么优雅的分层以及装配Bean而烦恼吗?java
GitHub最近超火的一款开源框架,懒松鼠Flink-Boot脚手架,该脚手架简直是Spring开发工程师的福音,完美融合Spring生态体系,不再须要手动在Java类中建立臃肿的Java对象,简直是开发大型流处理应用程序的必不可少的工具。地址:懒松鼠Flink-Boot 脚手架由《深刻理解Flink核心设计与实践原理》做者开发。
mysql
你的现状git
static Map<String,String> cache=new HashMap<String,String>(); public String findUUID(FlowData flowData) { String value=cache.get(flowData.getSubTestItem()); if(value==null) { String uuid=userMapper.findUUID(flowData); cache.put(uuid,value); return uuid; } return value; }
你想要的是这样github
@Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem") public String findUUID(FlowData flowData) { return userMapper.findUUID(flowData); }
你的现状spring
public void insertFlow(FlowData flowData) { try{ userMapper.insertFlow(flowData); }Cache(Exception e) { Thread.sleep(10000); userMapper.insertFlow(flowData); } }
你想要的是这样sql
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5)) @Override public void insertFlow(FlowData flowData) { userMapper.insertFlow(flowData); }
你的现状数据库
if(flowData.getSubTestItem().length()<2&&flowData.getSubTestItem().length()>7) { return null; } if(flowData.getBillNumber()==null) { return null; }
你想要的是这样编程
Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData); if (validate != null) { System.out.println(validate); return null; } public class FlowData { private String uuid; //声明该参数的校验规则字符串长度必须在7到20之间 @Size(min = 7, max = 20, message = "长度必须在{min}-{max}之间") private String subTestItem; //声明该参数的校验规则字符串不能为空 @NotBlank(message = "billNumber不能为空") private String billNumber; }
GitHub最近超火的一款开源框架,懒松鼠Flink-Boot脚手架,该脚手架简直是Spring开发工程师的福音,完美融合Spring生态体系,不再须要手动在Java类中建立臃肿的Java对象,简直是开发大型流处理应用程序的必不可少的工具。懒松鼠Flink-Boot 脚手架由《深刻理解Flink核心设计与实践原理》做者开发。缓存
/** * github地址: https://github.com/intsmaze * 博客地址:https://www.cnblogs.com/intsmaze/ * 出版书籍《深刻理解Flink核心设计与实践原理》 随书代码 * RichFlatMapFunction为Flink框架的一个通用型操做符(算子),开发者通常在该算子的flatMap方法中编写业务逻辑 * @auther: intsmaze(刘洋) * @date: 2020/10/15 18:33 */ public class MybatisFlatMap extends RichFlatMapFunction<String, String> { private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); protected ApplicationContext beanFactory; //mybatis的Service对象,操做数据库的user表 private UserService userService; @Override public void open(Configuration parameters) { ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters); userService = beanFactory.getBean(UserServiceImpl.class); } @Override public void flatMap(String value, Collector<String> out){ FlowData flowData = gson.fromJson(message, new TypeToken<FlowData>() { }.getType()); Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData); if (validate != null) { System.out.println(validate); return null; } //数据库查询,屏蔽掉获取数据库链接,是否数据库链接,事务的声明等 String flowUUID = userService.findUUID(flowData); if (StringUtils.isBlank(flowUUID)) { flowUUID = UUID.randomUUID().toString(); flowData.setUuid(flowUUID); //数据库插入,屏蔽掉获取数据库链接,是否数据库链接,事务的声明等 userService.insertFlow(flowData); } out.collect(gson.toJson(flowData)); } } public interface UserService { String findUUID(FlowData flowData); void insertFlow(FlowData flowData); } //经过注解实例化Bean对象。 @Service //经过注解声明进行事务管理 @Transactional //经过注解声明方法具备异常重试机制 @EnableRetry public class UserServiceImpl implements UserService { //经过注解进行依赖注入 @Resource private UserMapper userMapper; @Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem") @Override public String findUUID(FlowData flowData) { return userMapper.findUUID(flowData); } //经过注解声明该方法异常后的重试机制,无需手动编程 @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5)) @Override public void insertFlow(FlowData flowData) { userMapper.insertFlow(flowData); } } public interface UserMapper { String findUUID(FlowData flowData); void insertFlow(FlowData flowData); } //注解式声明参数校验规则 public class FlowData { private String uuid; //声明该参数的校验规则字符串长度必须在7到20之间 @Size(min = 7, max = 20, message = "长度必须在{min}-{max}之间") private String subTestItem; //声明该参数的校验规则字符串不能为空 @NotBlank(message = "billNumber不能为空") private String billNumber; @NotBlank(message = "barcode不能为空") private String barcode; private String flowName; private String flowStatus; ...... }
仓库地址:懒松鼠Flink-Boot脚手架由《深刻理解Flink核心设计与实践原理》做者开发。mybatis
该脚手架屏蔽掉组装Flink API细节,让跨界变得简单,使得开发者能以传统Java WEB模式的开发方式开发出具有分布式计算能力的流处理程序。
开发者彻底不须要理解分布式计算的理论知识和Flink框架的细节,即可以快速编写业务代码实现。
为了进一步提高开发者使用该脚手架开发大型项目的敏捷的程度,该脚手架工程默认集成Spring框架进行Bean管理,同时将微服务以及WEB开发领域中常常用到的框架集成进来,进一步提高开发速度。
除此以外针对目前流行的各大Java框架,该Flink脚手架工程也进行了集成,加快开发人员的编码速度,好比:
Flink-Boot ├── Flink-Base -- Flink-Boot工程基础模块 ├── Flink-Client -- Flink-Boot 客户端模块 ├── flink-annotation -- 注解生效模块 ├── flink-mybatis -- mybatis orm模块 ├── flink-retry -- 注解重试机制模式 ├── flink-validate -- 校验模块 ├── flink-sql -- Flink SQL解耦至XML配置模块 ├── flink-cache-annotation -- 接口缓冲模块 ├── flink-junit -- 单元测试模块 ├── flink-apollo -- 阿波罗配置客户端模块
技术 | 名称 | 状态 |
---|---|---|
Spring Framework | 容器 | 已集成 |
Spring 基于XML方式配置Bean | 装配Bean | 已集成 |
Spring 基于注解方式配置Bean | 装配Bean | 已集成 |
Spring 基于注解声明方法重试机制 | Retry注解 | 已集成 |
Spring 基于注解声明方法缓存 | Cache注解 | 已集成 |
Hibernate Validator | 校验框架 | 已集成 |
Druid | 数据库链接池 | 已集成 |
MyBatis | ORM框架 | 已集成 |
Kafka | 消息队列 | 已集成 |
HDFS | 分布式文件系统 | 已集成 |
Log4J | 日志组件 | 已集成 |
Junit | 单元测试 | 已集成 |
Mybatis-Plus | MyBatis扩展包 | 进行中 |
PageHelper | MyBatis物理分页插件 | 进行中 |
ZooKeeper | 分布式协调服务 | 进行中 |
Dubbo | 分布式服务框架 | 进行中 |
Redis | 分布式缓存数据库 | 进行中 |
Solr & Elasticsearch | 分布式全文搜索引擎 | 进行中 |
Ehcache | 进程内缓存框架 | 进行中 |
sequence | 分布式高效ID生产 | 进行中 |
Dubbole消费者 | 服务消费者 | 进行中 |
Spring eurake消费者 | 服务消费者 | 进行中 |
Apollo配置中心 | 携程阿波罗配置中心 | 进行中 |
Spring Config配置中心 | Spring Cloud Config配置中心 | 进行中 |
下面是集成Spring生态的基础手册.
该容器模式配置了JdbcTemplate实例,数据库链接池采用Druid,在业务方法中只须要获取容器中的JdbcTemplate实例即可以快速与关系型数据库进行交互,dataService实例封装了一些访问数据库表的方法。
<beans ...... default-lazy-init="true" default-init-method="init"> <context:property-placeholder location="classpath:config.properties"/> <bean id="druidDataSource" class="com.alibaba.druid.pool.DruidDataSource"> <property name="driverClassName" value="com.mysql.jdbc.Driver"></property> <property name="url" value="${jdbc.url}"></property> <property name="username" value="${jdbc.user}"></property> <property name="password" value="${jdbc.password}"></property> </bean> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <constructor-arg ref="druidDataSource"></constructor-arg> </bean> <bean id="dataService" class="com.intsmaze.flink.base.service.DataService"> <property name="jdbcTemplate" ref="jdbcTemplate"></property> </bean> </beans>
jdbc.user = intsmaze jdbc.password = intsmaze jdbc.url = jdbc:mysql://127.0.0.1:3306/flink-boot?useUnicode=true&characterEncoding=UTF-8
以下是SimpleClient(com.intsmaze.flink.client.SimpleClient)类的示例代码,该类继承了BaseFlink,能够看到对应实现的方法中分别设置以下:
/** * github地址: https://github.com/intsmaze * 博客地址:https://www.cnblogs.com/intsmaze/ * 出版书籍《深刻理解Flink核心设计与实践原理》 随书代码 * * @auther: intsmaze(刘洋) * @date: 2020/10/15 18:33 */ public class SimpleClient extends BaseFlink { public static void main(String[] args) throws Exception { SimpleClient topo = new SimpleClient(); topo.run(ParameterTool.fromArgs(args)); } @Override public String getTopoName() { return "SimpleClient"; } @Override public String getConfigName() { return "topology-base.xml"; } @Override public String getPropertiesName() { return "config.properties"; } @Override public void createTopology(StreamExecutionEnvironment builder) { DataStream<String> inputDataStrem = env.addSource(new SimpleDataSource()); DataStream<String> processDataStream = inputDataStrem.flatMap(new SimpleFunction()); processDataStream.print("输出结果"); } }
采用自定义数据源,用户须要编写自定义DataSource类,该类须要继承XXX抽象类,实现以下方法。
public class SimpleDataSource extends CommonDataSource { private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); ...... @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ...//构造读取各种外部系统数据的链接实例 } @Override public String sendMess() throws InterruptedException { Thread.sleep(1000); ...... MainData mainData = new MainData(); ......//经过外部系统数据的链接实例读取外部系统数据,封装进MainData对象中,而后返回便可。 return gson.toJson(mainData); } }
本做业计算的业务逻辑在Flink转换操做符中进行实现,通常来讲开发者只须要实现flatMap算子便可以知足大部分算子的使用。
用户编写的自定义类须要继承com.intsmaze.flink.base.transform.CommonFunction抽象类,均需实现以下方法。
public class SimpleFunction extends CommonFunction { private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); @Override public String execute(String message) throws Exception { FlowData flowData = gson.fromJson(message, new TypeToken<FlowData>() { }.getType()); String flowUUID = dataService.findUUID(flowData); if (StringUtils.isBlank(flowUUID)) { flowUUID = UUID.randomUUID().toString(); flowData.setUuid(flowUUID); dataService.insertFlow(flowData); } return gson.toJson(flowData); } }
CommonFunction抽象类中默认在open方法中经过BeanFactory对象获取到了Spring容器中对于的dataService实例,对于Spring中的其余实例同理在SimpleFunction类中的open方法中获取便可。
public abstract class CommonFunction extends RichFlatMapFunction<String, String> { private IntCounter numLines = new IntCounter(); protected DataService dataService; protected ApplicationContext beanFactory; @Override public void open(Configuration parameters) { getRuntimeContext().addAccumulator("num-FlatMap", this.numLines); ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters); dataService = beanFactory.getBean(DataService.class); } @Override public void flatMap(String value, Collector<String> out) throws Exception { this.numLines.add(1); String execute = execute(value); if (StringUtils.isNotBlank(execute)) { out.collect(execute); } } public abstract String execute(String message) throws Exception; }
能够根据状况选择重写open(Configuration parameters)方法,同时重写的open(Configuration parameters)方法的第一行要调用父类的open(Configuration parameters)方法。
public void open(Configuration parameters){ super.open(parameters); ...... //获取在Spring配置文件中配置的实例 XXX xxx=beanFactory.getBean(XXX.class); }
在自定义的Topology类编写Main方法,建立自定义的Topology对象后,调用对象的run(...)方法。
public class SimpleClient extends BaseFlink {
/** * 本地启动参数 -isLocal local * 集群启动参数 -isIncremental isIncremental */ public static void main(String[] args) throws Exception { SimpleClient topo = new SimpleClient(); topo.run(ParameterTool.fromArgs(args)); } .......