Запросы к данным через Kafka-коннектор
В руководстве пошагово демонстрируется пример работы TDG c Kafka - от настройки коннектора до выполнения обмена данными. Пример реализует следующую логику:
- получение объекта из топика (topic) Kafka;
- изменение обработчиком объекта, полученного в TDG;
- сохранение измененного объекта в хранилище;
- отправка измененного объекта в тот же топик.
Для выполнения примера требуются:
- настроенный кластер TDG;
- модель данных, сохраненная в файле
model.avsc. В примере используется модель из раздела по настройке модели данных; - установленные Python и pip;
- установленный и запущенный с помощью ZooKeeper сервер Kafka.
Руководство включает в себя следующие шаги:
Коннектор в TDG можно настроить двумя способами:
- указать параметры коннектора в файле конфигурации
.yml; - добавить или изменить коннектор, используя веб-интерфейс (только для input-коннекторов). Чтобы узнать больше, обратитесь к разделу Вкладка Connectors.
Зададим настройки первым способом, в файле .yml.
Создайте файл конфигурации config.yml
со следующими настройками:
types:__file: model.avscconnector:input:- name: from_kafkatype: kafkabrokers:- localhost:9092topics:- citiesgroup_id: kafkarouting_key: add_kafkaoutput:- name: to_kafkatype: kafkabrokers:- localhost:9092topic: citiesinput_processor:handlers:- key: add_kafkafunction: kafka_input_handler.callstorage:- key: input_keytype: Cityoutput_processor:input_key:handlers:- function: kafka_output_handler.calloutputs:- to_kafka
В файле указываются:
- используемая модель данных;
- секция
connector- настройки input- и output-коннекторов. Разделуinputсоответствует функция consumer, разделуoutput- функция producer. Настройки включают в себя имя коннектора, адрес сервера (брокера), а также название топика (cities), к которому будет обращаться TDG. В разделеinputтакже определен ключ маршрутизацииrouting_keyсо значениемadd_kafka; - секция
input_processor- обработка входящих данных. Здесь заданы ключ для хранилища (input_key), в котором будет сохранен объектCity, а также определен обработчик (kafka_input_handler) для ключа маршрутизацииadd_kafka. Узнать больше про настройкуinput_processorможно в соответствующем разделе справочника; - секция
output_processor- обработка исходящих данных. Здесь определены имя хранилища (input_key) и функция-обработчик (kafka_output_handler). Кроме того, задан параметрoutputs(to_kafka) - это означает, что объект будет отправлен обратно в топик Kafka. Узнать больше про настройкуoutput_processorможно в соответствующем разделе справочника.
Чтобы ознакомиться со всеми доступными параметрами конфигурации для коннектора Kafka, обратитесь к справочнику по настройке коннектора.
Данные в формате JSON, приходящие из Kafka, попадают в обработчики
(handlers), заданные в файле конфигурации в секции input_processor.
В функции обработчика можно модифицировать поступившую информации, а
также обернуть данные в JSON с ключом routing_key для дальнейшей
обработки.
В файле kafka_input_handler.lua укажите функцию, которая будет
запускаться в input-процессоре. Функция увеличит значение population и
задаст ключу routing_key значение input_key:
#!/usr/bin/env tarantoolreturn {call = function(params)params.obj.population = params.obj.population + 1params.routing_key = "input_key"return paramsend}
В разделе output указывается, как объект будет изменен в обработчике
перед отправкой во внешние системы. Обработка выполняется после
успешного сохранения объектов на экземплярах с ролью storage.
Чтобы обработать объект в output_processor, в файле конфигурации
укажите название хранилища (input_key), в котором был сохранен объект,
а затем определите обработчик для него (секция handlers).
Создайте файл kafka_output_handler.lua.
В нем будет записана функция, вызываемая output-обработчиком. Функция
вернет объект City:
#!/usr/bin/env tarantoolreturn {call = function(params)return {obj = params.obj}end}
Чтобы выполнить пример, нужно загрузить архив с моделью данных, файлом конфигурации и функциями обработчиков (handlers) в TDG:
- Поместите файлы со скриптами обработчиков
(
kafka_input_handler.luaиkafka_output_handler.lua) в папкуsrc. - Упакуйте в zip-архив:
- папку
src, внутри которой лежат файлы со скриптами обработчиков; - модель данных
model.avsc; - файл конфигурации
config.yml.
- папку
- Загрузите архив в TDG согласно инструкции.
На сервере (брокере) Kafka создайте новый
топик с
именем cities. Для демонстрации работы примера и просмотра переданных
в топик сообщений напишем простой скрипт на языке Python. Скрипт сыграет
роль consumer Kafka, который получает сообщения из топика cities на
брокере localhost:9092.
Чтобы работать с Kafka средствами Python, установите модуль
kafka-python:
pip install kafka-python
Для запуска чтения сообщений, приходящих из топика cities, подготовьте
следующий скрипт на языке Python:
from kafka import KafkaConsumerconsumer = KafkaConsumer('cities')for message in consumer:print (message)
Чтобы выполнить скрипт, используйте интерактивный режим интерпретатора
или сохраните функцию в файл consumer.py, а затем запустите ее командой python consumer.py.
Оставьте работать запущенный consumer.py, а затем переключитесь на
новую вкладку консоли.
Чтобы запустить отправку сообщений, отправьте в Kafka JSON-объект типа
City с полем population. Для этого подготовьте следующий скрипт на
языке Python:
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092')headers = [("Header Key", b"Header Value")]producer.send('cities', value='{"title": "Moscow", "population": 12655050}'.encode('ascii'), headers=headers)producer.flush()
Скрипт содержит подключение к Kafka в качестве producer и отправку
простого JSON-объекта: {"title": "Moscow", "population": 12655050}.
Чтобы выполнить скрипт, используйте интерактивный режим интерпретатора
или сохраните функцию в файл producer.py, а затем запустите ее командой python producer.py.
В результате TDG получит объект City из топика cities,
обработает его (увеличится значение поля population) и сохранит, а
затем отправит объект в тот же топик Kafka. Это вызовет повторное
получение, обработку и отправку. Пока пример запущен, во вкладке
терминала consumer будут появляться все новые сообщения с постоянно
возрастающим значением поля population.
Чтобы остановить обработку объектов, выполните одно из действий ниже:
- Выключите Kafka или TDG.
- Загрузите в TDG новую конфигурацию.