Процессоры входящих и исходящих данных | Tdg

Версия:

2.x
Руководство разработчика Разработка бизнес-логики Процессоры входящих и исходящих данных

Процессоры входящих и исходящих данных

Процессор (processor) – это подсистема, которая задает в TDG логику обработки объектов и автоматически выполняет пользовательский код при поступлении нового объекта или при его отправке через заданный коннектор. Для исполнения пользовательского кода используются обработчики.

Обработчик (handler) – это пользовательская функция, которая вызывается процессором и обрабатывает входящие или исходящие данные. В TDG такие функции представляют собой функции на языке Lua.

Существует два вида процессоров, оба они выполняются на экземплярах TDG с ролью runner:

  • input-процессор (input processor) – процессор входящих данных. Этот процессор обрабатывает объект, поступивший на вход через коннектор, настраивает для него маршрутизацию и правила хранения полученных объектов.

  • output-процессор (output processor) – процессор исходящих данных. Этот процессор готовит объект к отправке во внешнюю систему и передает его в коннектор.

В этом руководстве рассмотрим пример создания input-процессора и обработчика в TDG. В примере на HTTP-коннектор придет новый объект в формате JSON, который после преобразования в Lua-объект будет обработан input-процессором, а затем сохранен в хранилище – на экземплярах с ролью storage.

Для выполнения примера требуются:

Руководство включает в себя следующие шаги:

Конфигурация коннектора

Указать конфигурацию входящего коннектора можно:

  • в общем конфигурационном файле (секция connector в файле config.yml);

  • в отдельном файле connector.yml.

Задайте конфигурацию входящего HTTP-коннектора:

# config.yml

connector:
  input:
    - name: http
      type: http
      routing_key: input_key

При получении нового объекта в формате JSON через HTTP-коннектор:

  1. JSON-объект преобразовывается в Lua-таблицу (объект Lua).

  2. Для объекта генерируется UUID, по которому можно проследить дальнейший путь объекта (например, найти его в ремонтной очереди).

  3. Объектам, пришедшим на коннектор, присваивается определенный ключ маршрутизации (секция routing_key в конфигурации коннектора).

Ключ маршрутизации – это строковый ключ, который задается объекту и определяет для него процесс дальнейшей обработки. На коннекторе такой ключ определяет, в какой обработчик на input-процессоре будет передан объект. Если ключ маршрутизации не задан, объект обрабатывается функцией по умолчанию, которая превращает объект вида { "obj_type": { "field_1": 1 } } в объект вида { "field_1": 1 } и задает значение ключа маршрутизации как routing_key = obj_type. После получения ключа маршрутизации объект передается на input-процессор.

Выше в конфигурации определены имя коннектора http, его тип http и ключ маршрутизации input_key, с которым объект отправится в обработчик, заданный в файле конфигурации в секции input_processor.

Подробная информация о параметрах конфигурации коннектора приведена в справочнике по конфигурации бизнес-логики.

Конфигурация input-процессора

Чтобы задать процессор для входящих данных в TDG, необходимо добавить соответствующую секцию в конфигурацию TDG. Input-процессор можно описывать:

  • в общем конфигурационном файле (секция input processor в файле config.yml);

  • в отдельном файле input_processor.yml.

Конфигурация input-процессора включает такие параметры, как вызываемый обработчик, ключ маршрутизации и система, куда следует направить объект. Полная информация о параметрах конфигурации input-процессора приведена в справочнике по конфигурации бизнес-логики.

Задайте конфигурацию процессора для входящих данных:

# config.yml

input_processor:
  handlers:
    - key: input_key
      function: handle_band.newband
  storage:
    - key: band
      type: MusicBand

Здесь в секции handlers задан ключ маршрутизации input_key. Объекты с таким ключом, пришедшие из HTTP-коннектора на input-процессор, передаются в функцию newband из модуля handle_band.

Кроме того, в секции storage в параметре key задается еще один ключ маршрутизации band. После настройки такого ключа маршрутизации:

  1. TDG определяет по ключу нужный тип объекта из модели данных.

  2. Объект, пришедший на input-процессор, валидируется в соответствии с моделью данных.

  3. Объект преобразовывается в тип из модели данных.

  4. Преобразованный объект сохраняется в хранилище.

Объекты, которым в обработчике handle_band.newband был присвоен ключ band, будут провалидированы по этому ключу в соответствии с моделью данных, преобразованы в тип MusicBand и сохранены на экземплярах с ролью storage.

Примечание

Если при обработке объекта input-процессором возникает ошибка, объект отправляется в ремонтную очередь. Информация об объектах в ремонтной очереди с описанием ошибок доступна администратору в веб-интерфейсе TDG на вкладке Repair Queues.

После задания всех необходимых настроек общий файл конфигурации будет выглядеть следующим образом:

# config.yml

types:
  __file: model.avsc

connector:
  input:
    - name: http
      type: http
      routing_key: input_key

input_processor:
  handlers:
    - key: input_key
      function: handle_band.newband
  storage:
    - key: band
      type: MusicBand

Реализация обработчика

Как указано в конфигурации процессора, код обработчика должен храниться в файле handle_band.lua. Создайте обработчик с функцией newband(obj), которая принимает на вход новый Lua-объект с музыкальной группой:

-- handle_band.lua

local log = require('log')
local json = require('json')

log.info('Starting handle_band.lua')
function newband(obj)
    log.info('Received new object from the HTTP connector: %s', json.encode(obj))
    obj.routing_key = "band"
    log.info('The routing key is set to "%s" value', obj.routing_key)
    return obj
end

return {
    newband = newband
}

Здесь функция newband(obj) выводит информацию об объекте obj, пришедшем из HTTP-коннектора, в логах TDG в формате JSON. После этого функция присваивает объекту новый ключ маршрутизации band, а затем возвращает этот объект.

Загрузка конфигурации

Чтобы выполнить пример, нужно загрузить архив с файлом конфигурации, моделью данных и функциями обработчиков в TDG. Для этого:

  1. Поместите Lua-файл с кодом обработчика в папку src.

  2. Упакуйте в zip-архив:

    • папку src, внутри которой лежит файл с кодом обработчика handle_band.lua;

    • модель данных model.avsc;

    • файл конфигурации config.yml.

  3. Загрузите архив в TDG согласно инструкции.

Добавление объекта через input-процессор

Чтобы проверить добавленную конфигурацию, добавьте новую музыкальную группу в TDG через HTTP-коннектор. Для этого переключитесь на вкладку Test и выберите для запроса формат JSON.

Введите в поле запрос в формате JSON и затем нажмите Submit:

{
  "name":"Beatles",
  "genre":["Rock-n-roll"],
  "wasformed": 1965
}

При успешном добавлении объекта в поле Response вернется ответ «OK».

Проверить добавленные данные можно на вкладке Graphql. Убедитесь, что выбрана схема default, и напишите запрос на выборку всех музыкальных групп:

query {
  MusicBand {
    name
    wasformed
    genre
  }
}

Так как добавлена всего одна группа, ответ будет выглядеть следующим образом:

{
  "data": {
    "MusicBand": [
      {
        "genre": [
          "Rock-n-roll"
        ],
        "name": "Beatles",
        "wasformed": 1965
      }
    ]
  }
}

Также информацию о добавленном объекте можно проверить в логах TDG. В логах TDG видно, что сначала HTTP-коннектор получил на вход новый объект, а затем был запущен обработчик handle_band.lua.

Нашли ответ на свой вопрос?
Обратная связь