3.1.6. Взаимодействие с kafka¶
Рассмотрим настройку взаимодействия TDG по протоколу Kafka.
TDG может быть настроен как в качестве consumer
(роль connector, секция input), так и в качестве
producer (роль connector, секция output) в терминологии Kafka.
В целях простой демонстрации возможности взаимодействия с Kafka, в данном примере мы обеспечим получение объекта из заданной темы (topic), его небольшую модификацию и отправку обратно в ту же тему. В результате можно будет наблюдать рекурсивную переотправку модифицируемых объектов.
Для завершения рекурсивной обработки необходимо будет выключить Kafka или TDG, либо загрузить в TDG новую корректную конфигурацию без рекурсивной отправки с получением.
Для выполнения примера нам понадобится кластер TDG, настроенный ранее.
3.1.6.1. Конфигурация¶
Создайте файл config.yml и измените его так, чтобы он содержал следующий
текст:
types:
__file: model.avsc
functions:
kafka_hlr: {__file: kafka_handler.lua}
pipelines:
kafka_handler:
- kafka_hlr
connector:
input:
- name: from_kafka
type: kafka
brokers:
- localhost:9092
topics:
- items
group_id: kafka
pipeline: kafka_handler
routing:
- key: input_key
output: to_kafka
output:
- name: to_kafka
type: kafka
brokers:
- localhost:9092
topic: items
Обратите внимание, в разделе connector имеется описание входа с типом kafka,
реализующего роль consumer в терминологии Kafka, для которого указаны такие
параметры как:
name— имя коннектора;type— тип коннектора;brokers— адреса брокеров Kafka;topics— выбор тем (топиков) для подписки;group_id— consumer group id в терминологии Kafka;pipeline— указание на конвейер (pipeline), в который будет передана обработка получаемых сообщений.
В разделе output появилось описание исходящего маршрута с типом kafka,
реализующего роль producer в терминологии Kafka, для которого указаны
следующие параметры:
name— имя исходящего маршрута;type— тип исходящего маршрута;brokers— адреса брокеров Kafka;topic— в какой теме (topic) публиковать информацию.
Объекты в данном примере не сохраняются локально, поэтому модель данных может
быть пустой. Создайте пустой файл model.avsc.
Вся обработка данных ведется конвейером kafka_handler в файле
kafka_handler.lua.
Создайте файл kafka_handler.lua и наполните его следующим содержанием.
#!/usr/bin/env tarantool
local param = ...
param.version = param.version + 1
local ret = {obj = param, priority = 1, routing_key = 'input_key'}
return ret
Содержимое этого файла — скрипт обработки данных, который получает на вход JSON,
поступивший из Kafka. В этом файле можно выполнить предварительную обработку
поступившей информации, а также обернуть данные в JSON с ключом routing_key
для дальнейшей обработки
(см. подробнее).
Важно
Kafka коннектор в TDG поддерживает только валидные JSON объекты.
Сейчас в данном файле приведен пример простейшей модификации (увеличение числа,
содержащегося в поле version, которое было получено из Kafka JSON) и оборачивания
данного модифицированного JSON в формат для дальнейшей обработки с ключом
routing_key равным input_key.
Закончив с подготовкой файлов, упакуйте их в zip-архив и загрузите его согласно инструкции.
3.1.6.2. Описание процесса обработки запроса¶
Логика обработки данного примера изложена в файле конфигурации config.yml
и состоит в следующем:
Согласно разделу
connector/inputфайла конфигурацииconfig.yml, выполняется подключение в качествеconsumerк теме (topic)itemsc параметром group id равнымkafkaна брокере Kafka, расположенном по указанному адресу. Все получаемые сообщения передаются для обработки в конвейерkafka_handler;Конвейер
kafka_handlerвызывает функциюkafka_hlr, описанную в файлеkafka_handler.lua, которая упаковывает поступивший объект с ключомrouting_keyравным строкеinput_key;Согласно разделу
connector/routingфайла конфигурацииconfig.yml, все объекты с ключомinput_keyпередаются по исходящему маршрутуto_kafka;В секции
outputдля разделаconnectorуказана единственная записьto_kafka, которая описывает отправку запроса в качестве Kafka producer.
Обратите внимание, что сохранения информации в системе не происходит. Обработка объекта прекращается с отправкой его во внешнюю систему, поэтому в ремонтную очередь ничего не добавляется.
3.1.6.3. Запуск рекурсивной отправки Kafka¶
Для выполнения действий с Kafka необходимо установить и запустить сервер
Kafka, а также создать в нем тему (topic) с именем items. Для этого следуйте
инструкции из официальной документации по Kafka (http://kafka.apache.org/quickstart), изменив название
темы на items.
В целях наглядной демонстрации работы примера и просмотра сообщений, передаваемых
в выбранную тему (topic) Kafka, используйте любой consumer Kafka, подключенный
к тому же брокеру и теме items.
Далее мы рассмотрим использование модуля kafka-python, установить который
можно командой pip install kafka-python. Тогда для просмотра сообщений Kafka
выполните следующий скрипт на языке Python (используя интерактивный режим
интерпретатора или сохранив его в файл consumer.py и запустив командой
python consumer.py):
from kafka import KafkaConsumer
consumer = KafkaConsumer('items')
for message in consumer:
print (message)
Примечание
Для запуска скриптов потребуется Python версии 3 и выше, запуск которого в
вашей системе может выполняться командой python3.
Для начала демонстрации примера необходимо отправить в Kafka валидный JSON объект
с полем version. Откройте новую консоль (оставив работать consumer,
запущенный ранее) и выполните следующий скрипт на языке Python (используя
интерактивный режим интерпретатора или сохранив в файл его в файл producer.py
и запустив командой python producer.py):
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('items', b'{"id": "tarantool", "version": 0}')
Данный скрипт содержит подключение к Kafka в качестве producer и отправку
простого JSON объекта: {"id": "tarantool", "version": 0}.
В случае успеха TDG получит данный объект, обработает (увеличив
значение поля «version») и отправит его в ту же тему Kafka, вызывая повторное
получение, обработку, отправку и так далее. В окне с consumer при этом
будут появляться все новые сообщения с постоянно возрастающим значением
поля version.
Примечание
Для остановки рекурсивной отправки сообщений из TDG вам придется загрузить другую корректную конфигурацию в TDG, либо отключить TDG или Kafka.