Skip to main content

准备工作

完整的数据流是这样的 业务系统-> kafka服务器 ->kafka connect 客户端 -> elasticsearch。 我们的准备工作包括:

  1. 运行中的 Kafka服务器
  2. Elasticsearch 服务
  3. kafka connect 客户端, kafka下载后直接包含了kafka connect。下载地址: https://kafka.apache.org/downloads
  4. 下载 Elasticsearch 连接器插件,

为了方便调试还可以准备 kafka客户端,比如: offset explorer ;es 客户端 。

示例数据说明

我们以用户登录数据为例, 我们先使用 offset explorer 在kafka里 创建一个user-log 的topic。然后往这个topic里写入一条数据

{
"user_id":"u1",
"name":"张三",
"span": 123
}

配置

对于 Kafka Connect,你可以选择独立模式(Standalone)或分布式模式(Distributed)。这里以分布式模式为例,因为它更适合生产环境。 配置过程

1. 下载并安装 Elasticsearch 连接器

可以到官网下载 https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch

2. 配置 Kafka Connect

创建或编辑 connect-standalone.properties 文件:

bootstrap.servers=10.20.200.168:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=D:/soft/kafka_2.13-3.9.1/plugins

plugin.path 指向上面下载的插件路径

3. 配置 Elasticsearch 连接器

创建一个配置文件 elasticsearch-sink.properties:

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1

# 要同步的 Kafka 主题
topics=user-log,topic2,topic3
# 指定 Topic 到索引的映射关系
topic.index.map=user-log:user-log,topic2:es-index-2,topic3:es-index-3

connection.url=http://elasticsearch:9200
connection.username=elastic
connection.password=123456
type.name=_doc
key.ignore=true
schema.ignore=true
schemas.enable=false

启动 Kafka Connect 服务

./bin/connect-distributed.sh config/connect-distributed.properties es.properties
  1. 验证数据同步 在 Elasticsearch 中查询数据:

bash curl -X GET "http://elasticsearch:9200/user-events/_search?pretty"

注意事项

  • 版本兼容性:确保 Kafka Connect 和 Elasticsearch 版本兼容,避免因版本不匹配导致的问题。
  • 数据格式:Kafka 消息格式需要与 Elasticsearch 索引映射匹配。如果消息包含嵌套结构,Elasticsearch 可能需要适当的映射配置。
  • 错误处理:配置适当的错误处理策略,如重试机制、死信队列等,以应对网络波动或 Elasticsearch 临时不可用的情况。
  • 性能优化:
    • 根据数据量调整 tasks.max 参数
    • 合理设置 batch.size 和 max.in.flight.requests
    • 监控并调整缓冲区大小
  • 安全配置:
    • 如果 Elasticsearch 启用了安全认证,需要配置用户名、密码或 API Key
    • 考虑使用 SSL/TLS 加密通信
  • 索引管理:
    • 可以配置 topics.dir 和 topic.index.map 参数自定义索引名称
    • 考虑设置索引生命周期管理(ILM)策略
    • 幂等性:默认情况下,连接器不保证幂等写入。如果需要幂等性,需要额外配置。

通过以上步骤和注意事项,你可以成功配置 Kafka Connect 将数据从 Kafka 写入 Elasticsearch,并确保系统稳定运行。