文档中心MogDBMogDB StackUqbar

文档:v2.0

支持的版本:

支持从Kafka写入时序表

传感器或监控数据支持从Kafka写入时序表。Kafka和Uqbar可通过Kafka JDBC Connector进行连接。该连接器负责从Kafka读取数据,然后将消息中的SQL发送到Uqbar执行并记录失败消费结果,维护消息消费位置。向数据库插入数据需要使用批量插入模式。

jwbsehxhqw

安装流程

下载Kafka

访问 Kafka官网 下载Kafka软件包,下载完成后解压。

jwbsehxhqw

下载数据库驱动

下载对应数据库的JDBC驱动,以openGauss为例。访问openGauss官网下载对应操作系统的JDBC驱动。

jwbsehxhqw

下载完成后解压,得到相应的JAR包。

jwbsehxhqw

复制JAR包到kafka路径下,例如/kafka_2.12-3.2.1/libs。

下载Kafka JDBC Connector

可通过访问https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc?_ga=2.140576818.1269349503.1660542741-1354758524.1659497119,点击“Download”下载Kafka JDBC Connector。

jwbsehxhqw

下载完成后解压到任意目录供后续使用。

修改配置文件

server.properties

编辑kafka目录下的 config/server.properties 文件。

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://your.host.name:9092

修改或添加 listeners=PLAINTEXT://0.0.0.0:9092 ,否则将无法监听外网请求。

修改或添加 advertised.listeners=PLAINTEXT://your.host.name:9092your.host.name为本机的IP地址,否则外部网络将无法访问您的kafka。

connect-distributed.properties

编辑kafka目录下的 config/connect-distributed.properties 文件。

plugin.path=/usr/local/Cellar/confluentinc-kafka-connect-jdbc-10.5.2

plugin.path存放confluentinc-kafka-connect-jdbc文件夹的位置。如plugin.path=/usr/local/Cellar/confluentinc-kafka-connect-jdbc-10.5.2

启动Kafka

  1. 打开cmd/terminal并进入kafka目录,例如/kafka_2.12-3.2.1。

    cd /usr/local/Cellar/kafka_2.12-3.2.1
  2. 启动zookeeper。

    sh bin/zookeeper-server-start.sh config/zookeeper.properties
  3. 启动Kafka。

    sh bin/kafka-server-start.sh config/server.properties
  4. 输入jps查看zookeeper和kafka启动情况。

    81243 QuorumPeerMain  #Zookeeper
    30701 Launcher
    81598 Kafka # Kafka
  5. 启动Kafka Connect Worker。

    sh bin/connect-distributed.sh config/connect-distributed.properties

    如果显示如下回显信息,表示启动成功。

    jwbsehxhqw

    通过jps可以查看到ConnectDistributed进程运行。

    83931 ConnectDistributed
  6. 启动JDBC Connector。

    • 方式一:启动JDBC Connector

      通过终端或cmd 启动JDBC Connector。

      curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
          "name":"connector-name",
          "config":{
              "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
              "connection.url":"jdbc:opengauss://localhost:15432/OpengaussDB",
              "connection.user":"user",
              "connection.password":"password",
              "topics":"test_topic",
              "insert.mode": "insert",
              "table.name.format":"table_name_${topic}",
              "auto.create":true,
              "dialect.name": "PostgreSqlDatabaseDialect"
          }
      }'
    • 方式二:创建json文件配置 JDBC Connector。

      创建名为test.json的json文件,内容如下:

      {
          "name":"connector-name",
          "config":{
              "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
              "connection.url":"jdbc:opengauss://localhost:15432/OpengaussDB",
              "connection.user":"user",
              "connection.password":"password",
              "topics":"test_topic",
              "insert.mode": "insert",
              "table.name.format":"table_name_${topic}",
              "auto.create":true,
              "dialect.name": "PostgreSqlDatabaseDialect"
          }
      }

      在终端或cmd启动Kafka。

      curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @test.json
    • 方式三:使用Postman等接口测试工具

      jwbsehxhqw

      设置好参数后发送post请求。

      显示类似如下的消息表示Connector启动成功。

      {
      "name": "connector-name",
      "config": {
          "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url": "jdbc:opengauss://localhost:15432/sink",
          "connection.user": "user1",
          "connection.password": "Enmo@123",
          "topics": "test",
          "insert.mode": "insert",
          "table.name.format": "tableName_${topic}",
          "auto.create": "true",
          "dialect.name": "PostgreSqlDatabaseDialect",
          "name": "connector-name"
      },
      "tasks": [],
      "type": "sink"
      }

数据写入示例

一个JDBC Connector,至少需要包含如下参数:

"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:opengauss://localhost:15432/OpengaussDB",
"connection.user":"user",
"connection.password":"password",
"topics":"test",
"table.name.format":"tableName"

假设要写入数据库的数据表结构以及配置如下:

CREATE TABLE "test" (
 "id" INT NOT NULL,
 "longitude" REAL NULL,
 "latitude" REAL NULL,
 "temperature" REAL NULL,
 "humidity" REAL NULL,
 "time" TIME NULL,
 "string_time" TEXT NULL,
 "randomString" TEXT NULL
);
curl -X POST http://localhost:8083/connectors -H "Content-Type:application/json" -d '{
    "name":"connector-name",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:opengauss://localhost:15432/sink",
        "connection.user":"user1",
        "connection.password":"password",
        "topics":"topic",
        "table.name.format":"test"
    }
}'

在source端发送数据,启动JDBC Connector。如果终端或cmd显示如下信息,表示Kafka启动成功。

jwbsehxhqw

在数据库中查询,可以发现数据已保存到表中。

jwbsehxhqw

Copyright © 2011-2024 www.enmotech.com All rights reserved.