3.1.6. Взаимодействие с kafka¶
Рассмотрим настройку взаимодействия TDG по протоколу Kafka.
TDG может быть настроен как в качестве consumer
(роль connector
, секция input
), так и в качестве
producer (роль connector
, секция output
) в терминологии Kafka.
В целях простой демонстрации возможности взаимодействия с Kafka, в данном примере мы обеспечим получение объекта из заданной темы (topic), его небольшую модификацию и отправку обратно в ту же тему. В результате можно будет наблюдать рекурсивную переотправку модифицируемых объектов.
Для завершения рекурсивной обработки необходимо будет выключить Kafka или TDG, либо загрузить в TDG новую корректную конфигурацию без рекурсивной отправки с получением.
Для выполнения примера нам понадобится кластер TDG, настроенный ранее.
3.1.6.1. Конфигурация¶
Создайте файл config.yml
и измените его так, чтобы он содержал следующий
текст:
types:
__file: model.avsc
functions:
kafka_hlr: {__file: kafka_handler.lua}
pipelines:
kafka_handler:
- kafka_hlr
connector:
input:
- name: from_kafka
type: kafka
brokers:
- localhost:9092
topics:
- items
group_id: kafka
pipeline: kafka_handler
routing:
- key: input_key
output: to_kafka
output:
- name: to_kafka
type: kafka
brokers:
- localhost:9092
topic: items
Обратите внимание, в разделе connector
имеется описание входа с типом kafka,
реализующего роль consumer
в терминологии Kafka, для которого указаны такие
параметры как:
name
— имя коннектора;type
— тип коннектора;brokers
— адреса брокеров Kafka;topics
— выбор тем (топиков) для подписки;group_id
— consumer group id в терминологии Kafka;pipeline
— указание на конвейер (pipeline), в который будет передана обработка получаемых сообщений.
В разделе output
появилось описание исходящего маршрута с типом kafka,
реализующего роль producer
в терминологии Kafka, для которого указаны
следующие параметры:
name
— имя исходящего маршрута;type
— тип исходящего маршрута;brokers
— адреса брокеров Kafka;topic
— в какой теме (topic) публиковать информацию.
Объекты в данном примере не сохраняются локально, поэтому модель данных может
быть пустой. Создайте пустой файл model.avsc
.
Вся обработка данных ведется конвейером kafka_handler
в файле
kafka_handler.lua
.
Создайте файл kafka_handler.lua
и наполните его следующим содержанием.
#!/usr/bin/env tarantool
local param = ...
param.version = param.version + 1
local ret = {obj = param, priority = 1, routing_key = 'input_key'}
return ret
Содержимое этого файла — скрипт обработки данных, который получает на вход JSON,
поступивший из Kafka. В этом файле можно выполнить предварительную обработку
поступившей информации, а также обернуть данные в JSON с ключом routing_key
для дальнейшей обработки
(см. подробнее).
Важно
Kafka коннектор в TDG поддерживает только валидные JSON объекты.
Сейчас в данном файле приведен пример простейшей модификации (увеличение числа,
содержащегося в поле version
, которое было получено из Kafka JSON) и оборачивания
данного модифицированного JSON в формат для дальнейшей обработки с ключом
routing_key
равным input_key
.
Закончив с подготовкой файлов, упакуйте их в zip-архив и загрузите его согласно инструкции.
3.1.6.2. Описание процесса обработки запроса¶
Логика обработки данного примера изложена в файле конфигурации config.yml
и состоит в следующем:
Согласно разделу
connector
/input
файла конфигурацииconfig.yml
, выполняется подключение в качествеconsumer
к теме (topic)items
c параметром group id равнымkafka
на брокере Kafka, расположенном по указанному адресу. Все получаемые сообщения передаются для обработки в конвейерkafka_handler
;Конвейер
kafka_handler
вызывает функциюkafka_hlr
, описанную в файлеkafka_handler.lua
, которая упаковывает поступивший объект с ключомrouting_key
равным строкеinput_key
;Согласно разделу
connector
/routing
файла конфигурацииconfig.yml
, все объекты с ключомinput_key
передаются по исходящему маршрутуto_kafka
;В секции
output
для разделаconnector
указана единственная записьto_kafka
, которая описывает отправку запроса в качестве Kafka producer.
Обратите внимание, что сохранения информации в системе не происходит. Обработка объекта прекращается с отправкой его во внешнюю систему, поэтому в ремонтную очередь ничего не добавляется.
3.1.6.3. Запуск рекурсивной отправки Kafka¶
Для выполнения действий с Kafka необходимо установить и запустить сервер
Kafka, а также создать в нем тему (topic) с именем items
. Для этого следуйте
инструкции из официальной документации по Kafka (http://kafka.apache.org/quickstart
), изменив название
темы на items
.
В целях наглядной демонстрации работы примера и просмотра сообщений, передаваемых
в выбранную тему (topic) Kafka, используйте любой consumer
Kafka, подключенный
к тому же брокеру и теме items
.
Далее мы рассмотрим использование модуля kafka-python
, установить который
можно командой pip install kafka-python
. Тогда для просмотра сообщений Kafka
выполните следующий скрипт на языке Python (используя интерактивный режим
интерпретатора или сохранив его в файл consumer.py
и запустив командой
python consumer.py
):
from kafka import KafkaConsumer
consumer = KafkaConsumer('items')
for message in consumer:
print (message)
Примечание
Для запуска скриптов потребуется Python версии 3 и выше, запуск которого в
вашей системе может выполняться командой python3
.
Для начала демонстрации примера необходимо отправить в Kafka валидный JSON объект
с полем version
. Откройте новую консоль (оставив работать consumer
,
запущенный ранее) и выполните следующий скрипт на языке Python (используя
интерактивный режим интерпретатора или сохранив в файл его в файл producer.py
и запустив командой python producer.py
):
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('items', b'{"id": "tarantool", "version": 0}')
Данный скрипт содержит подключение к Kafka в качестве producer
и отправку
простого JSON объекта: {"id": "tarantool", "version": 0}
.
В случае успеха TDG получит данный объект, обработает (увеличив
значение поля «version») и отправит его в ту же тему Kafka, вызывая повторное
получение, обработку, отправку и так далее. В окне с consumer
при этом
будут появляться все новые сообщения с постоянно возрастающим значением
поля version
.
Примечание
Для остановки рекурсивной отправки сообщений из TDG вам придется загрузить другую корректную конфигурацию в TDG, либо отключить TDG или Kafka.