Требования к источникам и приемникам данных | Cdc

Версия:

latest
Руководство по установке Требования к источникам и приемникам данных

Требования к источникам и приемникам данных

Требования для всех источников/приемников данных

  • Обеспечена сетевая связанность между Kubernetes-кластером Tarantool CDC и источником/приемником данных.

Требования к источникам данных

PostgreSQL

  • Включена логическая репликация (wal_level=logical) для базы данных;

  • Обеспечен достаточный объем дискового пространства для хранения WAL;

  • Создан пользователь с правами REPLICATION, LOGIN;

  • Доступно подключение к базе данных по HOST:PORT из сети Tarantool CDC (по умолчанию порт 5432).

Tarantool EE

  • Tarantool EE обновлен до версии 2.11.x.

  • Создан пользователь с ролью replication (для тестирования может быть использована учетная запись суперпользователя admin).

  • На всех экземплярах с ролью vshard-storage или crud-storage в кластере Tarantool EE включен Extended WAL (wal_ext = { old = true, new = true }).

  • Доступно подключение ко всем экземплярам Tarantool по HOST:PORT из сети Tarantool CDC.

Oracle + OLR

Требования к версиям:

  • Поддерживаемые версии: 11.2, 12.1, 12.2, 18c, 19c, 21c, 23c.

  • Поддерживаемые версии: XE/DE, SE, SE2, PE, EE.

  • Поддерживаемые платформы: Linux64, Solaris x64(не проверялось), Solaris Sparc(не проверялось).

  • Поддерживаемое хранилище: файловая система (ext4, btrfs, xfs, xfs xfs, sshfs).

  • Поддерживаемые размеры блоков базы данных: 2 кб, 4 кб, 8 кб, 16 кб, 32 кб.

  • База данных должна находиться в режиме единственного экземпляра (без RAC).

  • Логи (redo и archive) должны размещаться на файловой системе. Логи в ASM не поддерживаются.

Требования к базе данных (Database Object):

  • Минимальные требования:

    База данных должна работать в режиме архивного журнала (ARCHIVELOG) и включать минимальное дополнительное ведение журнала (MINIMAL SUPPLEMENTAL LOGGING):

    • Режим ARCHIVELOG нужен для сохранения базой полного онлайн-журнала redo logs, который будет читать OpenLogReplicator.

    • MINIMAL SUPPLEMENTAL LOGGING добавляет в redo logs. Строки в этих логах содержат данные только тех столбцов, которые были изменены в операции изменения.

    Подробнее https://github.com/bersler/OpenLogReplicator/blob/master/documentation/installation/installation.adoc#database-configuration

    Пример скрипта:

    SHUTDOWN IMMEDIATE;
    STARTUP MOUNT;
    
    ALTER DATABASE ARCHIVELOG;
    ALTER DATABASE OPEN;
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
    
    ALTER SYSTEM ARCHIVE LOG CURRENT;
    

    Примечание

    Для Debezium Oracle Connector данного уровня логирования хватает только, чтобы создавать корректные CREATE(insert), READ(insert).

  • Расширенные требования:

    • [OpenLogReplicator] Чтобы захватывать все транзакции из базы данных (Database Object), рекомендуется включить принудительное ведение журнала (FORCE LOGGING).

      Подробнее https://github.com/bersler/OpenLogReplicator/blob/master/documentation/installation/installation.adoc#force-logging

      Пример скрипта:

      ALTER DATABASE FORCE LOGGING;
      ALTER SYSTEM ARCHIVE LOG CURRENT;
      
    • [OpenLogReplicator] Конфигурация FRA. Для OFFLINE-типа считывателя требуется, чтобы была настроена область быстрого восстановления (Fast Recovery Area).

      Подробнее про OFFLINE-чтение (чтение без подключения к БД): https://github.com/bersler/OpenLogReplicator/blob/b8cdd73ca9b646d48670708de68fe516405d7aa8/documentation/user-manual/user-manual.adoc#offline-reader

      Пример скрипта:

      ALTER SYSTEM SET DB_RECOVERY_FILE_DEST_SIZE = 50G;
      -- for oracle database 11
      ALTER SYSTEM SET DB_RECOVERY_FILE_DEST = '/u01/app/oracle/fast_recovery_area' scope = both;
      -- for oracle database 12+
      -- ALTER SYSTEM SET DB_RECOVERY_FILE_DEST = '/opt/oracle/fra';
      

Требования к TABLE (дополнительные требования):

  • [debezium-oracle-connector] FULL SUPPLEMENTAL LOGGING

    Это необходимо для правильного анализа событий обновления debezium (например событие UPDATE, DELETE). Уровень ведения журнала (ДЛЯ ВСЕХ) столбцов гарантирует, что Oracle фиксирует состояние столбца независимо от того, изменился ли столбец при записи записи о повторном запуске в журнал повтора. Включение более высокого уровня ведения журнала позволяет Debezium Oracle Connector генерировать события изменений, которые предоставляют точные данные о состоянии строки «до» и «после».

    Подробнее https://github.com/bersler/OpenLogReplicator/blob/master/documentation/installation/installation.adoc#table-configuration-requirements

    Пример скрипта включения расширенного логирования для отдельной таблицы или для всей базы данных:

    -- ALTER TABLE <SCHEMA_NAME>.<TABLE_NAME> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    -- e.g. :
    ALTER TABLE USR1.Person ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    -- or you can alter the whole database:
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    

    Примечание

    Для разных настроек конфигураций в принимающих коннекторах достаточно включить дополнительное логирование только для PRIMARY KEY.

Создание сервисного пользователя в СУБД:

  • Подробнее про требования OpenLogReplicator: https://github.com/bersler/OpenLogReplicator/blob/master/documentation/installation/installation.adoc#user-requirements

  • Подробнее про требования Debezium: https://debezium.io/documentation/reference/stable/connectors/oracle.html#_creating_connector_users

  • Пример скрипта создания пользователя, пригодного для эксплуатации OpenLogReplicator и Debezium Oracle Connector:

-- Create connector user for Debezium and OpenLogReplicator
CREATE TABLESPACE TBLS1 DATAFILE 'TBLS1.dbf' SIZE 100M AUTOEXTEND ON NEXT 100M;
CREATE USER USR1 IDENTIFIED BY USR1PWD DEFAULT TABLESPACE "TBLS1" TEMPORARY TABLESPACE "TEMP";
ALTER USER USR1 QUOTA UNLIMITED ON TBLS1;
---- Grant OpenLogReplicator needed privileges to USR1
GRANT "CONNECT" TO USR1;
GRANT "RESOURCE" TO USR1;
GRANT SELECT, FLASHBACK ON SYS.CCOL$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.CDEF$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.COL$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.DEFERRED_STG$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.ECOL$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.LOB$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.LOBCOMPPART$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.LOBFRAG$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.OBJ$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.TAB$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.TABCOMPART$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.TABPART$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.TABSUBPART$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.TS$ TO USR1;
GRANT SELECT, FLASHBACK ON SYS.USER$ TO USR1;
GRANT SELECT ON SYS.V_$ARCHIVED_LOG TO USR1;
GRANT SELECT ON SYS.V_$DATABASE TO USR1;
GRANT SELECT ON SYS.V_$DATABASE_INCARNATION TO USR1;
GRANT SELECT ON SYS.V_$LOG TO USR1;
GRANT SELECT ON SYS.V_$LOGFILE TO USR1;
GRANT SELECT ON SYS.V_$PARAMETER TO USR1;
GRANT SELECT ON SYS.V_$STANDBY_LOG TO USR1;
GRANT SELECT ON SYS.V_$TRANSPORTABLE_PLATFORM TO USR1;
GRANT SELECT, FLASHBACK ON XDB.XDB$TTSET TO USR1;
GRANT SELECT ANY DICTIONARY TO USR1;
DECLARE
    CURSOR C1 IS SELECT TOKSUF FROM XDB.XDB$TTSET;
    CMD VARCHAR2(2000);
BEGIN
    FOR C IN C1 LOOP
        CMD := 'GRANT SELECT, FLASHBACK ON XDB.X$NM' || C.TOKSUF || ' TO USR1';
        EXECUTE IMMEDIATE CMD;
        CMD := 'GRANT SELECT, FLASHBACK ON XDB.X$QN' || C.TOKSUF || ' TO USR1';
        EXECUTE IMMEDIATE CMD;
        CMD := 'GRANT SELECT, FLASHBACK ON XDB.X$PT' || C.TOKSUF || ' TO USR1';
        EXECUTE IMMEDIATE CMD;
    END LOOP;
END;
/
---- Grant Debezium needed privileges to USR1
GRANT SELECT_CATALOG_ROLE TO USR1 WITH admin OPTION;

Требования к конфигурации OpenLogReplicator для Debezium Oracle Connector:

  • Подробнее про Debezium: https://debezium.io/documentation/reference/stable/connectors/oracle.html#oracle-openlogreplicator-configuration

  • Пример рабочей конфигурации для Oracle Database 11:

{
  "version": "1.6.0",
  "log-level": 4,
  "source": [
    {
      "alias": "S1",
      "name": "XE",
      "reader": {
        "type": "online",
        "path-mapping": [
          "/u01/app/oracle/oradata",
          "/opt/oradata",
          "/u01/app/oracle/fra",
          "/opt/fra"
        ],
        "user": "USR1",
        "password": "USR1PWD",
        "server": "//oracledb:1521/XE"
      },
      "format": {
        "type": "json",
        "column": 2,
        "db": 3,
        "interval-dts": 9,
        "interval-ytm": 4,
        "message": 2,
        "rid": 1,
        "schema": 7,
        "scn-all": 1,
        "timestamp-all": 1
      },
      "filter": {
        "table": [
          {
            "owner": "USR1",
            "table": ".*"
          }
        ]
      }
    }
  ],
  "target": [
    {
      "alias": "DEBEZIUM",
      "source": "S1",
      "writer": {
        "type": "network",
        "uri": "0.0.0.0:27017"
      }
    }
  ]
}

Требования к приемникам данных

PostgreSQL

  • Создан пользователь с правами CONNECT, CREATE, SELECT, INSERT, UPDATE, DELETE, TRUNCATE.

  • Доступно подключение к базе данных по HOST:PORT из сети Tarantool CDC (по умолчанию порт 5432).

  • Созданы таблицы, дублирующие формат исходных.

    Примечание

    Sink-коннектор JDBC создает таблицы самостоятельно, но рекомендуется создать таблицы заранее, до запуска CDC, чтобы избежать проблем c искажением типов.

Tarantool EE

  • Создан пользователь с ролью public и правами session, create, write, execute.

  • Для кластера Tarantool EE должен быть установлен модуль CRUD и включены роли crud-router и crud-storage для соответствующих экземпляров.

  • В зависимости от сценария использования в кластере Tarantool EE могут быть выделены отдельные экземпляры с ролью crud-router для эксклюзивного использования Tarantool CDC и минимизации задержек при работе с кластером для других потребителей.

  • Могут использоваться все экземпляры с ролью crud-router, но в таком случае будет иметь место взаимное влияние между запросами Tarantool CDC и запросами потребителей.

  • Доступно подключение ко всем экземплярам Tarantool c ролью crud-router по HOST:PORT из сети Tarantool CDC, в зависимости от выбранного сценария.

  • Созданы таблицы, дублирующие формат исходных.

Clickhouse

Плоские данные

Данные, поступающие в коннектор Clickhouse, должны быть плоскими относительно мета-структур. Поля записи из источника не должны быть вложены ни в какую другую структуру. В рамках Debezium-коннекторов это означает, что из типичной структуры Debezium нужно убрать первый уровень вложенности:

{
  "before": null,
  "after": {
    "id": 1,
    "message": "artyom",
    ...
  },
  "source": {
    "connector": "postgresql",
    ...
  },
  "op": "c",
  ...
}

И превратить ее в плоскую структуру:

{
  "before": null,
  "after_id": 1,
  "after_message": "artyom",
  ...
  "source_connector": "postgresql",
  ...
  "op": "c",
  ...
}

Это можно сделать с помощью трансформаций, задав нужную конфигурацию обработчика данных. Трансформации можно настроить как на стороне источника (см. пример ниже), так и на стороне приемника.

Пример настроек для превращения данных из источника Postgres в плоские записи:

WORKER_CONNECTOR_TRANSFORMS=flatten
WORKER_CONNECTOR_TRANSFORMS_FLATTEN_DELIMITER="_"
WORKER_CONNECTOR_TRANSFORMS_FLATTEN_TYPE=org.apache.kafka.connect.transforms.Flatten$Value

Временная таблица

Поскольку данные из CDC приходят в формате Debezium, то часть работы по правильной адаптации данных ложится на сторону Clickhouse. В Clickhouse можно указать временную таблицу, которая будет принимать данные в формате Debezium.

Например, данные из источника приходят в формате следующей таблицы:

a. Пример для источника Postgres:

CREATE TABLE IF NOT EXISTS demo_table (
    id SERIAL PRIMARY KEY,
    message VARCHAR(255)
);

b. Пример для источника TDG:

{
    "name": "demo_table",
    "type": "record",
    "fields": [
        { "name": "id", "type": "int" },
        { "name": "message", "type": "string" }
    ],
    "indexes": [
        {"name": "pk", "parts": ["id"]}
    ],
    "affinity": [
        "id"
    ]
}

Тогда временная таблица в Clickhouse должна иметь следующий вид:

CREATE TABLE IF NOT EXISTS demo_table (
    `before_id` Nullable(UInt32),
    `before_message` Nullable(String),
    `after_id` Nullable(UInt32),
    `after_message` Nullable(String),
    `op` LowCardinality(String),
    `source_lsn` UInt64
)
ENGINE = Null;

где:

  • Все поля из источника указываются два раза, с префиксами before и after соответственно. Разделитель между префиксом и именем поля должен быть равен тому префиксу, который указан в настройке WORKER_CONNECTOR_TRANSFORMS_FLATTEN_DELIMITER. Тип данных должен быть семантически сохранен с типом данных, указанным на источнике.

  • op – тип операции.

  • source_lsn – номер транзакции записи.

  • ENGINE можно заменить на MergeTree, если нужно сохранять промежуточные значения.

Конечная таблица

Необходимо создать конечную таблицу, с которой в итоге будет работать пользователь Clickhouse. Она будет соответствовать временной таблице, но без дублирования полей данных и полей op и source_lsn.

Для временной таблицы, созданной в качестве примера в предыдущем разделе, нужно создать конечную таблицу следующего вида:

CREATE TABLE demo_table_data
(
    `id` UInt32,
    `message` String,
    `version` UInt64,
    `deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY (id)
ORDER BY id;

Важно

Чтобы работали все типы операций в связке с Debezium-коннектором источника, необходимо указать именно движок ReplacingMergeTree, где поля version и deleted будут выступать маркерами для версионирования данных и для поддержки событий обновления и удаления.

Материализованное представление

Чтобы передавать данные из временной таблицы в конечную таблицу, необходимо создать материализованное представление (MATERIALIZED VIEW).

Для временной и конечной таблиц, созданных в качестве примера в предыдущих разделах, нужно создать материализованное представление следующего вида:

CREATE MATERIALIZED VIEW demo_table_mv
TO demo_table_data
AS
SELECT
    if(op = 'd', before_id, after_id) AS id,
    if(op = 'd', before_message, after_message) AS message,
    if(op = 'd', source_lsn, source_lsn) AS version,
    if(op = 'd', 1, 0) AS deleted
FROM demo_table
WHERE (op = 'c') OR (op = 'r') OR (op = 'u') OR (op = 'd') OR (op = 'us');

Данное представление будет передавать данные из временной таблицы в конечную таблицу, учитывая все типы операций.

Чтобы обеспечить корректную работу с событиями удаления и обновления, представление использует поле номера транзакции (source_lsn) и поле типа операции (op).

Сценарий без поддержки событий обновления и удаления

Если нет необходимости поддерживать события обновления и удаления, то всю конфигурацию создания таблиц можно упростить:

CREATE TABLE IF NOT EXISTS demo_table (
    `after_id` Nullable(UInt32),
    `after_message` Nullable(String)
)
ENGINE = Null;

CREATE MATERIALIZED VIEW demo_table_mv
TO demo_table_data
AS
SELECT
    `after_id` as id,
    `after_message` as message
FROM demo_table;

CREATE TABLE demo_table_data
(
    id UInt32,
    message String
)
ENGINE = MergeTree()
ORDER BY id;
Нашли ответ на свой вопрос?
Обратная связь