原文连接:https://www.baeldung.com/spring-cloud-data-flow-etljava
做者:Norberto Ritzmannspring
译者:Emmasql
Spring Cloud Data Flow是一个用于构建实时数据管道和批处理过程的云原生工具包。 Spring Cloud Data Flow已准备好用于一系列数据处理用例,如简单的导入/导出,ETL处理,事件流和预测分析。mongodb
在本教程中,咱们将学习使用流管道实时提取转换和加载(ETL)的示例,该管道从JDBC数据库中提取数据,将其转换为简单的POJO并将其加载到MongoDB中。docker
ETL - 提取,转换和加载 -一般被认为将数据从多个数据库和系统批量加载到公共数据仓库中的过程。在此数据仓库中,能够在不影响系统总体性能的状况下进行大量数据分析处理。shell
然而,新趋势正在改变如何作到这一点的方式。 ETL仍然能够将数据传输到数据仓库和数据池。数据库
如今,能够借助Spring Cloud Data Flow的事件流体系架构使用流来完成此操做。服务器
借助Spring Cloud Data Flow(SCDF),开发人员能够建立两种风格的数据管道:架构
使用Spring Cloud Stream的长效实时流应用程序app
使用Spring Cloud Task的批处理短时间任务应用程序
在本文中,咱们将介绍第一个,基于Spring Cloud Stream的长效流媒体应用程序。
SCDF管道流由不一样的步骤组成,其中每一步都是使用Spring Cloud Stream微框架以Spring Boot样式构建的应用程序。这些应用程序集成了像Apache Kafka或RabbitMQ等的消息中间件。
这些应用程序分为源,处理器和接收器。与ETL过程相比,咱们能够说源是“提取”,处理器是“转换器”,接收器是“加载”部分。
在某些状况下,咱们能够在管道的一个或多个步骤中使用应用程序启动器。这意味着咱们不须要为每一步实现新的应用程序,而是配置已实现的现有应用程序启动器。
能够在此处找到应用程序启动器列表。
在开始以前,咱们须要选择这个复杂部署的部分。要定义的第一部分是SCDF服务器。
为了进行测试,咱们将使用SCDF Server Local进行本地开发。对于生产部署,咱们稍后能够选择云本机运行时,如SCDF Server Kubernetes。咱们能够在这里找到服务器运行列表。
如今,检查系统要求是否知足运行此服务器。
要运行SCDF服务器,咱们必须定义并设置两个依赖项:
消息中间件
关系数据库管理系统(the RDBMS)
咱们将使用RabbitMQ做为消息中间件,咱们选择PostgreSQL做为RDBMS来存储咱们的管道流定义。
为了运行RabbitMQ,能够在此处下载最新版本并使用默认配置启动RabbitMQ实例或运行如下Docker命令:
docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
最后的设置步骤:在默认端口5432上安装并运行PostgreSQL RDBMS。以后,建立一个数据库,SCDF可使用如下脚本存储其流定义:
CREATE DATABASE dataflow;
咱们能够选择使用docker-compose启动服务器,或者将其做为Java应用程序启动来运行SCDF Server Local。
在这里,咱们将SCDF Server Local做为Java应用程序运行。为了配置应用程序,咱们必须将配置定义为Java应用程序参数。咱们在系统路径中须要配置Java 8。
为了托管jar和依赖项,咱们须要为SCDF Server建立一个主文件夹,并将SCDF Server Local发行版下载到此文件夹中。您能够在此处下载SCDF Server Local最新分行版。
此外,咱们须要建立一个lib文件夹并在其中放置JDBC驱动程序。这里提供了最新版本的PostgreSQL驱动程序。
最后,运行SCDF本地服务器:
$java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \
--spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \
--spring.datasource.username=postgres_username \
--spring.datasource.password=postgres_password \
--spring.datasource.driver-class-name=org.postgresql.Driver \
--spring.rabbitmq.host=127.0.0.1 \
--spring.rabbitmq.port=5672 \
--spring.rabbitmq.username=guest \
--spring.rabbitmq.password=guest
咱们能够经过查看此URL来检查它是否正在运行:http://localhost:9393/dashboard
SCDF Shell是一个命令行工具,能够轻松组合和部署咱们的应用程序和管道。这些Shell命令在Spring Cloud Data Flow Server REST API上运行。
在此处得到最新版本的jar,而且下载到SCDF主文件夹中。完成后,运行如下命令(根据须要更新版本):
$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar
____ ____ _ __
/ ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| |
\___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` |
___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| |
|____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_|
____ |_| _ __|___/ __________
| _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \
| | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \
| |_| | (_| | || (_| | | _| | | (_) \ V V / / / / / / /
|____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/
Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
若是最后一行中得到“server-unknown:>”而不是“dataflow:>”,则表示您没有在localhost上运行SCDF服务器。在这种状况下,请运行如下命令以链接到另外一台主机:
server-unknown:>dataflow config server http://{host}
如今,Shell链接到SCDF服务器,咱们能够运行咱们的命令。
咱们在Shell中须要作的第一件事就是导入应用程序启动器。在Spring Boot 2.0.x中找到RabbitMQ + Maven的最新版本,并运行如下命令(再次声明,根据须要更新版本,此处为“Darwin-SR1”):
$ dataflow:>app import --uri http://bit.ly/Darwin-SR1-stream-applications-rabbit-maven
检查应用程序是否安装完成,请运行如下Shell命令:
$ dataflow:> app list
所以,咱们应该看到一个包含全部已安装应用程序的表。
此外,SCDF提供了一个名为Flo的图形界面,咱们能够经过如下地址访问:http://localhost:9393/dashboard。可是,它的使用不在本文的范围内。
咱们如今建立咱们的流管道。为此,咱们将使用JDBC Source应用启动程序从关系数据库中提取信息。
此外,咱们将建立一个自定义处理器,用于转换信息结构和一个自定义接收器,将数据加载到MongoDB中。
建立一个名为crm的数据库和一个名为customer的表:
CREATE DATABASE crm;
CREATE TABLE customer (
id bigint NOT NULL,
imported boolean DEFAULT false,
customer_name character varying(50),
PRIMARY KEY(id)
)
请注意,咱们正在使用导入的标志,该标志将存储已导入的记录。若有必要,咱们还能够将此信息存储在另外一个表中。
如今,插入一些数据:
INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);
对于转换步骤,咱们将把源表customer_name字段简单转换为新字段名称。其余转换能够在这里完成,但尽可能保持简短例子。
为此,咱们将建立一个名为customer-transform的新项目。最简单的方法是使用Spring Initializr站点建立项目。看到网站后,选择一个Group和一个Artifact名称。咱们将分别使用com.customer和customer-transform。
完成后,单击“生成项目”按钮下载项目。而后,解压缩项目并将其导入喜欢的IDE,并将如下依赖项添加到pom.xml:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
如今咱们开始为字段名称转换进行编码。为此,咱们将建立Customer类做为适配器。此类将经过setName()方法接收customer_name,并将经过getName方法输出其值。
@JsonProperty注释在JSON反序列化到Java时执行转换:
public class Customer {
private Long id;
private String name;
@JsonProperty("customer_name")
public void setName(String name) {
this.name = name;
}
@JsonProperty("name")
public String getName() {
return name;
}
// Getters and Setters
}
处理器须要从输入接收数据,进行转换并将结果绑定到输出通道。建立一个类来执行此操做:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;
@EnableBinding(Processor.class)
public class CustomerProcessorConfiguration {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Customer convertToPojo(Customer payload) {
return payload;
}
}
在上面的代码中,咱们能够观察到转换是自动发生的。输入接收JSON数据,Jackson使用set方法将其反序列化为Customer对象。
与输入相反,输出使用get方法将数据序列化为JSON。
与转换步骤相似,咱们将建立另外一个maven项目,名为customer-mongodb-sink。再次,访问Spring Initializr,Group名为com.customer,Artifact名为customer-mongodb-sink。而后,在依赖项搜索框中键入“MongoDB”并下载项目。
接下来,项目解压缩并导入IDE.
而后,添加与customer-transform项目中相同的额外依赖项。
如今咱们将建立另外一个Customer类,用于在此步骤中接收输入:
import org.springframework.data.mongodb.core.mapping.Document;
@Document(collection="customer")
public class Customer {
private Long id;
private String name;
// Getters and Setters
}
为了接收Customer,咱们将建立一个Listener类,它将使用CustomerRepository保存客户实体:
@EnableBinding(Sink.class)
public class CustomerListener {
@Autowired
private CustomerRepository repository;
@StreamListener(Sink.INPUT)
public void save(Customer customer) {
repository.save(customer);
}
}
在这种状况下,CustomerRepository是Spring Data的MongoRepository:
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface CustomerRepository extends MongoRepository<Customer, Long> {
}
如今,两个自定义应用程序均可以在SCDF服务器上注册。为了实现这个目标,先使用Maven命令mvn install编译这两个项目。
以后,使用Spring Cloud Data Flow Shell注册它们:
app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT
app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT
最后,检查应用程序是否存储在SCDF中,在shell中运行application list命令:
app list
所以,咱们应该在结果表中看到这两个应用程序。
DSL定义应用程序之间的配置和数据流。 SCDF DSL很简单。在第一个单词中,咱们定义应用程序的名称,而后是配置。
此外,语法是受Unix启发的Pipeline语法,它使用垂直条(也称为“管道”)来链接多个应用程序:
http --port=8181 | log
建立端口是8181HTTP应用程序,该应用程序将收到的任何正文有效负载发送到日志。
如今,让咱们看看如何建立JDBC Source的DSL流定义。
JDBC Source的关键配置是查询和更新。查询将选择未读记录,而更新将更改标志以防止从新读取当前记录。
此外,咱们将定义JDBC Source以30秒的固定延迟轮询并轮询最多1000行。最后,咱们将定义链接的配置,如驱动程序,用户名,密码和链接URL:
jdbc
--query='SELECT id, customer_name FROM public.customer WHERE imported = false'
--update='UPDATE public.customer SET imported = true WHERE id in (:id)'
--max-rows-per-poll=1000
--fixed-delay=30 --time-unit=SECONDS
--driver-class-name=org.postgresql.Driver
--url=jdbc:postgresql://localhost:5432/crm
--username=postgres
--password=postgres
能够在此处找到更多JDBC Source配置属性。
因为咱们没有在customer-mongodb-sink的application.properties中定义链接配置,咱们将经过DSL参数进行配置。
咱们的应用程序彻底基于MongoDataAutoConfiguration。您能够在此处查看其余可能的配置。基本上,咱们将定义spring.data.mongodb.uri:
customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main
首先,要建立最终的流定义,请返回到Shell并执行如下命令(没有换行符,刚刚插入它们以便于阅读):
stream create --name jdbc-to-mongodb
--definition "jdbc
--query='SELECT id, customer_name FROM public.customer WHERE imported=false'
--fixed-delay=30
--max-rows-per-poll=1000
--update='UPDATE customer SET imported=true WHERE id in (:id)'
--time-unit=SECONDS
--password=postgres
--driver-class-name=org.postgresql.Driver
--username=postgres
--url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink
--spring.data.mongodb.uri=mongodb://localhost/main"
此DSL流被定义名为jdbc-to-mongodb。接下来,咱们将按名称部署流:
stream deploy --name jdbc-to-mongodb
最后,咱们应该在日志输出中看到全部可用日志的位置:
Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink
Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform
Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc
在本文中,咱们已经看到了使用Spring Cloud Data Flow的ETL数据管道的完整示例。
最值得注意的是,咱们看到了应用启动程序的配置,使用Spring Cloud Data Flow Shell建立了一个ETL流管道,并为咱们的读取,转换和写数据实现了自定义应用程序。
与往常同样,示例代码能够在GitHub中找到。