Процессоры входящих и исходящих данных¶
Процессор (processor) – это подсистема, которая задает в TDG логику обработки объектов и автоматически выполняет пользовательский код при поступлении нового объекта или при его отправке через заданный коннектор. Для исполнения пользовательского кода используются обработчики.
Обработчик (handler) – это пользовательская функция, которая вызывается процессором и обрабатывает входящие или исходящие данные. В TDG такие функции представляют собой функции на языке Lua.
Существует два вида процессоров, оба они выполняются на экземплярах TDG с ролью runner:
input-процессор (input processor) – процессор входящих данных. Этот процессор обрабатывает объект, поступивший на вход через коннектор, настраивает для него маршрутизацию и правила хранения полученных объектов.
output-процессор (output processor) – процессор исходящих данных. Этот процессор готовит объект к отправке во внешнюю систему и передает его в коннектор.
В этом руководстве рассмотрим пример создания input-процессора и обработчика в TDG.
В примере на HTTP-коннектор придет новый объект в формате JSON, который после преобразования в Lua-объект будет
обработан input-процессором, а затем сохранен в хранилище – на экземплярах с ролью storage
.
Для выполнения примера требуются:
модель данных, сохраненная в файле
model.avsc
. В примере используется модель данных из руководства Hello world на Lua.
Руководство включает в себя следующие шаги:
Конфигурация коннектора¶
Указать конфигурацию входящего коннектора можно:
в общем конфигурационном файле (секция
connector
в файлеconfig.yml
);в отдельном файле
connector.yml
.
Задайте конфигурацию входящего HTTP-коннектора:
# config.yml
connector:
input:
- name: http
type: http
routing_key: input_key
При получении нового объекта в формате JSON через HTTP-коннектор:
JSON-объект преобразовывается в Lua-таблицу (объект Lua).
Для объекта генерируется UUID, по которому можно проследить дальнейший путь объекта (например, найти его в ремонтной очереди).
Объектам, пришедшим на коннектор, присваивается определенный ключ маршрутизации (секция
routing_key
в конфигурации коннектора).
Ключ маршрутизации – это строковый ключ, который задается объекту и определяет для него процесс дальнейшей обработки.
На коннекторе такой ключ определяет, в какой обработчик на input-процессоре будет передан объект.
Если ключ маршрутизации не задан, объект обрабатывается функцией по умолчанию, которая превращает объект вида
{ "obj_type": { "field_1": 1 } }
в объект вида { "field_1": 1 }
и задает значение ключа маршрутизации как
routing_key = obj_type
.
После получения ключа маршрутизации объект передается на input-процессор.
Выше в конфигурации определены имя коннектора http
, его тип http
и ключ маршрутизации input_key
,
с которым объект отправится в обработчик, заданный в файле конфигурации в секции input_processor
.
Подробная информация о параметрах конфигурации коннектора приведена в справочнике по конфигурации бизнес-логики.
Конфигурация input-процессора¶
Чтобы задать процессор для входящих данных в TDG, необходимо добавить соответствующую секцию в конфигурацию TDG. Input-процессор можно описывать:
в общем конфигурационном файле (секция
input processor
в файлеconfig.yml
);в отдельном файле
input_processor.yml
.
Конфигурация input-процессора включает такие параметры, как вызываемый обработчик, ключ маршрутизации и система, куда следует направить объект. Полная информация о параметрах конфигурации input-процессора приведена в справочнике по конфигурации бизнес-логики.
Задайте конфигурацию процессора для входящих данных:
# config.yml
input_processor:
handlers:
- key: input_key
function: handle_band.newband
storage:
- key: band
type: MusicBand
Здесь в секции handlers
задан ключ маршрутизации input_key
.
Объекты с таким ключом, пришедшие из HTTP-коннектора на input-процессор, передаются в функцию newband
из модуля handle_band
.
Кроме того, в секции storage
в параметре key
задается еще один ключ маршрутизации band
.
После настройки такого ключа маршрутизации:
TDG определяет по ключу нужный тип объекта из модели данных.
Объект, пришедший на input-процессор, валидируется в соответствии с моделью данных.
Объект преобразовывается в тип из модели данных.
Преобразованный объект сохраняется в хранилище.
Объекты, которым в обработчике handle_band.newband
был присвоен ключ band
, будут провалидированы по этому
ключу в соответствии с моделью данных, преобразованы в тип MusicBand
и сохранены на экземплярах с ролью storage
.
Примечание
Если при обработке объекта input-процессором возникает ошибка, объект отправляется в ремонтную очередь. Информация об объектах в ремонтной очереди с описанием ошибок доступна администратору в веб-интерфейсе TDG на вкладке Repair Queues.
После задания всех необходимых настроек общий файл конфигурации будет выглядеть следующим образом:
# config.yml
types:
__file: model.avsc
connector:
input:
- name: http
type: http
routing_key: input_key
input_processor:
handlers:
- key: input_key
function: handle_band.newband
storage:
- key: band
type: MusicBand
Реализация обработчика¶
Как указано в конфигурации процессора, код обработчика должен храниться в файле handle_band.lua
.
Создайте обработчик с функцией newband(obj)
, которая принимает на вход новый Lua-объект с музыкальной группой:
-- handle_band.lua
local log = require('log')
local json = require('json')
log.info('Starting handle_band.lua')
function newband(obj)
log.info('Received new object from the HTTP connector: %s', json.encode(obj))
obj.routing_key = "band"
log.info('The routing key is set to "%s" value', obj.routing_key)
return obj
end
return {
newband = newband
}
Здесь функция newband(obj)
выводит информацию об объекте obj
, пришедшем из HTTP-коннектора, в логах
TDG в формате JSON.
После этого функция присваивает объекту новый ключ маршрутизации band
, а затем возвращает этот объект.
Загрузка конфигурации¶
Чтобы выполнить пример, нужно загрузить архив с файлом конфигурации, моделью данных и функциями обработчиков в TDG. Для этого:
Поместите Lua-файл с кодом обработчика в папку
src
.Упакуйте в zip-архив:
папку
src
, внутри которой лежит файл с кодом обработчикаhandle_band.lua
;модель данных
model.avsc
;файл конфигурации
config.yml
.
Загрузите архив в TDG согласно инструкции.
Добавление объекта через input-процессор¶
Чтобы проверить добавленную конфигурацию, добавьте новую музыкальную группу в TDG через HTTP-коннектор. Для этого переключитесь на вкладку Test и выберите для запроса формат JSON.
Введите в поле запрос в формате JSON и затем нажмите Submit:
{
"name":"Beatles",
"genre":["Rock-n-roll"],
"wasformed": 1965
}
При успешном добавлении объекта в поле Response вернется ответ «OK».
Проверить добавленные данные можно на вкладке Graphql. Убедитесь, что выбрана схема
default
, и напишите запрос на выборку всех музыкальных групп:
query {
MusicBand {
name
wasformed
genre
}
}
Так как добавлена всего одна группа, ответ будет выглядеть следующим образом:
{
"data": {
"MusicBand": [
{
"genre": [
"Rock-n-roll"
],
"name": "Beatles",
"wasformed": 1965
}
]
}
}
Также информацию о добавленном объекте можно проверить в логах TDG.
В логах TDG видно, что сначала HTTP-коннектор получил на вход новый объект, а затем был
запущен обработчик handle_band.lua
.