HomeMogDBMogDB StackUqbar

Documentation:v1.1

Supported Versions:

Data Imported into a Time-Series Table from Kafka

Sensor or monitoring data can be imported into a time-series table from Kafka. Kafka is connected to Uqbar using a Kafka JDBC connector. The connector is used for reading data from Kafka, sending SQLs in a message to Uqbar for execution, recording the failure result, and keeping the message position. Data should be inserted into a database in batches.

jwbsehxhqw

Installation Process

Download Kafka

Access Kafka official website to download a Kafka software package and then decompress it.

jwbsehxhqw

Download a Database Connector

Download a JDBC driver corresponding to the target database, such as openGauss. Access openGauss official website to download a JDBC driver corresponding to the target operating system.

jwbsehxhqw

After the download is finished, decompress it to obtain a JAR package.

jwbsehxhqw

Copy the JAR package to the kafka path, such as /kafka_2.12-3.2.1/libs.

Download a Kafka JDBC Connector

Access https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc?_ga=2.140576818.1269349503.1660542741-1354758524.1659497119 and then click Download to obtain the Kafka JDBC Connector.

jwbsehxhqw

After the download is finished, decompress it to any directory for later use.

Modify Configuration Files

server.properties

Edit the config/server.properties file in the kafka directory, such as kafka_2.12-3.2.1.

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://your.host.name:9092

Add listeners=PLAINTEXT://0.0.0.0:9092 or change the original content to it. Otherwise, the requests from the external network cannot be listened.

Add advertised.listeners=PLAINTEXT://your.host.name:9092 or change the original content to it. your.host.name indicates the IP address of the local machine. Otherwise, Kafka cannot be accessed from the external network.

connect-distributed.properties

Edit the config/connect-distributed.properties file in the kafka directory, such as kafka_2.12-3.2.1.

plugin.path=/usr/local/Cellar/confluentinc-kafka-connect-jdbc-10.5.2

plugin.path indicates the position of the confluentinc-kafka-connect-jdbc folder, such as plugin.path=/usr/local/Cellar/confluentinc-kafka-connect-jdbc-10.5.2.

Kafka Startup

  1. Open a Command Prompt or terminal to access the kafka directory, such as kafka_2.12-3.2.1.

    cd /usr/local/Cellar/kafka_2.12-3.2.1
  2. Start Zookeeper.

    sh bin/zookeeper-server-start.sh config/zookeeper.properties
  3. Start Kafka.

    sh bin/kafka-server-start.sh config/server.properties
  4. Run jps to query the Zookeeper and Kafka status.

    81243 QuorumPeerMain  #Zookeeper
    30701 Launcher
    81598 Kafka # Kafka
  5. Start Kafka Connect Worker.

    sh bin/connect-distributed.sh config/connect-distributed.properties

    If information similar to the following is displayed, the startup is successful.

    jwbsehxhqw

    Run jps to query the ConnectDistributed process. It is running.

    83931 ConnectDistributed
  6. Start JDBC Connector.

    • Method one: Start JDBC Connector.

      Open a Command Prompt or terminal to start JDBC Connector.

      curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
          "name":"connector-name",
          "config":{
              "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
              "connection.url":"jdbc:opengauss://localhost:15432/OpengaussDB",
              "connection.user":"user",
              "connection.password":"password",
              "topics":"test_topic",
              "insert.mode": "insert",
              "table.name.format":"table_name_${topic}",
              "auto.create":true,
              "dialect.name": "PostgreSqlDatabaseDialect"
          }
      }'
    • Method two: Create a JSON file to configure JDBC Connector.

      Create a JSON file named test.json, including the following content.

      {
          "name":"connector-name",
          "config":{
              "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
              "connection.url":"jdbc:opengauss://localhost:15432/OpengaussDB",
              "connection.user":"user",
              "connection.password":"password",
              "topics":"test_topic",
              "insert.mode": "insert",
              "table.name.format":"table_name_${topic}",
              "auto.create":true,
              "dialect.name": "PostgreSqlDatabaseDialect"
          }
      }

      Start Kafka in a Command Prompt or terminal.

      curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @test.json
    • Method three: Use the Postman test tool.

      jwbsehxhqw

      After parameters are set, send the post request.

      If information similar to the following is displayed, JDBC Connector is started successfully.

      {
      "name": "connector-name",
      "config": {
          "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url": "jdbc:opengauss://localhost:15432/sink",
          "connection.user": "user1",
          "connection.password": "Enmo@123",
          "topics": "test",
          "insert.mode": "insert",
          "table.name.format": "tableName_${topic}",
          "auto.create": "true",
          "dialect.name": "PostgreSqlDatabaseDialect",
          "name": "connector-name"
      },
      "tasks": [],
      "type": "sink"
      }

Data Write Example

A JDBC Connector needs to contain the following parameters at least.

"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:opengauss://localhost:15432/OpengaussDB",
"connection.user":"user",
"connection.password":"password",
"topics":"test",
"table.name.format":"tableName"

Assume the table structure of data to be written into a database is as follows.

CREATE TABLE "test" (
 "id" INT NOT NULL,
 "longitude" REAL NULL,
 "latitude" REAL NULL,
 "temperature" REAL NULL,
 "humidity" REAL NULL,
 "time" TIME NULL,
 "string_time" TEXT NULL,
 "randomString" TEXT NULL
);
curl -X POST http://localhost:8083/connectors -H "Content-Type:application/json" -d '{
    "name":"connector-name",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:opengauss://localhost:15432/sink",
        "connection.user":"user1",
        "connection.password":"password",
        "topics":"topic",
        "table.name.format":"test"
    }
}'

After sending data from the source end and starting JDBC Connector, if information similar to the following is displayed, Kafka is started successfully.

jwbsehxhqw

After performing the table query, you can find that data has been saved in the table.

jwbsehxhqw

Copyright © 2011-2024 www.enmotech.com All rights reserved.