192.168.23.132 (主机名spark01)
192.168.23.133 (主机名spark02)
192.168.23.134 (主机名spark03)java
confluent-5.4.1-2.12.tar.gz
下载地址 https://www.confluent.io/download/ 选择右边Self managed software下载
web
jdk 1.8
zk集群 (本demo—zk地址192.168.23.132:2181,192.168.23.133:2181,192.168.23.134:2181)
kafka集群(本demo—kafka地址192.168.23.132:9092,192.168.23.133:9092,192.168.23.134:9092)sql
1.解压confluent-5.4.1-2.12.tar.gz到/export/servers/express
mkdir -p /export/servers/ tar -zxvf confluent-5.4.1-2.12.tar.gz -C /export/servers/
2.修改配置文件
cd /export/servers/confluent-5.4.1/etc/schema-registry
编辑connect-avro-distributed.propertiesapache
# Sample configuration for a distributed Kafka Connect worker that uses Avro serialization and # integrates the the Schema Registry. This sample configuration assumes a local installation of # Confluent Platform with all services running on their default ports. # Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated. bootstrap.servers=192.168.23.132:9092,192.168.23.133:9092,192.168.23.134:9092 # The group ID is a unique identifier for the set of workers that form a single Kafka Connect # cluster group.id=connect-cluster # The converters specify the format of data in Kafka and how to translate it into Connect data. # Every Connect user will need to configure these based on the format they want their data in # when loaded from or stored into Kafka #key.converter=io.confluent.connect.avro.AvroConverter #key.converter.schema.registry.url=http://localhost:8081 #value.converter=io.confluent.connect.avro.AvroConverter #value.converter.schema.registry.url=http://localhost:8081 key.converter.schema.registry.url=http://192.168.23.132:18081,http://192.168.23.133:18081,http://192.168.23.134:18081 value.converter.schema.registry.url=http://192.168.23.132:18081,http://192.168.23.133:18081,http://192.168.23.134:18081 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Internal Storage Topics. # # Kafka Connect distributed workers store the connector and task configurations, connector offsets, # and connector statuses in three internal topics. These topics MUST be compacted. # When the Kafka Connect distributed worker starts, it will check for these topics and attempt to create them # as compacted topics if they don't yet exist, using the topic name, replication factor, and number of partitions # as specified in these properties, and other topic-specific settings inherited from your brokers' # auto-creation settings. If you need more control over these other topic-specific settings, you may want to # manually create these topics before starting Kafka Connect distributed workers. # # The following properties set the names of these three internal topics for storing configs, offsets, and status. config.storage.topic=connect-configs offset.storage.topic=connect-offsets status.storage.topic=connect-statuses # The following properties set the replication factor for the three internal topics, defaulting to 3 for each # and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with # only a single broker, we set the replication factor here to just 1. That's okay for the examples, but # ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of # losing connector offsets, configurations, and status. config.storage.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 # The config storage topic must have a single partition, and this cannot be changed via properties. # Offsets for all connectors and tasks are written quite frequently and therefore the offset topic # should be highly partitioned; by default it is created with 25 partitions, but adjust accordingly # with the number of connector tasks deployed to a distributed worker cluster. Kafka Connect records # the status less frequently, and so by default the topic is created with 5 partitions. #offset.storage.partitions=25 #status.storage.partitions=5 # The offsets, status, and configurations are written to the topics using converters specified through # the following required properties. Most users will always want to use the JSON converter without schemas. # Offset and config data is never visible outside of Connect in this format. internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors # that will report audit data that can be displayed and analyzed in Confluent Control Center # producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor # consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor # These are provided to inform the user about the presence of the REST host and port configs # Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests. #rest.host.name=0.0.0.0 rest.port=18083 # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers. #rest.advertised.host.name=0.0.0.0 rest.advertised.port=18083 # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, # Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a # directory other than the home directory of Confluent Platform. plugin.path=share/java
编辑schema-registry.propertiesjson
# # Copyright 2018 Confluent Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # The address the socket server listens on. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=http://0.0.0.0:18081 # Zookeeper connection string for the Zookeeper cluster used by your Kafka cluster # (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". kafkastore.connection.url=192.168.23.132:2181,192.168.23.133:2181,192.168.23.134:2181 # Alternatively, Schema Registry can now operate without Zookeeper, handling all coordination via # Kafka brokers. Use this setting to specify the bootstrap servers for your Kafka cluster and it # will be used both for selecting the master schema registry instance and for storing the data for # registered schemas. # (Note that you cannot mix the two modes; use this mode only on new deployments or by shutting down # all instances, switching to the new configuration, and then starting the schema registry # instances again.) #kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092 # The name of the topic to store schemas in kafkastore.topic=_schemas # If true, API requests that fail will include extra debugging information, including stack traces debug=false
3.下载debezium-connector-sqlserver实时同步组件
实时同步sqlserver数据debezium-connector-sqlserver.zip
将此组件解压到 /export/servers/confluent-5.4.1/share/java 目录下
4.新增实时同步配置
在/export/servers/confluent-5.4.1/etc目录下新增文件夹kafka-connect-debezium,并在里面新增同步配置文件
cd /export/servers/confluent-5.4.1/etc
mkdir kafka-connect-debezium
cd kafka-connect-debezium
vim register_test.jsonbootstrap
{ "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.dbname": "Pos", #sqlserver 库名 "database.history.kafka.bootstrap.servers": "192.168.23.132:9092,192.168.23.133:9092,192.168.23.134:9092", "database.history.kafka.topic": "dbhistory.shop_sync", "database.hostname": "192.x.x.1", #sqlserver ip "database.password": "12345", #sqlserver 密码 "database.port": "50364", #sqlserver 端口 "database.server.name": "shop_sync", "database.user": "sa", #sqlserver 用户名 "decimal.handling.mode": "double", "snapshot.mode": "initial_schema_only", "table.whitelist":"dbo.Park" #sqlserver 表名 }, "name": "shop_sync-connector" }
全部配置已完成,将confluent-5.4.1文件夹分发到spark0二、spark03服务器,配置不用作任何调整
scp -r /export/servers/confluent-5.4.1 root@spark02:/export/servers/confluent-5.4.1
scp -r /export/servers/confluent-5.4.1 root@spark03:/export/servers/confluent-5.4.1vim
先进入kafka实时消费,以便启动confluent组件后查看数据是否同步成功
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic shop_sync.dbo.Park浏览器
5.启动confluent集群
分别在三台服务器执行 a , b 2步启动步骤,再执行 c 步骤,d便可查看任务bash
a.启动confluent schema注册 cd /export/servers/confluent-5.4.1 && ./bin/schema-registry-start -daemon ./etc/schema-registry/schema-registry.properties b.启动confluent集群 cd /export/servers/confluent-5.4.1 &&./bin/connect-distributed -daemon ./etc/schema-registry/connect-avro-distributed.properties c.提交任务 cd /export/servers/confluent-5.4.1/etc/kafka-connect-debezium curl -X POST -H "Content-Type:application/json" --data @register_test.json http://192.168.23.132:18083/connectors d.查看任务列表 curl -s 192.168.23.132:18083/connectors e.查看任务状态 curl -s 192.168.23.132:18083/connectors/ticket-master-connector/status f.删除任务 curl -s -X DELETE 192.168.23.132:18083/connectors/ticket-master-connector g.重启任务 # curl -s -X PUT 192.168.23.132:18083/connectors/shop-server-connector/pause # curl -s -X PUT 192.168.23.132:18083/connectors/shop-server-connector/resume
当提交任务执行完后,出现如下结果表示成功
也能够在浏览器中查看任务
http://192.168.23.132:18083/connectors