Запросы к данным через Kafka-коннектор¶
В руководстве пошагово демонстрируется пример работы TDG c Kafka – от настройки коннектора до выполнения обмена данными. Пример реализует следующую логику:
получение объекта из топика (topic) Kafka;
изменение обработчиком объекта, полученного в TDG;
сохранение измененного объекта в хранилище;
отправка измененного объекта в тот же топик.
Для выполнения примера требуются:
модель данных, сохраненная в файле
model.avsc
. В примере используется модель из раздела по настройке модели данных;установленный и запущенный с помощью ZooKeeper сервер Kafka.
Руководство включает в себя следующие шаги:
Настройка коннектора¶
Коннектор в TDG можно настроить двумя способами:
указать параметры коннектора в файле конфигурации
.yml
;добавить или изменить коннектор, используя веб-интерфейс (только для input-коннекторов). Чтобы узнать больше, обратитесь к разделу Вкладка Connectors.
Зададим настройки первым способом, в файле .yml
.
Создайте файл конфигурации config.yml
со следующими настройками:
types:
__file: model.avsc
connector:
input:
- name: from_kafka
type: kafka
brokers:
- localhost:9092
topics:
- cities
group_id: kafka
routing_key: add_kafka
output:
- name: to_kafka
type: kafka
brokers:
- localhost:9092
topic: cities
input_processor:
handlers:
- key: add_kafka
function: kafka_input_handler.call
storage:
- key: input_key
type: City
output_processor:
input_key:
handlers:
- function: kafka_output_handler.call
outputs:
- to_kafka
В файле указываются:
используемая модель данных;
секция
connector
– настройки input- и output-коннекторов. Разделуinput
соответствует функция consumer, разделуoutput
– функция producer. Настройки включают в себя имя коннектора, адрес сервера (брокера), а также название топика (cities
), к которому будет обращаться TDG. В разделеinput
также определен ключ маршрутизацииrouting_key
со значениемadd_kafka
;секция
input_processor
– обработка входящих данных. Здесь заданы ключ для хранилища (input_key
), в котором будет сохранен объектCity
, а также определен обработчик (kafka_input_handler
) для ключа маршрутизацииadd_kafka
. Узнать больше про настройкуinput_processor
можно в соответствующем разделе справочника;секция
output_processor
– обработка исходящих данных. Здесь определены имя хранилища (input_key
) и функция-обработчик (kafka_output_handler
). Кроме того, задан параметрoutputs
(to_kafka
) – это означает, что объект будет отправлен обратно в топик Kafka. Узнать больше про настройкуoutput_processor
можно в соответствующем разделе справочника.
Чтобы ознакомиться со всеми доступными параметрами конфигурации для коннектора Kafka, обратитесь к справочнику по настройке коннектора.
Реализация обработчиков¶
Обработка входящих данных¶
Данные в формате JSON, приходящие из Kafka, попадают в обработчики (handlers), заданные в файле
конфигурации в секции input_processor
.
В функции обработчика можно модифицировать поступившую информации, а также обернуть данные
в JSON с ключом routing_key
для дальнейшей обработки.
В файле kafka_input_handler.lua
укажите функцию, которая будет запускаться в input-процессоре.
Функция увеличит значение population
и задаст ключу routing_key
значение input_key
:
#!/usr/bin/env tarantool
return {
call = function(params)
params.obj.population = params.obj.population + 1
params.routing_key = "input_key"
return params
end
}
Обработка исходящих данных¶
В разделе output
указывается, как объект будет изменен в обработчике перед отправкой во внешние системы.
Обработка выполняется после успешного сохранения объектов на экземплярах с ролью storage
.
Чтобы обработать объект в output_processor
, в файле конфигурации укажите название хранилища (input_key
),
в котором был сохранен объект, а затем определите обработчик для него (секция handlers
).
Создайте файл kafka_output_handler.lua
. В нем будет записана функция, вызываемая output-обработчиком.
Функция вернет объект City
:
#!/usr/bin/env tarantool
return {
call = function(params)
return {obj = params.obj}
end
}
Загрузка конфигурации¶
Чтобы выполнить пример, нужно загрузить архив с моделью данных, файлом конфигурации и функциями обработчиков (handlers) в TDG:
Поместите файлы со скриптами обработчиков (
kafka_input_handler.lua
иkafka_output_handler.lua
) в папкуsrc
.Упакуйте в zip-архив:
папку
src
, внутри которой лежат файлы со скриптами обработчиков;модель данных
model.avsc
;файл конфигурации
config.yml
.
Загрузите архив в TDG согласно инструкции.
Запуск и настройка Kafka¶
На сервере (брокере) Kafka создайте новый топик
с именем cities
.
Для демонстрации работы примера и просмотра переданных в топик сообщений напишем простой скрипт на языке Python.
Скрипт сыграет роль consumer Kafka, который получает сообщения из топика cities
на брокере localhost:9092
.
Чтобы работать с Kafka средствами Python, установите модуль kafka-python
:
pip install kafka-python
Для запуска чтения сообщений, приходящих из топика cities
, подготовьте следующий скрипт на языке Python:
from kafka import KafkaConsumer
consumer = KafkaConsumer('cities')
for message in consumer:
print (message)
Чтобы выполнить скрипт, используйте интерактивный режим интерпретатора или сохраните функцию в файл consumer.py
,
а затем запустите ее командой python consumer.py
.
Оставьте работать запущенный consumer.py
, а затем переключитесь на новую вкладку консоли.
Запуск обработки объектов¶
Чтобы запустить отправку сообщений, отправьте в Kafka JSON-объект типа City
с полем population
.
Для этого подготовьте следующий скрипт на языке Python:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
headers = [("Header Key", b"Header Value")]
producer.send('cities', value='{"title": "Moscow", "population": 12655050}'.encode('ascii'), headers=headers)
producer.flush()
Скрипт содержит подключение к Kafka в качестве producer и отправку простого JSON-объекта:
{"title": "Moscow", "population": 12655050}
.
Чтобы выполнить скрипт, используйте интерактивный режим интерпретатора или сохраните функцию в файл producer.py
,
а затем запустите ее командой python producer.py
.
В результате TDG получит объект City
из топика cities
, обработает его (увеличится значение поля
population
) и сохранит, а затем отправит объект в тот же топик Kafka.
Это вызовет повторное получение, обработку и отправку.
Пока пример запущен, во вкладке терминала consumer
будут появляться все новые сообщения с постоянно возрастающим
значением поля population
.
Чтобы остановить обработку объектов, выполните одно из действий ниже:
Выключите Kafka или TDG.
Загрузите в TDG новую конфигурацию.