Универсальный Обработчик | Cdc

Версия:

latest
Руководство администратора Универсальный Обработчик

Универсальный Обработчик

Универсальный Обработчик – это приложение, обеспечивающее перенос информации от источника к приемнику данных. Настройка параметров переноса данных задается через файл конфигурации.

Настройка Универсального Обработчика

Универсальный Обработчик настраивается через конфигурационный файл application.yaml. Вся конфигурация может быть собрана в одном файле или разделена на несколько файлов. В последнем случае каждый файл должен называться application.yaml и располагаться на соответствующем логическом уровне. Пример разбиения конфигурации на несколько файлов:

│  ...
└─ cdc/
   ├─ cdc-worker.jar         # исполняемый файл Универсального Обработчика
   ├─ application.yaml       # хранит основные настройки, например, настройки сервера и другие, связанные со Spring
   │
   └─ config/
      ├─ application.yaml    # хранит общие настройки Обработчика, например, для работы с контрольными точками
      │
      ├─ sink/
      │  └─ application.yaml # хранит настройки подключения к приемнику данных
      └─ source/
         └─ application.yaml # хранит настройки подключения к источнику данных

Параметры в файле настроек application.yaml представлены в виде иерархии объектов формата YAML.

Пример:

spring:
  application:
    name: CDCWorker
  profiles:
    active: default

Эту же иерархическую структуру можно представить в виде строки, где уровни разделяются точками:

spring.application.name: CDCWorker
spring.profiles.active: default

Или в комбинированном виде:

spring:
  application.name: CDCWorker
  profiles.active: default

Комбинированная форма удобна, если на каком-то уровне определяется всего один потомок в дереве настроек. В этом случае можно упростить само дерево, соединяя узлы-потомки через точку.

Примечание

Для настройки Универсального Обработчика можно использовать файлы формата properties. В этом случае конфигурационный файл (или файлы) должен называться application.properties и соответствовать ранее описанным правилам расположения и представления. Подробнее о формате properties можно прочитать по ссылкам:

Файл настроек Универсального Обработчика состоит из нескольких разделов:

Примечание

Разделы настроек содержат ряд параметров Spring, подробнее о которых можно почитать в официальной документации Spring. При этом значения параметров по умолчанию в Универсальном Обработчике могут отличаться от значений, указанных в документации Spring.

Раздел настроек spring

Раздел spring содержит конфигурацию Spring Boot. Включает в себя следующие параметры:

  • application.name – имя приложения. Значение по умолчанию: cdc-worker.

  • profiles.active – имя профиля. Значение по умолчанию: default.

Пример:

spring:
  application.name: CDCWorker
  profiles.active: default

Раздел настроек logging

Раздел logging содержит настройки логирования. Включает в себя следующие параметры:

  • logging.file.level – управление уровнем логирования модулей. Всего можно управлять двумя модулями, задавая для них различные уровни логирования при помощи значений: DEBUG, VERBOSE, INFO, WARNING, ERROR. Значение OFF полностью отключает логирование событий модуля. По умолчанию логирование для модулей отключено:

    • org.springframework.boot.SpringApplication – логирование самого процесса запуска приложения.

    • org.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLogger – детальный отчет об автоконфигурации Spring Boot.

Отключение логирования модулей позволяет сократить как общий объем логов, так и количество лишней информации в них. Это ускоряет и упрощает анализ логов.

Пример:

logging:
  file:
    name: logs/cdc-worker.log
  level:
    org.springframework.boot.SpringApplication: OFF
    org.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLogger: OFF

Примечание

Параметр logging.file.name позволяет включить запись логов в указанный файл в дополнение к стандартному выводу логов. По умолчанию логирование в файл не производится.

Раздел настроек server

Раздел server содержит настройки сервера. Включает в себя следующие параметры:

  • server.port – порт сервера для подключения приложения. Значение по умолчанию: 8000

Пример:

server:
  port: 8000

Раздел настроек management

Раздел настроек management содержит настройки Spring Boot Actuator. Включает в себя следующие параметры:

  • management.endpoints.web.exposure.include – ID адресов мониторинга. По умолчанию сюда включены health и prometheus.

  • management.endpoint – группа параметров управления мониторингом:

    • prometheus.enabled – предоставление данных мониторинга в Prometheus. По умолчанию: true.

    • health.probes.enabled – настройка Spring Boot Actuator для интеграции с Kubernetes. По умолчанию: true.

    • health.show-details – определяет уровень детализации. Значение по умолчанию: always, указывает на полную информацию.

  • management.health.livenessState.enabled – механизм внутреннего отслеживания и управления состоянием жизнеспособности приложения. По умолчанию: true.

  • management.health.readinessState.enabled – механизм внутреннего отслеживания и управления состоянием готовности приложения к работе. По умолчанию: true.

Пример:

management:
  endpoints.web.exposure.include:
  - health
  - prometheus

  endpoint:
    prometheus.enabled: true
    health:
      probes.enabled: true
      show-details: always

  health:
    livenessState.enabled: true
    readinessState.enabled: true

Раздел настроек cdc

Раздел содержит общие настройки приложения Универсального Обработчика. Включает в себя следующие параметры:

  • cdc.worker.id – идентификатор процесса в системе.

  • cdc.shutdown.timeout.ms – максимальное время мягкого останова процесса в миллисекундах. По истечении времени происходит жёсткий останов процесса.

Пример:

cdc:
  worker:
    id: cdc-task
  shutdown:
    timeout:
      ms: 30000

Раздел настроек source

Раздел содержит настройки подключения к источнику данных. Включает в себя следующие параметры:

  • source.plugin.path – путь, где расположены динамически загружаемые модули для подключения к источнику данных. Путь должен указывать на папку, внутри которой находятся модули, каждый из которых распакован в свою папку. Просканированным будет только один модуль, что ускорит запуск приложения. См. подробнее.

  • source.connector.* – раздел параметров настройки выбранного коннектора для источника данных.

  • source.connector.class – наименование класса модуля подключения к источнику данных. Обязательный параметр, который определяет, какой из просканированных и загруженных модулей подключения к источнику данных нужно настроить и запустить.

  • source.transforms.names – список имён преобразований событий, заданных в разделе source.transforms.config. Универсальный Обработчик применяет преобразования к событиям в том порядке, в каком они указаны в source.transforms.names. По умолчанию является пустым списком.

  • source.transforms.config.* – пространство для задания настроек преобразований. Каждое преобразование задаётся отдельным ключом в пространстве, который и является именем преобразования, указываемым в source.transforms.names. Значением этого ключа является объект в формате YAML, содержащий настройки преобразования, включая его класс. По умолчанию пространство source.transforms.config является пустым объектом.

Пример:

source:
  plugin.path: /libs/connect
  connector:
    class: io.tarantool.connector.TarantoolConnector
    group.id: source-worker-1
    topic:
      prefix: cdc
    tarantool:
      topic:
        name: cdc
        strategy: per_space
      connect:
        username: admin
        password: secret-cluster-cookie
        host: localhost:3605
        timeout: 2000
      batch:
        size: 2048
      replication:
        idle:
          timeout: 2000
        type: anonymous
      spaces:
        fields:
          exclude: bucket_id
          include:
        include: User,all_types,intTest,arrays,all_types_tcs
        exclude:
      datetime:
        zone:
          enabled: true

Настройка коннекторов для подключения к PostgreSQL и Oracle из экосистемы Debezium также описывается в разделе source.connector.*. Документация по настройкам коннекторов из экосистемы Debezium:

Определение пути расположения модулей подключения к источнику данных

Предположим следующую схему расположения модулей:

/
│  ...
└─ libs/
    └─ connect/
       └─ source/
          │  ...
          ├─ postgresql-connector/
          │  │  ...
          │  └─ postgresql-connector.jar
          │  ...
          └─ tarantool-source-connector/
             │ ...
             └─ tarantool-source-connector.jar

Тогда путь должен быть указан как /libs/connect/source. При этом Универсальным Обработчиком будут загружены, но не запущены два модуля для подключения к источнику данных. Поскольку сканирование всех модулей подключения может занимать время, рекомендуется следующая форма расположения модулей:

/
│  ...
└─ libs/
    └─ connect/
       └─ source/
          │  ...
          ├─ postgresql-connector/
          │   └─ postgresql-connector/
          │      │  ...
          │      └─ tarantool-source-connector.jar
          │  ...
          └─ tarantool-source-connector/
             └─ tarantool-source-connector/
                │  ...
                └─ tarantool-source-connector.jar

А в параметре source.plugin.path нужно указать либо /libs/connect/source/postgresql-connector, либо /libs/connect/source/tarantool-connector. То есть путь всегда должен указывать на папку уровнем выше относительно папки с модулем подключения к источнику данных.

Раздел настроек sink

Раздел содержит настройки подключения к приемнику данных. Включает в себя следующие параметры:

  • sink.plugin.path – путь, где расположены динамически загружаемые модули для подключения к приемнику данных. Путь должен указывать на папку, внутри которой находятся модули, кждый из которых распакован в свою папку. Просканированным будет только один модуль, что ускорит запуск приложения. См. подробнее.

  • sink.connector.* – раздел для указания параметров настройки выбранного коннектора для приемника данных.

  • sink.connector.class – наименование класса модуля подключения к приемнику данных. Обязательный параметр, который определяет, какой из просканированных и загруженных модулей подключения к приемнику данных нужно настроить и запустить.

  • sink.transforms.names – список имён преобразований событий, заданных в разделе sink.transforms.config. Универсальный Обработчик применяет преобразования к событиям в том порядке, в каком они указаны в sink.transforms.names. По умолчанию задаёт два преобразования для извлечения имени очереди (топика) из событий.

  • sink.transforms.config.* – пространство для задания настроек преобразований. Каждое преобразование задаётся отдельным ключом в пространстве, который и является именем преобразования, указываемым в sink.transforms.names. Значением этого ключа является объект в формате YAML, содержащий настройки преобразования, включая его класс. По умолчанию пространство sink.transforms.config содержит настройки для двух преобрзования, извлекающих имя очереди (топика) из событий.

  • sink.retry.* – задаёт правила обработки ошибки RetriableException при работе с приемником данных.

  • sink.retry.count – количество повторных попыток отправки сообщений в приемник данных. По истечении этих попыток будет выброшена ошибка и Обработчик завершит свою работу.

  • sink.retry.backoff – пауза между повторными попытками отправки сообщений в приемник данных. Может принимать разные единицы измерения при указании соответствующего суффикса: секунды - s, миллисекунды - ms.

  • sink.retry.timeout – общее время на выполнение всех попыток отправки сообщений в приемник данных. По истечении этого времени, если сообщения не будут записаны, выбросится ошибка и обработчик завершит свою работу. Может принимать разные единицы измерения при указании соответствующего суффикса: секунды - s, миллисекунды - ms.

Пример:

sink:
  plugin.path: /libs/connect
  connector:
    class: io.debezium.connector.jdbc.JdbcSinkConnector
    connection:
      url: jdbc:postgresql://target-postgres:5432/iot?reWriteBatchedInserts=true
      username: iot
      password: iot
    insert.mode: upsert
    primary.key.mode: record_key
    tasks.max: 1
    delete.enabled: true
    schema.evolution: basic
    database.time.zone: UTC
    auto.create: true
    quote.identifiers: true
    truncate.enabled: false
    batch.size: 500
    use.reduction.buffer: true
  retry:
    count: 5
    backoff: 200ms
    timeout: 3s
  transforms:
    names:
    - topic-from-source-table
    - topic-from-source-space-name
    config:
      topic-from-source-table:
        type: io.tarantool.cdc.transforms.smt.ExtractTopic$Value
        field.path: source.table
        skip.missing.or.null: true
      topic-from-source-space-name:
        type: io.tarantool.cdc.transforms.smt.ExtractTopic$Value
        field.path: source.space_name
        skip.missing.or.null: true

Определение пути расположения модулей подключения к приемнику данных

Предположим следующую схему расположения модулей:

/
│  ...
└─ libs/
   └─ connect/
      └─ sink/
         │  ...
         ├─ jdbc-sink-connector/
         │  │  ...
         │  └─ jdbc-sink-connector.jar
         │  ...
         └─ tarantool-sink-connector/
            │  ...
            └─ tarantool-sink-connector.jar

Тогда путь должен быть /libs/connect/sink. При этом универсальным обработчиком будут загружены, но не запущены два модуля для подключения к приемнику данных. Поскольку сканирование всех модулей подключения может занимать время, рекомендуется следующая форма расположения модулей:

/
│  ...
└─ libs/
   └─ connect/
      └─ sink/
         │  ...
         ├─ jdbc-sink-connector/
         │  └─ jdbc-sink-connector/
         │     │  ...
         │     └─ jdbc-sink-connector.jar
         │  ...
         └─ tarantool-sink-connector/
            └─ tarantool-sink-connector/
               │  ...
               └─ tarantool-sink-connector.jar

А в параметре sink.plugin.path указать либо /libs/connect/sink/jdbc-sink-connector, либо /libs/connect/sink/tarantool-sink-connector. То есть, путь всегда должен указывать на папку уровнем выше относительно папки с модулем подключения к приемнику данных.

Раздел настроек offset

Раздел содержит настройки контрольных точек. Включает в себя следующие параметры:

  • offset.flush.interval.ms – интервал сохранения контрольных точек Обработчика в миллисекундах.

  • offset.flush.timeout.ms – максимальное время выполнения операции записи контрольных точек в миллисекундах. При превышении времени выбрасывается ошибка записи контрольных точек.

  • offset.failures.max.count – допустимое количество ошибок записи контрольных точек. При превышении этого количества работа Универсального Обработчика завершается. По умолчанию: 0. При этом значении работа универсального обработчика завершится после первой же ошибки. Для полного игнорирования ошибок необходимо выставить значение -1.

  • offset.storage.* – пространство для настройки хранилища контрольных точек.

  • offset.storage.type – наименование модуля хранения контрольных точек. Поддерживаемые модули:

    • tqe – хранение контрольных точек в Tarantool Queue Enterprise.

    • file – хранение контрольных точек в файле.

    • kafka – хранение контрольных точек в Kafka.

Пример:

offset:
  flush:
    interval.ms: 5000
    timeout.ms: 2500
  storage:
    type: tqe
    address: offsets-grpc-server:18182
    queue: offsets
    connection.id: pg-task-1 # ключ, по которому Обработчик будет сохранять свой снимок контрольных точек.

Раздел настроек throttle

Раздел содержит настройки ограничителя трафика. Включает в себя следующие параметры:

  • throttler.max.eps – задаёт ограничение по количеству событий в секунду. По умолчанию: -1 (ограничение не задано).

Пример:

throttle:
  throttler.max.eps: -1
Нашли ответ на свой вопрос?
Обратная связь