Top.Mail.Ru
Tdg » 1.6 » 3. Запросы из внешних систем » 3.1. Запросы к данным » 3.1.6. Взаимодействие с kafka
 

3.1.6. Взаимодействие с kafka

3.1.6. Взаимодействие с kafka

Рассмотрим настройку взаимодействия TDG по протоколу Kafka.

TDG может быть настроен как в качестве consumer (роль connector, секция input), так и в качестве producer (роль connector, секция output) в терминологии Kafka.

В целях простой демонстрации возможности взаимодействия с Kafka, в данном примере мы обеспечим получение объекта из заданной темы (topic), его небольшую модификацию и отправку обратно в ту же тему. В результате можно будет наблюдать рекурсивную переотправку модифицируемых объектов.

Для завершения рекурсивной обработки необходимо будет выключить Kafka или TDG, либо загрузить в TDG новую корректную конфигурацию без рекурсивной отправки с получением.

Для выполнения примера нам понадобится кластер TDG, настроенный ранее.

3.1.6.1. Конфигурация

Создайте файл config.yml и измените его так, чтобы он содержал следующий текст:

types:
  __file: model.avsc

functions:
  kafka_hlr: {__file: kafka_handler.lua}

pipelines:
  kafka_handler:
    - kafka_hlr

connector:
  input:
    - name: from_kafka
      type: kafka
      brokers:
        - localhost:9092
      topics:
        - items
      group_id: kafka
      pipeline: kafka_handler

  routing:
    - key: input_key
      output: to_kafka

  output:
    - name: to_kafka
      type: kafka
      brokers:
        - localhost:9092
      topic: items

Обратите внимание, в разделе connector имеется описание входа с типом kafka, реализующего роль consumer в терминологии Kafka, для которого указаны такие параметры как:

  • name – имя коннектора;
  • type – тип коннектора;
  • brokers – адреса брокеров Kafka;
  • topics – выбор тем (топиков) для подписки;
  • group_id – consumer group id в терминологии Kafka;
  • pipeline – указание на конвейер (pipeline), в который будет передана обработка получаемых сообщений.

В разделе output появилось описание исходящего маршрута с типом kafka, реализующего роль producer в терминологии Kafka, для которого указаны следующие параметры:

  • name – имя исходящего маршрута;
  • type – тип исходящего маршрута;
  • brokers – адреса брокеров Kafka;
  • topic – в какой теме (topic) публиковать информацию.

Объекты в данном примере не сохраняются локально, поэтому модель данных может быть пустой. Создайте пустой файл model.avsc.

Вся обработка данных ведется конвейером kafka_handler в файле kafka_handler.lua.

Создайте файл kafka_handler.lua и наполните его следующим содержанием.

#!/usr/bin/env tarantool

local param = ...

param.version = param.version + 1

local ret = {obj = param, priority = 1, routing_key = 'input_key'}

return ret

Содержимое этого файла – скрипт обработки данных, который получает на вход JSON, поступивший из Kafka. В этом файле можно выполнить предварительную обработку поступившей информации, а также обернуть данные в JSON с ключом routing_key для дальнейшей обработки (см. Подробнее).

Важно

Kafka коннектор в TDG поддерживает только валидные JSON объекты.

Сейчас в данном файле приведен пример простейшей модификации (увеличение числа, содержащегося в поле version, которое было получено из Kafka JSON) и оборачивания данного модифицированного JSON в формат для дальнейшей обработки с ключом routing_key равным input_key.

Закончив с подготовкой файлов, упакуйте их в zip-архив и загрузите его согласно инструкции.

3.1.6.2. Описание процесса обработки запроса

Логика обработки данного примера изложена в файле конфигурации config.yml и состоит в следующем:

  • Согласно разделу connector / input файла конфигурации config.yml, выполняется подключение в качестве consumer к теме (topic) items c параметром group id равным kafka на брокере Kafka, расположенном по указанному адресу. Все получаемые сообщения передаются для обработки в конвейер kafka_handler.
  • Конвейер kafka_handler вызывает функцию kafka_hlr, описанную в файле kafka_handler.lua, которая упаковывает поступивший объект с ключом routing_key равным строке input_key.
  • Согласно разделу connector / routing файла конфигурации config.yml, все объекты с ключом input_key передаются по исходящему маршруту to_kafka.
  • В секции output для раздела connector указана единственная запись to_kafka, которая описывает отправку запроса в качестве Kafka producer.

Обратите внимание, что сохранения информации в системе не происходит. Обработка объекта прекращается с отправкой его во внешнюю систему, поэтому в ремонтную очередь ничего не добавляется.

3.1.6.3. Запуск рекурсивной отправки Kafka

Для выполнения действий с Kafka необходимо установить и запустить сервер Kafka, а также создать в нем тему (topic) с именем items. Для этого следуйте официальной инструкции, изменив название темы на items.

В целях наглядной демонстрации работы примера и просмотра сообщений, передаваемых в выбранную тему (topic) Kafka, используйте любой consumer Kafka, подключенный к тому же брокеру и теме items.

Далее мы рассмотрим использование модуля kafka-python, установить который можно командой pip install kafka-python. Тогда для просмотра сообщений Kafka выполните следующий скрипт на языке Python (используя интерактивный режим интерпретатора или сохранив его в файл consumer.py и запустив командой python consumer.py):

from kafka import KafkaConsumer
consumer = KafkaConsumer('items')
for message in consumer:
    print (message)

Примечание

Для запуска скриптов потребуется Python версии 3 и выше, запуск которого в вашей системе может выполняться командой python3.

Для начала демонстрации примера необходимо отправить в Kafka валидный JSON объект с полем version. Откройте новую консоль (оставив работать consumer, запущенный ранее) и выполните следующий скрипт на языке Python (используя интерактивный режим интерпретатора или сохранив в файл его в файл producer.py и запустив командой python producer.py):

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('items', b'{"id": "tarantool", "version": 0}')

Данный скрипт содержит подключение к Kafka в качестве producer и отправку простого JSON объекта: {"id": "tarantool", "version": 0}.

В случае успеха TDG получит данный объект, обработает (увеличив значение поля «version») и отправит его в ту же тему Kafka, вызывая повторное получение, обработку, отправку и так далее. В окне с consumer при этом будут появляться все новые сообщения с постоянно возрастающим значением поля version.

Примечание

Для остановки рекурсивной отправки сообщений из TDG вам придется загрузить другую корректную конфигурацию в TDG, либо отключить TDG или Kafka.