Запросы к данным через Kafka-коннектор | Tdg

Версия:

2.x
Руководство разработчика Взаимодействие с Kafka Запросы к данным через Kafka-коннектор

Запросы к данным через Kafka-коннектор

В руководстве пошагово демонстрируется пример работы TDG c Kafka – от настройки коннектора до выполнения обмена данными. Пример реализует следующую логику:

  • получение объекта из топика (topic) Kafka;

  • изменение обработчиком объекта, полученного в TDG;

  • сохранение измененного объекта в хранилище;

  • отправка измененного объекта в тот же топик.

Для выполнения примера требуются:

Руководство включает в себя следующие шаги:

Настройка коннектора

Коннектор в TDG можно настроить двумя способами:

  • указать параметры коннектора в файле конфигурации .yml;

  • добавить или изменить коннектор, используя веб-интерфейс (только для input-коннекторов). Чтобы узнать больше, обратитесь к разделу Вкладка Connectors.

Зададим настройки первым способом, в файле .yml.

Создайте файл конфигурации config.yml со следующими настройками:

types:
  __file: model.avsc

connector:
  input:
    - name: from_kafka
      type: kafka
      brokers:
        - localhost:9092
      topics:
        - cities
      group_id: kafka
      routing_key: add_kafka

  output:
    - name: to_kafka
      type: kafka
      brokers:
        - localhost:9092
      topic: cities

input_processor:
  handlers:
    - key: add_kafka
      function: kafka_input_handler.call
  storage:
     - key: input_key
       type: City

output_processor:
  input_key:
    handlers:
      - function: kafka_output_handler.call
        outputs:
          - to_kafka

В файле указываются:

  • используемая модель данных;

  • секция connector – настройки input- и output-коннекторов. Разделу input соответствует функция consumer, разделу output – функция producer. Настройки включают в себя имя коннектора, адрес сервера (брокера), а также название топика (cities), к которому будет обращаться TDG. В разделе input также определен ключ маршрутизации routing_key со значением add_kafka;

  • секция input_processor – обработка входящих данных. Здесь заданы ключ для хранилища (input_key), в котором будет сохранен объект City, а также определен обработчик (kafka_input_handler) для ключа маршрутизации add_kafka. Узнать больше про настройку input_processor можно в соответствующем разделе справочника;

  • секция output_processor – обработка исходящих данных. Здесь определены имя хранилища (input_key) и функция-обработчик (kafka_output_handler). Кроме того, задан параметр outputs (to_kafka) – это означает, что объект будет отправлен обратно в топик Kafka. Узнать больше про настройку output_processor можно в соответствующем разделе справочника.

Чтобы ознакомиться со всеми доступными параметрами конфигурации для коннектора Kafka, обратитесь к справочнику по настройке коннектора.

Реализация обработчиков

Обработка входящих данных

Данные в формате JSON, приходящие из Kafka, попадают в обработчики (handlers), заданные в файле конфигурации в секции input_processor.

В функции обработчика можно модифицировать поступившую информации, а также обернуть данные в JSON с ключом routing_key для дальнейшей обработки.

В файле kafka_input_handler.lua укажите функцию, которая будет запускаться в input-процессоре. Функция увеличит значение population и задаст ключу routing_key значение input_key:

#!/usr/bin/env tarantool

return {
    call = function(params)
        params.obj.population = params.obj.population + 1
        params.routing_key = "input_key"
        return params
    end
}

Обработка исходящих данных

В разделе output указывается, как объект будет изменен в обработчике перед отправкой во внешние системы. Обработка выполняется после успешного сохранения объектов на экземплярах с ролью storage.

Чтобы обработать объект в output_processor, в файле конфигурации укажите название хранилища (input_key), в котором был сохранен объект, а затем определите обработчик для него (секция handlers).

Создайте файл kafka_output_handler.lua. В нем будет записана функция, вызываемая output-обработчиком. Функция вернет объект City:

#!/usr/bin/env tarantool

return {
    call = function(params)
        return {obj = params.obj}
    end
}

Загрузка конфигурации

Чтобы выполнить пример, нужно загрузить архив с моделью данных, файлом конфигурации и функциями обработчиков (handlers) в TDG:

  1. Поместите файлы со скриптами обработчиков (kafka_input_handler.lua и kafka_output_handler.lua) в папку src.

  2. Упакуйте в zip-архив:

    • папку src, внутри которой лежат файлы со скриптами обработчиков;

    • модель данных model.avsc;

    • файл конфигурации config.yml.

  3. Загрузите архив в TDG согласно инструкции.

Запуск и настройка Kafka

На сервере (брокере) Kafka создайте новый топик с именем cities. Для демонстрации работы примера и просмотра переданных в топик сообщений напишем простой скрипт на языке Python. Скрипт сыграет роль consumer Kafka, который получает сообщения из топика cities на брокере localhost:9092.

Чтобы работать с Kafka средствами Python, установите модуль kafka-python:

pip install kafka-python

Для запуска чтения сообщений, приходящих из топика cities, подготовьте следующий скрипт на языке Python:

from kafka import KafkaConsumer
consumer = KafkaConsumer('cities')
for message in consumer:
    print (message)

Чтобы выполнить скрипт, используйте интерактивный режим интерпретатора или сохраните функцию в файл consumer.py, а затем запустите ее командой python consumer.py.

Оставьте работать запущенный consumer.py, а затем переключитесь на новую вкладку консоли.

Запуск обработки объектов

Чтобы запустить отправку сообщений, отправьте в Kafka JSON-объект типа City с полем population. Для этого подготовьте следующий скрипт на языке Python:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
headers = [("Header Key", b"Header Value")]
producer.send('cities', value='{"title": "Moscow", "population": 12655050}'.encode('ascii'), headers=headers)
producer.flush()

Скрипт содержит подключение к Kafka в качестве producer и отправку простого JSON-объекта: {"title": "Moscow", "population": 12655050}.

Чтобы выполнить скрипт, используйте интерактивный режим интерпретатора или сохраните функцию в файл producer.py, а затем запустите ее командой python producer.py.

В результате TDG получит объект City из топика cities, обработает его (увеличится значение поля population) и сохранит, а затем отправит объект в тот же топик Kafka. Это вызовет повторное получение, обработку и отправку. Пока пример запущен, во вкладке терминала consumer будут появляться все новые сообщения с постоянно возрастающим значением поля population.

Чтобы остановить обработку объектов, выполните одно из действий ниже:

  • Выключите Kafka или TDG.

  • Загрузите в TDG новую конфигурацию.

Нашли ответ на свой вопрос?
Обратная связь