Kafka提供两种启动方式,一种是单机版standalone,一种是分布式版distributed,与之想对应的配置文件也是分开的
单机版的配置文件:connect-standalone.properties
分布式的配置文件:connect-distributed.properties
单机启动方式:bin/connect-standalone.sh -daemon config/connect-standalone.properties config/connect-console-sink.properties
分布式启动方式:bin/connect-distributed.sh -daemon config/connect-distributed.properties
启动完之后就能通过curl的方式请求确认服务是否正常
curl localhost:8083/connectors
单机模式配置信息在提交的配置文件里面,例如:config/connect-console-sink.properties
集群模式需要通过REST API去提交
基于Debezium 的CDC为例,介绍下如何提交一个Kafka Connect source 任务:
curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors -d '{
"name": "test-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "root",
"tombstones.on.delete": "false",
"database.server.id": "1",
"database.server.name": "test-server",
"database.history.kafka.topic": "test-server-history",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"include.schema.changes": "true",
"database.serverTimezone": "Asia/Shanghai",
"database.driver": "com.mysql.jdbc.Driver",
"database.history.kafka.recovery.poll.interval.ms": "3000",
"defaultFetchSize": "1000",
"database.tinyInt1isBit": "false",
"snapshot.locking.mode": "none",
"decimal.handling.mode": "string",
}
}'通过请求 curl http://localhost:8083就能看到刚提交的source任务了
这里是一个简单的配置文件例子,对于数据格式,我们可以配置Kafka数据的key和value为Avro格式的,在配置中我们要指定
"transforms.schema.schema.registry.url": "schema.registry.url"
同时我们也要在config/connect-distributed.properties配置文件中配置avro的convertor
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
接下来以hdfs sink 为例子,介绍下如何提交一个Kafka Connect sink 任务
curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors -d '{
"name": "test-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"hive.database": "ods",
"hive.jdbc.url": "jdbc:hive2://hive-server:7001",
"hive.user": "root",
"logs.dir": "/user/hive/warehouse/ods.db/log_wals",
"topics.dir": "/user/hive/warehouse/ods.db",
"topics.regex": "mysql_account__.*_binlog",
"hdfs.url": "hdfs://namenode",
"hadoop.conf.dir": "/etc/hadoop",
"hadoop.home": "/opt/hadoop",
"rotate.interval.ms": "6000000",
"timezone": "Asia/Shanghai",
"avro.codec": "snappy",
"tasks.max": "1"
}
}'通过请求 curl http://localhost:8083就能看到刚提交的sink任务了
一下是REST API的介绍:
GET /Connectors:返回活跃的 Connector 列表
POST /Connectors:创建一个新的 Connector;请求的主体是一个包含字符串name字段和对象 config 字段的 JSON 对象。
GET /Connectors/{name}:获取指定 Connector 的信息
GET /Connectors/{name}/config:获取指定 Connector 的配置参数
PUT /Connectors/{name}/config:更新指定 Connector 的配置参数
GET /Connectors/{name}/status:获取 Connector 的当前状态,包括它是否正在运行,失败,暂停等。
GET /Connectors/{name}/tasks:获取当前正在运行的 Connector 的任务列表。
GET /Connectors/{name}/tasks/{taskid}/status:获取任务的当前状态,包括是否是运行中的,失败的,暂停的等,
PUT /Connectors/{name}/pause:暂停连接器和它的任务,停止消息处理,直到 Connector 恢复。
PUT /Connectors/{name}/resume:恢复暂停的 Connector
POST /Connectors/{name}/restart:重启 Connector
POST /Connectors/{name}/tasks/{taskId}/restart:重启单个任务
DELETE /Connectors/{name}:删除 Connector, 停止所有的任务并删除其配置
我们在任务中能配置一些transform,可以对数据进行一些处理,我们将在下一期介绍Kafka Connect的transform
包含Kafka 自带的transform以及如何开发一个自定义的transform,敬请期待,有问题也可以随时给我留言