Коннекторы¶
Коннекторы – это функциональные составляющие Tarantool CDC, при помощи которых присходит взаимодействие с источниками и приемниками данных. Коннектор для источника (source connector) и коннектор для приемника (sink connector) совместно организуют конвейер по извлечению, трансформации и выгрузке данных.
В поставку Tarantool CDC входят:
Коннекторы для источника и приемника данных PostgreSQL.
Коннекторы для источника и приемника данных Oracle.
Коннекторы для источника и приемника данных Kafka.
Коннекторы для источника и приемника данных Tarantool DB 1.x и 2.x.
Коннекторы для источника и приемника данных Tarantool Data Grid 2.x.
Коннекторы для источника и приемника данных Tarantool Queue Enterprise.
Коннектор для приемника данных Clickhouse.
Коннектор для приемника данных Elasticsearch.
В файле настроек конкретный коннектор указывается при помощи обязательного параметра class. Параметр должен
быть прописан в секциях connector в разделах параметров источника данных (source) и приемника данных (sink):
connector:
class: io.tarantool.connector.KafkaSourceConnector
Кроме параметра class, в секции connector указываются и другие параметры коннекторов.
Коннекторы Kafka¶
Больше информации о параметрах Kafka, перечисленных в статьях о коннекторах для источника и приемника, смотрите в официальной документации Kafka.
Коннектор для источника данных Kafka¶
Коннектор для источника Kafka позволяет Tarantool CDC получать данные из брокеров Kafka.
Параметры коннектора указываются в секции connector:
key.converter– название класса конвертера для ключа. Необязательный параметр, который может принимать 2 значения:org.apache.kafka.connect.json.JsonConverter(по умолчанию);io.apicurio.registry.utils.converter.AvroConverter.
key.converter.schemas.enable– определяет, включать ли схему в каждое сериализованное значение ключа. Необязательный параметр, значение по умолчанию:true.key.converter.schemas.cache.size– максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию:1000.key.converter.decimal.format– задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию:BASE64.key.converter.replace.null.with.default– определяет, заменять ли поля, имеющие значениеNULL, на значение по умолчанию. Приtrueприводит к заменеNULLна значение по умолчанию. ПриfalseзаменыNULLне происходит. Необязательный параметр, значение по умолчанию:true.key.converter.apicurio.registry.url– обязателен приkey.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для ключа. См. подробнее.value.converter– название класса конвертера для значения. Необязательный параметр, который может принимать 2 значения:org.apache.kafka.connect.json.JsonConverter(по умолчанию);io.apicurio.registry.utils.converter.AvroConverter.
value.converter.schemas.enable– определяет, включать ли схему в каждое сериализованное значение. Необязательный параметр, значение по умолчанию:true.value.converter.schemas.cache.size– максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию:1000.value.converter.decimal.format– задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию:BASE64.value.converter.replace.null.with.default– определяет, заменять ли поля, имеющие значениеNULL, на значение по умолчанию. Приtrueприводит к заменеNULLна значение по умолчанию. ПриfalseзаменыNULLне происходит. Необязательный параметр, значение по умолчанию:true.value.converter.apicurio.registry.url– обязателен приvalue.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для значения. См. подробнее.bootstrap.servers– список адресов брокеров, к которым подключаетсяkafka-consumer. См. подробнее. Параметр так же может быть указан в секцииconsumer. При одновременном указании параметра в 2х местах с отличными друг от друга значениями, приоритетным будет значение в секцииconsumer. Параметр обязательно должен быть указан хотя бы в 1 из допустимых мест.topics– динамическая карта, в которой ключ - название топика, из которогоkafka-consumerбудет забирать данные, а значение – номер партиций, из которыхkafka-consumerбудет забираться данные для текущего топика. Параметр обязателен. Пример карты:topic-1: 0–kafka-consumerбудет забирать данные из 0 партиции топикаtopic-1.second-topic: 1-4–kafka-consumerбудет забирать данные из [1;4) партиций топикаsecond-topic(не включая правую границу). Т.е. будет забирать данные из 1,2,3 партиции.topic-name: 0,1-4–kafka-consumerбудет забирать данные из 0 и [1;4) партиций топикаtopic-name(не включая правую границу). Т.е. будет забирать данные из 0,1,2,3 партиции.
consumer– секция определяет параметрыkafka-consumer. Все параметры, описанные ниже, относятся к этой секции и не являются обязательными.fetch.max.bytes– cм. подробнее.max.poll.records– cм. подробнее.enable.auto.commit– определяет, будет ли потребитель автоматически периодически фиксировать смещения потребленных сообщений. Может принимать только предустановленное значение:false.key.deserializer– класс десериализатора, используемый для преобразования байтового представления ключа сообщения из формата хранения в Kafka в объект целевого языка программирования. Может принимать только предустановленное значение:org.apache.kafka.common.serialization.ByteArrayDeserializer.value.deserializer– класс десериализатора, используемый для преобразования байтового представления значения сообщения из формата хранения в Kafka в объект целевого языка программирования. Может принимать только предустановленное значение:org.apache.kafka.common.serialization.ByteArrayDeserializer.auto.offset.reset– Определяет поведение потребителя при отсутствии сохраненного смещения для партиции в группе потребителей. Может принимать только предустановленное значение:earliest.
Пример настройки:
connector:
class: io.tarantool.connector.KafkaSourceConnector
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
key.converter.schemas.cache.size: 1000
key.converter.decimal.format: BASE64
key.converter.replace.null.with.default: true
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: true
value.converter.schemas.cache.size: 1000
value.converter.decimal.format: BASE64
value.converter.replace.null.with.default: true
bootstrap.servers: broker-1:9092,broker-2:9092
topics:
topic-1: 0
second-topic: 1-4
topic-N: 0,1-4
consumer:
fetch.max.bytes: 52428800
max.poll.records: 500
Коннектор для приемника данных Kafka¶
Коннектор для приемника данных Kafka позволяет Tarantool CDC передавать данные в брокеры Kafka.
Параметры коннектора указываются в секции connector.
key.converter– название класса конвертера для ключа. Необязательный параметр, который может принимать 2 значения:org.apache.kafka.connect.json.JsonConverter(по умолчанию);io.apicurio.registry.utils.converter.AvroConverter.
key.converter.schemas.enable– определяет, включать ли схему в каждое сериализованное значение ключа. Необязательный параметр, значение по умолчанию:true.key.converter.schemas.cache.size– максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию:1000.key.converter.decimal.format– задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию:BASE64.key.converter.replace.null.with.default– определяет, заменять ли поля, имеющие значениеNULL, на значение по умолчанию. Приtrueприводит к заменеNULLна значение по умолчанию. ПриfalseзаменыNULLне происходит. Необязательный параметр, значение по умолчанию:true.key.converter.apicurio.registry.url– обязателен приkey.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для ключа. См. подробнее.value.converter– название класса конвертера для значения. Необязательный параметр, который может принимать 2 значения:org.apache.kafka.connect.json.JsonConverter(по умолчанию);io.apicurio.registry.utils.converter.AvroConverter.
value.converter.schemas.enable– определяет, включать ли схему в каждое сериализованное значение. Необязательный параметр, значение по умолчанию:true.value.converter.schemas.cache.size– максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию:1000.value.converter.decimal.format– задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию:BASE64.value.converter.replace.null.with.default– определяет, заменять ли поля, имеющие значениеNULL, на значение по умолчанию. Приtrueприводит к заменеNULLна значение по умолчанию. ПриfalseзаменыNULLне происходит. Необязательный параметр, значение по умолчанию:true.value.converter.apicurio.registry.url– обязателен приvalue.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для значения. См. подробнее.bootstrap.servers– список адресов брокеров, к которым подключаетсяkafka-producerиadmin-producer. См. подробнее. Параметр может быть указан в секцияхproducerиadmin. При одновременном указании параметра в нескольких местах с отличными друг от друга значениями, приоритетными будут значения параметра в секцияхproducerдляkafka-producerи/илиadminдляadmin-producer. Параметр обязательно должен быть указан хотя бы в 1 из допустимых мест.producer– секция определяет настройкуkafka-producer, cм. подробнее. Параметры этой секции не являются обязательными. Ниже представлены некоторые из возможных параметров секции:key.serializer– указывает класс сериализатора, используемый для преобразования объекта ключа сообщения из формата приложения в байтовое представление для хранения в брокере Kafka. Значение по умолчанию:org.apache.kafka.common.serialization.ByteArrayDeserializer.value.serializer– указывает класс сериализатора, используемый для преобразования объекта значения сообщения из формата приложения в байтовое представление для хранения в брокере Kafka. Значение по умолчанию:org.apache.kafka.common.serialization.ByteArrayDeserializer.max.block.ms– определяет максимальное время блокировки при вызове методовsend()иpartitionsFor(), после которого выбрасывается исключениеTimeoutException. Значение по умолчанию:Long.MAX_VALUE.enable.idempotence– регулирут идемпотентный режим работы. Этот режим гарантирует, что отправка сообщений будет выполнена ровно один раз в рамках сессии продюсера, даже при повторных попытках отправки. Значение по умолчанию:false.acks– определяет количество подтверждений, которые продюсер должен получить от лидеров реплик партиций перед тем, как запись будет считаться завершенной. Значение по умолчанию:all.max.in.flight.requests.per.connection– определяет максимальное количество не подтвержденных запросов отправки, которые могут быть отправлены на один брокер без получения ответа. Значение по умолчанию:1.delivery.timeout.ms– определяет максимальное время ожидания подтверждения доставки сообщения, после которого операция отправки считается завершенной (успешно или с ошибкой). Значение по умолчанию:Integer.MAX_VALUE.
admin– секция параметров для настройкиadmin-producer. Секция не является обязательной. Особенности настройки см. подробнееtopics– секция параметров для настройки топиков. Секция не является обязательной. Особенности настройки см. подробнее.
Пример настройки:
connector:
class: io.tarantool.connector.KafkaSinkConnector
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
key.converter.schemas.cache.size: 1000
key.converter.decimal.format: BASE64
key.converter.replace.null.with.default: true
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: true
value.converter.schemas.cache.size: 1000
value.converter.decimal.format: BASE64
value.converter.replace.null.with.default: true
bootstrap.servers: broker-1:9092,broker-2:9092
producer:
batch.size: 512
admin:
bootstrap.servers: broker-1:9092,broker-2:9092
topic.creation.default:
partitions: 1
replication.factor: 1