Конфигурация бизнес-логики | Tdg

Версия:

2.x
Справочник Конфигурация Tarantool Data Grid Конфигурация бизнес-логики

Конфигурация бизнес-логики

В конфигурацию бизнес-логики можно добавить параметры модели данных, мониторинга, сервисов GraphQL, подсистем и коннекторов.

Примечание

Для простоты эксплуатации рекомендуется хранить различные параметры конфигурации в разных YAML-файлах.

Параметры модели данных

Настроить модель данных можно, используя следующие секции:

  • types – описание типов объектов в модели данных.

  • versioning – настройка параметров времени жизни объектов и ограничения количества версий объектов.

  • archivation – настройка параметров архивации объектов.

  • welcome-message – текст приветственного сообщения.

  • tdg-version – проверка версии TDG при применении конфигурации.

Секция types

В этой секции настраивается модель данных – указываются типы объектов, которые будут сохраняться в TDG. В качестве языка описания модели данных TDG используется Apache Avro Schema.

По умолчанию указывается путь к файлу, где определена модель данных:

types: {__file: model.avsc}

Содержание model.avcs определяется пользователем самостоятельно. Например, в файле может быть указана такая модель данных:

[
    {
        "name": "Country",
        "type": "record",
        "fields": [
            {"name": "title", "type": "string"},
            {"name": "phone_code", "type": ["null", "string"]}
        ],
        "indexes": ["title"],
        "relations": [
        { "name": "city", "to": "City", "count": "many", "from_fields": "title", "to_fields": "country" }
        ]
    },
    {
        "name": "City",
        "type": "record",
        "fields": [
            {"name": "title", "type": "string"},
            {"name": "country", "type": "string"},
            {"name": "population", "type": "int"},
            {"name": "capital", "type": "boolean"}
        ],
        "indexes": [
            {"name":"primary", "parts":["title", "country"]},
            "title",
            "country",
            "population"
        ]
    }
]

Секция versioning

В этой секции настраивается версионирование – сохранение разных версий объектов для последующего извлечения, просмотра и восстановления объектов.

По умолчанию версионирование отключено. Чтобы его включить, укажите enabled: true для нужного типа объекта.

Пример. Включить версионирование для объекта типа Tourists можно так:

versioning:
  - type: Tourists
    enabled: true

Вы также можете настроить следующие параметры:

  • keep_version_count – количество хранимых версий. По умолчанию: 5. Минимальное значение: 1. Если вы не хотите ограничивать количество хранимых версий, удалите этот параметр. Если параметр задан, старые версии будут удаляться. Только новые версии, количество которых меньше или равно заданному значению параметра, будут сохранены. Каждый раз, когда добавляется новая версия, система проводит проверку и удаляет старые версии при необходимости.

  • delay_sec – интервал в секундах, через который запускается новая проверка устаревших объектов. Найденные устаревшие объекты удаляются. Минимальное значение: 1.

  • lifetime_hours – время жизни версии в часах, также может быть задано в днях (lifetime_days) или годах (lifetime_years). По умолчанию не задано, поэтому версии хранятся неограниченное время. Минимальное значение: 1. Если параметр задан, версии, существующие дольше заданного значения параметра, будут удалены.

  • strategy – стратегия удаления предыдущих версий из хранилища (архивирование). Варианты стратегии:

    • dir – вывод в файл;

    • permanent – постоянное удаление версий;

    • cold_storage – хранение на диске с расчетом на то, что данные будут запрашиваться редко.

      Примечание

      Когда включена стратегия cold_storage, данные сохраняются в спейсе на vinyl. Получить доступ к таким данным можно только вручную, с помощью запросов space_object:select().

    Значение по умолчанию: permanent.

    Если параметр strategy задан на вывод в файл или хранение на диске, необходимо указать schedule – расписание запуска задачи на архивацию в формате cron с поддержкой секунд. При выводе в файл также необходимо указать максимальный размер файла, используя параметр file_size_threshold.

  • schedule – расписание в формате cron, по которому проверяется наличие изменений в объектах. Обязательный параметр, если задана стратегия strategy: cold_storage или strategy: file.

  • dir – директория, где в JSON-файлах хранятся прежние версии объектов.

  • file_size_threshold – предельный размер JSON-файла, в котором хранятся прежние версии объектов. Когда предельное значение достигнуто, версии начинают сохраняться в новый файл. Обязательный параметр, если задана стратегия strategy: file.

Пример

Укажем параметры версионирования для типов объектов Country и City.

Country: хранить 7 версий, ограничить время жизни 4 часами, запускать проверку через 1 секунду, использовать холодное хранение (хранить на диске).

City: хранить 3 версии, ограничить время жизни 2 днями, запускать проверку через 1 секунду, архивировать версии в файл.

versioning:
  - type: Country
    enabled: true
    keep_version_count: 7
    lifetime_hours: 4
    delay_sec: 1
    strategy: cold_storage
    schedule: "0 0 0 */1"
  - type: City
    enabled: true
    keep_version_count: 3
    lifetime_days: 2
    delay_sec: 1
    strategy: file
    schedule: "0 0 0 */1 * *"
    dir: "/var/data"
    file_size_threshold: 100001

Секция archivation

В этой секции настраивается архивация – выгрузка объектов из TDG и сохранение их отдельно на жестком диске в файлах формата .jsonl (формат имени – YYYY-MM-DDTHH:MM:SS.jsonl). Объекты записываются в файл построчно: один объект – одна строка вида {"TypeName": {... data ...}}, где TypeName – тип сохраняемого объекта.

Определенные типы объектов могут поступать в TDG в большом количестве, но при этом иметь короткое время активного использования, после чего потребность в обращении к ним возникает редко. Именно для таких объектов полезно настраивать архивацию.

По умолчанию архивация не настроена. Чтобы ее настроить, задайте в секции archivation следующие параметры:

  • type – тип объекта.

  • lifetime_days – время жизни объекта в TDG, указывается в днях. Объекты старше этой величины архивируется.

  • schedule – расписание запуска задачи на архивацию в формате cron с поддержкой секунд.

  • dir – директория для хранения файлов с архивными данными.

  • file_size_threshold – максимальный размер файла с архивными данными, указывается в байтах. При достижении этого размера запись архивируемых данных начинается в новый файл. По умолчанию: 104857600 (100 Мб).

Пример

Укажем настройки архивации для типа объекта Quotation, чтобы повысить производительность TDG за счёт архивирования редко используемых цитат.

archivation:
  - type: Quotation
    lifetime_days: 7
    schedule: "0 0 0 */1 * *"
    dir: "/var/data"
    file_size_threshold: 104857600

Секция welcome-message

В этой секции настраивается текст приветственного сообщения, которое будет появляться при входе в систему. Ограничения на количество символов в сообщении нет.

Полезно изменять секцию welcome-message, например, если вы хотите облегчить работу с TDG новым сотрудникам, дописав им полезные подсказки, или решили оставить важную информацию для опытных коллег.

Пример

Изменим текст приветственного сообщения, чтобы сообщить коллегам о планируемом обновлении сервиса.

welcome-message: |
  Коллеги, сервис может быть недоступен в четверг с 12.00 до 14.00 в связи с плановым обновлением.

Секция tdg-version

В этой секции настраивается проверка версии TDG на совместимость при применении конфигурации. Для параметра указывается условие проверки, и с ним сравнивается текущая версия TDG. Если условие не выполняется, применение конфигурации останавливается.

Примеры

Проверим версию 1.6.0 на совместимость.

tdg-version: "== 1.6.0"

В примере:

  • ==, <=, <, >, >= – оператор.

  • 1.7.0 – версия major.minor.patch (семантическое версионирование) или scm-<число>.

Примечание

Чтобы избежать возможных ошибок при чтении, всегда заключайте выражение из оператора и версии в кавычки. Обязательного помещения в кавычки не требует только выражение с ==.

Синтаксис YAML позволяет указывать многостроковые значения с помощью операторов >, <, >=, >= в первой строке. Чтобы задать версию больше, чем выбранное значение, нужно поместить оператор > и версию в кавычки.

Пример неправильного заполнения, когда версия будет прочитана как 1.6.0:

tdg-version: > 1.6.0

Пример правильного заполнения, когда версия будет прочитана как больше чем 1.6.0:

tdg-version: "> 1.6.0"

Параметры мониторинга

Настроить параметры мониторинга и изменить логику обработки объектов, при необходимости передавая функциональную нагрузку на резервный компонент, можно при помощи секций jobs, tracing, repair_queue, maintenance и metrics.

Секция jobs

В этой секции при помощи параметра max_jobs_in_parallel настраивается максимальное количество отложенных задач (jobs), которые могут выполняться одновременно. Если вы используете отложенные задачи, рекомендуется настроить данную секцию и ограничить количество отложенных задач, чтобы сохранить высокую производительность TDG.

При достижении установленного ограничения отложенные задачи сверх лимита будут поставлены в очередь и будут выполняться последовательно.

Значение по умолчанию – 50:

jobs:
  max_jobs_in_parallel: 50

Секция tracing

В этой секции настраиваются параметры для взаимодействия с системой трассировки, которая позволяет анализировать производительность выполнения кода TDG:

  • lbase_url – адрес API-ресурса (endpoint), куда отсылаются собранные для анализа участки кода (spans).

  • lapi_method – тип HTTP-запроса для обращения к base_url. Для систем трассировки, поддерживающих протокол OpenZipkin, используется POST.

  • lreport_interval – интервал в секундах, через который в систему трассировки отсылаются накопленные участки кода.

  • lspans_limit – максимальное количество участков кода, которое может накапливаться в буфере перед отправкой в систему трассировки.

Пример

Укажем локальный адрес для отправки участков кода с максимальным размером буфера = 100.

tracing:
  base_url: localhost:8080/api/v2/spans
  api_method: POST
  report_interval: 0
  spans_limit: 100

Секция repair_queue

При стандартном поведении TDG, когда объект поступает в систему на обработку, он сразу помещается в ремонтную очередь. Если объект удалось обработать и сохранить, он удаляется из ремонтной очереди. В случае ошибки объекты остаются в ремонтной очереди, и администратор имеет возможность просматривать их, а после устранения источника проблемы – отправлять на повторную обработку.

В этой секции можно изменить порядок обработки объектов, которые попали в ремонтную очередь.

Пример. Настроим репликацию во внешнюю систему для всех объектов ремонтной очереди, которые получили специальное значение ключа __unclassified__, используемое для определения неклассифицированных объектов. При попадании такого объекта в ремонтную очередь, согласно настройкам postprocess_with_routing_key: unclassified, объекту присваивается другой ключ маршрутизации unclassified, и с этим ключом он направляется в подсистему output_processor. Дальнейшая обработка объекта будет выполняться в соответствии с параметрами секции output_processor.

repair_queue:
  on_object_added:
    __unclassified__:
        postprocess_with_routing_key: unclassified

Секция maintenance

В этой секции устанавливаются следующие параметры, сигнализирующие об ошибках системы:

  • clock_delta_threshold_sec – порог рассинхронизации истинного времени (CLOCK_REALTIME) на разных экземплярах Tarantool. Значение указывается в секундах.

  • fragmentation_threshold_critical – порог, при превышении которого система покажет критическое сообщение о фрагментации данных. Принимает относительные значения от 0 до 1.

  • fragmentation_threshold_warning – порог, при превышении которого система покажет предупреждение о фрагментации данных. Принимает относительные значения от 0 до 1.

Если тот или иной параметр в секции maintenance выходит за указанные рамки, об этом появляется сообщение в веб-интерфейсе TDG на вкладке Cluster.

Значения по умолчанию:

maintenance:
  clock_delta_threshold_sec: 5
  fragmentation_threshold_critical: 0.9
  fragmentation_threshold_warning: 0.6

Секция metrics

В этой секции настраиваются параметры, задающие формат и возможность просмотра метрик по нескольким адресам ресурсов (endpoints) для мониторинга работы TDG. Эти параметры указываются внутри подраздела export:

metrics:
  export:
    - path: '/path_for_json_metrics'
      format: 'json'
    - path: '/path_for_prometheus_metrics'
      format: 'prometheus'
    - path: '/health'
      format: 'health'

Параметры:

  • path – путь, по которому будут доступны метрики.

  • format – формат, в котором будут доступны метрики. Может принимать следующие значения:

    • 'json'

    • 'prometheus'

    • 'health' – информация о текущем состоянии экземпляра. Если состояние экземпляра удовлетворительное (healthy), ответом на запрос будет HTTP-код 200, если нет – HTTP-код 500.

При необходимости можно добавить несколько адресов ресурсов одного формата по разным путям. Это полезно, когда есть несколько систем хранения метрик.

По умолчанию значения метрик в формате Prometheus доступны по адресу http://<IP_адрес_экземпляра>/metrics.

После заполнения секции metrics в файле конфигурации метрики по умолчанию в формате Prometheus по адресу http://<IP_адрес_экземпляра>/metrics перестанут быть доступными. Чтобы сделать их доступными, также укажите path: '/metrics' и format: 'prometheus' в секции metrics.

Параметры сервисов GraphQL

Настроить сервисы GraphQL можно, используя следующие секции:

  • services – параметры настройки сервисов.

  • hard-limits – параметры ограничений для запросов GraphQL.

  • force_yield_limits – параметры ограничений на количество сканирований записей.

  • graphql_query_cache_size – параметры размера кэша запросов GraphQL.

  • test-soap-data – текст запроса по умолчанию на вкладке Test.

Секция services

В этой секции настраивается конфигурация сервисов.

Для примера настроим сервис получения цены со следующими параметрами:

  • get_price – имя сервиса. Оно всегда должно начинаться с латинской буквы и может содержать латинские буквы, цифры и символ подчеркивания (_).

  • type – тип запроса GraphQL для вызова сервиса: query или mutation. Если тип – query, параметр можно не указывать.

  • doc – произвольное описание сервиса.

  • function – ссылка на Lua-файл с сервис-функцией.

  • return_type – тип данных, который возвращается в результате выполнения сервиса.

  • args – описание аргументов, передаваемых на вход при выполнении сервиса, в формате имя_аргумента: тип_данных.

services:
  get_price:
    doc: "Get the item price by ID"
    function: {__file: get_price_by_id.lua}
    return_type: ItemPrice
    args:
      item_id: string

Секция hard-limits

В этой секции устанавливаются ограничения для запросов GraphQL. Такие настройки для планировщика запросов позволяют контролировать нагрузку на кластер и предотвращают полное сканирование спейса в TDG. Также данные ограничения учитываются при работе функций Repository API и Sandbox API.

Параметры:

  • scanned – максимальное количество кортежей, которое TDG будет сканировать при выполнении запроса. Значение по умолчанию: 2000.

    Примечание

    Начиная с версии 2.7.0, вместо ограничения scanned для hard-limits TDG использует механизм fiber.slice.

  • returned – максимальное количество кортежей, которое может вернуть одно хранилище. Значение по умолчанию: 100. Например, returned = 100, а в настройках пагинации параметр first равен 5. Тогда запрос вернет 5 кортежей, а остальной массив данных можно обойти с помощью параметра after.

Настройка данных параметров возможна при конфигурации системы или с помощью специальных запросов и мутаций GraphQL. Пример настроек подраздела hard-limits в файле конфигурации:

hard-limits:
  scanned: 2000
  returned: 100

Во встроенном клиенте GraphiQL в веб-интерфейсе TDG для установки данных ограничений используется схема admin. Пример запроса:

mutation {
  config {
    hard_limits(scanned:5000 returned:400) {
      scanned
      returned
    }
  }
}

Данные настройки превентивно требуют качественного написания запросов. Если одно из ограничений превышается, выполнение запроса прекращается и возвращается ошибка. Всего существует два вида ошибок – для параметров scanned и returned соответственно.

Кроме того, ошибка возникает, если:

  • для параметра returned верно first > returned;

  • для параметра scanned верно, что количество записей больше значения scanned, при этом first > scanned.

Пример ошибки: Hard limit for scan rows reached 2000 for space \"storage_MusicBand\"

Такая ошибка при сканировании означает, что при поиске по индексу в спейсе MusicBand было просмотрено более 2000 записей. Для устранения ошибки необходимо добавить индекс, который сократит выборку для данного запроса.

Примечание

Появление данной ошибки сканирования также возможно, когда используются функции repository.find(), repository.update() и repository.delete() из раздела Repository API.

Ошибка может возникнуть

  • независимо от значения параметра first, если не используется индекс.

    Например, есть запрос repository.find("ModelName", {{'key', '==', 'a'}}. Значение параметра scanned = 5000. Первые 5000 записей при этом не подходят по запросу. Такой запрос вызовет ошибку.

    Решение: добавить индекс – "indexes": [..., "key"];

  • если в запросе используются два или больше фильтров.

    Например, есть запрос с несколькими фильтрами, где scanned = 5000. Под первый фильтр подходит 10000 записей. При этом первые 5000 записей не подходят под второй фильтр. Такой запрос вызовет ошибку.

Секция force_yield_limits

В этой секции устанавливается ограничение на количество сканирований записей в спейсе. При достижении порога управление потоком переходит от текущего файбера к другому (выполняется yield).

Актуально при выполнении map_reduce, чтобы избежать зависаний на экземплярах с ролью storage. Также учитывается при работе функций Repository API.

Значение по умолчанию – 1000:

force_yield_limits: 1000

Секция graphql_query_cache_size

В этой секции устанавливается размер кэша для уникальных GraphQL-запросов.

Значение по умолчанию – 3000:

graphql_query_cache_size: 3000

Секция test-soap-data

В этой секции можно задать текст, который будет по умолчанию отображаться на вкладке Test в веб-интерфейсе TDG. Это может также быть путь к файлу, содержащему структуру тестового объекта в формате XML или JSON для имитации входящего запроса.

Пример. Укажем путь к файлу.

test-soap-data: {__file: test_object.json}

Параметры подсистем

Каждая подсистема – это набор определенных функциональных возможностей, входящих в ту или иную кластерную роль.

  • Управляющие подсистемы объединяет в себе роль core.

  • Обрабатывающие подсистемы объединяет в себе роль runner.

  • Принимающие подсистемы объединяет в себе роль connector.

  • Хранящие подсистемы объединяет в себе роль storage.

Ниже рассматриваются параметры, связанные со следующими подсистемами:

  • input_processor – настраивает логику обработки, маршрутизацию и правила хранения объектов. Относится к роли runner.

  • output_processor – настраивает логику обработки объектов, которые будут реплицироваться во внешние системы. Относится к роли runner.

  • account_manager – настраивает работу ролевой модели доступа и связанные с ней функции безопасности. Относится к роли core.

  • notifier – настраивает отправку уведомлений при попадании объекта в ремонтную очередь. Относится к роли core.

  • gc – принудительно запускает сборку мусора в коде Lua. Запускается на экземплярах с любой ролью.

  • sequence_generator – генерирует последовательность случайных чисел. Относится к роли core.

  • logger – настраивает ведение журнала. Запускается на экземплярах с любой ролью.

  • tasks – настраивает задачи, выполняемые при помощи ролей scheduler и task_runner.

  • task_runner – настраивает порог количества выполняемых пайплайнов для задач (tasks) и отложенных задач (jobs). Относится к роли runner.

  • audit_log – настраивает ведение журнала аудита. Запускается на экземплярах с любой ролью.

input_processor

В этой секции настраиваются параметры процессора ввода данных:

  • handlers – определение обработчиков входящего запроса.

    Если обработчик не задан, объект обрабатывается в соответствии со значением, указанным в секции input_processor: routing.

    Вложенные параметры:

    • key: ключ маршрутизации, по которому определяется, как следует обработать объект.

    • function: функция, которую следует применить к объекту.

  • routing – маршрутизация объекта в определенный пайплайн обработки.

    Вложенные параметры:

    • key: ключ маршрутизации.

    • output: система, куда следует направить объект.

  • storage – настройки сохранения объекта на экземплярах роли storage.

    Вложенные параметры:

    • key: ключ маршрутизации.

    • type (необязательно): тип бизнес-объекта (агрегата), куда следует направить объект.

Пример:

input_processor:
  handlers:
    - key: input_processor
      function: handler.call
  routing:
    - key: input_processor
      output: to_input_processor
  storage:
    - key: estate_key

output_processor

В этой секции задается логика обработки объектов, которые будут реплицироваться во внешние системы. Выполняется после успешного сохранения объектов на экземплярах с ролью storage.

Для каждого ключа маршрутизации, присвоенного объекту, определяются следующие подразделы:

  • handlers – обработчики входящих запросов:

    • function: имя функции, которую следует применить к объекту.

    • outputs: параметры, с которым объект будет отправлен во внешнюю систему.

Пример:

output_processor:
  estate_key:
    handlers:
      - function: output.estate_output.call
        outputs:
          - dummy

account_manager

В этой секции настраивается подсистема account_manager, которая обеспечивает работу ролевой модели доступа и связанные с ней функций безопасности.

Параметры:

  • only_one_time_passwords – флаг (true/false). По умолчанию: false. Если указано значение true, нет возможности вручную задавать пароль при создании или импорте профиля пользователя. Вместо этого TDG автоматически генерирует одноразовый пароль и высылает его на адрес электронной почты пользователя. Для отсылки пароля также необходимо иметь работающий сервер SMTP, описание его конфигурации в секции connector (output: type: smtp) и указание на этот output в секции account_manager. Опционально можно указать заголовок письма, в котором высылается одноразовый пароль (options: subject:). Если указано значение only_one_time_passwords: false, то пароль будет генерироваться и отправляться пользователю, даже если поле Password при создании профиля пользователя оставлено пустым.

  • password_change_timeout_seconds – минимальное время, которое должно пройти до следующей смены пароля. Указывается в секундах. Действует только в отношении смены собственного пароля.

  • block_after_n_failed_attempts – количество неудачных попыток входа в систему, после которых пользователь будет заблокирован (получит статус blocked).

  • ban_inactive_more_seconds – максимальное время, в течение которого пользователь может быть неактивен в системе. Указывается в секундах. По истечении этого времени пользователь будет заблокирован. Максимально возможное значение параметра – 45 дней. Если значение превышает 45 дней или параметр не задан, устанавливается значение по умолчанию, равное 45 дням.

  • password_policy – настройки политики в отношении паролей. Эти настройки также можно задать через веб-интерфейс TDG.

  • min_length – минимально допустимая длина пароля. По умолчанию: 8.

  • include – флаги (true/false), определяющие, какие категории символов должны обязательно входить в пароль.

    Допустимые значения:

    • lower – латинские буквы в нижнем регистре. По умолчанию: true.

    • upper – латинские буквы в верхнем регистре. По умолчанию: true.

    • digits – цифры от 0 до 9 включительно. По умолчанию: true.

    • symbols– дополнительные символы !"#$%&'()*+,-./:;<=>?@[\]^_`{|}~. По умолчанию: false.

Пример. Настроим возможность получать одноразовый пароль по электронной почте.

account_manager:
  only_one_time_passwords: true
  output:
    name: to_smtp
    options:
      subject: "Registration"
  password_change_timeout_seconds: 10
  block_after_n_failed_attempts: 5
  ban_inactive_more_seconds: 86400
  password_policy:
    min_length: 8
    include:
      lower: true
      upper: true
      digits: true
      symbols: false

notifier

В этой секции настраивается подсистема notifier для отправки уведомлений:

  • mail_server – секция настроек сервера SMTP, который используется для отправки уведомлений при попадании объекта в ремонтную очередь. Данные параметры также можно задать через веб-интерфейс TDG.

    Вложенные параметры:

    • url – URL сервера SMTP.

    • from – имя отправителя, которое будет показываться в поле «From» при получении уведомления.

    • username – имя пользователя сервера SMTP.

    • password – пароль пользователя сервера SMTP.

    • timeout – тайм-аут запроса к серверу SMTP. Указывается в секундах.

    • skip_verify_host – флаг (true/false): пропустить проверку хоста по протоколу TLS.

  • users – секция, где задаются данные подписчиков (subscribers), которые будут получать уведомления. Подписчиков также можно создать через веб-интерфейс TDG.

    Вложенные параметры:

    • id – идентификатор подписчика.

    • name – имя подписчика.

    • addr – e-mail подписчика.

Пример:

notifier:
  mail_server:
    url: 127.0.0.1:2525
    from: TDG_repair_queue
    username: user
    password: passpass
    timeout: 5
    skip_verify_host: true
  users:
    - id: 1
      name: Petrov
      addr: petrov@mailserver.com

gc

В этой секции настраивается подсистема garbage_collector, которая принудительно запускает сборку мусора в Lua. Подсистема включается неявно на всех экземплярах:

  • forced – включение (true) или отключение (false) принудительной сборки мусора. По умолчанию: false.

  • period_sec – интервал, через который происходит запуск нового цикла сборки мусора. Указывается в секундах.

  • steps – размер шага сборщика мусора.

Пример. Настроим принудительную сборку мусора так, чтобы она происходила каждые три секунды.

gc:
  forced: true
  period_sec: 3
  steps: 20

sequence_generator

В этой секции указываются настройки, используемые при генерации последовательностей уникальных чисел:

  • starts_with – число, с которого начинается последовательность. По умолчанию: 1.

  • range_width – диапазон последовательности. По умолчанию: 100.

Пример. Зададим генерацию последовательности уникальных чисел с 5 при диапазоне 15.

sequence_generator:
  starts_with: 5
  range_width: 15

logger

В этой секции определяются параметры подсистемы логирования logger. Все параметры обязательные:

  • rotate_log – флаг (true/false), осуществлять ли ротацию лога.

  • max_msg_in_log – максимальное количество сообщений, сохраняемых в логе.

  • max_log_size – максимальный размер файла лога, в байтах.

  • delete_by_n_msg – количество одновременно удаляемых сообщений при ротации лога. При превышении значений параметров max_msg_in_log или max_msg_log_size наиболее старые n сообщений в логе удаляются за раз, что повышает производительность по сравнению с режимом, когда старые сообщения удаляются по одному.

logger:
  rotate_log: true
  max_msg_in_log: 500000
  max_log_size: 10485760
  delete_by_n_msg: 1000

tasks

В этом разделе определяется, какие задачи и в какое время будут запускаться в системе.

Пример:

tasks:
  task_1:
    kind: single_shot
    function: single_task
    keep: 5
  task_2:
    kind: continuous
    function: long_task
    pause_sec: 10
    run_as:
      user: username
  task_3:
    kind: periodical
    function: long_task
    schedule: "0 */5 * * * *"
    run_as:
      user: username

В этом примере:

  • task_N – имя задачи.

  • kind – вид задачи: single_shot (однократная), continuous (постоянная) или periodical (периодическая).

  • function – функция, определяющая, что именно делается в рамках задачи.

  • keep – количество завершенных экземпляров задачи, которые сохраняются в истории. По умолчанию: 10.

  • pause_sec – пауза в выполнении задачи вида continuous. Указывается в секундах.

  • schedule – расписание выполнения для задач вида periodical. Задается в формате cron c расширенным синтаксисом, в котором минимальной величиной является секунда:

    * * * * * *
    | | | | | |
    | | | | | ----- День недели (0-6) (Воскресенье = 0)
    | | | | ------- Месяц (1-12)
    | | | --------- День (1-31)
    | | ----------- Час (0-23)
    | ------------- Минута (0-59)
    --------------- Секунда (0-59)
    

    Например, schedule: "0 */5 * * * *" означает, что задача будет запускаться периодически каждые 5 минут.

  • run_as – пользователь, от имени которого выполняется задача. Применяется только для задач видов continuous или periodical. Указанный пользователь должен иметь достаточно прав для выполнения действий задачи. Если режим обязательной аутентификации выключен, параметр run_as не обязателен. Внутри блока run_as передается параметр:

    • user – имя пользователя.

task_runner

В этой секции настраивается running_count_threshold – порог количества функций, выполняемых в рамках задач (tasks) и отложенных задач (jobs).

Значение по умолчанию:

task_runner:
  running_count_threshold: 100

audit_log

В этой секции задаются параметры журнала аудита. Все параметры опциональные:

  • enabled – включить (true) или выключить (false) запись событий в журнал аудита. По умолчанию: false.

  • severity – уровень важности событий, которые будут записываться в журнал аудита. Возможные значения по возрастанию важности: VERBOSE, INFO, WARNING, ALARM. По умолчанию: INFO.

    При указании определенного уровня в журнал аудита будут записываться события этого уровня и выше. Например, если задано INFO, будут записываться события, соответствующие уровням INFO, WARNING и ALARM.

  • remove_older_than_n_hours – максимальное время хранения записей в журнале аудита. Указывается в часах. Записи старше указанного времени удаляются.

Пример. Включим запись событий уровня INFO и более высоких.

audit_log:
  enabled: true
  severity: INFO
  remove_older_than_n_hours: 5

По умолчанию журнал аудита выключен:

audit_log:
  enabled: false

Параметры коннекторов

В этой секции настраиваются параметры коннекторов TDG:

  • input – получение и первоначальная обработка (парсинг) входящих запросов;

  • output – отправка запросов во внешние системы;

  • routing – маршрутизация объектов.

Настройка input

input – параметры коннекторов для получения и первоначальной обработки (парсинга) входящих запросов. У всех типов коннекторов есть общие параметры:

  • name – имя коннектора (произвольное).

  • type – тип коннектора. Поддерживаемые типы:

    • http – для запросов в формате JSON по HTTP;

    • soap – для запросов в формате XML (SOAP) по HTTP;

    • kafka – для интеграции с шиной данных Apache Kafka;

    • tarantool_protocol – для запросов через iproto;

    • file – для загрузки данных из файлов.

  • is_async – режим работы TDG в качестве producer:

    • true (по умолчанию) – асинхронный режим: подтверждение о доставке сообщения отправляется сразу после того, как сообщение добавлено в очередь на отправку.

    • false – синхронный режим: подтверждение о доставке сообщения отправляется только после того, как сообщение было доставлено.

Помимо этих параметров, у некоторых типов коннекторов есть параметры, специфические только для них.

Особые параметры коннектора типа soap

  • wsdl – схема WSDL, описывающая структуру входящего XML.

  • success_response_body – шаблон ответа в случае успешной обработки запроса.

  • error_response_body – шаблон ответа в случае ошибки.

  • handlers – обработчики входящего запроса.

Примечание

Шаблоны ответа (параметры success_response_body и error_response_body) опциональны. Пользователь может задать шаблоны в нужном ему формате, соответствующем системе, в которую отправляется ответ. TDG ограничений на формат не накладывает. В шаблоне error_response_body возможно использовать один спецификатор форматирования %s, чтобы передать в тело ответа текст ошибки.

Пример. Укажем коннектор типа soap.

connector:
  input:
    - name: soap
      type: soap
      wsdl: {__file: Connect.wsdl}
      success_response_body: {__file: success_response_body.xml}
      error_response_body: {__file: error_response_body.xml}
      handlers:
          function: connect_input_handle

Особые параметры коннектора типа kafka

  • brokers – URL-адреса брокеров сообщений.

  • topics – топики (topics) в терминологии Kafka.

  • group_id – идентификатор группы подписчиков.

  • token_name – имя токена приложения. Необходимо сначала сгенерировать токен приложения в системе, а затем указать имя токена (name) в качестве значения параметра. Токен будет использоваться для авторизации при обработке входящих сообщений, которые коннектор забирает из Apache Kafka. Поскольку формат сообщений Kafka не дает возможности передать токен в самом сообщении, токен указывается в конфигурации коннектора.

  • enable_debug – режим отладки для Kafka:

    • true – режим отладки включен. Значение true добавляет параметр debug: "all" в настройки соединения для Kafka. Если в логах не требуются все атрибуты, установите необходимое значение параметра debug в секции options.

    • false (по умолчанию) – опция отключена.

  • options – опции библиотеки librdkafka:

    • enable.auto.offset.store – значение true задает автоматическое обновление параметра смещения (autocommit). Значение по умолчанию: false.

    • enable.partition.eof – значение true не выводит ошибку RD_KAFKA_RESP_ERR__PARTITION_EOF, если consumer доходит до конца сегмента. Значение по умолчанию: false.

    • auto.offset.reset – смещение, используемое при отсутствии другого значения в памяти. Значение по умолчанию: earliest.

    • statistics.interval.ms (integer) – интервал обновления метрик в миллисекундах. Значение по умолчанию: 15000. Диапазон доступных значений: 0–86400000. Значение 0 отключает сбор статистики.

    • security.protocol – протокол для связи с брокерами. Возможные значения: plaintext, ssl, sasl_plaintext, sasl_ssl. Значение по умолчанию: plaintext.

    • debug – выбор режима отладки. Возможные значения: generic, metadata, topic, cgrp, fetch, consumer, all. Значение по умолчанию: all.

  • workers_count – количество читателей сообщений (workers), которые будут работать на коннекторе. Значение по умолчанию: 10. Параметр может быть полезен для случая, когда нужно гарантированно сохранить порядок чтения входящих сообщений. Если читателей сообщений несколько, то будет идти параллельная обработка нескольких сообщений сразу, и их порядок может быть нарушен. Чтобы гарантировать сохранение порядка, необходимо в кластере оставить один connector и один input_processor и задать значение workers_count: 1.

  • logger – выбор режима для журнала событий:

    • stderr (по умолчанию) – в логи TDG выводятся только сообщения об ошибках.

    • tdg – в TDG отправляются все логи Kafka, включая сообщения об ошибках.

Для коннектора типа kafka в конфигурации можно задать более одного входящего коннектора.

Примечание

При подключении к Kafka в логах TDG выводятся input-параметры, с которым был создан consumer Kafka. В логах указаны значения для всех опций библиотеки librdkafka, в том числе выставленные по умолчанию. Отключить вывод логов нельзя.

Пример вывода:

tdg2                | 2023-03-02 16:17:19.810 [1] main/304/main I> [dcb31ae4-ca99-4b1c-995f-a0dd05194fa9] Kafka consumer for "kafka" input configuration: ---
tdg2                | ssl.engine.id: dynamic
tdg2                | socket.blocking.max.ms: '1000'
tdg2                | message.max.bytes: '1000000'
tdg2                | connections.max.idle.ms: '0'
tdg2                | enable_sasl_queue: 'false'
tdg2                | batch.size: '1000000'
...

Пример. Укажем коннектор типа kafka.

connector:
  input:
  - name: kafka-1
    type: kafka
    brokers:
      - localhost:9092
    topics:
      - orders-1
      - items-1
    group_id: kafka
    token_name: kafka_token
    function: connect_input_handle
    workers_count: 1

  - name: kafka-2
    type: kafka
    brokers:
      - localhost:9092
    topics:
      - orders-2
      - items-2
    group_id: kafka
    token_name: kafka_token
    function: connect_input_handle
    workers_count: 1

Особые параметры коннектора типа file

  • format – формат файла для чтения данных: csv или jsonl.

  • filename – имя файла для чтения данных.

  • workdir – директория для поиска файла.

Пример. Укажем коннектор типа file.

connector:
  input:
    - name: json_importer
      type: file
      format: jsonl
      filename: data.json
      routing_key: input_key
      workdir: .

Настройка output

output – параметры коннекторов для отправки исходящих запросов. У всех типов коннекторов есть общие параметры:

  • name – имя коннектора (произвольное).

  • type – тип коннектора. Поддерживаемые типы:

    • input_processor – для отправки объекта в подсистему input_processor;

    • http – для отправки объекта в формате JSON во внешнюю систему по HTTP;

    • soap – для отправки объекта в формате XML (SOAP) во внешнюю систему по HTTP;

    • kafka – для интеграции с шиной данных Apache Kafka;

    • smtp – для отправки запросов через SMTP-сервер;

    • dummy – для игнорирования объектов: пришедший объект никуда не отправляется.

  • headers – заголовки HTTP, которые при необходимости может установить пользователь при отправке данных во внешнюю систему.

  • is_async – режим работы TDG в качестве producer:

    • true (по умолчанию) – асинхронный режим: подтверждение о доставке сообщения отправляется сразу после того, как сообщение добавлено в очередь на отправку.

    • false – синхронный режим: подтверждение о доставке сообщения отправляется только после того, как сообщение было доставлено.

Помимо этих параметров, у некоторых типов коннекторов есть параметры, специфические только для них.

Особые параметры коннектора типа http

  • url – URL внешней системы, куда отправляется объект.

  • format – формат, в котором отправляется объект. Для коннектора типа http формат – всегда JSON.

  • options – опции параметра opts из встроенного модуля Tarantool http).

Пример. Укажем коннектор типа http.

connector:
    output:
        - name: to_external_http_service
          type: http
          url: http://localhost:8021
          format: json
          options:
            timeout: 5
            keepalive_idle: 60
            keepalive_interval: 60
            verify_host: true
            verify_peer: true
          headers:
            hello: world

Особые параметры коннектора типа soap

  • url – URL внешней системы, куда отправляется объект.

Пример. Укажем коннектор типа soap.

connector:
  output:
    - name: to_external_soap_service
      type: soap
      url: http://localhost:8020/test_soap_endpoint
      headers:
        config_header: header_for_soap

Особые параметры коннектора типа kafka

  • brokers – адреса (URL) брокеров сообщений.

  • topic – топик (topic) в терминологии Kafka.

  • format – формат, в котором отправляется сообщение в Kafka. Возможные значения: json и plain. Значение plain может применяться для случая, когда необходимо отправить в Kafka сообщение в формате XML. Значение по умолчанию: json.

  • options – опции библиотеки librdkafka:

    • queue.buffering.max.ms (integer) – задержка в миллисекундах. Используется, чтобы дождаться, пока накопятся сообщения в очереди продюсера, прежде чем будут созданы пакеты сообщений для отправки брокерам. Значение по умолчанию: 5.

    • statistics.interval.ms (integer) – интервал обновления метрик в миллисекундах. Значение по умолчанию: 15000. Диапазон доступных значений: 0–86400000. Значение 0 отключает сбор статистики.

    • debug – выбор режима отладки. Возможные значения: generic, metadata, broker, topic, msg, all. Значение по умолчанию: all.

  • logger – выбор режима для журнала событий:

    • stderr (по умолчанию) – в логи TDG выводятся только сообщения об ошибках.

    • tdg – в TDG отправляются все логи Kafka, включая сообщения об ошибках.

  • set_key – генерация ключа UUID для отправляемого сообщения. Значение по умолчанию: true. При значении false ключ в исходящем сообщении будет пустым.

Примечание

При подключении к Kafka в логах TDG выводятся output-параметры, с которым был создан producer Kafka. В логах указаны значения для всех опций библиотеки librdkafka, в том числе выставленные по умолчанию. Отключить вывод логов нельзя.

Пример вывода:

tdg2                | 2023-03-02 16:17:19.821 [1] main/304/main I> [dcb31ae4-ca99-4b1c-995f-a0dd05194fa9] Kafka producer for "to_kafka" output configuration: ---
tdg2                | ssl.engine.id: dynamic
tdg2                | socket.blocking.max.ms: '1000'
tdg2                | message.max.bytes: '1000000'
tdg2                | connections.max.idle.ms: '0'
tdg2                | enable_sasl_queue: 'false'
tdg2                | batch.size: '1000000'
...

Пример. Укажем коннектор типа kafka.

connector:
  output:
    - name: to_kafka
      type: kafka
      brokers:
        - localhost:9092
      topic: objects
      options:
        debug: "all"
        queue.buffering.max.ms: 1
      is_async: true

Особые параметры коннектора типа smtp

  • url – URL сервера SMTP.

  • from – имя отправителя, которое будет показываться в поле From при получении сообщения.

  • subject – тема отправляемого сообщения.

  • timeout – тайм-аут запроса к серверу SMTP. Указывается в секундах.

  • ssl_cert – путь к SSL-сертификату.

  • ssl_key – путь к приватному ключу для SSL-сертификата.

Пример. Укажем коннектор типа smtp.

connector:
  output:
    - name: to_smtp
      type: smtp
      url: localhost:2525
      from: tdg@localhost
      subject: TDG_Objects
      timeout: 5
      ssl_cert: ssl.crt
      ssl_key: ssl.pem

Настройка routing

routing – маршрутизация объекта для отправки в систему, которая определяется в поле output в зависимости от ключа маршрутизации key. Ключ маршрутизации объект получает в результате обработки функцией, указанной в input. Если в input функция не указана, объект обрабатывается функцией по умолчанию, которая превращает объект вида { "obj_type": { "field_1": 1 } } в объект вида { "field_1": 1 } и задает значение ключа маршрутизации как routing_key = obj_type.

Пример конфигурации:

connector:
  routing:
    - key: input_processor
      output: to_input_processor

    - key: external_http_service
      output: to_external_http_service

    - key: external_soap_service
      output: to_external_soap_service

Пример настройки бизнес-логики

connector:
  input:
    - name: http
      type: http
      routing_key: input_key

  output:
    - name: http_output
      type: http
      url: http://localhost:{HTTP_PORT}
      format: json
      headers:
        hello: world

    - name: invalid_http_output
      type: http
      url: http://localhost:8085
      format: json

    - name: soap_output
      type: soap
      url: http://localhost:{HTTP_PORT}
      headers:
        config_header: header_for_soap

input_processor:
  handlers:
    - key: input_key
      function: classifier.call

  storage:
    - key: test_object_1
      type: TestObject

    - key: test_object_2
      type: TestObject

    - key: test_object_3
      type: TestObject

    - key: soap_object
      type: TestObject

    - key: updatable_object
      type: UpdatableObject

    - key: object_with_entity
      type: ObjectWithEntity

    - key: bad_object
      type: BadObject

repair_queue:
  on_object_added:
    bad_object:
      postprocess_with_routing_key: bad_object_failed

    __unclassified__:
      postprocess_with_routing_key: unclassified

output_processor:
  test_object_1:
    handlers:
      - function: output_processor_handler.call
        outputs:
          - http_output

  test_object_2:
    handlers:
      - function: output_processor_handler.call
        outputs:
          - invalid_http_output

  test_object_3:
    handlers:
      - function: bad_output_processor_handler.call
        outputs:
          - invalid_http_output

  updatable_object:
    handlers:
      - function: output_processor_handler.call
        outputs:
          - http_output

  bad_object_failed:
    handlers:
      - function: output_processor_handler.call
        outputs:
          - http_output

  object_with_entity:
    handlers:
      - function: output_processor_handler.call
        outputs:
          - http_output

  soap_object:
    handlers:
      - function: soap_processor.call
        outputs:
          - soap_output

  unclassified:
    handlers:
      - function: output_processor_handler.call
        outputs:
          - http_output
Нашли ответ на свой вопрос?
Обратная связь