VK Docs logo
Помощь
Обновлена 26 мая 2026 г. в 12:20

Руководство администратора

Настоящее Руководство содержит описание функционала, доступного для администратора Tarantool Change Data Capture (TCDC).

Универсальный Обработчик

Универсальный Обработчик - это приложение для переноса информации от Источника к Приемнику данных. Настройка параметров переноса данных задается через файл конфигурации.

Настройка Универсального Обработчика

Универсальный Обработчик настраивается через конфигурационный файл application.yaml. Вся конфигурация может быть собрана в одном файле или разделена на несколько файлов. В последнем случае каждый файл должен называться application.yaml и располагаться на соответствующем логическом уровне. Пример разбиения конфигурации на несколько файлов:

│  ...└─ cdc/   ├─ cdc-worker.jar         # исполняемый файл Универсального Обработчика   ├─ application.yaml       # хранит основные настройки, например, настройки сервера и другие, связанные со Spring   └─ config/      ├─ application.yaml    # хранит общие настройки Обработчика, например, для работы с контрольными точками      ├─ sink/      │  └─ application.yaml # хранит настройки подключения к Приемнику данных      └─ source/         └─ application.yaml # хранит настройки подключения к Источнику данных

Параметры в файле настроек application.yaml представлены в виде иерархии объектов формата YAML.

Пример:

spring:  application:    name: CDCWorker  profiles:    active: default

Эту же иерархическую структуру можно представить в виде строки, где уровни разделяются точками:

spring.application.name: CDCWorkerspring.profiles.active: default

Или в комбинированном виде:

spring:  application.name: CDCWorker  profiles.active: default

Комбинированная форма удобна, если на каком-то уровне определяется всего один потомок в дереве настроек. В этом случае можно упростить само дерево, соединяя узлы-потомки через точку.

Файл настроек Универсального Обработчика состоит из нескольких разделов:

Раздел настроек spring

Раздел spring содержит конфигурацию Spring Boot. Параметры раздела:

  • application.name - имя приложения. Значение по умолчанию: cdc-worker.
  • profiles.active - имя профиля. Значение по умолчанию: default.

Пример:

spring:  application.name: CDCWorker  profiles.active: default

Раздел настроек logging

Раздел logging содержит настройки логирования. Параметры раздела:

  • logging.file.level - управление уровнем логирования модулей. Всего можно управлять двумя модулями, задавая для них различные уровни логирования при помощи значений: DEBUG, VERBOSE, INFO, WARNING, ERROR. Значение OFF полностью отключает логирование событий модуля. По умолчанию логирование для модулей отключено:
    • org.springframework.boot.SpringApplication - логирование самого процесса запуска приложения.
    • org.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLogger - детальный отчет об автоконфигурации Spring Boot.

Отключение логирования модулей позволяет сократить как общий объем логов, так и количество лишней информации в них. Это ускоряет и упрощает анализ логов.

Пример:

logging:  file:    name: logs/cdc-worker.log  level:    org.springframework.boot.SpringApplication: OFF    org.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLogger: OFF

Раздел настроек server

Раздел server содержит настройки сервера. Параметры раздела:

  • server.port - порт сервера для подключения приложения. Значение по умолчанию: 8000

Пример:

server:  port: 8000

Раздел настроек management

Раздел настроек management содержит настройки Spring Boot Actuator. Параметры раздела:

  • management.endpoints.web.exposure.include - ID адресов мониторинга. По умолчанию сюда включены health и prometheus.
  • management.endpoint - группа параметров управления мониторингом:
    • prometheus.enabled - предоставление данных мониторинга в Prometheus. По умолчанию: true.
    • health.probes.enabled - настройка Spring Boot Actuator для интеграции с Kubernetes. По умолчанию: true.
    • health.show-details - определяет уровень детализации. Значение по умолчанию: always, указывает на полную информацию.
  • management.health.livenessState.enabled - механизм внутреннего отслеживания и управления состоянием жизнеспособности приложения. По умолчанию: true.
  • management.health.readinessState.enabled - механизм внутреннего отслеживания и управления состоянием готовности приложения к работе. По умолчанию: true.

Пример:

management:  endpoints.web.exposure.include:  - health  - prometheus  endpoint:    prometheus.enabled: true    health:      probes.enabled: true      show-details: always  health:    livenessState.enabled: true    readinessState.enabled: true

Раздел настроек cdc

Раздел содержит общие настройки приложения Универсального Обработчика. Параметры раздела:

  • cdc.worker.id - идентификатор процесса в системе.
  • cdc.shutdown.timeout.ms - максимальное время мягкого останова процесса. По истечении времени происходит жесткий останов процесса.

Пример:

cdc:  worker:    id: cdc-task  shutdown:    timeout:      ms: 30000

Раздел настроек source

Раздел содержит настройки подключения к Источнику данных. Параметры раздела:

  • source.plugin.path - путь, где расположены динамически загружаемые модули для подключения к Источнику данных. Путь должен указывать на папку, внутри которой находятся модули, каждый из которых распакован в свою папку. Просканированным будет только один модуль, что ускорит запуск приложения. См. подробнее.
  • source.connector.* - раздел параметров настройки выбранного коннектора к Источнику данных.
  • source.connector.class - наименование класса подключения к Источнику данных. Определяет, какой из просканированных и загруженных модулей подключения к Источнику данных нужно настроить и запустить. Обязательный параметр.
  • source.transforms - раздел для настройки параметров преобразователей.

Пример:

source:  plugin.path: /libs/connect  connector:    class: io.tarantool.connector.TarantoolConnector    topic.prefix: cdc    max.batch.size: 2048    connect:      username: admin      password: secret-cluster-cookie      targets: localhost:3605      timeout: 2000    replication:      idle.timeout: 2000      anonymous: true      id: source-worker-1    fields.exclude: bucket_id    spaces.include: User,all_types,intTest,arrays,all_types_tcs    datetime.zone.enabled: true

Настройка коннекторов для подключения к PostgreSQL и Oracle из экосистемы Debezium также описывается в разделе source.connector.*. Документация по настройкам коннекторов из экосистемы Debezium:

Определение пути расположения модулей подключения к Источнику данных

Предположим следующую схему расположения модулей:

/│  ...└─ libs/    └─ connect/       └─ source/          │  ...          ├─ postgresql-connector/          │  │  ...          │  └─ postgresql-connector.jar          │  ...          └─ tarantool-source-connector/             │ ...             └─ tarantool-source-connector.jar

Тогда путь должен быть указан как /libs/connect/source. При этом Универсальным Обработчиком будут загружены, но не запущены два модуля для подключения к Источнику данных. Поскольку сканирование всех модулей подключения может занимать время, рекомендуется следующая форма расположения модулей:

/│  ...└─ libs/    └─ connect/       └─ source/          │  ...          ├─ postgresql-connector/          │   └─ postgresql-connector/          │      │  ...          │      └─ tarantool-source-connector.jar          │  ...          └─ tarantool-source-connector/             └─ tarantool-source-connector/                │  ...                └─ tarantool-source-connector.jar

А в параметре source.plugin.path нужно указать либо /libs/connect/source/postgresql-connector, либо /libs/connect/source/tarantool-connector. То есть путь всегда должен указывать на папку уровнем выше относительно папки с модулем подключения к Источнику данных.

Раздел настроек sink

Раздел содержит настройки подключения к Приемнику данных. Параметры раздела:

  • sink.plugin.path - путь, где расположены динамически загружаемые модули для подключения к Приемнику данных. Путь должен указывать на папку, внутри которой находятся модули, каждый из которых распакован в свою папку. Просканированным будет только один модуль, что ускорит запуск приложения. См. подробнее.
  • sink.connector.* - раздел для указания параметров настройки выбранного коннектора к Приемнику данных.
  • sink.connector.class - наименование класса подключения к Приемнику данных. Определяет, какой из просканированных и загруженных модулей подключения к Приемнику данных нужно настроить и запустить. Обязательный параметр
  • sink.transforms - раздел для настройки параметров преобразователей.
  • sink.retry.* - задает правила обработки ошибки RetriableException при работе с Приемником данных.
  • sink.retry.count - количество повторных попыток отправки сообщений в Приемник данных. По истечении этих попыток будет выброшена ошибка и Обработчик завершит свою работу.
  • sink.retry.backoff - пауза между повторными попытками отправки сообщений в Приемник данных. Может принимать разные единицы измерения при указании соответствующего суффикса: секунды - s, миллисекунды - ms.
  • sink.retry.timeout - общее время на выполнение всех попыток отправки сообщений в Приемник данных. По истечении этого времени, если сообщения не будут записаны, выбросится ошибка и обработчик завершит свою работу. Может принимать разные единицы измерения при указании соответствующего суффикса: секунды - s, миллисекунды - ms.

Пример:

sink:  plugin.path: /libs/connect  connector:    class: io.debezium.connector.jdbc.JdbcSinkConnector    connection:      url: jdbc:postgresql://target-postgres:5432/iot?reWriteBatchedInserts=true      username: iot      password: iot    insert.mode: upsert    primary.key.mode: record_key    tasks.max: 1    delete.enabled: true    schema.evolution: basic    database.time.zone: UTC    auto.create: true    quote.identifiers: true    truncate.enabled: false    batch.size: 500    use.reduction.buffer: true  retry:    count: 5    backoff: 200ms    timeout: 3s  transforms:    names:    - topic-from-source-table    - topic-from-source-space-name    config:      topic-from-source-table:        type: io.tarantool.cdc.transforms.smt.ExtractTopic$Value        field.path: source.table        skip.missing.or.null: true      topic-from-source-space-name:        type: io.tarantool.cdc.transforms.smt.ExtractTopic$Value        field.path: source.space_name        skip.missing.or.null: true

Определение пути расположения модулей подключения к Приемнику данных

Предположим следующую схему расположения модулей:

/│  ...└─ libs/   └─ connect/      └─ sink/         │  ...         ├─ jdbc-sink-connector/         │  │  ...         │  └─ jdbc-sink-connector.jar         │  ...         └─ tarantool-sink-connector/            │  ...            └─ tarantool-sink-connector.jar

Тогда путь должен быть /libs/connect/sink. При этом универсальным обработчиком будут загружены, но не запущены два модуля для подключения к Приемнику данных. Поскольку сканирование всех модулей подключения может занимать время, рекомендуется следующая форма расположения модулей:

/│  ...└─ libs/   └─ connect/      └─ sink/         │  ...         ├─ jdbc-sink-connector/         │  └─ jdbc-sink-connector/         │     │  ...         │     └─ jdbc-sink-connector.jar         │  ...         └─ tarantool-sink-connector/            └─ tarantool-sink-connector/               │  ...               └─ tarantool-sink-connector.jar

А в параметре sink.plugin.path указать либо /libs/connect/sink/jdbc-sink-connector, либо /libs/connect/sink/tarantool-sink-connector. То есть, путь всегда должен указывать на папку уровнем выше относительно папки с модулем подключения к Приемнику данных.

Раздел настроек offset

Раздел содержит настройки контрольных точек. Параметры раздела:

  • offset.flush.interval.ms - интервал сохранения контрольных точек Обработчика.
  • offset.flush.timeout.ms - максимальное время выполнения операции записи контрольных точек. При превышении времени выбрасывается ошибка записи контрольных точек.
  • offset.failures.max.count - допустимое количество ошибок записи контрольных точек. При превышении этого количества работа Универсального Обработчика завершается. По умолчанию: 0. При этом значении работа универсального обработчика завершится после первой же ошибки. Для полного игнорирования ошибок необходимо выставить значение -1.
  • offset.storage.* - пространство для настройки хранилища контрольных точек.
  • offset.storage.type - наименование модуля хранения контрольных точек. Поддерживаемые модули:
    • tqe - хранение контрольных точек в Tarantool Queue Enterprise 2.x/ 3.x. При указании этого модуля необходимо указать параметры:
      • address - адрес и порт сервера, на котором установлен TQE.
      • queue - имя очереди для записи контрольных точек.
      • connection.id - ключ, по которому Обработчик будет сохранять свой снимок контрольных точек. Здесь необходимо указать значение task-${worker-id} для того, чтобы Helm-чарт автоматически присваивал уникальный ключ Обработчику. При подключении более чем одного Обработчика к очереди TQE уникальность ключа критически важна, потому что по этому ключу каждый Обработчик сохраняет свой снимок контрольных точек отдельно, не перезаписывая снимки других Обработчиков.
    • file - хранение контрольных точек в файле. При указании этого модуля необходимо указать параметры:
      • file.name - путь к файлу с контрольными точками.
    • kafka - хранение контрольных точек в Kafka. При указании этого модуля необходимо указать параметры:
      • servers - список пар адрес:порт серверов Kafka.
      • topic - название топика, в который будут сохраняться контрольные точки. Если топика до этого не существовало, он будет создан.
      • partitions.count - число сегментов указанного топика при создании (значение учитывается только если топика до этого не существовало).
      • replication.factor - сколько копий топика должно существовать.

Пример:

offset:  flush:    interval.ms: 5000    timeout.ms: 2500  storage:    type: tqe    address: offsets-grpc-server:18182    queue: offsets    connection.id: pg-task-1 # ключ, по которому Обработчик будет сохранять свой снимок контрольных точек.

Раздел настроек throttle

Раздел содержит настройки ограничителя трафика. Параметры раздела:

  • throttler.max.eps - задает ограничение по количеству событий в секунду. По умолчанию: -1 (ограничение не задано).

Пример:

throttle:  throttler.max.eps: -1

Коннекторы

Чтобы читать данные из Источников и записывать их в Приемники, Tarantool CDC использует коннекторы. Данные из Источника попадают в Tarantool CDC через коннектор к Источнику (source connector), проходят через ряд преобразований и записываются в Приемник через Коннектор к Приемнику (sink connector).

В поставку Tarantool CDC входят:

  • Коннекторы к Источнику и Приемнику данных PostgreSQL.
  • Коннекторы к Источнику и Приемнику данных Oracle.
  • Коннекторы к Источнику и Приемнику данных Kafka.
  • Коннекторы к Источнику и Приемнику данных Tarantool DB 1.x и 2.x.
  • Коннекторы к Источнику и Приемнику данных Tarantool Data Grid 2.x.
  • Коннекторы к Источнику и Приемнику данных Tarantool Queue Enterprise 2.x и 3.x.
  • Коннектор к Приемнику данных Tarantool Column Storage 1.x.
  • Коннектор к Приемнику данных Clickhouse.
  • Коннектор к Приемнику данных Elasticsearch.
  • Подключение сторонних коннекторов.

В файле настроек конкретный коннектор указывается при помощи обязательного параметра class. Параметр должен быть прописан в разделах connector параметров Источника данных (source) и Приемника данных (sink):

connector:  class: io.tarantool.connector.KafkaSourceConnector

Кроме параметра class, в разделе connector указываются и другие параметры коннекторов.

Коннекторы Kafka

Больше информации о параметрах Kafka, перечисленных в статьях о коннекторах к Источнику и Приемнику, смотрите в официальной документации Kafka.

Коннектор к Источнику данных Kafka

Коннектор к Источнику Kafka позволяет Tarantool CDC получать данные из брокеров Kafka.

Параметры коннектора указываются в разделе connector:

  • key.converter - название класса конвертера для ключа. Необязательный параметр, который может принимать 2 значения:
    • org.apache.kafka.connect.json.JsonConverter (по умолчанию);
    • io.apicurio.registry.utils.converter.AvroConverter.
  • key.converter.schemas.enable - определяет, включать ли схему в каждое сериализованное значение ключа. Необязательный параметр, значение по умолчанию: true.
  • key.converter.schemas.cache.size - максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию: 1000.
  • key.converter.decimal.format - задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию: BASE64.
  • key.converter.replace.null.with.default - определяет, заменять ли поля, имеющие значение NULL, на значение по умолчанию. При true приводит к замене NULL на значение по умолчанию. При false замены NULL не происходит. Необязательный параметр, значение по умолчанию: true.
  • key.converter.apicurio.registry.url - обязателен при key.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для ключа. См. подробнее.
  • value.converter - название класса конвертера для значения. Необязательный параметр, который может принимать 2 значения:
    • org.apache.kafka.connect.json.JsonConverter (по умолчанию);
    • io.apicurio.registry.utils.converter.AvroConverter.
  • value.converter.schemas.enable - определяет, включать ли схему в каждое сериализованное значение. Необязательный параметр, значение по умолчанию: true.
  • value.converter.schemas.cache.size - максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию: 1000.
  • value.converter.decimal.format - задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию: BASE64.
  • value.converter.replace.null.with.default - определяет, заменять ли поля, имеющие значение NULL, на значение по умолчанию. При true приводит к замене NULL на значение по умолчанию. При false замены NULL не происходит. Необязательный параметр, значение по умолчанию: true.
  • value.converter.apicurio.registry.url - обязателен при value.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для значения. См. подробнее.
  • bootstrap.servers - список адресов брокеров, к которым подключается kafka-consumer, cм. подробнее. Параметр так же может быть указан в разделе consumer. При одновременном указании параметра в 2х местах с отличными друг от друга значениями, приоритетным будет значение в разделе consumer. Параметр обязательно должен быть указан хотя бы в 1 из допустимых мест.
  • topics - динамическая карта, в которой ключ - название топика, из которого kafka-consumer будет забирать данные, а значение - номер партиций, из которых kafka-consumer будет забираться данные для текущего топика. Параметр обязателен. Пример карты:
    • topic-1: 0 -- kafka-consumer будет забирать данные из 0 партиции топика topic-1.
    • second-topic: 1-4 -- kafka-consumer будет забирать данные из (1;4) партиций топика second-topic (не включая правую границу). Т.е. будет забирать данные из 1,2,3 партиции.
    • topic-name: 0,1-4 -- kafka-consumer будет забирать данные из 0 и [1;4) партиций топика topic-name (не включая правую границу). Т.е. будет забирать данные из 0,1,2,3 партиции.
  • consumer - раздел определяет параметры kafka-consumer. Все параметры, описанные ниже, относятся к этому разделу и не являются обязательными.
    • fetch.max.bytes - cм. подробнее.
    • max.poll.records - cм. подробнее.
    • enable.auto.commit - определяет, будет ли потребитель автоматически периодически фиксировать смещения потребленных сообщений. Может принимать только предустановленное значение: false.
    • key.deserializer - класс десериализатора, используемый для преобразования байтового представления ключа сообщения из формата хранения в Kafka в объект целевого языка программирования. Может принимать только предустановленное значение: org.apache.kafka.common.serialization.ByteArrayDeserializer.
    • value.deserializer - класс десериализатора, используемый для преобразования байтового представления значения сообщения из формата хранения в Kafka в объект целевого языка программирования. Может принимать только предустановленное значение: org.apache.kafka.common.serialization.ByteArrayDeserializer.
    • auto.offset.reset - Определяет поведение потребителя при отсутствии сохраненного смещения для партиции в группе потребителей. Может принимать только предустановленное значение: earliest.

Пример настройки:

connector:  class: io.tarantool.connector.KafkaSourceConnector  key.converter: org.apache.kafka.connect.json.JsonConverter  key.converter.schemas.enable: true  key.converter.schemas.cache.size: 1000  key.converter.decimal.format: BASE64  key.converter.replace.null.with.default: true  value.converter: org.apache.kafka.connect.json.JsonConverter  value.converter.schemas.enable: true  value.converter.schemas.cache.size: 1000  value.converter.decimal.format: BASE64  value.converter.replace.null.with.default: true  bootstrap.servers: broker-1:9092,broker-2:9092  topics:    topic-1: 0    second-topic: 1-4    topic-N: 0,1-4  consumer:    fetch.max.bytes: 52428800    max.poll.records: 500

Коннектор к Приемнику данных Kafka

Коннектор к Приемнику данных Kafka позволяет Tarantool CDC передавать данные в брокеры Kafka.

Параметры коннектора указываются в разделе connector.

  • key.converter - название класса конвертера для ключа. Необязательный параметр, который может принимать 2 значения:
    • org.apache.kafka.connect.json.JsonConverter (по умолчанию);
    • io.apicurio.registry.utils.converter.AvroConverter.
  • key.converter.schemas.enable - определяет, включать ли схему в каждое сериализованное значение ключа. Необязательный параметр, значение по умолчанию: true.
  • key.converter.schemas.cache.size - максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию: 1000.
  • key.converter.decimal.format - задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию: BASE64.
  • key.converter.replace.null.with.default - определяет, заменять ли поля, имеющие значение NULL, на значение по умолчанию. При true приводит к замене NULL на значение по умолчанию. При false замены NULL не происходит. Необязательный параметр, значение по умолчанию: true.
  • key.converter.apicurio.registry.url - обязателен при key.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для ключа. См. подробнее.
  • value.converter - название класса конвертера для значения. Необязательный параметр, который может принимать 2 значения:
    • org.apache.kafka.connect.json.JsonConverter (по умолчанию);
    • io.apicurio.registry.utils.converter.AvroConverter.
  • value.converter.schemas.enable - определяет, включать ли схему в каждое сериализованное значение. Необязательный параметр, значение по умолчанию: true.
  • value.converter.schemas.cache.size - максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию: 1000.
  • value.converter.decimal.format - задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию: BASE64.
  • value.converter.replace.null.with.default - определяет, заменять ли поля, имеющие значение NULL, на значение по умолчанию. При true приводит к замене NULL на значение по умолчанию. При false замены NULL не происходит. Необязательный параметр, значение по умолчанию: true.
  • value.converter.apicurio.registry.url - обязателен при value.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для значения. См. подробнее.
  • bootstrap.servers - список адресов брокеров, к которым подключаются kafka-producer и admin-producer. См. подробнее. Параметр может быть указан в разделах producer и admin. При одновременном указании параметра в нескольких местах с отличными друг от друга значениями, приоритетными будут значения параметра в разделах producer для kafka-producer и/или admin для admin-producer. Параметр обязательно должен быть указан хотя бы в 1 из допустимых мест.
  • producer - раздел определяет настройку kafka-producer, cм. подробнее. Параметры этого раздела не являются обязательными. Ниже представлены некоторые из возможных параметров раздела:
    • key.serializer - указывает класс сериализатора, используемый для преобразования объекта ключа сообщения из формата приложения в байтовое представление для хранения в брокере Kafka. Значение по умолчанию: org.apache.kafka.common.serialization.ByteArrayDeserializer.
    • value.serializer - указывает класс сериализатора, используемый для преобразования объекта значения сообщения из формата приложения в байтовое представление для хранения в брокере Kafka. Значение по умолчанию: org.apache.kafka.common.serialization.ByteArrayDeserializer.
    • max.block.ms - определяет максимальное время блокировки при вызове методов send() и partitionsFor(), после которого выбрасывается исключение TimeoutException. Значение по умолчанию: Long.MAX_VALUE.
    • enable.idempotence - регулирут идемпотентный режим работы. Этот режим гарантирует, что отправка сообщений будет выполнена ровно один раз в рамках сессии продюсера, даже при повторных попытках отправки. Значение по умолчанию: false.
    • acks - определяет количество подтверждений, которые продюсер должен получить от лидеров реплик партиций перед тем, как запись будет считаться завершенной. Значение по умолчанию: all.
    • max.in.flight.requests.per.connection - определяет максимальное количество не подтвержденных запросов отправки, которые могут быть отправлены на один брокер без получения ответа. Значение по умолчанию: 1.
    • delivery.timeout.ms - определяет максимальное время ожидания подтверждения доставки сообщения, после которого операция отправки считается завершенной (успешно или с ошибкой). Значение по умолчанию: Integer.MAX_VALUE.
  • admin - раздел параметров для настройки admin-producer. Раздел не является обязательным. Особенности настройки см. подробнее.
  • topics - раздел параметров для настройки топиков. Раздел не является обязательным. Особенности настройки см. подробнее.

Пример настройки:

connector:  class: io.tarantool.connector.KafkaSinkConnector  key.converter: org.apache.kafka.connect.json.JsonConverter  key.converter.schemas.enable: true  key.converter.schemas.cache.size: 1000  key.converter.decimal.format: BASE64  key.converter.replace.null.with.default: true  value.converter: org.apache.kafka.connect.json.JsonConverter  value.converter.schemas.enable: true  value.converter.schemas.cache.size: 1000  value.converter.decimal.format: BASE64  value.converter.replace.null.with.default: true  bootstrap.servers: broker-1:9092,broker-2:9092  producer:    batch.size: 512  admin:    bootstrap.servers: broker-1:9092,broker-2:9092  topic.creation.default:    partitions: 1    replication.factor: 1

Коннекторы TCS

Коннектор к Приемнику данных TCS

Коннектор к Tarantool Column Storage (TCS) позволяет TCDC передавать данные в TCS через JDBC-драйвер Arrow Flight SQL.

{note:important} DDL-команды нельзя передавать через коннектор. При получении такого события произойдет ошибка и Универсальный Обработчик остановится. Все команды по добавлению новых или изменению существующих таблиц, в которые планируется запись, необходимо выполнять вне коннектора. {/note}

Параметры коннектора указываются в разделе connector:

  • class - имя класса коннектора для записи в TCS по JDBC. Обязательный параметр. Может принимать только одно значение:
    • io.debezium.connector.jdbc.JdbcSinkConnector.
  • connection.url - строка подключения Arrow Flight SQL. Обязательный параметр. Вид:
    • jdbc:arrow-flight-sql://<хост>:<порт>?useEncryption=0. Параметр useEncryption=0 отключает шифрование (например, для локальной сборки).
  • connection.username - имя пользователя TCS. Обязательный параметр.
  • connection.password - пароль пользователя TCS. Обязательный параметр.
  • insert.mode - режим записи строк. Должен быть согласован с фактической схемой таблиц в TCS. Необязательный параметр. Принимает значения:
    • INSERT — вставка новой строки (нужен уникальный первичный ключ).
    • UPDATE — обновление по ключу.
  • primary.key.mode - способ получения первичного ключа записи для ее сохранения в Приемнике. Необязательный параметр. Принимает значения:
    • none — первичный ключ не требуется.
    • kafka — в качестве первичного ключа используются контрольные точки Kafka (__connect_topic, __connect_partition, __connect_offset).
    • record_key — первичный ключ извлекается из ключа события (record.key).
    • record_value — первичный ключ извлекается из самой записи (record.value).
    • record_header — ключ извлекается из заголовков события (ConnectRecordheaders()). См. подробнее. Набор и порядок полей первичных ключей событий должны соответствовать ключам тех таблиц в TCS, в которые коннектор записывает события.
  • primary.key.fields - определяет список полей, используемых в качестве первичного ключа. Его поведение зависит от типа ключа в сообщении:
    • Когда ключ имеет простой тип — параметр становится обязательным, иначе коннектор выбросит исключение. В этом случае берется первое значение из свойства primary.key.fields.
    • Когда ключ сообщения представляет собой структуру, параметр можно не задавать: используются все поля ключа. Если параметр задан, используются только перечисленные поля из структуры ключа.
  • delete.enabled - определяет, обрабатывать ли события удаления. Необязательный параметр. При значении true события удаления обрабатываются, при false игнорируются. Также, при значении true, перед обработкой событий DELETE таблицы сбрасывается буфер накопленных событий UPDATE для той же таблицы, чтобы не потерять порядок операций. Значение по умолчанию: false.
  • schema.evolution - поддержка режима изменения схемы таблиц Приемника. Необязательный параметр. Принимает знаения:
    • none - события, приводящие к изменениям схемы таблиц (DDL-команды) не передаются в Приемник коннектором. Это рекомендованное значение при работе с Приемником данных TCS.
    • basic - события, приводящие к изменениям схемы таблиц (DDL-команды) передаются в Приемник коннектором. Это значение установлено по умолчанию.
  • database.time.zone - часовой пояс для типов даты/времени при записи в БД. Необязательный параметр. Значение по умолчанию: UTC.
  • auto.create - автоматическое создание таблиц в Приемнике при необходимости. Необязательный параметр. Значение по умолчанию: true.
  • quote.identifiers - экранирование идентификаторов в SQL (имен таблиц и колонок). Необязательный параметр. Значение по умолчанию: true.
  • truncate.enabled - разрешение операций очистки таблиц, если они приходят в потоке событий. Необязательный параметр. Значение по умолчанию: false.
  • batch.size - размер пакета накапливаемого пакета записей для передачи в БД. Необязательный параметр. Значение по умолчанию: 500.
  • use.reduction.buffer - использование буфера сжатия событий. При работе буфер накапливает несколько событий, относящихся к одной и той же записи (таблица + ключ), и оставляет только последнее из них. Это сокращает количество операций записи, заменяя серию промежуточных обновлений одним, самым новым. Необязательный параметр. Значение по умолчанию: true.
  • hibernate.dialect - диалект Hibernate для генерации SQL. Значение зависит от версии TCS и установленного драйвера JDBC. Необязательный параметр.

Пример настройки:

sink:  plugin.path: plugins/sink  connector:    class: io.debezium.connector.jdbc.JdbcSinkConnector    connection:      url: jdbc:arrow-flight-sql://localhost:50051      username: tcs      password: tcs    insert.mode: update    primary.key.mode: record_key    delete.enabled: true    schema.evolution: none    database.time.zone: UTC    auto.create: true    quote.identifiers: true    truncate.enabled: false    batch.size: 500    use.reduction.buffer: true    hibernate.dialect: org.hibernate.dialect.PostgreSQLDialect

Коннектор к приемнику данных Elasticsearch

Коннектор к приемнику данных Elasticsearch позволяет TCDC передавать события изменения данных в индексы Elasticsearch. Коннектор преобразует каждое событие (вставку, обновление, замену или удаление записи в источнике) в соответствующее действие над документом в Elasticsearch: индексацию, обновление или удаление. Таким образом, документы в Elasticsearch отражают актуальное состояние данных, переданных через CDC.

Параметры коннектора:

  • elastic.connections — список URL для подключения к узлам Elasticsearch. Обязательный параметр. Каждый элемент списка - строка с URL вида <elasticsearch://>[<login>[:<password>]@]<host>:<port>[/][?] или <elasticsearchs://>[<login>[:<password>]@]<host>:<port>[/][?] для подключения по TLS(SSL). Элементы списка перечисляются через запятую.

    Внимание:

    • На данный момент в строке подключения не поддерживаются параметры запроса (например, ?timeout=30s).
    • При подключении к узлу с использованием TLS(SSL) доверенные сертификаты в форматах PEM и PKCS12 можно задать через параметры tls.trust.pkcs12.certificates и tls.trust.pem.certificates. Цепочка доверия для каждого узла Elasticsearch будет проверяться на основе этих сертификатов.
  • insert.mode — режим вставки записей. Необязательный параметр. Принимает значения:

    • insert — вставка происходит, если в индексе нет документа с таким же _id. Если документ существует, коннектор выбрасывает исключение. Это значение используется по умолчанию.
    • replace — при наличии документа с таким же _id старый документ заменяется новым.
  • primary.key.mode — способ получения первичного ключа записи для ее сохранения в Elasticsearch. Значение первичного ключа сохраняется в поле документа _id. Необязательный параметр. Принимает значения:

    • record_value — первичный ключ извлекается из самой записи (value). Это значение используется по умолчанию.
    • record_key — первичный ключ извлекается из ключа события (key).
  • primary.key.fields — список имен полей, которые входят в состав значения первичного ключа события, в Elasticsearch. Коннектор проверяет, какие из указанных полей присутствуют в событии, и формирует из них массив в формате JSON, перечисляя их через запятую. Порядок элементов в массиве соответствует порядку перечисления полей в схеме события. Поля, отсутствующие в событии, игнорируются. Если поле присутствует, но его значение равно null, то в составном ключе (несколько полей) null допускается, а в несоставном ключе (одно поле) коннектор выбрасывает исключение. Пустая строка записывается как "". Полученный массив в формате JSON записывается в поле _id сохраняемого документа. Необязательный параметр:

    • Если параметр не задан или пуст, первичным ключом считаются все поля записи.
    • Если параметр задан, коннектор ищет в записи поля с указанными именами. Найденные поля объединяются в состав _id. Если не найдено ни одного поля, коннектор выбрасывает исключение.
  • delete.enabled — включает или выключает обработку событий DELETE. Необязательный параметр. Возможные значения:

    • true - события обрабатываются.
    • false — события отбрасываются с записью в лог (уровень DEBUG). Это значение используется по умолчанию.
  • decimal.handling.mode — режим преобразования полей типа Decimal (включая io.debezium.data.VariableScaleDecimal). Необязательный параметр. Принимает значения:

    • precise, engineer_string — значение представляется в виде экспоненциальной записи (например, 123.45 преобразуется в "1.2345e2"). Тип в Elasticsearch — text. Значение precise используется по умолчанию.
    • plain_string — значение всегда представляется в виде строки десятичной дроби (тип — text).
    • double — значение представляется в виде числа двойной точности (тип — double). Этот тип не подходит для данных, требующих абсолютной точности.
  • datetime.handling.mode — режим преобразования полей типа дата‑время. Необязательный параметр. Принимает значения:

    • datetime — преобразуется в тип date Elasticsearch. Это значение используется по умолчанию.
    • string — преобразуется в текстовую строку ISO‑8601 (тип text).
    • numeric — преобразуется в целое число, разрядность которого зависит от исходного типа Kafka Connect:
      • Для Timestamp - 64-битное (long) - миллисекунды, отсчитываемые с момента 01 января 1970 года 00 часов 00 минут 00 секунд.
      • Для Date — 32-битное целое (int) с количеством дней с 01 января 1970 года.
      • Для Time — 32-битное целое (int) с миллисекундами с полуночи.
  • datetime.infinity.handling.enabled — включает или выключает обработку значений -infinity/+infinity для логического типа io.debezium.time.ZonedTimestamp. Необязательный параметр. Принимает значения:

    • true:
      • При datetime.handling.mode: string — выходной тип text со значением -infinity/+infinity;
      • При datetime.handling.mode: datetime или datetime.handling.mode: numeric — выходной тип date с преобразованием в граничные даты (+5879610-12-31... и -5879609-12-31...).
    • false:
      • При datetime.handling.mode: datetime или datetime.handling.mode: numeric — коннектор выбрасывает исключение.
      • При datetime.handling.mode: string — коннектор работает в штатном режиме. Значение по умолчанию: false.
  • io.threads.count — количество потоков обработки операций сетевого ввода‑вывода. Необязательный параметр. Значение 0 означает отсутствие ограничений количества потоков. По умолчанию используется значение 0.

  • futures.processing.threads.count — количество потоков для обработки ответов от Elasticsearch. Низкое количество потоков относительно количества событий снижает скорость обработки событий. Увеличение количества потоков повышает скорость обработки, но требует дополнительных ресурсов процессора. Значение 0 означает использование всех доступных ресурсов хоста. По умолчанию используется значение 0.

  • tls.trust.pem.certificates — список полных путей к доверенным сертификатам в формате PEM (расширения .pem, .crt, .ca-bundle) или путей к папкам, где хранятся такие сертификаты (поиск выполняется рекурсивно). Необязательный параметр.

  • tls.trust.pkcs12.certificates — список записей вида путь_к_файлу.p12:пароль, где указывается путь к контейнеру PKCS12 (.p12, .pfx) и пароль от него. Контейнеры без пароля не допускаются. Необязательный параметр.

  • auth.api-key — Строка в формате Base64 для аутентификации в API по ключу. Если параметр задан, он используется вместо аутентификации по логину и паролю. Необязательный параметр.

При получении события для индекса, который отсутствует в кластере Elasticsearch, коннектор автоматически создает этот индекс. Параметры из разделе index применяются только в момент создания индекса. Если индекс уже существует, эти параметры не используются. Если параметры не заполнены - используются значения по умолчанию. Больше информации о настройках индексов Elasticsearch см. по ссылке.

Параметры настройки индексов указываются в виде *.index.<параметр>: индекс1: значение1, индекс2: значение2, __common__: значение_по_умолчанию. Ключ __common__ задает значение для всех индексов, у которых не указано свое конкретное значение. Если __common__ не указан, то для всех индексов для которых значения не указаны, будет использоваться значение по умолчанию.

  • index.number-of-shards — количество первичных шардов индекса. Значение по умолчанию устанавливается на стороне Elasticsearch. Необязательный параметр.
  • index.number-of-routing-shards — количество виртуальных шардов, используемых в ключе маршрутизации записи события в Elasticsearch. Значение по умолчанию: 1. Необязательный параметр.
  • index.mapping.total-fields.limit — максимальное количество полей в индексе (включая сопоставления полей, объектов, псевдонимов и вычисляемых полей). Превышение лимита может привести к ухудшению производительности и перегрузке памяти узлов кластера. Необязательный параметр. Значение по умолчанию: 1000.
  • index.mapping.dynamic — управляет добавлением новых полей в сопоставление индекса. Принимает значения:
    • true — новые поля автоматически добавляются в сопоставление. Это значение используется по умолчанию.
    • runtime — новые поля добавляются в сопоставление как вычисляемые поля. Такие поля не индексируются и загружаются из исходного документа (_source) во время выполнения запроса.
    • false — новые поля не добавляются в сопоставление. Они не индексируются и не участвуют в поиске, но остаются в исходном документе ответа.
    • strict — при обнаружении нового поля выбрасывается исключение, и документ отклоняется.
  • index.translog.durability — режим сброса журнала транзакций на диск. Необязательный параметр. Принимает значения:
    • request — синхронный сброс после каждого запроса. Это значение используется по умолчанию.
    • async — асинхронный сброс в фоне с интервалом sync_interval.
  • index.translog.sync-interval — интервал принудительной записи журнала транзакций на диск. Необязательный параметр. Используется только при durability: async. Значение по умолчанию: 5s. Не может быть менее 100ms.
  • index.translog.flush-threshold-size — максимальный общий размер операций в translog, после которого выполняется принудительный сброс (flush). Необязательный параметр. Значение по умолчанию: 10 GB.
  • index.refresh-interval — интервал выполнения операции refresh (делает последние изменения видимыми для поиска). Необязательный параметр. Значение по умолчанию различается для разных типов установок Elasticsearch:
    • Для облачной установки EalsticSearch (Serverless) - 5s.
    • Для установки Elasticsearch в Elastic Stack - 1s. При значении -1 операция refresh отключается.
  • index.auto-expand-replicas — автоматическое расширение числа реплик в зависимости от количества узлов данных в кластере. Формат: нижняя_граница-верхняя_граница (например, 0-5) или 0-all для всех узлов. Необязательный параметр. Значение по умолчанию: false (отключено).
  • index.codec — метод сжатия хранимых данных. Необязательный параметр. Принимает значения:
    • default — сжатие LZ4.
    • best_compression — сжатие ZSTD (более высокая степень сжатия за счет скорости чтения).
  • index.max-result-window — ограничение на глубину поиска в Elasticsearch. Этот параметр не влияет на передачу данных коннектором, но ограничивает поисковые запросы к уже проиндексированным данным со стороны других сервисов. Необязательный параметр. Значение по умолчанию: 10000.
  • index.routing.allocation.enable — разрешение на размещение шардов при их создании или восстановлении. Параметр определяет, какие типы шардов могут быть размещены на узлах кластера. Необязательный параметр. Принимает значения:
    • all — можно размещать любые шарды (первичные и реплики). Это значение используется по умолчанию.
    • primaries — можно размещать только первичные шарды.
    • new_primaries — можно размещать только вновь создаваемые первичные шарды.
    • none — нельзя размещать никакие шарды. При этом значении создание индекса невозможно.
  • index.routing.rebalance.enable — разрешение на перераспределение шардов. Параметр определяет, какие типы шардов могут быть автоматически перемещены между узлами кластера для выравнивания нагрузки. Необязательный параметр. Принимает значения:
    • all — разрешено перераспределение любых шардов (первичных и реплик). Это значение используется по умолчанию.
    • primaries — разрешено перераспределение только первичных шардов.
    • replicas — разрешено перераспределение только реплик.
    • none — перераспределение запрещено.
  • index.routing-partition-size — количество шардов, на которые направляются документы при использовании пользовательской маршрутизации. Значение должно быть меньше общего количества шардов индекса. Необязательный параметр. Значение по умолчанию: 1 (все документы с одинаковым значением пользовательской маршрутизации попадают в один шард).
  • index.soft-deletes.enabled — включение механизма мягких удалений, при которм удаленные документы не удаляются физически, а помечаются как удаленные и временно хранятся для нужд репликации и восстановления. Необязательный параметр. Принимает значения:
    • true - режим мягкого удаления включен. Это значение используется по умолчанию.
    • false - режим мягкого удаления выключен, при удалении документы стираются сразу. Параметр устарел начиная с Elasticsearch 7.6.0, так как мягкие удаления включены по умолчанию и не отключаются.
  • index.soft-deletes.retention-lease.period — максимальный срок, на который ведущий шард гарантирует хранение истории мягких удалений для реплик. Если реплика не синхронизируется в течение этого периода, история считается утерянной. Необязательный параметр. Значение по умолчанию: 12h.
  • index.search.idle.after — время бездействия шарда без поисковых или get-запросов, после которого шард считается простаивающим. Необязательный параметр. Значение по умолчанию: 30s.

Пример настройки:

connector:  elastic.connections: elasticsearch://user:pass@localhost:9200, elasticsearchs://10.0.0.1:9200  insert.mode: replace  primary.key:    mode: record_value    fields: id, order_number  delete.enabled: true  decimal.handling.mode: precise  datetime:    handling.mode: datetime    infinity.handling.enabled: true  io.threads.count: 4  futures.processing.threads.count: 4  tls.trust:    pem.certificates: /etc/certs/elastic-ca.crt, /etc/certs/trusted/    pkcs12.certificates: /etc/certs/client.p12:secret123, /etc/certs/node.p12:pass456  auth.api-key: dXNlcm5hbWU6cGFzc3dvcmQ=  index:    number-of-shards: users:5, orders:10, __common__:3    number-of-routing-shards: users:5, orders:10, __common__:3    mapping:      total-fields.limit: users:2000, __common__:1000      dynamic: __common__:true, users:runtime, logs:false    translog:      durability: users:async, __common__:request      sync-interval: users:10s, __common__:5s      flush-threshold-size: users:5gb, __common__:10gb    refresh-interval: users:-1, orders:30s, __common__:1s    auto-expand-replicas: users:0-3, orders:0-all, __common__:false    codec: users:best_compression, __common__:default    max-result-window: users:50000, __common__:10000    routing:      allocation.enable: users:primaries, __common__:all      rebalance.enable: users:replicas, __common__:all    routing-partition-size: users:3, __common__:1    soft-deletes:      retention-lease.period: users:24h, __common__:12h      enabled: users:true, __common__:true    search.idle.after: users:60s, __common__:30s

Коннекторы Tarantool

Коннекторы Tarantool позволяют TCDC подключаться к Tarantool DataBase (TDB) и Tarantool Data Grid 2.x (TDG2) для:

  • Получения событий изменения данных из TDB и TDG2 при помощи коннектора к Источнику данных Tarantool.
  • Передачи событий изменения данных в TDB Коннекторы Tarantool позволяют TCDC подключаться к Tarantool DataBase (TDB) для:
  • Получения событий изменения данных при помощи коннектора к Источнику данных Tarantool.
  • Передачи событий изменения данных при помощи коннектора к Приемнику данных Tarantool.

Коннекторы к Источнику данных Tarantool

Коннектор к Источнику данных Tarantool позволяет TCDC получать события изменения данных в Tarantool DataBase 1.x/ 2.x (TDB1/ TDB2) и Tarantool Data Grid 2.x (TDG2). Событие может быть вставкой, обновлением или удалением записи.

Параметры коннектора:

  • topic.prefix — приставка в теме сообщения. Необязательный параметр. Значение по умолчанию не установлено.
  • poll.interval.ms - время, в течение которого коннектор накапливает события в пакете, который будет передан для дальнейшей обработки. Если размер пакета не достиг размера, указанного в max.batch.size, то пакет передается по истечении указанного времени. Необязательный параметр. Значение по умолчанию: 500.
  • max.batch.size - максимальное количество событий, которое вмещает пакет передачи данных. По достижении указанного количества пакет передается для дальнейшей обработки. Необязательный параметр. Значение по умолчанию: 2048.
  • connect.username — имя пользователя для подключения к Источнику. Обязательный параметр.
  • connect.password — пароль пользователя для подключения к Источнику. Необязательный параметр. Пустой пароль или отсутствие параметра допускается только с именем пользователя guest.
  • connect.timeout — время ожидания подключения к Источнику в миллисекундах. Если подключение не установлено за это время, операция прерывается. Необязательный параметр. Допустимые значения: от 1 до 2147483647 (Integer.MAX_VALUE). Значение по умолчанию: 2000.
  • connect.adapter - режим обработки входящих событий. Определяет, как именно коннектор должен читать и преобразовывать кортежи, полученные из Tarantool. Принимает значения:
    • tarantool - события обрабатываются как обычные кортежи Tarantool. Коннектор извлекает и отправляет их в том виде, в котором они хранятся в базе. Этот режим не рекомендуется для подключения к Приемнику TDG2, из-за особенностей хранения данных.
    • tdg2 - события обрабатываются как сложные документы TDG2. Этот режим рекомендуется для подключения к Приемнику TDG2.
  • connect.targets - список узлов, к которым необходимо подключить коннектор. Обязательный параметр. Значение по умолчанию не установлено.
  • replication.anonymous - включает режим анонимной реплики. Необязательный параметр. Принимает значения:
    • false - коннектор подключается как неанонимная реплика.
    • true - коннектор подключается как анонимная реплика. Анонимная реплика не может изменять данные в кластере Tarantool и не учитывается кластером Tarantool в его внутренних расчетах. Это значение используется по умолчанию.
  • replication.id - произвольный идентификатор сеанса репликации, используемый для генерации уникального экземпляра коннектора в кластере Tarantool. Необязательный параметр.
  • replication.idle.timeout — максимальное время ожидания в миллисекундах, после последнего сигнала (события или пульса) от Tarantool. По истечении этого времени подключение к Источнику прерывается. Необязательный параметр. Допустимые значения: от 0 до 9223372036854775807 (Long.MAX_VALUE). Значение по умолчанию: 2000.
  • spaces.include — список названий пространств, которые коннектор будет обрабатывать. Названия перечисляются через запятую. Необязательный параметр. Если параметр не указан или пуст, обрабатываются все пространства Источника.
  • spaces.exclude — список названий пространств, которые коннектор не будет обрабатывать. Необязательный параметр. Названия перечисляются через запятую. Параметр имеет приоритет над spaces.include - при указании пространства в обоих параметрах, коннектор не будет обрабатывать это пространство.
  • fields.include — список названий полей, которые коннектор будет обрабатывать. Названия перечисляются через запятую. Необязательный параметр. Под действие параметра попадают поля пространств Источника, указанных в spaces.include или всех пространств, если spaces.include не указан или пуст. Также можно указывать поля конкретных пространств в формате [space_name].[field_name], где space_name — название пространства, field_name — имя поля. Названия полей, которые не принадлежат ни одному пространству, игнорируются.
  • fields.exclude — список названий полей, которые коннектор не будет обрабатывать. Необязательный параметр. Названия перечисляются через запятую. Также можно указывать поля конкретных пространств в формате [space_name].[field_name], где space_name — название пространства, field_name — имя поля. Параметр имеет приоритет над fields.include - при указании поля в обоих параметрах, коннектор не будет обрабатывать это поле.
  • datetime.zone.enabled — включает или выключает обработку временных зон типов данных datetime (для TDB), DateTime и Timestamp (для TDG2). Необязательный параметр. Принимает значения:
    • false — указанные типы данных кодируются в io.debezium.time.MicroTimestamp в виде количества микросекунд, отсчитываемые с момента 01 января 1970 года 00 часов 00 минут 00 секунд. Это значение используется по умолчанию.
    • true — указанные типы данных кодируются в io.debezium.time.ZonedTimestamp в виде ISO‑8601 строки со смещением.
  • integer.handling.mode — режим преобразования полей типа integer или unsigned. Необязательный параметр. Принимает значения:
    • int64 — все поля указанных типов преобразуются в обычное целое число (со знаком, до 9 квинтиллионов). Это значение используется по умолчанию.
    • decimal — все поля указанных типов преобразуются в десятичное число без дробной части.

Пример настройки:

connector:  topic.prefix: cdc  poll.interval.ms: 100  max.batch.size: 2048  connect:    username: admin    password: secret-cluster-cookie    timeout: 2000    adapter: tarantool    targets: source-tdb-storage-1-master:3301  replication:    anonymous: true    id: cdc-source-worker-1    idle.timeout: 2000  spaces:    include: User    exclude: intTest  fields:    include: all_types    exclude: bucket_id  datetime.zone.enabled: false  integer.handling.mode: int64

Коннекторы к Приемнику данных Tarantool

Коннектор к Приемнику данных Tarantool позволяет TCDC записывать события в эти СУБД. Событие может быть вставкой, обновлением или удалением записи из Источника данных.

Параметры коннектора:

  • tarantool.connection.groups — список URL с настройками для подключения к TDB. Обязательный параметр. Каждый элемент списка - строка с URL вида tarantool://<host><:><port>/?<param1=value1&param2=value2...>, tarantool://<host><:><port>/?<param1=value1&param2=value2...>. Элементы списка перечисляются через запятую. Доступные параметры:
    • host — имя узла Tarantool. Значение по умолчанию: localhost
    • port — порт для подключения. Значение по умолчанию: 3301.
    • size — количество подключений в группе. Значение по умолчанию: 1.
    • user — имя пользователя для подключения. Значение по умолчанию: guest.
    • password — пароль для подключения. Значение по умолчанию отсутствует.
    • auth_type — тип аутентификации группы подключения. Доступные значения:
      • CHAP_SHA1 - результат преобразования пароля уникален для каждого подключения, что снижает риски перехвата пароля. Это значение используется по умолчанию.
      • PAP_SHA256 - результат преобразования пароля одинаков при каждом подключении.
    • tag — имя группы подключения, уникальное. Значение по умолчанию отсутствует.
  • insert.mode — режим вставки записей. Необязательный параметр. Принимает значения:
    • insert — вставка происходит если в пространстве нет кортежа с идентичным первичным ключом. В ином случае коннектор завершает работу с исключением. Это значение используется по умолчанию.
    • replace — при наличии кортежа с таким же первичным ключом перезаписывает старый кортеж новым.
  • primary.key.mode — способ получения первичного ключа. Необязательный параметр. Принимает значения:
    • record_value (по умолчанию) — поиск первичного ключа производится в самой записи
    • record_key — поиск первичного ключа производится в структуре record.key
  • primary.key.fields — список имен полей, потенциально входящих в состав первичного ключа. Параметр связан с primary.key.mode и определяет части первичного ключа по именам полей. Необязательный параметр. Поведение зависит от значения primary.key.mode:
    • При record_value:
      • Когда список полей пустой или отсутствует, первичным ключом считаются все поля в записи.
      • Когда список полей не пустой, производится поиск полей записи по названиям из этого списка. При нахождении поля оно считается частью первичного ключа.
      • Когда в записи не найдено ни одного поля с названием, указанным в списке полей, коннектор выбрасывает исключение.
    • При record_key:
      • Когда ключ имеет простой тип — параметр становится обязательным, иначе коннектор выбросит исключение. В этом случае берется первое значение из свойства primary.key.fields.
      • Когда ключ сообщения представляет собой структуру, параметр можно не задавать: используются все поля ключа. Если параметр задан, используются только перечисленные поля из структуры ключа.
  • delete.enabled — включает или выключает обработку событий DELETE. Необязательный параметр. Возможные значения:
    • true - события обрабатываются.
    • false — события отбрасываются с записью в журнал (уровень DEBUG). Это значение используется по умолчанию.
  • truncate.enabled — включает или выключает обработку событий TRUNCATE. Необязательный параметр. Возможные значения:
    • true - события обрабатываются.
    • false — события отбрасываются с записью в журнал (уровень DEBUG). Это значение используется по умолчанию. Внимание: Включать обработку событий TRUNCATE не рекомендуется. Порядок обработки сообщений такого рода в общем потоке данных не гарантирован, что может привести к непредвиденным потерям данных в Приемнике.
  • decimal.handling.mode — режим преобразования полей типа io.debezium.data.VariableScaleDecimal и org.apache.kafka.connect.data.Decimal. Необязательный параметр. Принимает значения:
    • precise, engineer_string — значение представляется в виде экспоненциальной записи (например, 123.45 преобразуется в "1.2345e2"). Значение precise используется по умолчанию.
    • plain_string — значение всегда представляется в виде строки десятичной дроби.
    • double — значение представляется в виде числа двойной точности. Этот тип не подходит для данных, требующих абсолютной точности.
  • datetime.handling.mode — режим преобразования полей типа дата‑время. Необязательный параметр. Принимает значения:
    • datetime — преобразуется в тип date Elasticsearch. Это значение используется по умолчанию.
    • string — преобразуется в текстовую строку ISO‑8601 (тип text).
    • numeric — преобразуется в целое число, разрядность которого зависит от исходного типа Kafka Connect:
      • Для Timestamp - 64-битное (long) - миллисекунды, отсчитываемые с момента 01 января 1970 года 00 часов 00 минут 00 секунд.
      • Для Date — 32-битное целое (int) с количеством дней с 01 января 1970 года.
      • Для Time — 32-битное целое (int) с миллисекундами с полуночи.
  • datetime.infinity.handling.enabled — включает или выключает обработку значений -infinity/+infinity для логического типа io.debezium.time.ZonedTimestamp. Необязательный параметр. Принимает значения:
    • true:
      • При datetime.handling.mode: string — выходной тип text со значением -infinity/+infinity;
      • При datetime.handling.mode: datetime или datetime.handling.mode: numeric — выходной тип date с преобразованием в граничные даты (+5879610-12-31... и -5879609-12-31...).
    • false:
      • При datetime.handling.mode: datetime или datetime.handling.mode: numeric — коннектор выбрасывает исключение.
      • При datetime.handling.mode: string — коннектор работает в штатном режиме. Значение по умолчанию: false.

Пример настройки:

connector:  tarantool.connection.groups: tarantool://host1:3301/?user=admin&password=secret&size=2, tarantool://host2:3301/?user=admin&password=secret  insert.mode: replace  primary.key:    mode: record_value    fields: id  delete.enabled: true  truncate.enabled: false  decimal.handling.mode: precise  datetime:    handling.mode: datetime    infinity.handling.enabled: false

Коннекторы Debezium

К коннекторам Debezium относятся:

  • Коннекторы к Источнику и Приемнику данных PostgreSQL.
  • Коннекторы к Источнику и Приемнику данных Oracle.

Механизм Пульс в коннекторах Debezium

Пульс (heartbeat) — механизм, обеспечивающий периодическое обновление позиции чтения в журнале упреждающей записи (WAL) источника данных независимо от наличия изменений в отслеживаемых коннектором таблицах. Благодаря этому механизму контрольные точки сохраняются с заданной периодичностью и сокращается объем WAL, который требуется повторно прочитать при восстановлении коннектора после сбоя.

Коннектор к Источнику сохраняет контрольные точки записи только при обработке событий из отслеживаемых таблиц. Если в этих таблицах нет изменений, позиция чтения журнала упреждающей записи не обновляется.

При перезапуске коннектор продолжает чтение журнала упреждающей записи с последней сохраненной контрольной точки. Если между этой точкой и текущей позицией в журнале находится большой объем записей из других таблиц, коннектор просматривает их все, прежде чем дойдет до конца. Время восстановления работы напрямую зависит от объема журнала, который необходимо прочитать.

Пульс создает искусственные изменения в отдельной созданной служебной таблице, которые попадают в журнал упреждающей записи, прочитываются и отдаются коннектором в обработчик. При обработке таких событий их контрольные точки сохраняются, что позволяет не перечитывать большие куски журнала репликации при возобновлении переноса данных.

В результате при перезапуске коннектору требуется прочитать часть журнала от последней контрольной точки до момента восстановления. Это сокращает время восстановления коннектора после сбоев и помогает периодически очищать журнал упреждающей записи, снижая потребление дисковой памяти на стороне базы данных Источника.

Настройка Пульса

Для правильной работы Пульса в базе данных Источника необходимо создать служебную таблицу, в которую коннектор будет вносить изменения.

Пример создания служебной таблицы для PostgreSQL:

CREATE TABLE public.heartbeat (    id SERIAL PRIMARY KEY,    ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP);

Пример создания служебной таблицы для Oracle:

CREATE TABLE heartbeat_table (    id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,    ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP);

Работа Пульса настраивается параметрами:

  • heartbeat.action.query - SQL-запрос, выполняемый на Источнике данных. Используется для внесения изменений в служебную таблицу. При получении этого запроса контрольная точка в WAL Источника обновляется. Таблица, указываемая в запросе heartbeat.action.query должна быть включена в список наблюдаемых таблиц table.include.list коннектора, иначе коннектор не увидит изменения и контрольная точка не обновится.
  • heartbeat.interval.ms - интервал отправки Пульс-сообщений. Значение 0 отключает Пульс. Рекомендуемое значение: 10000 (10 секунд).

Пример для PostgreSQL:

connector:  heartbeat.interval.ms: 30000  heartbeat.action.query: "INSERT INTO heartbeat (ts) VALUES (NOW())"

Пример для Oracle с LogMiner:

connector:  heartbeat.interval.ms: 30000  heartbeat.action.query: "INSERT INTO heartbeat_table VALUES (SYSDATE)"

Особенности коннекторов Oracle с OLR адаптером

При использовании коннектором Oracle OLR (OpenLogReplicator) адаптера с флагом SHOW_CHECKPOINT (flags = 4096 (0x1000)) запрос heartbeat.action.query не нужен при включенном Пульсе, т.к.:

  1. OLR отправляет события CHECKPOINT на границах LWN (Log Writer Number) в redo log - независимо от наличия изменений в наблюдаемых таблицах.
  2. При получении событий CHECKPOINT и COMMIT от OLR, коннектор сразу отправляет Пульс, не дожидаясь окончания интервала времени.
  3. Это гарантирует, что контрольные точки обновляются регулярно даже при отсутствии изменений в наблюдаемых таблицах.

Важно: heartbeat.interval.ms должен быть включен (значение > 0), иначе Пульс не будет работать даже с контрольными точками OLR.

Пример конфигурации OLR:

{  "source": [{    "flags": 4096  }]}

Пример конфигурации Debezium с OLR:

connector:  heartbeat.interval.ms: 10000  # heartbeat.action.query не требуется

Рекомендации по использованию Пульса

СУБД/коннектор
Поток изменений в наблюдаемые таблицы
Поток изменений в ненаблюдаемые таблицы
heartbeat.interval.ms
heartbeat.action.query
Примечания
PostgreSQL
Большой
Большой
Не нужен
Не нужен
Нормальная работа
PostgreSQL
Большой
Малый
Не нужен
Не нужен
Нормальная работа
PostgreSQL
Малый
Большой
10000-30000
Обязателен
WAL растет
PostgreSQL
Малый
Малый
10000-30000
Не обязателен
Мониторинг
SQL Server
Малый
Большой
10000-30000
Обязателен
Записи журнала растут
Oracle + OLR (флаг 4096)
Малый
Большой
10000-30000
Не нужен
OLR шлет контрольную точку
Oracle + LogMiner
Малый
Большой
10000-30000
Обязателен

Сторонние коннекторы

Tarantool CDC поддерживает подключение сторонних коннекторов, которые не входят в основную поставку TCDC. Коннектор может быть представлен как один или несколько взаимосвязанных JAR-файлов. Эти файлы нужно добавить на тот же уровень, на котором находятся коннекторы из основной поставки.

Добавление сторонних коннекторов различается для экземпляров Tarantool CDC, развернутых:

Добавление сторонних коннекторов через Kubernetes

Для того, чтобы добавить сторонние коннекторы к установке Tarantool CDC, развернутой в среде Kubernetes:

  1. В реестр контейнеров (Container Registry) вашего образа добавьте образ, который содержит один или несколько JAR-файлов вашего коннектора.
  1. В файле конфигурации ссылка values.yml укажите этот образ в разделе connectorPackages:

    connectorPackages:  debezium-mysql:    image:      repository: registry.example.local/cdc-connectors/extra_connetcor # адрес в реестре контейнеров      tag: "1.0"      pullPolicy: IfNotPresent    classes:      - io.debezium.connector.mysql.MySqlConnector                      # класс коннектора
  2. В файле конфигурации Helm-чарта values.yml укажите класс этого коннектора в разделе sinks или sources:

    sources:                                                                # или sinks, в зависимости от типа коннектора  mysql:    common:      connector:        class: io.debezium.connector.mysql.MySqlConnector
  3. Проверьте, что Helm чарт корректно генерирует шаблоны с указанными параметрами:

    helm template my-release ./helm-chart-cdc --values values.yml | grep -A5 "initContainers"helm template my-release ./helm-chart-cdc --values values.yml | grep -A3 "mountPath: /libs/connect"
  4. Перезапустите ваш экземпляр Tarantool CDC:

    helm upgrade my-release ./helm-chart-cdc --values values.ymlkubectl rollout restart deployment -l app.kubernetes.io/instance=my-releasekubectl rollout status deployment -l app.kubernetes.io/instance=my-release
  5. После перезапуска в журнале Обработчика должна появиться строка с перечнем доступных коннекторов, в которой присутствует класс из вашего коннектора. Пример:

    io.tarantool.cdc.source.SourceConfiguration : available source connectors: [  PluginDesc{klass=class io.debezium.connector.mysql.MySqlConnector, name='io.debezium.connector.mysql.MySqlConnector',    version='2.7.0.Final', encodedVersion=2.7.0.Final, type=source,    typeName='source', location='file:/libs/connect/source/debezium-mysql/'},  ...]

    Если в записи перечислены только коробочные коннекторы, проверьте параметр source.plugin.path / sink.plugin.path.

Пример добавления сторонних коннекторов через Kubernetes

1. Сборка и публикация образ коннектора.

Один JAR:

FROM busybox:1.36COPY my-connector-1.0.jar /connectors/

Несколько JAR:

Файлы .jar всех коннекторов должны быть собраны внутри папки /connectors/. В файле pom.xml должны быть перечислены артефакты коннектора. Далее артефакты и их транзитивные зависимости устанавливаются в одну общую папку.

FROM maven:3.9-eclipse-temurin-17 AS builderWORKDIR /buildCOPY pom.xml .RUN mvn -q dependency:copy-dependencies -DoutputDirectory=/connectorsFROM busybox:1.36COPY --from=builder /connectors /connectors/

Проверка содержимого:

docker run --rm registry.example.local/cdc-connectors/debezium-mysql:1.0 ls -la /connectors/docker run --rm registry.example.local/cdc-connectors/debezium-mysql:1.0 sh -c "cp -r /connectors/. /tmp/ && ls /tmp/"

2. Внесение изменений в values.yml Helm-чарта.

Добавьте раздел верхнего уровня connectorPackages, укажите в нем образ коннектора. Затем добавьте в раздел sources описание подключения к Источнику, или в раздел sinks описание подключения к Приемнику, с нужным классом коннектора. в sources / sinks тот же connector.class, что перечислен в classes пакета.

3. Проверка Helm values перед установкой:

См. 4 шаг в Добавление сторонних коннекторов через Kubernetes

Проверка журнала после установки выполняется так же, как в шаге 6 общей инструкции — в строке available source connectors: [...] (или available sink connectors: [...]) должен присутствовать указанный класс коннектора.

Добавление сторонних коннекторов через ATE

Для того, чтобы добавить сторонние коннекторы к установке Tarantool CDC, развернутой при помощи инструментов Ansible Tarantool Enterprise, выполните скрипт scripts/install-connector.sh с указанием класса коннектора и пути к архиву или папке, в которой находится коннектор. Скрипт может содержать следующие параметры и команды:

  • --class <FQCN> - полное имя класса коннектора, например com.example.MysqlSourceConnector. Обязательный параметр.
  • --name <имя> - имя коннектора. По умолчанию определяется из имени JAR-файла, в котором найден указанный класс. Если класс находится в нескольких JAR-файлах и --name не задан, для выбора JAR используется --version (см. ниже); если этого недостаточно, скрипт остановится и предложит указать --name явно. Если класс не удалось найти ни в одном JAR, имя берется из имени архива/папки.
  • --version <версия> - версия коннектора. По умолчанию определяется из имени JAR-файла, в котором найден указанный класс. Если класс найден в нескольких JAR-файлах, --version используется как фильтр для выбора нужного JAR.
  • --install-dir <путь> - папка для установки сторонних коннекторов. Значение по умолчанию: /opt/tarantool-cdc/custom-connectors. Рекомендуется устанавливать сторонние коннекторы вне папки с основным архивом Tarantool CDC /releases, чтобы обновление продукта не приводило к их изменению или удалению.
  • --tcdc-dir <путь> - папка, в которой установлен экземпляр Tarantool CDC. Значение по умолчанию: /opt/tarantool-cdc/releases/current.
  • --role source\|sink - проверка, что коннектор соответствует требуемой роли (Источник или Приемник).
  • --validate-only - выполняет статические проверки коннектора. Команда проверяет:
    • Целостность переданного архива (.zip, .tar.gz);
    • Наличие JAR-файлов;
    • Отсутствие дублирующихся артефактов;
    • Наличие указанного класса коннектора внутри JAR-файлов;
    • Совместимость байткода (не выше Java 17);
    • Роль коннектора (SourceConnector или SinkConnector).
* `--dry-run` - выводит планируемые действия, но не выполняет их.

Пример:

install-connector.sh \  --class io.debezium.connector.mysql.MySqlConnector \  --role source \  debezium-connector-mysql-2.7.0.Final-plugin.zip

После успешной установки скрипт создает следующую структуру:

/opt/tarantool-cdc/custom-connectors/└── <имя-коннектора>/    └── <версия>/        ├── connector.jar        ├── dependency-1.jar        └── ...          # все JAR-файлы из архива (не более 2 уровней вложенности)

И выводит готовый фрагмент для файла настроек:

=== Add to worker.yaml ===source:                         # или 'sink:', если это коннектор к Приемнику данных  plugin.path: /opt/tarantool-cdc/custom-connectors/<имя>/<версия>  connector.class: <FQCN>

После установки необходимо указать путь к коннектору в конфигурации Универсального Обработчика. В параметре source.plugin.path (или sink.plugin.path для коннектора к Приемнику) нужно указать путь до папки с версией коннектора:

source:  plugin.path: /opt/tarantool-cdc/custom-connectors/debezium-connector-mysql/2.7.0.Final  connector:    class: io.debezium.connector.mysql.MySqlConnector    # параметры коннектора...

После изменения конфигурации необходимо перезапустить Универсальный Обработчик через ATE.

Примеры добавления сторонних коннекторов через ATE

Проверка архива без установки:

install-connector.sh \  --class io.debezium.connector.mysql.MySqlConnector \  --validate-only \  debezium-connector-mysql-2.7.0.Final-plugin.zip

Установка в нестандартную директорию с выводом планируемых действий:

install-connector.sh \  --class io.debezium.connector.mysql.MySqlConnector \  --install-dir /app/cdc/custom-connectors \  --dry-run \  debezium-connector-mysql-2.7.0.Final-plugin.zip

Преобразователи

Одиночные преобразователи (Single Message Transformations - SMT) - это модули, с помощью которых можно изменять события, обрабатываемые в Tarantool CDC. Изменения могут включать в себя: преобразование структуры тела или ключа события, добавление, удаление, переименование полей; смена типов полей, смена темы событий, изменение заголовков событий.

Настройка преобразователей

Преобразователи могут быть настроены как для коннектора к Источнику, так и для коннектора к Приемнику. Первые, если они определены, применяются к событиям после того, как они прочитаны из Источника. Вторые применяются перед отправкой событий в целевую систему. Преобразователи настраиваются в конфигурации коннекторов в разделах transforms.

Параметры преобразователей:

  • chain — список имен преобразователей в том порядке, в котором они должны применяться к событию. Имена преобразователей задаются пользователем произвольно.
  • config — словарь, где каждый ключ — имя преобразователя, а значение — словарь с его настройками. Имена преобразователей должны совпадать с именами в chain.
    • type - класс преобразователя. Остальные параметры зависят от конкретного типа преобразователя и передаются в него как настройки.

В целом в установке TCDC может использоваться ни одного, один, или несколько преобразователей. В последнем случае преобразователи объединяются в последовательную цепочку для обработки событий: сначала преобразователи коннектора к Источнику, в порядке, указанном в параметре source.transforms.chain; затем преобразователи коннектора к Приемнику данных, в порядке, указанном в параметре sink.transforms.chain.

Пример настройки:

source:  connector:  ...                     # параметры коннектора  transforms:    chain:                # порядок применения преобразователей    - mongodb-unwrap    config:               # настройки каждого преобразователя коннектора к Источнику      mongodb-unwrap:        type: io.debezium.connector.mongodb.transforms.ExtractNewDocumentState        drop.tombstones: falsesink:  connector:  ...                     # параметры коннектора  transforms:    chain:                # порядок применения преобразователей    - remap    - module_2    - ...    config:               # настройки каждого преобразователя коннектора к Приемнику      remap:        type: io.tarantool.cdc.transforms.smt.Remap        messages:        - topic: cdc.inventory.users          key:            type: struct          ...             # остальные настройки преобразователя Remap      module_2:        type: org.apache.kafka.connect.transforms.RegexRouter        regex: storage_(.*)      ...

Модуль Remap

Remap — это модуль для преобразования событий путем создания нового выходящего события с заданной структурой тела и ключа с последующим заполнением их значениями из входящего события. К значениям могут быть применены различные преобразования: из строки в число, из строки с датой и временем в логический тип Timestamp, из числа в строку и так далее.

На входе могут быть как типизированные записи, структура которых описывается постоянной схемой, так и нетипизированные, без схемы. На выходе запись всегда типизирована.

Общий алгоритм работы модуля:

  1. По теме (топику, topic) входящей записи Remap ищет в своей конфигурации объявление структуры новой записи. Если для данной темы объявление отсутствует, запись передается дальше без изменений.
  2. По найденной структуре создается новая запись.
  3. Значения извлекаются из входящей записи, проходят через заданные преобразования и присваиваются соответствующим полям новой записи. Полям, которые обозначены как необязательные и для которых не указаны ссылки на значения, присваивается null.
  4. Новая запись передается дальше. Входящая запись отбрасывается.

Конфигурация модуля Remap

На верхнем уровне конфигурации модуля Remap определяется набор тем (топиков) с заданными описаниями выходных событий. Каждое описание преобразования состоит из двух частей:

  • Объявление компонента данных ключа записи key;
  • Объявление компонента данных тела записи value.

Эти объявления могут быть составными, включать в себя вложенные части, что позволяет описывать произвольные древовидные структуры Объявления компонентов данных ключа и тела записи задаются по одним и тем же правилам.

Атрибуты объявления компонента данных:

  • type - тип компонента данных. Обязательный параметр. В зависимости от его значения другие атрибуты могут стать обязательными.
  • optional - необязательность компонента данных:
    • true - компонент данных необязательный. Если компонент данных не удается вычислить, то модуль присвоит ему значение null.
    • false - компонент данных обязательный. Это значение используется по умолчанию. Если при этом значении компонент данных, или вложенный обязательный компонент любого уровня не является вычислимым, преобразователь Remap выбросит ошибку конфигурации при запуске обработчика. Правила вычислимости компонентов данных описаны отдельно для каждого типа.
  • params - набор строковых пар ключ-значение для тонкой настройки логических типов. Состав набора пар зависит от логического типа.
  • hints - набор строковых пар ключ-значение для тонкой настройки преобразования извлекаемых данных. Состав набора зависит от применяемого преобразования.
  • modifiers - набор строковых пар ключ-значение, в который входит обязательный элемент chain, для указания последовательности применяемых действий над полученным конечным значением перед записью его в поле выходного сообщения. Набор действий зависит от типа компонента данных.
  • default - значение по умолчанию. Не является обязательным, если задан атрибут path. Поддерживается только для простых типов.
  • path - ссылка на компонент данных во входящей записи. Может быть необязательным, если задан атрибут default. При type: struct атрибут path не должен задаваться.
  • version - версия схемы, не является обязательной.

Общий вид объявления компонента данных:

messages:- topic: <Тема сообщения>  key:    # Объявление компонента данных ключа новой записи    type: string    path: $key  value:  # Объявление компонента данных тела новой записи    type: <Тип>[/<Логический тип>]    version: <Версия схемы>    params:      <Параметр-1>: <Значение-Параметра-1>      <Параметр-...>: <Значение-Параметра-...>      <Параметр-N>: <Значение-Параметра-N>    optional: true | false    name: <Наименование поля в структуре>    fields:    - <Объявление поля 1>    - <Объявление поля ...>    - <Объявление поля N>    items: <Объявление компонента данных элементов массива или значений словаря>    key: <Объявление компонента данных ключей словаря>    path: <Путь к значению во входящей записи>    default: <Значение по умолчанию>    hints:      <Подсказка-1>: <Значение-Подсказки-1>      <Подсказка-...>: <Значение-подсказки-...>      <Подсказка-N>: <Значение-подсказки-N>    modifiers:      chain: <Цепочка действий над значением>      <Действие-1>: <Аргумент-Действия-1>      <Действие-...>: <Аргумент-Действия-...>      <Действие-N>: <Аргумент-Действия-N>- topic: topic_2  key: <Объявление компонента данных ключа новой записи>  value: <Объявление компонента данных тела новой записи>- topic: topic_N  key: <Объявление компонента данных ключа новой записи>  value: <Объявление компонента данных тела новой записи>

Особенности атрибутов компонентов данных в модуле Remap

Атрибут компонентов данных type

Тип struct (структура):

Структурный тип объединяет разнородные данные в единую структуру с постоянным набором полей. Для структурного типа обязательно наличие не пустого списка полей fields. Объявления полей подобны объявлениям компонентов данных, но для каждого поля должен быть определен атрибут name. Тип поля может быть структурой, массивом или словарем, что позволяет создавать древовидные структуры произвольной степени вложенности.

Пример:

type: structoptional: truefields: # Описания полей- name: a  type: int32  path: $value.a- name: b  type: int64  path: $key.b- name: c  type: struct  fields:  - name: d    type: boolean    path: $value.enabled  - name: e    type: string    path: $value.label

Компонент структурного типа является вычислимым при соблюдении хотя бы одного из следующих условий:

  • Необязательная структура (optional: true), у которой нет вычислимых полей — при вычислении компоненту будет присвоен null.
  • Все поля структуры являются вычислимыми (рекурсивно).

Тип array (массив):

Этот тип объединяет однородные данные в ряд переменной длины.
Объявление в атрибуте items является объявлением компонента данных и не должно быть пустым. В нем и в его вложениях атрибуты path могут отсутствовать или же указывать на значения. При этом первый элемент path может ссылаться как на текущий обрабатываемый элемент входного массива ($), так и на входящую запись и ее служебные поля ($value, $key, $header, $timestamp, $partition). Тип описываемого значения также может быть структурой, массивом или словарем, что позволяет создавать вложенные структуры произвольной глубины.

Атрибут path объявления массива указывает на расположение во входящей записи массива, на основе которого создается выходной массив в поле новой записи. Выходной массив получается путем преобразования элементов входного массива в соответствии с объявлением из атрибута items. Если path отсутствует, то должен быть объявлен атрибут optional: true.

Пример:

type: arrayitems: # Объявление компонента данных элементов массива  type: string  path: $.a

Значение компонента данных структурного типа может быть вычислено, если соблюдается хотя бы одно условие из следующих:

  • У структуры нет вычислимых полей, значение атрибута optional равно true - значением этого компонента будет null.
  • Все поля структуры являются вычислимыми.

Тип map (словарь):

Этот тип данных объединяет пары ключ-значение в контейнер данных переменной длины, где ключ и значение могут иметь свои типы. Объявление словаря подобно объявлению массива, добавляется только атрибут key, описывающий структуру ключа. Объявления в атрибутах key и items описывают компоненты данных и не должны быть пустыми. Для ключей словаря допустимы только простые типы, а также не должен указываться атрибут path - ключи выходного словаря получаются из значений ключей входного словаря. В объявлении из атрибута items и в его вложениях атрибуты path могут отсутствовать или же указывать на значения. При этом первый элемент path может ссылаться как на текущее обрабатываемое значение входного словаря ($), так и на входящую запись и ее служебные поля ($value, $key, $header, $timestamp, $partition). Тип описываемого значения также может быть структурой, массивом или словарем, что позволяет создавать вложенные структуры произвольной глубины.

Атрибут path объявления словаря указывает на расположение во входящей записи словаря, на основе которого создается выходной словарь в поле новой записи. Выходной словарь получается путем преобразования всех ключей и значений входного словаря в соответствии с объявлениями из атрибутов key и items. Если path отсутствует, то должен быть объявлен атрибут optional: true.

Пример:

type: mapkey: # Объявление компонента данных ключей словаря  type: int32items: # Объявление компонента данных значений словаря  type: string  path: $.a # Путь к исходному словарю во входящей записи

Значение компонента данных типа "словарь" может быть вычислено, если соблюдается хотя бы одного из следующих условий:

  • Задан path и значения ключей (key) и значений (items) являются вычислимыми.
  • Не задан path, но при этом optional: true — при вычислении компоненту может быть присвоен null.

Простые типы:

К простым типам относятся все остальные типы, которые представляют простые (атомарные или скалярные) значения:

  • int8 - целое число со знаком, разрядность - 8 бит.
  • int16 - целое число со знаком, разрядность - 16 бит.
  • int32 - целое число со знаком, разрядность - 32 бита.
  • int64 - целое число со знаком, разрядность - 64 бита.
  • float32 - число с плавающей запятой и знаком, разрядность - 32 бита.
  • float64 - число с плавающей запятой и знаком, разрядность - 64 бита.
  • string - строка (unicode).
  • bytes - байтовая строка.
  • boolean - двоичный тип (true, false).

Компонент простого типа является вычислимым при соблюдении хотя бы одного из следующих условий:

  • Не заданы атрибуты path, default, но задан optional: true — при вычислении такому компоненту может быть присвоен null.
  • Задан default, задан optional: false — при вычислении такому компоненту будет присвоено значение по умолчанию.
  • Задан path, задан optional: false — значение компонента будет извлечено из входящей записи.

Атрибут компонентов данных path

Атрибут path отвечает за указание значения во входящей записи. Представляет собой последовательность имен полей, разделенных точкой. Точка обозначает путь до нужного значения в исходной структуре. Первым элементом пути должен быть служебный символ, указывающий на корневую структуру для поиска значения. Описание служебных символов:

  • $ - в качестве корня для поиска значения по пути берется текущий обрабатываемый элемент - элемент входного массива или значение входного словаря.
  • $key - в качестве корня для поиска значения по пути берется ключ входящей записи.
  • $value - в качестве корня для поиска значения по пути берется тело входящей записи.
  • $header - указывает на поиск значения в заголовках входящей записи. Путь, начинающийся с $header, состоит только из двух частей: самого служебного символа и названия заголовка, откуда брать значение.
  • $timestamp - указывает на временную метку входящей записи. После этого служебного символа никаких других символов быть не должно.
  • $partition - указывает на номер части топика (partition) входящей записи. После этого служебного символа никаких других символов быть не должно.

Примеры:

  • $ - значение текущего элемента массива или словаря берется целиком. Например, для массивов данных простого типа (например, массив строк), необходимо просто взять строку как значение целиком.
  • $.person.birthdate - дата рождения извлекается из поля birthdate структуры, вложенной в поле person структуры текущего обрабатываемого элемента входящего массива.
  • $value.before.price - цена извлекается из поля price структуры, вложенной в поле before структуры тела записи.
  • $key.user_id - номер пользователя расположен в поле user_id структуры ключа записи.
  • $header.x-timestamp - временная метка извлекается из заголовка входящей записи x-timestamp.
  • $timestamp - в качестве значения используется временная метка входящей записи.
  • $partition - в качестве значения используется номер части топика входящей записи.

Атрибут компонентов данных params

Нужен для точного описания выходных типов Kafka Connect в схеме конечной записи. Kafka Connect использует набор базовых типов, поверх которого наложено множество логических типов, для каждого из которых может быть задан свой набор параметров. Например, для логического типа org.apache.kafka.connect.data.Decimal необходимо задать параметр scale. Любые другие логические типы из других экосистем, поддерживающих соглашения Kafka Connect, используют эти наборы параметров для выполнения внутренних преобразований.

Модификаторы преобразованных значений

Модификаторы modifiers задают набор действий над конечным значением. Эти действия выполняются после извлечения значения из исходной записи и приведения к целевому типу, указанному в схеме.

Модификаторы определены только для простых типов. Для каждого типа поддерживается свой набор модификаторов.

Последовательность действий задается особым атрибутом chain, где строкой через запятую перечисляются действия. Каждое действие представлено именем и аргументом, тем самым, является функцией от двух аргументов, где первым аргументом всегда является текущее значение.

Исключение составляет только двоичный тип: для него не требуется указание chain, только указание флага negate.

Общий алгоритм работы модификаторов:

  1. Действие берется из списка chain.
  2. Указанное действие применяется к текущему значению (извлеченному и преобразованному). Получается новое значение.
  3. Переход к следующему действию из списка chain:
    • Если действий больше нет, полученное значение является итоговым.
    • Иначе повторяются шаги 1-3.

Модификаторы типа boolean:

Для двоичного типа есть смысл только в одном действии - в обращении исходного значения.

  1. negate - логическое отрицание. Не требует указания аргумента.

Модификаторы типа int8:

Для байтового типа реализованы побитовые операции.

  1. r_shift - сдвинуть все биты вправо на указанное число шагов.
  2. l_shift - сдвинуть все биты влево на указанное число шагов.
  3. b_and - двоичное "И".
  4. b_or - двоичное "ИЛИ".
  5. b_xor - двоичное исключающее "ИЛИ".
  6. b_not - двоичное отрицание. Не требует указания аргумента.

Модификаторы для типов int16, int32, int64, float32, float64:

  1. add - прибавить к текущему значению некоторое число.
  2. sub - вычесть из текущего значения некоторое число.
  3. mul - умножить текущее значение на некоторое число.
  4. div - разделить текущее значение на некоторое число.

Пример:

type: int32path: $value.pricemodifiers:  chain: add,mul  add: 100  mul: 2

Пример показывает вычисление выражения ((X + 100) * 2), где X - извлеченное и приведенное к конечному типу значение.

Подсказки преобразования значений

Подсказки hints дают дополнительную информацию для точного преобразования значений. Например, для преобразования строки в логические типы Kafka Connect для работы с датой и временем, подсказки позволяют указать формат для разбора строки, которая может содержать дату и время в формате, отличном от ISO8601. При приведении строк и числовых данных к логическому типу Decimal в подсказках можно указать правило округления.

Поддерживаемые подсказки:

  • math-round - для логического типа Decimal, указывающая на то, какое правило округления должно быть применено к получаемому значению типа Decimal. Допустимые значения (см. подробнее документацию):
    • CEILING;
    • DOWN;
    • FLOOR;
    • HALF_DOWN;
    • HALF_EVEN;
    • HALF_UP;
    • UNNECESSARY;
    • UP.
  • date-format - применяется для преобразований строк в тип даты org.apache.kafka.connect.data.Date. В подсказке указывается формат даты для разбора строки.
  • time-format - применяется для преобразований строк в тип времени org.apache.kafka.connect.data.Time. В подсказке указывается формат времени для разбора строки.
  • datetime-format - применяется для преобразований строк в тип даты и времени org.apache.kafka.connect.data.Timestamp. В подсказке указывается формат даты и времени для разбора строки.

Логические типы Kafka Connect

Логические типы - механизм системы типов Kafka Connect, позволяющий расширить множество поддерживаемых типов данных. Но полноценной поддержкой обладают только логические типы от самого Kafka Connect. Встроенная поддержка этих типов означает, что при сериализации данных в транспортный формат (AVRO или JSON) будет использовано значение основного типа, но при десериализации данных значение будет представлено экземпляром некоторого класса Java, представляющего значение логического типа.

Поддерживаемые логические типы Kafka Connect:

  • org.apache.kafka.connect.data.Decimal - значения представлены классом java.math.BigDecimal;
  • org.apache.kafka.connect.data.Date - значения представлены классом java.util.Date;
  • org.apache.kafka.connect.data.Time - значения представлены классом java.util.Date;
  • org.apache.kafka.connect.data.Timestamp - значения представлены классом java.util.Date.

В преобразователе Remap они также поддержаны на низком уровне. Другие логические типы не поддерживаются, но их можно задать в качестве указания на, что значения можно обработать соответствующими классами, например, из экосистемы Debezium. Логический тип указывается через косую черту (/) после названия основного типа.

Пример:

type: int32/org.apache.kafka.connect.data.Datepath: $value.birthdate

В примере значение $value.birthdate приводится к логическому типу и представляется экземпляром класса java.util.Date. Способ преобразования зависит от типа исходного значения. Если исходное значение - число (int8, int16, int32, int64, float32, float64), то оно будет приведено к типу int32 (в зависимости от начального типа - с потерями или без), а затем на основе полученного числа будет создан экземпляр класса java.util.Date. Если исходное значение - строка и указана подсказка формата (format), то преобразование выполняется прямым разбором строки по указанному формату. В случае без указания формата строка приводится к числу int32 с последующим созданием экземпляра класса java.util.Date.

Пример преобразования

Ниже представлен пример преобразования типизированного сообщения для уменьшения количества полей.

Ключ исходного и итогового сообщений из топика city_architects:

Примечание: описание схемы опущено для краткости для краткости.

{    "schema": {        ...    },    "payload": "250d635d-5fea-4e13-ab71-05058398b690"}

Тело исходного сообщения:

Примечание: описание схемы опущено для краткости.

{    "schema": {       ...    },    "payload": {        "id": 0,        "name": "Петр",        "surname": "Берш",        "patronymic": "Петрович",        "birthdate": "16.12.1933"    }}

Преобразование:

messages:- topic: city_architects  key:    type: string    path: $key  value:    type: struct    fields:    - name: id      type: int64      path: $value.id    - name: name      type: string      path: $value.name    - name: surname      type: string      path: $value.surname

Выходное сообщение:

Примечание: описание схемы опущено для краткости.

{    "schema": {       ...    },    "payload": {        "id": 0,        "name": "Петр",        "surname": "Берш",    }}

Поддерживаемые преобразования

Начальный тип
Конечный тип
Параметры
Описание преобразования
boolean
boolean
string
boolean
Если входящая строка равна "true", то выходное значение равно ИСТИНА. Во всех остальных случаях выходное значение равно ЛОЖЬ.
int8
boolean
Если числовое значение не равно 0, то выходное значение равно ИСТИНА, иначе равно ЛОЖЬ.
int16
boolean
Если числовое значение не равно 0, то выходное значение равно ИСТИНА, иначе равно ЛОЖЬ.
int32
boolean
Если числовое значение не равно 0, то выходное значение равно ИСТИНА, иначе равно ЛОЖЬ.
int64
boolean
Если числовое значение не равно 0, то выходное значение равно ИСТИНА, иначе равно ЛОЖЬ.
Decimal
boolean
Если числовое значение не равно 0, то выходное значение равно ИСТИНА, иначе равно ЛОЖЬ.
bytes
bytes
Выходное значение равно входному значению.
string
bytes
К строке будет применяться преобразование из строки формата Base64 в байтовую строку. Если строка не является Base64, будет выброшено исключение.
boolean
bytes
Если входное значение равно ИСТИНА, то выходное значение - байтовая строка с одним байтом: 0 в случае значения ЛОЖЬ или 1 в случае значения ИСТИНА.
int8
bytes
Байтовая строка, содержащая один байт, равный входному значению.
int16
bytes
Байтовая строка, содержащая байтовое представление входного значения в порядке от старшего байта к младшему.
int32
bytes
Байтовая строка, содержащая байтовое представление входного значения в порядке от старшего байта к младшему.
int64
bytes
Байтовая строка, содержащая байтовое представление входного значения в порядке от старшего байта к младшему.
float32
bytes
Байтовая строка, содержащая байтовое представление входного значения в порядке от старшего байта к младшему.
float64
bytes
Байтовая строка, содержащая байтовое представление входного значения в порядке от старшего байта к младшему.
int8
float32
Приведение целочисленного типа к типу с плавающей запятой.
int16
float32
Приведение целочисленного типа к типу с плавающей запятой.
int32
float32
Приведение целочисленного типа к типу с плавающей запятой.
int64
float32
Приведение целочисленного типа к типу с плавающей запятой.
Decimal
float32
Приведение типа к типу с плавающей запятой с потерей точности.
boolean
float32
Если входящее значение - ИСТИНА, то выходящее значение - 1.0, иначе - 0.0.
string
float32
Разбор строкового представления значения с плавающей запятой. Если строка сформирована неправильно, будет выброшено исключение.
int8
float64
Приведение целочисленного типа к типу с плавающей запятой.
int16
float64
Приведение целочисленного типа к типу с плавающей запятой.
int32
float64
Приведение целочисленного типа к типу с плавающей запятой.
int64
float64
Приведение целочисленного типа к типу с плавающей запятой.
org.apache.kafka.connect.data.Decimal
float64
Приведение к типу с плавающей запятой с потерей точности.
boolean
float64
Если входящее значение - ИСТИНА, то выходящее значение - 1.0, иначе - 0.0.
string
float64
Разбор строкового представления значения с плавающей запятой. Если строка сформирована неправильно, будет выброшено исключение.
int8
int8
Выходное значение равно входному значению.
int16
int8
Выходное значение - младший байт входящего значения (возможна потеря данных).
int32
int8
Выходное значение - младший байт входящего значения (возможна потеря данных).
int64
int8
Выходное значение - младший байт входящего значения (возможна потеря данных).
org.apache.kafka.connect.data.Decimal
int8
Выходное значение - младший байт значения, представляющего значение BigDecimal.
boolean
int8
Если входящее значение - ИСТИНА, исходящее - 1. В противном случае - 0.
string
int8
Разбор строкового представления байтового значения. Корректное строковое представление - множество строк, представляющих значения от -128 до 127.
int8
int16
Приведение к типу int16 (расширение разрядности).
int16
int16
Выходное значение равно входному значению.
int32
int16
Выходное значение - 2 младших байта входного значения (уменьшение разрядности с потерей данных).
int64
int16
Выходное значение - 2 младших байта входного значения (уменьшение разрядности с потерей данных).
org.apache.kafka.connect.data.Decimal
int16
Выходное значение - 2 младших байта значения, представляющего значение BigDecimal.
boolean
int16
Если входное значение - ИСТИНА, то выходное - 1. В противном случае - 0.
string
int16
Разбор строкового представления короткого целого значения. Корректное строковое представление - множество строк, представляющих значения от -32768 до 32767. В противном случае будет выброшено исключение.
int8
int32
Приведение к типу int32 (расширение разрядности).
int16
int32
Приведение к типу int32 (расширение разрядности).
int32
int32
Выходное значение равно входному значению.
int64
int32
Выходное значение - 4 младших байта входного значения (уменьшение разрядности с потерей данных).
org.apache.kafka.connect.data.Decimal
int32
Выходное значение - 4 младших байта значения, представляющего значение BigDecimal.
boolean
int32
Если входное значение - ИСТИНА, то выходное - 1. В противном случае - 0.
string
int32
Разбор строкового представления целого значения. Корректное строковое представление - множество строк, представляющих значения от -2147483648 до 2147483647. В противном случае будет выброшено исключение.
int8
int64
Приведение к типу int64 (расширение разрядности).
int16
int64
Приведение к типу int64 (расширение разрядности).
int32
int64
Приведение к типу int64 (расширение разрядности).
int64
int64
Выходное значение равно входному значению.
org.apache.kafka.connect.data.Decimal
int64
Выходное значение - 8 байт значения, представляющего значение BigDecimal.
boolean
int64
Если входное значение - ИСТИНА, то выходное - 1. В противном случае - 0.
string
int64
Разбор строкового представления целого значения. Корректное строковое представление - множество строк, представляющих значения от -9223372036854775808 до 9223372036854775807. В противном случае будет выброшено исключение.
org.apache.kafka.connect.data.Date
org.apache.kafka.connect.data.Timestamp
Расширение типа c org.apache.kafka.connect.data.Date до org.apache.kafka.connect.data.Timestamp
org.apache.kafka.connect.data.Time
org.apache.kafka.connect.data.Timestamp
Расширение типа c org.apache.kafka.connect.data.Time до org.apache.kafka.connect.data.Timestamp
org.apache.kafka.connect.data.Timestamp
org.apache.kafka.connect.data.Timestamp
Выходное значение равно входному значению.
boolean
org.apache.kafka.connect.data.Timestamp
Входное значение, приведенное к типу int64, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Timestamp.
int8
org.apache.kafka.connect.data.Timestamp
Входное значение, приведенное к типу int64, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Timestamp.
int16
org.apache.kafka.connect.data.Timestamp
Входное значение, приведенное к типу int64, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Timestamp.
int32
org.apache.kafka.connect.data.Timestamp
Входное значение, приведенное к типу int64, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Timestamp.
int64
org.apache.kafka.connect.data.Timestamp
Входное значение, приведенное к типу int64, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Timestamp.
org.apache.kafka.connect.data.Decimal
org.apache.kafka.connect.data.Timestamp
Входное значение, приведенное к типу int64, трактуется как количество миллисекунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Timestamp.
string
org.apache.kafka.connect.data.Timestamp
hints.datetime-format
Если задан параметр hints.datetime-format, то строка будет разбираться в соответствии с указанным форматом даты-времени напрямую в экземпляр java.util.Date. В противном случае строка будет приводиться к типу int64 (см. преобразования к int64).
org.apache.kafka.connect.data.Date
org.apache.kafka.connect.data.Date
Выходное значение равно входному значению.
org.apache.kafka.connect.data.Time
org.apache.kafka.connect.data.Date
Преобразование не имеет смысла, т.к. данные и по времени и дате обнуляются.
org.apache.kafka.connect.data.Timestamp
org.apache.kafka.connect.data.Date
Усекаются временные данные, остается только дата.
boolean
org.apache.kafka.connect.data.Date
Входное значение, приведенное к типу int32, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Date.
int8
org.apache.kafka.connect.data.Date
Входное значение, приведенное к типу int32, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Date.
int16
org.apache.kafka.connect.data.Date
Входное значение, приведенное к типу int32, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Date.
int32
org.apache.kafka.connect.data.Date
Входное значение, приведенное к типу int32, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Date.
int64
org.apache.kafka.connect.data.Date
Входное значение, приведенное к типу int32, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Date.
string
org.apache.kafka.connect.data.Date
hints.date-format
Если задан параметр hints.date-format, то строка будет разбираться в соответствии с указанным форматом даты-времени напрямую в экземпляр java.util.Date. В противном случае строка будет приводиться к типу int32 (см. преобразования к int32).
org.apache.kafka.connect.data.Date
org.apache.kafka.connect.data.Time
Преобразование не имеет смысла, т.к. данные и по времени и дате обнуляются.
org.apache.kafka.connect.data.Time
org.apache.kafka.connect.data.Time
Выходное значение равно входному значению.
org.apache.kafka.connect.data.Timestamp
org.apache.kafka.connect.data.Time
Время напрямую переводится как время с начала "эпохи".
boolean
org.apache.kafka.connect.data.Time
Входное значение, приведенное к типу int32, трактуется как количество секунд с начала суток. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Time.
int8
org.apache.kafka.connect.data.Time
Входное значение, приведенное к типу int32, трактуется как количество секунд с начала суток. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Time.
int16
org.apache.kafka.connect.data.Time
Входное значение, приведенное к типу int32, трактуется как количество секунд с начала суток. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Time.
int32
org.apache.kafka.connect.data.Time
Входное значение, приведенное к типу int32, трактуется как количество секунд с начала суток. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Time.
int64
org.apache.kafka.connect.data.Time
Входное значение, приведенное к типу int32, трактуется как количество секунд с начала суток. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Time.
string
org.apache.kafka.connect.data.Time
hints.time-format
Если задан параметр hints.time-format, то строка будет разбираться в соответствии с указанным форматом даты-времени напрямую в экземпляр java.util.Date. В противном случае строка будет приводиться к типу int32 (см. преобразования к int32).
org.apache.kafka.connect.data.Decimal
org.apache.kafka.connect.data.Decimal
Выходное значение равно входному значению.
boolean
org.apache.kafka.connect.data.Decimal
hints.math-round
Если входное значение - ИСТИНА, то выходное значение - 1. В противном случае 0.
Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления.
float32
org.apache.kafka.connect.data.Decimal
hints.math-round
Приведение к типу BigDecimal, являющимся представлением логического типа org.apache.kafka.connect.data.Decimal.
Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления.
float64
org.apache.kafka.connect.data.Decimal
hints.math-round
Приведение к типу BigDecimal, являющимся представлением логического типа org.apache.kafka.connect.data.Decimal.
Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления.
int8
org.apache.kafka.connect.data.Decimal
hints.math-round
Приведение к типу BigDecimal, являющимся представлением логического типа org.apache.kafka.connect.data.Decimal.
Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления.
int16
org.apache.kafka.connect.data.Decimal
hints.math-round
Приведение к типу BigDecimal, являющимся представлением логического типа org.apache.kafka.connect.data.Decimal.
Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления.
int32
org.apache.kafka.connect.data.Decimal
hints.math-round
Приведение к типу BigDecimal, являющимся представлением логического типа org.apache.kafka.connect.data.Decimal.
Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления.
int64
org.apache.kafka.connect.data.Decimal
hints.math-round
Приведение к типу BigDecimal, являющимся представлением логического типа org.apache.kafka.connect.data.Decimal.
Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления.
string
org.apache.kafka.connect.data.Decimal
hints.math-round
Разбор строкового представления числа с ограниченной точностью.
Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления.
bytes
org.apache.kafka.connect.data.Decimal
Разбор байтовой строки, являющейся физическим представлением Decimal.
org.apache.kafka.connect.data.Date
string
hints.date-format
Дата кодируется в строку. В качестве формата кодирования используется либо iso8601, либо формат, переданный в параметре hints.date-format.
org.apache.kafka.connect.data.Time
string
hints.time-format
Дата кодируется в строку. В качестве формата кодирования используется либо iso8601, либо формат, переданный в параметре hints.time-format.
org.apache.kafka.connect.data.Timestamp
string
hints.datetime-format
Дата кодируется в строку. В качестве формата кодирования используется либо iso8601, либо формат, переданный в параметре hints.datetime-format.
bytes
string
Преобразование байтовой строки в формат Base64.
int8
string
Строковое представление.
int16
string
Строковое представление.
int32
string
Строковое представление.
int64
string
Строковое представление.
float32
string
Строковое представление.
float64
string
Строковое представление.

Мониторинг

Средства мониторинга Tarantool CDC включают панель Grafana с показателями.

Панель Grafana

Для наблюдения за работой Tarantool CDC можно использовать специальную панель Grafana. Панель состоит из графиков и таблиц с показателями, которые обновляются в режиме реального времени. Графики и таблицы представлены в группе "Общая информация" и в группах метрик Универсальных Обработчиков, которые входят в установку. Ниже представлено описание элементов панели и нормальное поведение показателей. О том, на какие проблемы может указывать отход показателей от нормы, см. в статье.

В группу "Общая информация" входят следующие графики и таблицы:

  • Workers - Показывает состав текущей установки TCDC. Количество запущенных Обработчиков и какие источники и приемники данных они обслуживают. В норме информация должна соответствовать заданной топологии установки Tarantool CDC.
  • Total EPS - Общее количество событий в секунду, полученных из Источника и отправленных в Приемник. В норме значение возрастает.
  • Events Per Second - Количество событий в секунду, полученных из Источника и отправленных в Приемник для каждого Обработчика. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
  • Average Lag - Средняя задержка при переносе событий от Источника в Приемник для всех Обработчиков. Измеряется как разница во времени от появления события в Источнике до окончания его записи в Приемник Обработчиком. На точность показателя влияет рассинхронизация часов в распределенной системе. Для вычисления общего показателя используется среднее от максимальных значений среди всех компонентов системы. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
  • Errors - Общая частота возникновения ошибок при обработке событий. В норме графики имеют значение 0. Ошибки бывают следующих видов:
    • Внутренние ошибки - любые ошибки, возникающие внутри Обработчиков во время их работы;
    • Ошибки записи контрольных точек - ошибки, возникающие в моменты записи контрольных точек в хранилище состояния.
  • Source to Connector Lag - Среднее время переноса событий от источников до коннекторов. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
  • Connector to Worker Lag - Среднее время удержания событий в очередях коннекторов. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
  • Worker to Target Lag - Среднее время записи событий в приемники. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.

Общая информация установки TCDC в Grafana

Для каждого из Универсальных Обработчиков представлены следующие графики:

  • Events Per Second - Количество событий в секунду, полученных из Источника и отправленных в Приемник Обработчиком. В норме графики стабильные, их уровень колеблется вокруг некоторого значения.
  • Errors - Частота возникновения ошибок на Обработчике. В норме графики имеют значение 0. Ошибки бывают следующих видов:
    • Внутренние ошибки - любые ошибки, возникающие внутри Обработчиков во время их работы;
    • Ошибки записи контрольных точек - ошибки, возникающие в моменты записи контрольных точек в хранилище состояния.
  • Throttling Time - Время, затраченное на ограничение трафика. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
  • Lag - Задержка переноса событий от Источника в Приемник для Обработчика. Измеряется как разница во времени от появления события в Источнике до окончания его записи в Приемник Обработчиком. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
  • Latencies - Сводный график задержек, из которых составляется график Lag. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
  • Source Task Poll Time Percentiles - Время опроса задачи Источника, в миллисекундах. В норме при небольшом трафике 50, 95 и 99 перцентили принимают значение времени, заданное в настройках Обработчика (параметр worker.connector.poll.interval.ms), чаще всего это 500 миллисекунд. При большом трафике, если Обработчик успевает вычитывать сообщения из коннектора, 95 и 99 перцентили остаются на прежнем уровне или кратковременно снижаются, 50-й перцентиль может упасть до миллисекунд. Когда Обработчик не успевает вычитывать сообщения, все перцентили, а также среднее время ожидания, снижаются до нескольких десятков миллисекунд.
  • Worker Batch Processing - Время, потраченное на обработку пакета событий Обработчиком. В норме графики стабильные, их уровень колеблется вокруг некоторого значения.
  • Source to Connector Lag - Время переноса события с момента его появления в Источнике до момента его обработки коннектором к Источнику. В норме графики стабильные, их уровень колеблется вокруг некоторого значения.
  • Connector to Worker Lag - Время удержания событий в очереди коннекторов Обработчика. В норме графики стабильные, их уровень колеблется вокруг некоторого значения.
  • Worker to Target Lag - Время записи событий в Приемник. В норме графики стабильные, их уровень колеблется вокруг некоторого значения.
  • Offsets - Количство событий в секунду, которое подтверждается Обработчиком. В норме уровень графика должен совпадать с уровнем графика Events Per Second Обработчика.
  • Memory usage - Потребление памяти виртуальной машиной Java для Обработчика. В норме графики стабильные, их уровень колеблется вокруг некоторого значения. Отображается три графика:
    • Использовано - количество памяти в байтах, используемое обработчиками.
    • Зарезервировано - количество памяти, зарезервированной виртуальной машиной Java для дальнейшего использования.
    • Доступно - количество памяти, доступной для резервирования и использования виртуальной машиной Java.
  • CPU Usage - Потребление процессорных ресурсов Обработчиком. В норме графики стабильные, их уровень колеблется вокруг некоторого значения. Потребление показывается с трех точек зрения:
    • На уровне процесса - сколько процессорного времени потребляется отдельно взятыми процессами Обработчика.
    • На уровне системы - сколько процессорного времени потребляется процессами обработчика на уровне операционной системы.
    • Среднее на уровне процесса за 15 минут - среднее потребление процессорных ресурсов на уровне отдельно взятых процессов за последние 15 минут.

Метрики Обработчика в Grafana

Инциденты и сценарии реагирования

Данная статья описывает возможные инциденты, их признаки и рекомендуемые действия для устранения неисправностей Tarantool CDC.

Признаки инцидентов на панели Grafana

Возможные инциденты, отражающиеся в группе "Общая информация":

  • Workers

    • Признак:
      • Количество каких либо компонентов отличается от заданного в топологии.
    • Реагирование:
      • Проверить работу соответствующего компонента, получить логи.
  • Total EPS, Events Per Second

    • Признак:
      • Снижение или увеличение количества обрабатываемых событий.
    • Реагирование:
      • Необходимо проверить работу Обработчиков:

        • Ошибки репликации;
        • Перезапуски - каждый перезапуск требует времени на восстановление работы и увеличивает задержку событий;
        • Потребление памяти - высокое потребление памяти может привести к проблемам в сборке мусора;
        • Сборщик мусора - пауза Stop The World;
        • Сборщик мусора - частота;
        • Сборщик мусора - поколения.

        Также необходимо проверить:

        • Состояние сети и инфраструктуры;
        • Состояние очереди (если используется).

        Если значения упали до 0 при работающей репликации, следует проверить синхронизацию часов в системе. Если показатели отсутствуют, следует проверить состояние Обработчиков, так как метрики, на основе которых считаются данные показатели, рассчитываются на них.

  • Average Lag

    • Признак:
      • Рост времени обработки события (устойчивый тренд или превышение порога в течение заданного окна).
      • Показатели принимают значение 0 при работающей репликации.
      • Отсутствие показателей.
    • Реагирование:
      • См. описание реагирования на симптомы графиков Total EPS и Events Per Second выше.
  • Errors

    • Признак:
      • Значение больше 0.
    • Реагирование:
      • При выбросах ошибок Сети или ошибок превышения времени ожидания ответа нужно проверить состояние инфраструктуры и сети.
      • При наличии внутренних ошибок CDC нужно проверить ресурсы, потребляемые Обработчиками.
      • При наличии ошибок контрольных точек нужно проверить состояние очереди (если используется), инфраструктуры и сети.
  • Source to Connector Lag, Connector to Worker Lag, Worker to Target Lag

    • Признак:
      • Рост времени обработки события (устойчивый тренд или превышение порога в течение заданного окна).
    • Реагирование:
      • См. описание реагирования на симптомы графиков Total EPS и Events Per Second выше.

Возможные инциденты, отражающиеся в группе графиков отдельного Универсального Обработчика:

  • Events Per Second

    • Признак:
      • Снижение или увеличение количества обрабатываемых событий.
    • Реагирование:
      • Необходимо проверить работу Обработчика:

        • Ошибки репликации;
        • Перезапуски - каждый перезапуск требует времени на восстановление работы и увеличивает задержку событий;
        • Потребление памяти - высокое потребление памяти может привести к проблемам в сборке мусора;
        • Сборщик мусора - пауза Stop The World;
        • Сборщик мусора - частота;
        • Сборщик мусора - поколения.

        Также необходимо проверить:

        • Состояние сети и инфраструктуры;
        • Состояние очереди (если используется).

        Если значения упали до 0 при работающей репликации, следует проверить синхронизацию часов в системе. Если показатели отсутствуют, следует проверить состояние Обработчика, так как метрики, на основе которых считаются данные показатели, рассчитываются на нем.

  • Errors

    • Признак:
      • Значение больше 0.
    • Реагирование:
      • При выбросах ошибок Сети или ошибок превышения времени ожидания ответа нужно проверить состояние инфраструктуры и сети.
      • При наличии внутренних ошибок CDC нужно проверить ресурсы, потребляемые Обработчиком.
      • При наличии ошибок контрольных точек нужно проверить состояние очереди (если используется), инфраструктуры и сети.
  • Throttling Time

    • Признак:
      • Превышение значений показателей установленных пределов.
    • Реагирование:
      • См. описание реагирования на симптомы графика Events Per Second выше.
  • Lag

    • Признак:
      • Рост времени обработки события (устойчивый тренд или превышение порога в течение заданного окна).
      • Показатели принимают значение 0 при работающей репликации.
      • Отсутствие показателей.
    • Реагирование:
      • См. описание реагирования на симптомы графика Events Per Second выше.
  • Latencies

    • Признак:
      • Порог срабатывания сигнализации расчитывается экспериментально с учетом результатов нагрузочного тестирования.
    • Реагирование:
      • При превышении тех или иных зедержек допустимых значений, проверить остальные метрики, связанные со времением и ресурсами.
  • Source Task Poll Time Percentiles

    • Признак:
      • Порог срабатывания сигнализации расчитывается экспериментально с учетом результатов нагрузочного тестирования.
    • Реагирование:
      • Информационный показатель для разбора инцидентов. Время обработки ожидания, превышающее время для ожидания, заданное в настройках коннектора, означает медленную работу Обработчика.
  • Worker Batch Processing

    • Признак:
      • Рост времени обработки события (устойчивый тренд или превышение порога в течение заданного окна).
    • Реагирование:
      • См. описание реагирования на симптомы графика Events Per Second выше.
  • Source to Connector Lag

    • Признак:
      • Какой либо из показателей вырос до некоторого значения выше допустимого предела.
      • Рост показателей выше некоторого значения.
      • Показатели равны 0.
    • Реагирование:
      • Стабильный, но высокий показатель времени переноса означает медленную сеть, необхоимо проверить нагрузку на систему и сетевые настройки.
      • Рост показателя обозначает, что Обработчик не успевает вычитывать сообщения из коннектора. Необходимо подобрать более оптимальные значения параметров коннектора, учитывая данные нагрузочного тестирования.
      • Если показатели равны 0 для повторного подключения к репликации, то необходимо проверить синхронизацию часов в системе. Для первого подключения к репликации начало обработки будет выдавать нулевые значения времени переноса.
  • Connector to Worker Lag

    • Признак:
      • Порог срабатывания сигнализации расчитывается экспериментально с учетом результатов нагрузочного тестирования.
      • Какой либо из показателей вырос до некоторого значения выше допустимого предела.
      • Рост показателей выше некоторого значения.
    • Реагирование:
      • Большое время удержания событий в очереди коннектора означает, медленную работу Обработчика. Необходимо подобрать более оптимальные значения параметров коннектора, учитывая данные нагрузочного тестирования.
      • Если показатели равны 0 для повторного подключения к репликации, то необходимо проверить синхронизацию часов в системе. Для первого подключения к репликации начало обработки будет выдавать нулевые значения времени переноса.
  • Worker to Target Lag

    • Признак:
      • Порог срабатывания сигнализации расчитывается экспериментально с учетом результатов нагрузочного тестирования.
      • Какой либо из показателей вырос до некоторого значения выше допустимого предела.
      • Рост показателей выше некоторого значения.
    • Реагирование:
      • Большое время записи событий в Приемник может влиять на работу Обработчика, замедляя его, и соответственно, приводить к увеличению отставания всего процесса репликации. Необходимо проверить:
        • Общую нагрузку на систему;
        • Состояние сети.
  • Offsets

    • Признак:
      • Средний уровень количества сохраненных контрольных точек за некоторый период наблюдения (5 минут, 10 минут, 15 минут) ниже уровня количества обрабатываемых событий.
    • Реагирование:
      • Проверить ошибки контрольных точек, состояние очереди (если используется), инфраструктуры и сети.
  • Memory usage

    • Признак:
      • Превышение использованной памяти выше расчетного значения (в т.ч. по всем Обработчикам).
      • Неограниченный рост использованной и зарезервированной памяти.
    • Реагирование:
      • Проверить показатели памяти для Обработчиков. Имеет смысла также проверить средний размер событий - количество потребляемой памяти может расти в зависимости от размера и количества обрабатываемых событий.
      • Рост использованной и зарезервированной памяти указывает на утечку ресурсов.
  • CPU Usage

    • Признак:
      • Суммарное потребление ресурсов процессоров выше расчитанной по топологии квоты ядер.
      • Превышение среднего потребления процессора за 15 минут за допустимый предел.
    • Реагирование:
      • Проверить показатели потребления ресурсов процессора для Воркеров. Имеет смысл проверить потребление памяти - при высоком потреблении памяти и большом количестве обрабатываемых событий в секунду может наблюдаться возрастание нагрузки на сборщик мусора. При наблюдении проблем на Обработчиках необходимо сбалансировать параметры Обработчика:
        • source.connector.max.batch.size;
        • source.connector.batch.split.groups.
      • При наблюдении проблем на Обработчике, у которого в качестве Источника данных выступает TQE, необходимо сбалансировать параметр подписчика TQE fetch_batch_size и скорость работы Обработчика, уменьшив значение параметра при необходимости.

Метрики Tarantool CDC

Чтобы контролировать работоспособность системы, в Tarantool CDC осуществляется сбор двух групп метрик:

Типы метрик:

  • counter - монотонно возрастающий счетчик значений. Не может быть уменьшен, но может быть сброшен до 0.
  • gauge - изменяющееся значение. Может как увеличиваться, так и уменьшаться.
  • histogram - распределение значений по заранее определенным группам (buckets).
  • summary - агрегация гистограмм. Используется в случаях, когда невозможно заранее выделить группы, по которым необходимо распределить значение.

Метрики Универсального Обработчика

cdc_connector_hold_time

  • Тип метрики: summary
  • Описание: Время, проведенное событием в очереди самого коннектора.
  • Ожидаемое поведение метрики: Колеблется вокруг некоторого значения.
  • Аномальное поведение метрики: Значение индикатора возрастает.
  • Дата последнего обновления: 2026-03-31

cdc_connector_trip_time

  • Тип метрики: summary
  • Описание: Время, потраченное на перенос события из Журнала Упреждающей Записи (ЖУЗ/WAL) Источника данных до коннектора к Источнику.
  • Ожидаемое поведение метрики: Колеблется вокруг некоторого значения.
  • Аномальное поведение метрики: Значение индикатора возрастает.
  • Дата последнего обновления: 2026-03-31

cdc_errors

  • Тип метрики: counter
  • Описание: Количество ошибок, возникающих при работе Универсального Обработчика.
  • Ожидаемое поведение метрики: Счетчик не увеличивается.
  • Аномальное поведение метрики: Скорость возрастания счетчика ненулевая.
  • Дата последнего обновления: 2026-03-31

cdc_offsets_commitable

  • Тип метрики: gauge
  • Описание: Текущее количество подтвержденных контрольных точек.
  • Ожидаемое поведение метрики: Колеблется вокруг некоторого значения.
  • Аномальное поведение метрики: Значение индикатора возрастает или постоянно равно 0.
  • Дата последнего обновления: 2026-03-31

cdc_offsets_commited_total

  • Тип метрики: counter
  • Описание: Количество успешно записанных контрольных точек. Счетчик увеличивается на количество успешно подтвержденных сообщений каждый раз при успешной записи контрольных точек.
  • Ожидаемое поведение метрики: Монотонно возрастает. Если брать скорость возрастания метрики на интервале бОльшем или равном периоду записи контрольных точек и сравнить со скоростью возрастания количества записанных событий в очередь на том же интервале, то эти метрики должны совпадать.
  • Аномальное поведение метрики: Счетчик не возрастает.
  • Дата последнего обновления: 2026-03-31

cdc_offsets_errors_total

  • Тип метрики: counter
  • Описание: Количество ошибок записи контрольных точек. При возникновенни ошибки при записи пакета контрольных точек, счетчик ошибок увеличивается на количество точек, которые не были записаны из за ошибки.
  • Ожидаемое поведение метрики: Равен 0 или не возрастает.
  • Аномальное поведение метрики: Счетчик возрастает.
  • Дата последнего обновления: 2026-03-31

cdc_offsets_uncommitable

  • Тип метрики: gauge
  • Описание: Текущее количество неподтвержденных контрольных точек.
  • Ожидаемое поведение метрики: Колеблется вокруг некоторого значения.
  • Аномальное поведение метрики: Значение индикатора возрастает.
  • Дата последнего обновления: 2026-03-31

cdc_poll_time

  • Тип метрики: histogram
  • Описание: Время, затраченное на выполнение операций чтения данных из коннектора к Источнику данных. Метрика замеряется на уровне приложения.
  • Метки:
    • quantile:
      • 0.5;
      • 0.95;
      • 0.99.
  • Ожидаемое поведение метрики: Время чтения колеблется вокруг некоторого значения, в небольших пределах. Время чтения не должно превышать расчетных показателей больше, чем на некоторый процент. При увеличении размера сообщений время чтения также может увеличиваться.
  • Аномальное поведение метрики: Возрастание без существенного изменения размера сообщений может указывать на следующие причины:
    • Деградация работы Источника.
    • Деградация сети на участке между Источником данных и коннектором к Источнику.
    • Проблемы коннектора к Источнику данных.
    • Перегрузка процессора.
    • Утечка, переполнение памяти.
  • Дата последнего обновления: 2026-03-31

cdc_records

  • Тип метрики: counter
  • Описание: Количество событий, прошедших через Универсальный Обработчик, с детализацией по статусу.
  • Метки:
    • status:
      • received - Количество событий, полученных из Источника данных.
      • retried - Количество событий, которые потребовали повторной попытки отправки.
      • sent - Количество событий, успешно записанных в Приемник данных.
  • Ожидаемое поведение метрики: Скорость возрастания счетчика стабильна или увеличивается. При небольшом или отсутствующем потоке репликации, снижении потока сообщений в Приемник данных скорость возрастания счетчика близка к или равна 0 на некоторых отрезках времени.
  • Аномальное поведение метрики: Скорость возрастания счетчика уменьшается или счетчик не увеличивается, причины:
    • Ошибки получения данных.
    • Увеличение времени чтения или записи данных.
    • Деградация сети на участке между коннектором к Приемнику и Приемникмом данных.
    • Перегрузка процессора.
    • Утечка, переполнение памяти.
  • Дата последнего обновления: 2026-03-31

cdc_task

  • Тип метрики: gauge
  • Описание: Индикатор активности процесса с соответствующими значениями тегов connector и type. Значение этой метрики всегда равно 1, когда работает процесс. Суммированием этой метрики можно посчитать количество работающих экземпляров. Тег connector обозначает загруженный данным процессом KafkaConnect-процессор.
  • Метки:
    • connector:
      • source - Информация о запущенном коннекторе к Источнику данных.
      • sink - Информация о запущенном коннекторе к Приемнику данных.
  • Ожидаемое поведение метрики: Для отдельного экземпляра - значение равно 1.
  • Аномальное поведение метрики: Значение, равное 0 или отсутствие этой метрики в выборке при запросе к системе мониторинга означает, что ни один из экземпляров системы не работает.
  • Дата последнего обновления: 2026-03-31

cdc_trip_time

  • Тип метрики: summary
  • Описание: Общее время, затраченное на перенос события от Источника до Приемника данных.
  • Ожидаемое поведение метрики: Колеблется вокруг некоторого значения.
  • Аномальное поведение метрики: Значение индикатора возрастает.
  • Дата последнего обновления: 2026-03-31

cdc_write_time

  • Тип метрики: summary
  • Описание: Время, затраченное на выполнение операции записи данных в Приемник данных.
  • Метки:
    • quantile:
      • 0.5;
      • 0.95;
      • 0.99.
  • Ожидаемое поведение метрики: Время записи колеблется вокруг некоторого значения, в небольших пределах. Время записи не должно превышать расчетных показателей больше, чем на некоторый процент. При увеличении размера сообщений время записи также может увеличиваться.
  • Аномальное поведение метрики: Возрастание времени записи сообщений в Приемник может указывать на следующие причины:
    • Деградация сети на участке между коннектором к Приемнику и Приемникмом данных.
    • Деградация работы Приемника.
    • Проблемы коннектора к Приемнику.
    • Перегрузка процессора.
    • Утечка, переполнение памяти.
  • Дата последнего обновления: 2026-03-31

Метрики Java

application_ready_time_seconds

  • Тип метрики: gauge
  • Описание: Время запуска приложения.
  • Метки:
    • main_application_class:
      • io.tarantool.worker.* - Java класс.
  • Ожидаемое поведение метрики: Время запуска колеблется в небольших пределах вокруг некоторого значения, не занимает существенный процент от общей работы Tarantool CDC.
  • Аномальное поведение метрики: При стабильном возрастании времени запуска приложения, если процессы уходят на перезапуск, это является негативной динамикой. Означает перегруз системы.
  • Дата последнего обновления: 2026-03-31

jvm_gc_overhead_percent

  • Тип метрики: gauge
  • Описание: Приблизительный процент использования процессора сборщиком мусора относительно использования процессора за предыдущий период или с начала запуска процесса.
  • Ожидаемое поведение метрики: Показатель стабилен, его значение колеблется с небольшой амплитудой вокруг некоторого значения.
  • Аномальное поведение метрики: Возрастание данного показателя является признаком проблем с использованием памяти и сборкой мусора.
  • Дата последнего обновления: 2026-03-31

jvm_gc_pause_seconds

  • Тип метрики: summary
  • Описание: Время, затраченное на паузы для сборщика мусора.
  • Ожидаемое поведение метрики: Значение показателя стабильно, уменьшается или колеблется с небольшой амплитудой вокруг некоторого значения.
  • Аномальное поведение метрики: Возрастание затрачиваемого времени на сборку мусора указывает на проблемы с использованием памяти и сборкой мусора. Большие и частые паузы могут влиять на общую пропускную способность системы.
  • Дата последнего обновления: 2026-03-31

jvm_gc_pause_seconds_max

  • Тип метрики: gauge
  • Описание: Максимальная пауза в работе сборщика мусора.
  • Ожидаемое поведение метрики: Значение показателя стабильно, уменьшается или колеблется с небольшой амплитудой вокруг некоторого значения.
  • Аномальное поведение метрики: Возрастание затрачиваемого времени на сборку мусора указывает на проблемы с использованием памяти и сборкой мусора. Большие и частые паузы могут влиять на общую пропускную способность системы.
  • Дата последнего обновления: 2026-03-31

jvm_memory_used_bytes

  • Тип метрики: gauge
  • Описание: Количество использованной памяти.
  • Метки:
    • area:
      • nonheap;
      • heap.
    • id:
      • CodeHeap;
      • G1 Survivor Space;
      • G1 Old Gen;
      • G1 Eden Space;
      • Metaspace;
      • Compressed Class Space.
  • Ожидаемое поведение метрики: При возрастании потока сообщений и их размера, может наблюдаться пропорциональное увеличение использования памяти. Использование памяти при стабильном потоке сообщений должно быть также стабильно.
  • Аномальное поведение метрики: Увеличение использования памяти при отсутствии соответствующих изменений в потоке данных может указывать на утечку памяти.
  • Дата последнего обновления: 2026-03-31

jvm_threads_daemon_threads

  • Тип метрики: gauge
  • Описание: Текущее количество потоков, запущенных в режиме daemon.
  • Ожидаемое поведение метрики: Количество запущенных потоков в режиме daemon стабильно или колеблется в небольших пределах вокруг некоторого значения.
  • Аномальное поведение метрики: Возрастание количества работающих потоков в режиме daemon может указывать на утечку ресурсов, перегрузку системы.
  • Дата последнего обновления: 2026-03-31

jvm_threads_live_threads

  • Тип метрики: gauge
  • Описание: Текущее количество запущенных потоков в системе.
  • Ожидаемое поведение метрики: Количество запущенных потоков стабильно или колеблется в небольших пределах вокруг некоторого значения.
  • Аномальное поведение метрики: Возрастание количества работающих потоков может указывать на утечку ресурсов, перегрузку системы.
  • Дата последнего обновления: 2026-03-31

jvm_threads_peak_threads

  • Тип метрики: gauge
  • Описание: Максимальное количество потоков, когда либо работавших в системе одновременно с момента запуска или с сброса индикатора.
  • Ожидаемое поведение метрики: Показатель может возрастать в начале работы Tarantool CDC, но в остальное время должен оставаться без изменений.
  • Аномальное поведение метрики: Возрастание максимального количества работающих потоков может указывать на утечку ресурсов, перегрузку системы.
  • Дата последнего обновления: 2026-03-31

jvm_threads_started_threads_total

  • Тип метрики: counter
  • Описание: Общее количество потоков в системе, запущенных и отработанных.
  • Ожидаемое поведение метрики: Скорость возрастания метрики нулевая или близка к нулю.
  • Аномальное поведение метрики: Возрастание общего количества потоков может указывать на утечку ресурсов, перегрузку системы.
  • Дата последнего обновления: 2026-03-31

jvm_threads_states_threads

  • Тип метрики: gauge
  • Описание: Текущее количество потоков, запущенных Java-машиной.
  • Метки:
    • state:
      • blocked;
      • runnable;
      • waiting;
      • terminated;
      • timed-waiting;
      • new.
  • Ожидаемое поведение метрики: Количество потоков стабильно в течении длительного периода работы Tarantool CDC, или колеблется с небольшой амплитудой вокруг некоторого значения. Количество потоков является примерной суммой следующих показателей:
    • Общие потоки Java Runtime.
    • Потоки, запущенные коннекторами для обработки данных.
    • Потоки сборщика мусора.
  • Аномальное поведение метрики: Возрастание количества потоков может указывать на утечку ресурсов, перегрузку системы с последующим возрастанием нагрузки на сборщик мусора.
  • Дата последнего обновления: 2026-03-31

process_cpu_usage

  • Тип метрики: gauge
  • Описание: Текущее использование процессора Java-машиной.
  • Ожидаемое поведение метрики: Низкий показатель использования процессора указывает на стабильную работу системы. Также снижение использования процессора может быть связано со снижением потока данных.
  • Аномальное поведение метрики: Возрастание использования процессора может указывать на перегрузку системы.
  • Дата последнего обновления: 2026-03-31

process_files_open_files

  • Тип метрики: gauge
  • Описание: Количество открытых файловых дескрипторов. Носит информационный характер. Может понадобиться при расследовании причин падений и ошибок, случающихся в компонентах.
  • Ожидаемое поведение метрики: Количество открытых файловых дескрипторов (сетевых сокетов) стабильно в течение продолжительного времени работы Tarantool CDC или колеблется с незначительной амплитудой вокруг некоторого значения.
  • Аномальное поведение метрики: Стабильное увеличение данной метрики является аномальным, указывает на утечку ресурсов, перегрузку системы.
  • Дата последнего обновления: 2026-03-31

system_load_average_1m

  • Тип метрики: gauge
  • Описание: Средняя загрузка системы в течение одной минуты. С точки зрения JVM это интерпретируется как количество готовых к исполнению объектов, запланированных к выполнению плюс количество таких объектов, выполняющихся на процессоре в данный момент. Временное окно - 1 минута.
  • Ожидаемое поведение метрики: Показатель средней загрузки стабилен или колеблется в небольших пределах вокруг некоторого значения, или пропорционально соответствует потоку сообщений.
  • Аномальное поведение метрики: Стабильное возрастание средней загрузки может указывать на утечку ресурсов, перегрузку системы.
  • Дата последнего обновления: 2026-03-31