Коннекторы | Cdc

Version:

latest

Коннекторы

Коннекторы – это функциональные составляющие Tarantool CDC, при помощи которых присходит взаимодействие с источниками и приемниками данных. Коннектор для источника (source connector) и коннектор для приемника (sink connector) совместно организуют конвейер по извлечению, трансформации и выгрузке данных.

В поставку Tarantool CDC входят:

  • Коннекторы для источника и приемника данных PostgreSQL.

  • Коннекторы для источника и приемника данных Oracle.

  • Коннекторы для источника и приемника данных Kafka.

  • Коннекторы для источника и приемника данных Tarantool DB 1.x и 2.x.

  • Коннекторы для источника и приемника данных Tarantool Data Grid 2.x.

  • Коннекторы для источника и приемника данных Tarantool Queue Enterprise.

  • Коннектор для приемника данных Clickhouse.

  • Коннектор для приемника данных Elasticsearch.

В файле настроек конкретный коннектор указывается при помощи обязательного параметра class. Параметр должен быть прописан в секциях connector в разделах параметров источника данных (source) и приемника данных (sink):

connector:
  class: io.tarantool.connector.KafkaSourceConnector

Кроме параметра class, в секции connector указываются и другие параметры коннекторов.

Коннекторы Kafka

Больше информации о параметрах Kafka, перечисленных в статьях о коннекторах для источника и приемника, смотрите в официальной документации Kafka.

Коннектор для источника данных Kafka

Коннектор для источника Kafka позволяет Tarantool CDC получать данные из брокеров Kafka.

Параметры коннектора указываются в секции connector:

  • key.converter – название класса конвертера для ключа. Необязательный параметр, который может принимать 2 значения:

    • org.apache.kafka.connect.json.JsonConverter (по умолчанию);

    • io.apicurio.registry.utils.converter.AvroConverter.

  • key.converter.schemas.enable – определяет, включать ли схему в каждое сериализованное значение ключа. Необязательный параметр, значение по умолчанию: true.

  • key.converter.schemas.cache.size – максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию: 1000.

  • key.converter.decimal.format – задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию: BASE64.

  • key.converter.replace.null.with.default – определяет, заменять ли поля, имеющие значение NULL, на значение по умолчанию. При true приводит к замене NULL на значение по умолчанию. При false замены NULL не происходит. Необязательный параметр, значение по умолчанию: true.

  • key.converter.apicurio.registry.url – обязателен при key.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для ключа. См. подробнее.

  • value.converter – название класса конвертера для значения. Необязательный параметр, который может принимать 2 значения:

    • org.apache.kafka.connect.json.JsonConverter (по умолчанию);

    • io.apicurio.registry.utils.converter.AvroConverter.

  • value.converter.schemas.enable – определяет, включать ли схему в каждое сериализованное значение. Необязательный параметр, значение по умолчанию: true.

  • value.converter.schemas.cache.size – максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию: 1000.

  • value.converter.decimal.format – задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию: BASE64.

  • value.converter.replace.null.with.default – определяет, заменять ли поля, имеющие значение NULL, на значение по умолчанию. При true приводит к замене NULL на значение по умолчанию. При false замены NULL не происходит. Необязательный параметр, значение по умолчанию: true.

  • value.converter.apicurio.registry.url – обязателен при value.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для значения. См. подробнее.

  • bootstrap.servers – список адресов брокеров, к которым подключается kafka-consumer. См. подробнее. Параметр так же может быть указан в секции consumer. При одновременном указании параметра в 2х местах с отличными друг от друга значениями, приоритетным будет значение в секции consumer. Параметр обязательно должен быть указан хотя бы в 1 из допустимых мест.

  • topics – динамическая карта, в которой ключ - название топика, из которого kafka-consumer будет забирать данные, а значение – номер партиций, из которых kafka-consumer будет забираться данные для текущего топика. Параметр обязателен. Пример карты:

    • topic-1: 0kafka-consumer будет забирать данные из 0 партиции топика topic-1.

    • second-topic: 1-4kafka-consumer будет забирать данные из [1;4) партиций топика second-topic (не включая правую границу). Т.е. будет забирать данные из 1,2,3 партиции.

    • topic-name: 0,1-4kafka-consumer будет забирать данные из 0 и [1;4) партиций топика topic-name (не включая правую границу). Т.е. будет забирать данные из 0,1,2,3 партиции.

  • consumer – секция определяет параметры kafka-consumer. Все параметры, описанные ниже, относятся к этой секции и не являются обязательными.

    • fetch.max.bytescм. подробнее.

    • max.poll.recordscм. подробнее.

    • enable.auto.commit – определяет, будет ли потребитель автоматически периодически фиксировать смещения потребленных сообщений. Может принимать только предустановленное значение: false.

    • key.deserializer – класс десериализатора, используемый для преобразования байтового представления ключа сообщения из формата хранения в Kafka в объект целевого языка программирования. Может принимать только предустановленное значение: org.apache.kafka.common.serialization.ByteArrayDeserializer.

    • value.deserializer – класс десериализатора, используемый для преобразования байтового представления значения сообщения из формата хранения в Kafka в объект целевого языка программирования. Может принимать только предустановленное значение: org.apache.kafka.common.serialization.ByteArrayDeserializer.

    • auto.offset.reset – Определяет поведение потребителя при отсутствии сохраненного смещения для партиции в группе потребителей. Может принимать только предустановленное значение: earliest.

Пример настройки:

connector:
  class: io.tarantool.connector.KafkaSourceConnector
  key.converter: org.apache.kafka.connect.json.JsonConverter
  key.converter.schemas.enable: true
  key.converter.schemas.cache.size: 1000
  key.converter.decimal.format: BASE64
  key.converter.replace.null.with.default: true
  value.converter: org.apache.kafka.connect.json.JsonConverter
  value.converter.schemas.enable: true
  value.converter.schemas.cache.size: 1000
  value.converter.decimal.format: BASE64
  value.converter.replace.null.with.default: true
  bootstrap.servers: broker-1:9092,broker-2:9092
  topics:
    topic-1: 0
    second-topic: 1-4
    topic-N: 0,1-4
  consumer:
    fetch.max.bytes: 52428800
    max.poll.records: 500

Коннектор для приемника данных Kafka

Коннектор для приемника данных Kafka позволяет Tarantool CDC передавать данные в брокеры Kafka.

Параметры коннектора указываются в секции connector.

  • key.converter – название класса конвертера для ключа. Необязательный параметр, который может принимать 2 значения:

    • org.apache.kafka.connect.json.JsonConverter (по умолчанию);

    • io.apicurio.registry.utils.converter.AvroConverter.

  • key.converter.schemas.enable – определяет, включать ли схему в каждое сериализованное значение ключа. Необязательный параметр, значение по умолчанию: true.

  • key.converter.schemas.cache.size – максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию: 1000.

  • key.converter.decimal.format – задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию: BASE64.

  • key.converter.replace.null.with.default – определяет, заменять ли поля, имеющие значение NULL, на значение по умолчанию. При true приводит к замене NULL на значение по умолчанию. При false замены NULL не происходит. Необязательный параметр, значение по умолчанию: true.

  • key.converter.apicurio.registry.url – обязателен при key.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для ключа. См. подробнее.

  • value.converter – название класса конвертера для значения. Необязательный параметр, который может принимать 2 значения:

    • org.apache.kafka.connect.json.JsonConverter (по умолчанию);

    • io.apicurio.registry.utils.converter.AvroConverter.

  • value.converter.schemas.enable – определяет, включать ли схему в каждое сериализованное значение. Необязательный параметр, значение по умолчанию: true.

  • value.converter.schemas.cache.size – максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию: 1000.

  • value.converter.decimal.format – задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию: BASE64.

  • value.converter.replace.null.with.default – определяет, заменять ли поля, имеющие значение NULL, на значение по умолчанию. При true приводит к замене NULL на значение по умолчанию. При false замены NULL не происходит. Необязательный параметр, значение по умолчанию: true.

  • value.converter.apicurio.registry.url – обязателен при value.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для значения. См. подробнее.

  • bootstrap.servers – список адресов брокеров, к которым подключается kafka-producer и admin-producer. См. подробнее. Параметр может быть указан в секциях producer и admin. При одновременном указании параметра в нескольких местах с отличными друг от друга значениями, приоритетными будут значения параметра в секциях producer для kafka-producer и/или admin для admin-producer. Параметр обязательно должен быть указан хотя бы в 1 из допустимых мест.

  • producer – секция определяет настройку kafka-producer, cм. подробнее. Параметры этой секции не являются обязательными. Ниже представлены некоторые из возможных параметров секции:

    • key.serializer – указывает класс сериализатора, используемый для преобразования объекта ключа сообщения из формата приложения в байтовое представление для хранения в брокере Kafka. Значение по умолчанию: org.apache.kafka.common.serialization.ByteArrayDeserializer.

    • value.serializer – указывает класс сериализатора, используемый для преобразования объекта значения сообщения из формата приложения в байтовое представление для хранения в брокере Kafka. Значение по умолчанию: org.apache.kafka.common.serialization.ByteArrayDeserializer.

    • max.block.ms – определяет максимальное время блокировки при вызове методов send() и partitionsFor(), после которого выбрасывается исключение TimeoutException. Значение по умолчанию: Long.MAX_VALUE.

    • enable.idempotence – регулирут идемпотентный режим работы. Этот режим гарантирует, что отправка сообщений будет выполнена ровно один раз в рамках сессии продюсера, даже при повторных попытках отправки. Значение по умолчанию: false.

    • acks – определяет количество подтверждений, которые продюсер должен получить от лидеров реплик партиций перед тем, как запись будет считаться завершенной. Значение по умолчанию: all.

    • max.in.flight.requests.per.connection – определяет максимальное количество не подтвержденных запросов отправки, которые могут быть отправлены на один брокер без получения ответа. Значение по умолчанию: 1.

    • delivery.timeout.ms – определяет максимальное время ожидания подтверждения доставки сообщения, после которого операция отправки считается завершенной (успешно или с ошибкой). Значение по умолчанию: Integer.MAX_VALUE.

  • admin – секция параметров для настройки admin-producer. Секция не является обязательной. Особенности настройки см. подробнее

  • topics – секция параметров для настройки топиков. Секция не является обязательной. Особенности настройки см. подробнее.

Пример настройки:

connector:
  class: io.tarantool.connector.KafkaSinkConnector
  key.converter: org.apache.kafka.connect.json.JsonConverter
  key.converter.schemas.enable: true
  key.converter.schemas.cache.size: 1000
  key.converter.decimal.format: BASE64
  key.converter.replace.null.with.default: true
  value.converter: org.apache.kafka.connect.json.JsonConverter
  value.converter.schemas.enable: true
  value.converter.schemas.cache.size: 1000
  value.converter.decimal.format: BASE64
  value.converter.replace.null.with.default: true
  bootstrap.servers: broker-1:9092,broker-2:9092
  producer:
    batch.size: 512
  admin:
    bootstrap.servers: broker-1:9092,broker-2:9092
  topic.creation.default:
    partitions: 1
    replication.factor: 1
Found what you were looking for?
Feedback