文档中心MogDBMogDB StackUqbar
v5.0

文档:v5.0

支持的版本:

其他版本:

wal2json

wal2json简介

MogDB兼容PostgreSQL,承载了企业对于数据处理和分析的关键需求。随着数据处理需求的多样化,特别是在实时数据复制、流处理和审计方面,存在从数据库日志中高效提取数据的迫切需求。MogDB的WAL(Write-Ahead Logging)机制,虽然提供了数据的完整性和恢复能力,但其原生格式对于实时解析和跨平台应用来说过于复杂和专用。MogDB 5.0.6及更高版本配套的wal2json插件适配PostgreSQL社区v2.5版本,能够将WAL日志转换成更加通用、易于解析和集成的JSON格式,以便于进行更广泛的数据处理和分析,大幅提升MogDB在企业级数据架构中的应用价值和灵活性。

说明:在PostgreSQL社区v2.5版本的wal2json中,遇到oldkeys中某一列为NULL,其输出会跳过,适配MogDB版本的wal2json插件对此进行了优化,oldkeys中存在值为NULL的列也会显示。


wal2json安装

手动安装

  1. 访问MogDB下载页面,下载所需版本的wal2json插件。

  2. 使用数据库实例用户(如omm)解压插件包,例如:

    tar -xzvf wal2json-2.3-x.x.x-01-CentOS-x86_64.tar.gz
  3. 进入插件所在目录下,执行make install命令。

    cd wal2json/
    make install

PTK安装

参见插件安装


修改数据库参数配置

  1. 修改数据库数据目录下的postgresql.conf(主备都需要修改)。

    wal_level = logical
    max_replication_slots = 10  # 大于1即可
    max_wal_senders = 10 # 使用pg_recvlogical工具需要设置此参数,大于1即可
  2. 修改pg_hba.conf(主备都需要修改)。

    host   replication   all     127.0.0.1/32       trust
  3. 执行完成后重启数据库。


wal2json使用示例

方法一

  1. 通过工具pg_recvlogical建立一个名为test2_slot的复制槽,通过-P参数和wal2json进行绑定,根据实际情况替换端口号、用户名等。

    pg_recvlogical -d postgres --slot test2_slot --create -P wal2json -h 127.0.0.1 -p 5001 -U test
    pg_recvlogical -d postgres -h 127.0.0.1 -p 5001 -U test --slot test2_slot --start  -f -
  2. 使用另一个会话,在SQL端进行数据库增删改操作。

    -- 创建有主键的表
    CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
    
    -- 创建没有主键的表
    CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
    
    -- 开启事务,执行DML操作
    BEGIN;
    INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
    INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
    INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
    DELETE FROM table2_with_pk WHERE a < 3;
    
    INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
    
    -- 表table2_without_pk没有主键或复制标识,因此更新操作不会加入数据流中
    UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
    COMMIT;
  3. wal2json端输出操作过程。

    {"change":[]}
    {"change":[]}
    {"change":[{"kind":"insert","schema":"public","table":"table2_with_pk","columnnames":["a","b","c"],"columntypes":["integer","character varying(30)","timestamp without time zone"],"columnvalues":[1,"Backup and Restore","2024-02-25 21:02:24.518253"]},{"kind":"insert","schema":"public","table":"table2_with_pk","columnnames":["a","b","c"],"columntypes":["integer","character varying(30)","timestamp without time zone"],"columnvalues":[2,"Tuning","2024-02-25 21:02:24.518253"]},{"kind":"insert","schema":"public","table":"table2_with_pk","columnnames":["a","b","c"],"columntypes":["integer","character varying(30)","timestamp without time zone"],"columnvalues":[3,"Replication","2024-02-25 21:02:24.518253"]},{"kind":"delete","schema":"public","table":"table2_with_pk","oldkeys":{"keynames":["a","c"],"keytypes":["integer","timestamp without time zone"],"keyvalues":[1,"2024-02-25 21:02:24.518253"]}},{"kind":"delete","schema":"public","table":"table2_with_pk","oldkeys":{"keynames":["a","c"],"keytypes":["integer","timestamp without time zone"],"keyvalues":[2,"2024-02-25 21:02:24.518253"]}},{"kind":"insert","schema":"public","table":"table2_without_pk","columnnames":["a","b","c"],"columntypes":["integer","numeric(5,2)","text"],"columnvalues":[1,2.34,"Tapir"]}]}
  4. SQL端清理测试表。

    DROP TABLE table2_with_pk;
    DROP TABLE table2_without_pk;
  5. 删除逻辑复制槽。

    pg_recvlogical -d postgres -S test2_slot -p 5001 --drop -h 127.0.0.1

方法二

  1. 使用逻辑复制函数pg_create_logical_replication_slot,也可以建立逻辑复制槽,与wal2json进行绑定。

    SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
  2. 进行数据库增删改操作。

    -- 创建有主键的表
    CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
    
    -- 创建没有主键的表
    CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
    
    -- 开启事务,执行DML操作
    BEGIN;
    INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
    INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
    INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
    DELETE FROM table2_with_pk WHERE a < 3;
    
    INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
    
    -- 表table2_without_pk没有主键或复制标识,因此更新操作不会加入数据流中
    UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
    COMMIT;
  3. 使用pg_logical_slot_get_changes来查看wal2json的输出。

    SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1');
    WARNING:  Due to DDL in the same top transaction, partial modification may be lost for <XID, FIRST_LSN, FINAL_LSN, SKIP_START_LSN> = { <15726, 0/EA35E10, 0/EA3C638, 0/EA35E58> }
    WARNING:  Due to DDL in the same top transaction, partial modification may be lost for <XID, FIRST_LSN, FINAL_LSN, SKIP_START_LSN> = { <15727, 0/EA3CC80, 0/EA45000, 0/EA3CCC8> }
    WARNING:  table "table2_without_pk" without primary key or replica identity is nothing
    CONTEXT:  slot "test_slot", output plugin "wal2json", in the change callback, associated LSN 0/EA45E50
                                                        data
    -------------------------------------------------------------------------------------------------------------
     {                                                                                                          +
             "change": [                                                                                        +
             ]                                                                                                  +
     }
     {                                                                                                          +
             "change": [                                                                                        +
             ]                                                                                                  +
     }
     {                                                                                                          +
             "change": [                                                                                        +
                     {                                                                                          +
                             "kind": "insert",                                                                  +
                             "schema": "public",                                                                +
                             "table": "table2_with_pk",                                                         +
                             "columnnames": ["a", "b", "c"],                                                    +
                             "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],+
                             "columnvalues": [1, "Backup and Restore", "2024-02-26 10:31:32.821865"]            +
                     }                                                                                          +
                     ,{                                                                                         +
                             "kind": "insert",                                                                  +
                             "schema": "public",                                                                +
                             "table": "table2_with_pk",                                                         +
                             "columnnames": ["a", "b", "c"],                                                    +
                             "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],+
                             "columnvalues": [2, "Tuning", "2024-02-26 10:31:32.821865"]                        +
                     }                                                                                          +
                     ,{                                                                                         +
                             "kind": "insert",                                                                  +
                             "schema": "public",                                                                +
                             "table": "table2_with_pk",                                                         +
                             "columnnames": ["a", "b", "c"],                                                    +
                             "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],+
                             "columnvalues": [3, "Replication", "2024-02-26 10:31:32.821865"]                   +
                     }                                                                                          +
                     ,{                                                                                         +
                             "kind": "delete",                                                                  +
                             "schema": "public",                                                                +
                             "table": "table2_with_pk",                                                         +
                             "oldkeys": {                                                                       +
                                     "keynames": ["a", "c"],                                                    +
                                     "keytypes": ["integer", "timestamp without time zone"],                    +
                                     "keyvalues": [1, "2024-02-26 10:31:32.821865"]                             +
                             }                                                                                  +
                     }                                                                                          +
                     ,{                                                                                         +
                             "kind": "delete",                                                                  +
                             "schema": "public",                                                                +
                             "table": "table2_with_pk",                                                         +
                             "oldkeys": {                                                                       +
                                     "keynames": ["a", "c"],                                                    +
                                     "keytypes": ["integer", "timestamp without time zone"],                    +
                                     "keyvalues": [2, "2024-02-26 10:31:32.821865"]                             +
                             }                                                                                  +
                     }                                                                                          +
                     ,{                                                                                         +
                             "kind": "insert",                                                                  +
                             "schema": "public",                                                                +
                             "table": "table2_without_pk",                                                      +
                             "columnnames": ["a", "b", "c"],                                                    +
                             "columntypes": ["integer", "numeric(5,2)", "text"],                                +
                             "columnvalues": [1, 2.34, "Tapir"]                                                 +
                     }                                                                                          +
             ]                                                                                                  +
     }
    (3 rows)
  4. 使用SQL函数pg_drop_replication_slot删除逻辑复制槽。

    SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
  5. 清理测试表。

    DROP TABLE table2_with_pk;
    DROP TABLE table2_without_pk;

wal2json参数说明

当前版本支持以下参数:

  • include-xids:是否在每个变更集中添加事务ID(xid)。默认值为false。

  • include-timestamp:是否为每个变更集添加时间戳。默认值为false。

  • include-schemas:是否在每次变更中添加模式(schema)名称。默认值为true。

  • include-types:是否在每次变更中添加数据类型信息。默认值为true。

  • include-typmod:是否为有修饰符的数据类型添加修饰符(例如,varchar(20)而不是varchar)。默认值为true。

  • include-type-oids:是否添加数据类型的OID。默认值为false。

  • include-domain-data-type:是否将域名替换为其底层数据类型。默认值为false。

  • include-column-positions:是否添加列位置信息(基于pg_attribute.attnum)。默认值为false。

  • include-origin:是否添加数据的来源信息。默认值为false。

  • include-not-null:是否添加非空信息作为列选项。默认值为false。

  • include-default:是否添加列的默认表达式。默认值为false。

  • include-pk:是否添加主键信息,包括列名和数据类型。默认值为false。

  • numeric-data-types-as-string:是否将数字数据类型作为字符串使用。默认值为false。

  • pretty-print:是否为JSON结构添加空格和缩进以提高可读性。默认值为false。

  • write-in-chunks:是否在每次变更后写入,而不是每个变更集结束时。默认值为false,仅当format-version为1时使用。

  • include-lsn:是否在每个变更集中添加下一个LSN。默认值为false。

  • include-transaction:是否发出表示每个事务开始和结束的记录。默认值为true。

  • filter-origins:排除指定来源的变更。默认为空,即不过滤任何来源。

  • filter-tables:排除指定表的行变更。默认为空,即不过滤任何表。

  • add-tables:仅包括指定表的行变更。默认包括所有表。

  • format-version:定义使用哪种格式。默认为1。

  • actions:定义将发送哪些操作(insert、update、delete、truncate)。默认为所有操作。如果使用format-version 1,则不启用truncate(为了向后兼容)。


限制项说明

  • MogDB中不存在truncate相关的wal日志,因此wal2json不支持truncate的解析。
  • PostgreSQL可以通过pg_logical_emit_message函数发送一些消息类的wal日志,其不包含数据信息,MogDB不支持该函数,因此也无法通过wal2json解析该类型的wal日志。
  • 不支持Ustore和Cstore相关的wal日志解析。
  • 如果表没有主键,wal2json只能解析INSERT操作,无法解析UPDATE和DELETE操作。
  • 在一个长事务中执行重命名列操作时,重命名操作后的wal会丢失,这是因为内核在进行解码的时候丢失了该部分的wal日志,并且给出类似于WARNING: Due to DDL in the same top transaction, partial modification may be lost for <XID, FIRST_LSN, FINAL_LSN, SKIP_START_LSN> = { <13915, 0/40EE9D0, 0/40EED70, 0/40EEA58> }的警告,wal2json无法将该wal日志转化为JSON。内核支持单独使用DDL语句,因此只建议单独使用重命名列等改变表的操作,而非在长事务中使用。
  • 使用分区表时,内核解码可能会丢失某些wal日志,并且给出类似于WARNING: Due to DDL in the same top transaction, partial modification may be lost for <XID, FIRST_LSN, FINAL_LSN, SKIP_START_LSN> = { <13915, 0/40EE9D0, 0/40EED70, 0/40EEA58> }的警告,wal2json无法将该wal日志转化为JSON。

相关页面

pg_recvlogical逻辑复制函数

Copyright © 2011-2024 www.enmotech.com All rights reserved.