Универсальный Обработчик¶
Универсальный Обработчик – это приложение, обеспечивающее перенос информации от источника к приемнику данных. Настройка параметров переноса данных задается через файл конфигурации.
Настройка Универсального Обработчика¶
Универсальный Обработчик настраивается через конфигурационный файл application.yaml.
Вся конфигурация может быть собрана в одном файле или разделена на несколько файлов. В последнем случае
каждый файл должен называться application.yaml и располагаться на соответствующем логическом уровне.
Пример разбиения конфигурации на несколько файлов:
│ ...
└─ cdc/
├─ cdc-worker.jar # исполняемый файл Универсального Обработчика
├─ application.yaml # хранит основные настройки, например, настройки сервера и другие, связанные со Spring
│
└─ config/
├─ application.yaml # хранит общие настройки Обработчика, например, для работы с контрольными точками
│
├─ sink/
│ └─ application.yaml # хранит настройки подключения к приемнику данных
└─ source/
└─ application.yaml # хранит настройки подключения к источнику данных
Параметры в файле настроек application.yaml представлены в виде иерархии объектов формата YAML.
Пример:
spring:
application:
name: CDCWorker
profiles:
active: default
Эту же иерархическую структуру можно представить в виде строки, где уровни разделяются точками:
spring.application.name: CDCWorker
spring.profiles.active: default
Или в комбинированном виде:
spring:
application.name: CDCWorker
profiles.active: default
Комбинированная форма удобна, если на каком-то уровне определяется всего один потомок в дереве настроек. В этом случае можно упростить само дерево, соединяя узлы-потомки через точку.
Примечание
Для настройки Универсального Обработчика можно использовать файлы формата properties.
В этом случае конфигурационный файл (или файлы) должен называться application.properties и соответствовать
ранее описанным правилам расположения и представления.
Подробнее о формате properties можно прочитать по ссылкам:
Файл настроек Универсального Обработчика состоит из нескольких разделов:
spring– конфигурация Spring Boot.logging– настройки логирования.server– настройки сервера.management– настройки Spring Actuator.cdc– общие настройки приложения Универсального Обработчика.source– настройки подключения к источнику данных.sink– настройки подключения к приемнику данных.offset– настройки работы с контрольными точками.throttle– настройки для ограничителя трафика.
Примечание
Разделы настроек содержат ряд параметров Spring, подробнее о которых можно почитать в официальной документации Spring. При этом значения параметров по умолчанию в Универсальном Обработчике могут отличаться от значений, указанных в документации Spring.
Раздел настроек spring¶
Раздел spring содержит конфигурацию Spring Boot. Включает в себя следующие параметры:
application.name– имя приложения. Значение по умолчанию:cdc-worker.profiles.active– имя профиля. Значение по умолчанию:default.
Пример:
spring:
application.name: CDCWorker
profiles.active: default
Раздел настроек logging¶
Раздел logging содержит настройки логирования. Включает в себя следующие параметры:
logging.file.level– управление уровнем логирования модулей. Всего можно управлять двумя модулями, задавая для них различные уровни логирования при помощи значений:DEBUG,VERBOSE,INFO,WARNING,ERROR. ЗначениеOFFполностью отключает логирование событий модуля. По умолчанию логирование для модулей отключено:org.springframework.boot.SpringApplication– логирование самого процесса запуска приложения.org.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLogger– детальный отчет об автоконфигурации Spring Boot.
Отключение логирования модулей позволяет сократить как общий объем логов, так и количество лишней информации в них. Это ускоряет и упрощает анализ логов.
Пример:
logging:
file:
name: logs/cdc-worker.log
level:
org.springframework.boot.SpringApplication: OFF
org.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLogger: OFF
Примечание
Параметр logging.file.name позволяет включить запись логов в указанный файл в дополнение к стандартному выводу логов.
По умолчанию логирование в файл не производится.
Раздел настроек server¶
Раздел server содержит настройки сервера. Включает в себя следующие параметры:
server.port– порт сервера для подключения приложения. Значение по умолчанию:8000
Пример:
server:
port: 8000
Раздел настроек management¶
Раздел настроек management содержит настройки Spring Boot Actuator. Включает в себя следующие параметры:
management.endpoints.web.exposure.include– ID адресов мониторинга. По умолчанию сюда включеныhealthиprometheus.management.endpoint– группа параметров управления мониторингом:prometheus.enabled– предоставление данных мониторинга в Prometheus. По умолчанию:true.health.probes.enabled– настройка Spring Boot Actuator для интеграции с Kubernetes. По умолчанию:true.health.show-details– определяет уровень детализации. Значение по умолчанию:always, указывает на полную информацию.
management.health.livenessState.enabled– механизм внутреннего отслеживания и управления состоянием жизнеспособности приложения. По умолчанию:true.management.health.readinessState.enabled– механизм внутреннего отслеживания и управления состоянием готовности приложения к работе. По умолчанию:true.
Пример:
management:
endpoints.web.exposure.include:
- health
- prometheus
endpoint:
prometheus.enabled: true
health:
probes.enabled: true
show-details: always
health:
livenessState.enabled: true
readinessState.enabled: true
Раздел настроек cdc¶
Раздел содержит общие настройки приложения Универсального Обработчика. Включает в себя следующие параметры:
cdc.worker.id– идентификатор процесса в системе.cdc.shutdown.timeout.ms– максимальное время мягкого останова процесса в миллисекундах. По истечении времени происходит жёсткий останов процесса.
Пример:
cdc:
worker:
id: cdc-task
shutdown:
timeout:
ms: 30000
Раздел настроек source¶
Раздел содержит настройки подключения к источнику данных. Включает в себя следующие параметры:
source.plugin.path– путь, где расположены динамически загружаемые модули для подключения к источнику данных. Путь должен указывать на папку, внутри которой находятся модули, каждый из которых распакован в свою папку. Просканированным будет только один модуль, что ускорит запуск приложения. См. подробнее.source.connector.*– раздел параметров настройки выбранного коннектора для источника данных.source.connector.class– наименование класса модуля подключения к источнику данных. Обязательный параметр, который определяет, какой из просканированных и загруженных модулей подключения к источнику данных нужно настроить и запустить.source.transforms.names– список имён преобразований событий, заданных в разделеsource.transforms.config. Универсальный Обработчик применяет преобразования к событиям в том порядке, в каком они указаны вsource.transforms.names. По умолчанию является пустым списком.source.transforms.config.*– пространство для задания настроек преобразований. Каждое преобразование задаётся отдельным ключом в пространстве, который и является именем преобразования, указываемым вsource.transforms.names. Значением этого ключа является объект в формате YAML, содержащий настройки преобразования, включая его класс. По умолчанию пространствоsource.transforms.configявляется пустым объектом.
Пример:
source:
plugin.path: /libs/connect
connector:
class: io.tarantool.connector.TarantoolConnector
group.id: source-worker-1
topic:
prefix: cdc
tarantool:
topic:
name: cdc
strategy: per_space
connect:
username: admin
password: secret-cluster-cookie
host: localhost:3605
timeout: 2000
batch:
size: 2048
replication:
idle:
timeout: 2000
type: anonymous
spaces:
fields:
exclude: bucket_id
include:
include: User,all_types,intTest,arrays,all_types_tcs
exclude:
datetime:
zone:
enabled: true
Настройка коннекторов для подключения к PostgreSQL и Oracle из экосистемы Debezium также описывается в
разделе source.connector.*. Документация по настройкам коннекторов из экосистемы Debezium:
Определение пути расположения модулей подключения к источнику данных¶
Предположим следующую схему расположения модулей:
/
│ ...
└─ libs/
└─ connect/
└─ source/
│ ...
├─ postgresql-connector/
│ │ ...
│ └─ postgresql-connector.jar
│ ...
└─ tarantool-source-connector/
│ ...
└─ tarantool-source-connector.jar
Тогда путь должен быть указан как /libs/connect/source. При этом Универсальным Обработчиком будут загружены, но
не запущены два модуля для подключения к источнику данных. Поскольку сканирование всех модулей подключения может
занимать время, рекомендуется следующая форма расположения модулей:
/
│ ...
└─ libs/
└─ connect/
└─ source/
│ ...
├─ postgresql-connector/
│ └─ postgresql-connector/
│ │ ...
│ └─ tarantool-source-connector.jar
│ ...
└─ tarantool-source-connector/
└─ tarantool-source-connector/
│ ...
└─ tarantool-source-connector.jar
А в параметре source.plugin.path нужно указать либо
/libs/connect/source/postgresql-connector, либо /libs/connect/source/tarantool-connector. То есть путь всегда
должен указывать на папку уровнем выше относительно папки с модулем подключения к источнику данных.
Раздел настроек sink¶
Раздел содержит настройки подключения к приемнику данных. Включает в себя следующие параметры:
sink.plugin.path– путь, где расположены динамически загружаемые модули для подключения к приемнику данных. Путь должен указывать на папку, внутри которой находятся модули, кждый из которых распакован в свою папку. Просканированным будет только один модуль, что ускорит запуск приложения. См. подробнее.sink.connector.*– раздел для указания параметров настройки выбранного коннектора для приемника данных.sink.connector.class– наименование класса модуля подключения к приемнику данных. Обязательный параметр, который определяет, какой из просканированных и загруженных модулей подключения к приемнику данных нужно настроить и запустить.sink.transforms.names– список имён преобразований событий, заданных в разделеsink.transforms.config. Универсальный Обработчик применяет преобразования к событиям в том порядке, в каком они указаны вsink.transforms.names. По умолчанию задаёт два преобразования для извлечения имени очереди (топика) из событий.sink.transforms.config.*– пространство для задания настроек преобразований. Каждое преобразование задаётся отдельным ключом в пространстве, который и является именем преобразования, указываемым вsink.transforms.names. Значением этого ключа является объект в формате YAML, содержащий настройки преобразования, включая его класс. По умолчанию пространствоsink.transforms.configсодержит настройки для двух преобрзования, извлекающих имя очереди (топика) из событий.sink.retry.*– задаёт правила обработки ошибкиRetriableExceptionпри работе с приемником данных.sink.retry.count– количество повторных попыток отправки сообщений в приемник данных. По истечении этих попыток будет выброшена ошибка и Обработчик завершит свою работу.sink.retry.backoff– пауза между повторными попытками отправки сообщений в приемник данных. Может принимать разные единицы измерения при указании соответствующего суффикса: секунды -s, миллисекунды -ms.sink.retry.timeout– общее время на выполнение всех попыток отправки сообщений в приемник данных. По истечении этого времени, если сообщения не будут записаны, выбросится ошибка и обработчик завершит свою работу. Может принимать разные единицы измерения при указании соответствующего суффикса: секунды -s, миллисекунды -ms.
Пример:
sink:
plugin.path: /libs/connect
connector:
class: io.debezium.connector.jdbc.JdbcSinkConnector
connection:
url: jdbc:postgresql://target-postgres:5432/iot?reWriteBatchedInserts=true
username: iot
password: iot
insert.mode: upsert
primary.key.mode: record_key
tasks.max: 1
delete.enabled: true
schema.evolution: basic
database.time.zone: UTC
auto.create: true
quote.identifiers: true
truncate.enabled: false
batch.size: 500
use.reduction.buffer: true
retry:
count: 5
backoff: 200ms
timeout: 3s
transforms:
names:
- topic-from-source-table
- topic-from-source-space-name
config:
topic-from-source-table:
type: io.tarantool.cdc.transforms.smt.ExtractTopic$Value
field.path: source.table
skip.missing.or.null: true
topic-from-source-space-name:
type: io.tarantool.cdc.transforms.smt.ExtractTopic$Value
field.path: source.space_name
skip.missing.or.null: true
Определение пути расположения модулей подключения к приемнику данных¶
Предположим следующую схему расположения модулей:
/
│ ...
└─ libs/
└─ connect/
└─ sink/
│ ...
├─ jdbc-sink-connector/
│ │ ...
│ └─ jdbc-sink-connector.jar
│ ...
└─ tarantool-sink-connector/
│ ...
└─ tarantool-sink-connector.jar
Тогда путь должен быть /libs/connect/sink. При этом универсальным обработчиком будут загружены, но не запущены
два модуля для подключения к приемнику данных. Поскольку сканирование всех модулей подключения может занимать
время, рекомендуется следующая форма расположения модулей:
/
│ ...
└─ libs/
└─ connect/
└─ sink/
│ ...
├─ jdbc-sink-connector/
│ └─ jdbc-sink-connector/
│ │ ...
│ └─ jdbc-sink-connector.jar
│ ...
└─ tarantool-sink-connector/
└─ tarantool-sink-connector/
│ ...
└─ tarantool-sink-connector.jar
А в параметре sink.plugin.path указать либо
/libs/connect/sink/jdbc-sink-connector, либо /libs/connect/sink/tarantool-sink-connector. То есть, путь
всегда должен указывать на папку уровнем выше относительно папки с модулем подключения к приемнику данных.
Раздел настроек offset¶
Раздел содержит настройки контрольных точек. Включает в себя следующие параметры:
offset.flush.interval.ms– интервал сохранения контрольных точек Обработчика в миллисекундах.offset.flush.timeout.ms– максимальное время выполнения операции записи контрольных точек в миллисекундах. При превышении времени выбрасывается ошибка записи контрольных точек.offset.failures.max.count– допустимое количество ошибок записи контрольных точек. При превышении этого количества работа Универсального Обработчика завершается. По умолчанию:0. При этом значении работа универсального обработчика завершится после первой же ошибки. Для полного игнорирования ошибок необходимо выставить значение-1.offset.storage.*– пространство для настройки хранилища контрольных точек.offset.storage.type– наименование модуля хранения контрольных точек. Поддерживаемые модули:tqe– хранение контрольных точек в Tarantool Queue Enterprise.file– хранение контрольных точек в файле.kafka– хранение контрольных точек в Kafka.
Пример:
offset:
flush:
interval.ms: 5000
timeout.ms: 2500
storage:
type: tqe
address: offsets-grpc-server:18182
queue: offsets
connection.id: pg-task-1 # ключ, по которому Обработчик будет сохранять свой снимок контрольных точек.
Раздел настроек throttle¶
Раздел содержит настройки ограничителя трафика. Включает в себя следующие параметры:
throttler.max.eps– задаёт ограничение по количеству событий в секунду. По умолчанию:-1(ограничение не задано).
Пример:
throttle:
throttler.max.eps: -1