confluent实时同步sqlserver数据到kafka

安装准备

192.168.23.132 (主机名spark01)
192.168.23.133 (主机名spark02)
192.168.23.134 (主机名spark03)java

confluent-5.4.1-2.12.tar.gz
下载地址 https://www.confluent.io/download/ 选择右边Self managed software下载
在这里插入图片描述web

自行安装并启动

jdk 1.8
zk集群 (本demo—zk地址192.168.23.132:2181,192.168.23.133:2181,192.168.23.134:2181)
kafka集群(本demo—kafka地址192.168.23.132:9092,192.168.23.133:9092,192.168.23.134:9092)sql

安装confluent集群

1.解压confluent-5.4.1-2.12.tar.gz到/export/servers/express

mkdir -p /export/servers/
tar -zxvf confluent-5.4.1-2.12.tar.gz -C /export/servers/

2.修改配置文件
cd /export/servers/confluent-5.4.1/etc/schema-registry
编辑connect-avro-distributed.propertiesapache

# Sample configuration for a distributed Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=192.168.23.132:9092,192.168.23.133:9092,192.168.23.134:9092

# The group ID is a unique identifier for the set of workers that form a single Kafka Connect
# cluster
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
#key.converter=io.confluent.connect.avro.AvroConverter
#key.converter.schema.registry.url=http://localhost:8081
#value.converter=io.confluent.connect.avro.AvroConverter
#value.converter.schema.registry.url=http://localhost:8081
key.converter.schema.registry.url=http://192.168.23.132:18081,http://192.168.23.133:18081,http://192.168.23.134:18081
value.converter.schema.registry.url=http://192.168.23.132:18081,http://192.168.23.133:18081,http://192.168.23.134:18081
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter




# Internal Storage Topics.
#
# Kafka Connect distributed workers store the connector and task configurations, connector offsets,
# and connector statuses in three internal topics. These topics MUST be compacted.
# When the Kafka Connect distributed worker starts, it will check for these topics and attempt to create them
# as compacted topics if they don't yet exist, using the topic name, replication factor, and number of partitions
# as specified in these properties, and other topic-specific settings inherited from your brokers'
# auto-creation settings. If you need more control over these other topic-specific settings, you may want to
# manually create these topics before starting Kafka Connect distributed workers.
#
# The following properties set the names of these three internal topics for storing configs, offsets, and status.
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

# The following properties set the replication factor for the three internal topics, defaulting to 3 for each
# and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with
# only a single broker, we set the replication factor here to just 1. That's okay for the examples, but
# ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of 
# losing connector offsets, configurations, and status.
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

# The config storage topic must have a single partition, and this cannot be changed via properties. 
# Offsets for all connectors and tasks are written quite frequently and therefore the offset topic
# should be highly partitioned; by default it is created with 25 partitions, but adjust accordingly
# with the number of connector tasks deployed to a distributed worker cluster. Kafka Connect records
# the status less frequently, and so by default the topic is created with 5 partitions.
#offset.storage.partitions=25
#status.storage.partitions=5

# The offsets, status, and configurations are written to the topics using converters specified through
# the following required properties. Most users will always want to use the JSON converter without schemas. 
# Offset and config data is never visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=0.0.0.0
rest.port=18083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=0.0.0.0
rest.advertised.port=18083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=share/java

编辑schema-registry.propertiesjson

#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# The address the socket server listens on.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=http://0.0.0.0:18081

# Zookeeper connection string for the Zookeeper cluster used by your Kafka cluster
# (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
kafkastore.connection.url=192.168.23.132:2181,192.168.23.133:2181,192.168.23.134:2181

# Alternatively, Schema Registry can now operate without Zookeeper, handling all coordination via
# Kafka brokers. Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the master schema registry instance and for storing the data for
# registered schemas.
# (Note that you cannot mix the two modes; use this mode only on new deployments or by shutting down
# all instances, switching to the new configuration, and then starting the schema registry
# instances again.)
#kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092

# The name of the topic to store schemas in
kafkastore.topic=_schemas

# If true, API requests that fail will include extra debugging information, including stack traces
debug=false

3.下载debezium-connector-sqlserver实时同步组件
实时同步sqlserver数据debezium-connector-sqlserver.zip
将此组件解压到 /export/servers/confluent-5.4.1/share/java 目录下
在这里插入图片描述
4.新增实时同步配置
在/export/servers/confluent-5.4.1/etc目录下新增文件夹kafka-connect-debezium,并在里面新增同步配置文件
cd /export/servers/confluent-5.4.1/etc
mkdir kafka-connect-debezium
cd kafka-connect-debezium
vim register_test.jsonbootstrap

{
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "database.dbname": "Pos",          #sqlserver 库名
        "database.history.kafka.bootstrap.servers": "192.168.23.132:9092,192.168.23.133:9092,192.168.23.134:9092",
        "database.history.kafka.topic": "dbhistory.shop_sync",
        "database.hostname": "192.x.x.1",  #sqlserver ip
        "database.password": "12345",      #sqlserver 密码
        "database.port": "50364",          #sqlserver 端口
        "database.server.name": "shop_sync",
        "database.user": "sa",             #sqlserver 用户名
        "decimal.handling.mode": "double",
        "snapshot.mode": "initial_schema_only",
       "table.whitelist":"dbo.Park"        #sqlserver 表名
    },
    "name": "shop_sync-connector"
}

全部配置已完成,将confluent-5.4.1文件夹分发到spark0二、spark03服务器,配置不用作任何调整
scp -r /export/servers/confluent-5.4.1 root@spark02:/export/servers/confluent-5.4.1
scp -r /export/servers/confluent-5.4.1 root@spark03:/export/servers/confluent-5.4.1vim

先进入kafka实时消费,以便启动confluent组件后查看数据是否同步成功
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic shop_sync.dbo.Park浏览器

5.启动confluent集群
分别在三台服务器执行 a , b 2步启动步骤,再执行 c 步骤,d便可查看任务bash

a.启动confluent  schema注册
		cd /export/servers/confluent-5.4.1 && ./bin/schema-registry-start -daemon ./etc/schema-registry/schema-registry.properties 
	b.启动confluent集群
		cd /export/servers/confluent-5.4.1 &&./bin/connect-distributed -daemon ./etc/schema-registry/connect-avro-distributed.properties 
	c.提交任务	
		cd /export/servers/confluent-5.4.1/etc/kafka-connect-debezium
		curl -X POST -H "Content-Type:application/json" --data @register_test.json http://192.168.23.132:18083/connectors
	d.查看任务列表
		curl -s 192.168.23.132:18083/connectors
	e.查看任务状态
		curl -s 192.168.23.132:18083/connectors/ticket-master-connector/status
	f.删除任务  
		curl -s -X DELETE 192.168.23.132:18083/connectors/ticket-master-connector
	g.重启任务
		# curl -s -X PUT 192.168.23.132:18083/connectors/shop-server-connector/pause
		# curl -s -X PUT 192.168.23.132:18083/connectors/shop-server-connector/resume

当提交任务执行完后,出现如下结果表示成功
在这里插入图片描述
也能够在浏览器中查看任务
http://192.168.23.132:18083/connectors
在这里插入图片描述