Запросы к данным через Kafka-коннектор¶
В руководстве пошагово демонстрируется пример работы TDG c Kafka – от настройки коннектора до выполнения обмена данными. Пример реализует следующую логику:
получение объекта из топика (topic) Kafka;
изменение обработчиком объекта, полученного в TDG;
сохранение измененного объекта в хранилище;
отправка измененного объекта в тот же топик.
Для выполнения примера требуются:
модель данных, сохраненная в файле
model.avsc. В примере используется модель из раздела по настройке модели данных;установленный и запущенный с помощью ZooKeeper сервер Kafka.
Руководство включает в себя следующие шаги:
Настройка коннектора¶
Коннектор в TDG можно настроить двумя способами:
указать параметры коннектора в файле конфигурации
.yml;добавить или изменить коннектор, используя веб-интерфейс (только для input-коннекторов). Чтобы узнать больше, обратитесь к разделу Вкладка Connectors.
Зададим настройки первым способом, в файле .yml.
Создайте файл конфигурации config.yml со следующими настройками:
types:
__file: model.avsc
connector:
input:
- name: from_kafka
type: kafka
brokers:
- localhost:9092
topics:
- cities
group_id: kafka
routing_key: add_kafka
output:
- name: to_kafka
type: kafka
brokers:
- localhost:9092
topic: cities
input_processor:
handlers:
- key: add_kafka
function: kafka_input_handler.call
storage:
- key: input_key
type: City
output_processor:
input_key:
handlers:
- function: kafka_output_handler.call
outputs:
- to_kafka
В файле указываются:
используемая модель данных;
секция
connector– настройки input- и output-коннекторов. Разделуinputсоответствует функция consumer, разделуoutput– функция producer. Настройки включают в себя имя коннектора, адрес сервера (брокера), а также название топика (cities), к которому будет обращаться TDG. В разделеinputтакже определен ключ маршрутизацииrouting_keyсо значениемadd_kafka;секция
input_processor– обработка входящих данных. Здесь заданы ключ для хранилища (input_key), в котором будет сохранен объектCity, а также определен обработчик (kafka_input_handler) для ключа маршрутизацииadd_kafka. Узнать больше про настройкуinput_processorможно в соответствующем разделе справочника;секция
output_processor– обработка исходящих данных. Здесь определены имя хранилища (input_key) и функция-обработчик (kafka_output_handler). Кроме того, задан параметрoutputs(to_kafka) – это означает, что объект будет отправлен обратно в топик Kafka. Узнать больше про настройкуoutput_processorможно в соответствующем разделе справочника.
Чтобы ознакомиться со всеми доступными параметрами конфигурации для коннектора Kafka, обратитесь к справочнику по настройке коннектора.
Реализация обработчиков¶
Обработка входящих данных¶
Данные в формате JSON, приходящие из Kafka, попадают в обработчики (handlers), заданные в файле
конфигурации в секции input_processor.
В функции обработчика можно модифицировать поступившую информации, а также обернуть данные
в JSON с ключом routing_key для дальнейшей обработки.
В файле kafka_input_handler.lua укажите функцию, которая будет запускаться в input-процессоре.
Функция увеличит значение population и задаст ключу routing_key значение input_key:
#!/usr/bin/env tarantool
return {
call = function(params)
params.obj.population = params.obj.population + 1
params.routing_key = "input_key"
return params
end
}
Обработка исходящих данных¶
В разделе output указывается, как объект будет изменен в обработчике перед отправкой во внешние системы.
Обработка выполняется после успешного сохранения объектов на экземплярах с ролью storage.
Чтобы обработать объект в output_processor, в файле конфигурации укажите название хранилища (input_key),
в котором был сохранен объект, а затем определите обработчик для него (секция handlers).
Создайте файл kafka_output_handler.lua. В нем будет записана функция, вызываемая output-обработчиком.
Функция вернет объект City:
#!/usr/bin/env tarantool
return {
call = function(params)
return {obj = params.obj}
end
}
Загрузка конфигурации¶
Чтобы выполнить пример, нужно загрузить архив с моделью данных, файлом конфигурации и функциями обработчиков (handlers) в TDG:
Поместите файлы со скриптами обработчиков (
kafka_input_handler.luaиkafka_output_handler.lua) в папкуsrc.Упакуйте в zip-архив:
папку
src, внутри которой лежат файлы со скриптами обработчиков;модель данных
model.avsc;файл конфигурации
config.yml.
Загрузите архив в TDG согласно инструкции.
Запуск и настройка Kafka¶
На сервере (брокере) Kafka создайте новый топик
с именем cities.
Для демонстрации работы примера и просмотра переданных в топик сообщений напишем простой скрипт на языке Python.
Скрипт сыграет роль consumer Kafka, который получает сообщения из топика cities на брокере localhost:9092.
Чтобы работать с Kafka средствами Python, установите модуль kafka-python:
pip install kafka-python
Для запуска чтения сообщений, приходящих из топика cities, подготовьте следующий скрипт на языке Python:
from kafka import KafkaConsumer
consumer = KafkaConsumer('cities')
for message in consumer:
print (message)
Чтобы выполнить скрипт, используйте интерактивный режим интерпретатора или сохраните функцию в файл consumer.py,
а затем запустите ее командой python consumer.py.
Оставьте работать запущенный consumer.py, а затем переключитесь на новую вкладку консоли.
Запуск обработки объектов¶
Чтобы запустить отправку сообщений, отправьте в Kafka JSON-объект типа City с полем population.
Для этого подготовьте следующий скрипт на языке Python:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
headers = [("Header Key", b"Header Value")]
producer.send('cities', value='{"title": "Moscow", "population": 12655050}'.encode('ascii'), headers=headers)
producer.flush()
Скрипт содержит подключение к Kafka в качестве producer и отправку простого JSON-объекта:
{"title": "Moscow", "population": 12655050}.
Чтобы выполнить скрипт, используйте интерактивный режим интерпретатора или сохраните функцию в файл producer.py,
а затем запустите ее командой python producer.py.
В результате TDG получит объект City из топика cities, обработает его (увеличится значение поля
population) и сохранит, а затем отправит объект в тот же топик Kafka.
Это вызовет повторное получение, обработку и отправку.
Пока пример запущен, во вкладке терминала consumer будут появляться все новые сообщения с постоянно возрастающим
значением поля population.
Чтобы остановить обработку объектов, выполните одно из действий ниже:
Выключите Kafka или TDG.
Загрузите в TDG новую конфигурацию.