VK Docs logo
Помощь
Обновлена 30 декабря 2025 г. в 06:16

Справочники

Документ рассматривает параметры конфигурации, метрики и функции API для Tarantool Queue Enterprise, функциональность MQ (далее по тексту – TQE(MQ)):

Конфигурация

Конфигурация модуля API

Пример YAML-файла конфигурации модуля API:

app_name: MESSAGE_QUEUE_EE_APIapp_version: 1.0core_host: 0.0.0.0core_port: 18184grpc_listen:  - uri: 'tcp://0.0.0.0:18182'producer:  enabled: true  tarantool:    user: user    pass: pass    connections:      routers:        - "localhost:3301"    queues:      queue:        connections:          routers:            - "localhost:3302"consumer:  enabled: true  polling_timeout: 500ms  tarantool:    user: user    pass: pass    connections:      storage-1:        - "localhost:3301"    queues:      queue:        connections:          storage-1:            - "localhost:3302"log:  file: log.jsonl  format: json  level: info

Основные параметры

Основные параметры обеспечивают настройку адресов и портов подключения модуля API:

app_name: MESSAGE_QUEUE_EE_APIapp_version: 1.0core_host: 0.0.0.0core_port: 18184grpc_listen:  - uri: 'tcp://0.0.0.0:18182'grpc_host: 0.0.0.0grpc_port: 18182grpc_options:  initial_conn_window_size: 65535  initial_window_size: 65535  header_table_size: 16777216  max_header_list_size: 128  max_concurrent_streams: 128  num_stream_workers: 0  max_recv_msg_size: 4294967296  max_send_msg_size: 2147483647  read_buffer_size: 262144  write_buffer_size: 262144  shared_write_buffer: false

где:

  • app_name - название приложения;
  • app_version - версия приложения;
  • core_host - адрес сбора метрик и проверки состояния модуля API;
  • core_port - порт сбора метрик и проверки состояния модуля API;
  • grpc_options - набор опций для конфигурации grpc-сервера (подробнее здесь). Опции не изменяются после запуска grpc-сервераы:
    • initial_conn_window_size - начальное окно в байтах http2-соединения, по умолчанию 64kb;
    • initial_window_size - начальное окно http2-стрима в байта, по умолчанию 64kb;
    • header_table_size - размер динамической таблицы с заголовками http2, по умолчанию не задано (динамическая таблица не используется);
    • max_header_list_size - максимальный размер таблицы с заголовками http2, по умолчанию не задано;
    • max_concurrent_streams - количество одновременных http-стримов, по умолчанию 128;
    • num_stream_workers - количество воркеров для обработки входящих запросов, по умолчанию 0 (под каждый http2-стрим создается отдельная горутина);
    • max_recv_msg_size - максимальный размер входящего сообщения в байтах, по умолчанию 4mb;
    • max_send_msg_size - максимальный размер исходящего сообщения в байтах, по умолчанию 2mb;
    • read_buffer_size - размер буфер на чтение, по умолчанию 256kb;
    • write_buffer_size - размер буфер на запись, по умолчанию 256kb;
    • shared_write_buffer - позволяет переиспользовать буфер на запись, а не создавать под каждое подключение, по умолчанию false;
  • grpc_listen - набор интрефейсов, на которых обслуживает gRPC-сервер;
    • uri - адрес прослушивания, например: unix:///tmp/tqe-mq.sock или tcp://localhost:18182
  • grpc_host - (устарело после введения grpc_listen) адрес модуля API;
  • grpc_port - (устарело после введения grpc_listen) порт модуля API.

producer

Параметр producer модуля API обеспечивает настройку параметров публикации.

producer:  enabled: true  tarantool:    user: user    pass: pass    queues:      queue:        connections:          routers:            - "localhost:3301"

Доступные опции параметра producer:

  • enabled - параметр доступности сервиса:
    • true доступен,
    • false - нет;
  • tarantool - секция конфигурации доступа к ядру TQE(MQ):
    • user - логин доступа к очереди (указывается в секции creds конфигурации ядра TQE(MQ));
    • pass - пароль доступа к очереди (указывается в секции creds конфигурации ядра TQE(MQ));
    • connections - секция адресов роутеров (маршрутизаторов) для публикации сообщений.
    • replicasets - список алиасов репликасетов (whitelist), на которые будут публиковаться сообщения.
    • queues - секция доступных очередей ядра TQE(MQ);
      • connections - секция адресов роутеров (маршрутизаторов) для публикации сообщений.

consumer

Параметр consumer модуля API обеспечивает настройку параметров подписки на очередь сообщений.

consumer:  enabled: true  polling_timeout: 500ms  tarantool:    user: user    pass: pass    queues:      queue:        connections:          storage-1:            - "localhost:3301"

Доступные опции параметра consumer:

  • enabled - параметр доступности сервиса:
    • true - доступен,
    • false - нет;
  • polling_timeout - время задержки между запросами новых сообщений, пример: 500ms;
  • tarantool - секция конфигурации доступа к очереди TQE MQ:
    • user - логин доступа к очереди (указывается в секции creds конфигурации ядра TQE(MQ));
    • pass - пароль доступа к очереди (указывается в секции creds конфигурации ядра TQE(MQ));
    • connections - секция адресов хранилищ для подписки на сообщения.
    • queues - секция доступных очередей TQE(MQ);
      • connections - секция адресов хранилищ для подписки на сообщения.

log

Параметр log модуля API обеспечивает настройку параметров журналирования.

log:  file: log.jsonl  format: json  level: info

Доступные опции параметра log:

  • file - имя файла записи журнала (может содержать абсолютный или относительный путь, по умолчанию /dev/stderr либо в файл server.jsonl, при запуске модуля API в фоновом режиме с опцией -d);
  • format - формат вывода сообщений, принимаемые значения: text|json (по умолчанию text);
  • level - уровень журналирования, принимаемые значения: debug|info|warn|error|dpanic(паника в режиме разработки)|panic(паника)|fatal (по умолчанию info).

Конфигурация ядра

Конфигурация ядра Tarantool 3 происходит с помощью обновления файла конфигурации или конфигурации в etcd/Tarantool Config Storage. Подробнее про конфигурацию в Tarantool 3 можно прочитать здесь

Пример YAML-файла конфигурации модуля ядра:

credentials:  users:    user:      roles: [super]      password: passroles_cfg:  app.roles.queue:    features:      metrics_enabled: true      validation_enabled: true    queues:      - name: queue      - name: another_queue        deduplication_mode: basic      - name: archive_queue        storage: disk      - name: queue_disabled_indexe        disabled_filters_by: [routing_key, sharding_key]  app.roles.api:    sharding:      routing:        core-1:            buckets:            - 1            - [2,10]        core-2:            buckets:            - [11,20]        core-3:            buckets:            - [21,1000]

Секция features

Секция features отвечает за настройку параметров функций ядра. Настраивается на уровне ролей app.roles.queue и app.roles.api.

roles_cfg:  app.roles.queue:    features:      metrics_enabled: true      validation_enabled: false  app.roles.api:    features:      metrics_enabled: false      validation_enabled: true

где:

  • metrics_enabled - включение/выключение сбора метрик для методов API ядра. По умолчанию true;
  • validation_enabled - включение/выключение валидации сообщений для методов API ядра. По умолчанию true.

Секция queues

Секция queues отвечает за описание используемых очередей сообщений и настройку их параметров. Настраивается на уровне роли app.roles.queue. Имеет следующую структуру:

roles_cfg:  app.roles.queue:      queues:        - name: some-queue          latency: 1          disabled_filters_by: [sharding_key]      deduplication_mode: basic        - name: other-queue

где:

  • name - название очереди сообщений.
  • latency - задержка в миллисекундах между оповещениями подписчика о новых сообщениях. По умолчанию – 1.
  • deduplication_mode - режим дедупликации (т.н. режим устранения избыточности, дублирования данных). Может принимать значения basic, extended, keep_latest, keep_first. По умолчанию – basic.
  • poll_max_batch – максимальное количество сообщений, которое одно ядро вернет за один запрос подписки. По умолчанию – 100.
  • storage – движок хранения очереди. Допустимые значения: memory для использования memtx, disk для использования vinyl. По умолчанию - memory.
  • poll_yield_every – период (в сообщениях), с которым обработчик запроса будет передавать управление другим обработчикам. По умолчанию – 512.
  • disabled_filters_by - список отключенных фильтров. Отключение фильтра делает невозможным подписку с фильтрацией по указанному полю. Изменение этой опции после создания очереди позволяет включать фильтраю по полю при его удаление из списка, но не позволяет отключать ее. По умолчанию - [] (фильтрация включена по всем полям). Возможные значения - routing_key, sharding_key.

Также существует другой вариант настройки параметров секции queues, который имеет следующую структуру:

roles_cfg:  app.roles.queue:    queues: ["some-queue", "other-queue"]

Секция sharding

Секция sharding отвечает за описание статического шардинга. Настраивается на уровне роли app.roles.api:

roles_cfg:  app.roles.api:    sharding:      routing:        core-1:          buckets:            - 1            - [2,300]        core-2:          buckets:            - [301,500]        core-3:          buckets:            - [501,1000]

где:

  • core-1 - псевдоним набора реплик (replica set) из топологии;
  • buckets - диапазон обслуживаемых бакетов. Может принимать массив значений, где каждое значение - либо id одного бакета, либо диапазон бакетов "от и до".

Метрики

Контроль работоспособности модулей выполняется на основании проверки метрик производительности.

Метрики модулей предоставляются в формате prometheus.

Метрики модуля API

Метрики модуля API доступны на HTTP-эндпоинте /metrics.

Пример частичного вывода метрик модуля API:

# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.# TYPE go_gc_duration_seconds summarygo_gc_duration_seconds{quantile="0"} 5.3002e-05go_gc_duration_seconds{quantile="0.25"} 7.574e-05go_gc_duration_seconds{quantile="0.5"} 9.047e-05go_gc_duration_seconds{quantile="0.75"} 0.000111856go_gc_duration_seconds{quantile="1"} 0.000338263go_gc_duration_seconds_sum 1.305067544go_gc_duration_seconds_count 11608...

Основный метрики модуля API предоставляют следующую диагностическую информацию:

  • mqee_grpc_publishing_request_messages - гистограмма опубликованных сообщений в очередь:
    • grpc_method - название удаленной процедуры;
    • queue - название очереди;
    • routing_key - значение routing_key, по которому будет произведена публикация ("" для batch-запросов);
    • result - "success", если сообщения успешно опубликованны, иначе - "fail".
  • mqee_grpc_requests_size_bytes_total - счетчик общего размера запросов в байтах:
    • grpc_method - название удаленной процедуры;
    • queue - название очереди;
    • routing_key - routing_key обрабатываемоего сообщения ("" для batch-запросов).
  • mqee_grpc_subscribers_count - индикатор текущего количества подписчиков:
    • queue - название очереди;
    • routing_key - значение routing_key, по которому была произведена подписка.
  • mqee_grpc_app_info - информация об очереди (mqee_grpc_app_info{app_name="MESSAGE_QUEUE_EE_API",app_version="test"} 1);
  • mqee_grpc_gomaxprocs - индикатор текущего значения GOMAXPROCS.
  • mqee_grpc_tuples_received - гистограмма количества сообщений, которые были получены от ядра или из кэша; включает в себя mqee_grpc_tuples_received_sum (суммарное количество полученных сообщений) и mqee_grpc_tuples_received_count (число запросов на получение сообщений в ядро или кэш):
    • queue - название очереди;
    • routing_key - значение routing_key, по которому была произведена подписка.
  • mqee_grpc_sent_messages - гистограмма количества сообщений, отправленных потребителям gRPC-сервером; включает в себя mqee_grpc_sent_messages_sum (суммарное количество отправленных сообщений) и mqee_grpc_sent_messages_count (число отправленных ответов):
    • queue - название очереди;
    • routing_key - значение routing_key, по которому была произведена подписка.
  • grpc_server_started_total - счетчик полученных запросов на удаленную процедуру:
    • grpc_type - тип подключения (unary, client_stream, server_stream, bidi_stream);
    • grpc_service - название gRPC-сервиса;
    • grpc_method - название процедуры:
      • Publish;
      • PublishBatch;
      • ServerReflectionInfo;
      • Subscribe.
  • grpc_server_handled_total - счетчик завершенных удаленных процедур:
    • grpc_type - тип подключения (unary, client_stream, server_stream, bidi_stream):
    • grpc_service - название gRPC-сервиса;
    • grpc_method - название процедуры (см. список в grpc_server_started_total).
    • grpc_code - код ответа gRPC-код.
  • grpc_server_msg_received_total - счетчик полученных сообщений:
    • grpc_type - тип подключения(unary, client_stream, server_stream, bidi_stream);
    • grpc_service - название gRPC-сервиса;
    • grpc_method - название процедуры (см. список в grpc_server_started_total).
  • grpc_server_msg_sent_total - счетчик отправленных сообщений:
    • grpc_type - тип подключения(unary, client_stream, server_stream, bidi_stream);
    • grpc_service - название gRPC-сервиса;
    • grpc_method - название процедуры (см. список в grpc_server_started_total).
  • grpc_server_handling_seconds - гистограмма секунд на обработку gRPC-вызова:
    • grpc_type - тип подключения(unary, client_stream, server_stream, bidi_stream);
    • grpc_service - название gRPC-сервиса;
    • grpc_method - название процедуры (см. список в grpc_server_started_total).
  • go_gc_duration_seconds - сводная информация по длительности паузы на цикл сборки мусора окружения Go. Используемый диапазон в секундах: 0, 0.25, 0.5, 0.75, 1;
  • go_goroutines - индикатор текущего количества goroutine;
  • go_info - информация о версии окружения Go;
  • go_memstats_* - набор метрик для отслеживания использования памяти средой выполнения Go;
  • go_threads - индикатор текущего количества потоков (threads) операционной системы, используемых средой выполнения Go.
  • mqee_grpc_processing_time - время на выполнения стадии обработки сообщения в модуле API:
    • point - стадия обработки сообщений, может принимать следующие значения:
      • "produce-request-received";
      • "broadcast-request-received";
      • "produce-request-validated";
      • "broadcast-request-validated";
      • "produce-request-persisted";
      • "broadcast-request-persisted";
      • "subscribe-notifications-sent";
    • queue - название очереди;
    • routing_key - routing_key обрабатываемоего сообщения (пустая строка, если сообщение без routing_key);
    • result - "success" если стадия завершилась успешно, "fail" если с ошибкой.

Метрики ядра TQE(MQ)

Метрики ядра TQE(MQ) доступны на HTTP-эндпоинте /metrics на каждом экземпляре Tarantool.

Пример частичного вывода метрик модуля ядра:

# HELP tnt_vinyl_disk_index_size Amount of index stored in files# TYPE tnt_vinyl_disk_index_size gaugetnt_vinyl_disk_index_size{alias="app"} 0# HELP tnt_read_only Is instance read only# TYPE tnt_read_only gaugetnt_read_only{alias="app"} 0# HELP tnt_vinyl_disk_data_size Amount of data stored in files# TYPE tnt_vinyl_disk_data_size gaugetnt_vinyl_disk_data_size{alias="app"} 0...

С полным списком встроенных метрик можно ознакомиться в справочнике метрик Tarantool

Метрики модуля ядра предоставляют следующую диагностическую информацию:

  • mqee_tnt_latency_seconds - время на выполнения стадии обработки сообщения в модуле ядра:
    • point - стадия обработки сообщений, может принимать следующие значения:
      • "batch-publish-request-tnt-storage-persisted";
      • "broadcast-request-tnt-storage-persisted";
    • queue - название очереди;
    • routing_key - routing_key обрабатываемоего сообщения ("" для batch-запросов);
    • result - "success" если стадия завершилась успешно, "fail" если с ошибкой.
  • mqee_tnt_pollers_total - индикатор текущего количества поллеров:
    • queue - название очереди;

API

В этом разделе описана спецификация API на основе протокола gRPC.

Примеры создания gRPC-клиента и выполнения запросов к очереди приведены в документах:

Producer

Сервер публикации сообщений брокера очередей.

Method Name
Request Type
Response Type
Description
Produce
Публикация группы сообщений в очередь

ProduceRequest

Запрос на публикацию группы сообщений в очередь.

Field
Type
Label
Description
queue
string
Название очереди, в которой необходимо опубликовать сообщения
sharding_key
string
optional
Ключ шардирования. Необходим для распределения данных в системе
messages
repeated
Набор сообщений
metadata
repeated
Произвольные данные в формате списка из пар ключ-значение.

ProduceMessage

Группа сообщений.

Field
Type
Label
Description
routing_key
string
optional
Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди на консьюмерах
deduplication_key
string
optional
Ключ дедупликации. Необходим для проверки повторных сообщений. Если не указан, то проверка не производится
payload
bytes
Произвольные данные в бинарном формате (тело сообщения)
metadata
repeated
Произвольные данные в формате списка из пар ключ-значение.

ProduceResponse

Ответ на публикацию группы сообщений.

Field
Type
Label
Description
ids
uint64
repeated
Идентификаторы сообщений
is_duplicates
bool
repeated
Флаги наличия дубликатов

BroadcastRequest

Запрос на рассылку сообщения на указанные шарды.

Field
Type
Label
Description
queue
string
Название очереди, в которую необходимо опубликовать сообщение
routing_key
string
optional
Ключ маршрутизации сообщения (тип сообщения) необходим для фильтрации сообщений из очереди на консьюмерах
deduplication_key
string
optional
Ключ дедупликации необходим для проверки повторных сообщений, если не указан, то проверка не производится
payload
bytes
Произвольные данные в бинарном формате, содержит тело сообщения
metadata
repeated
Произвольные данные в формате списка из пар ключ-значение.
replicasets
string
repeated
Список с названиями репликасетов, на которые нужно опубликовать сообщение. По умолчанию рассылка происходит на все шарды.
timeout
uint64
optional
Максимальное время на рассылку сообщения

BroadcastResponse

Ответ на рассылку сообщения.

Field
Type
Label
Description
code
uint32
Код завершения рассылки: 0 - Успешная публикация 1 - Ошибка на роутере 2 - Ошибка на репликасете
error
string
optional
Сообщение об ошибке
replicasets
repeated
Набор ответов с шардов

BroadcastResponse.ReplicasetsEntry

Field
Type
Label
Description
key
value

ReplicasetResponse

Ответ репликасета на публикацию сообщения.

Field
Type
Label
Description
success
Сообщение об успешной публикации
error
Сообщение об ошибке публикации

Success

Сообщение об успешной публикации.

Field
Type
Label
Description
id
uint64
Идентификатор сообщения добавленного в очередь

Error

Сообщение об ошибке публикации.

Field
Type
Label
Description
code
uint32
Код ошибки
message
string
Сообщение об ошибке

ConsumerService

Сервер подписок на сообщения брокера очередей.

Method Name
Request Type
Response Type
Description
Subscribe
Подписка на сообщения с фильтром

SubscriptionStreamRequest

Поток запросов подписки.

Field
Type
Label
Description
subscribe_request
Запрос на подписку
commit_request
Запрос на обновление состояния подписки.

SubscriptionRequest

Запрос на подписку.

Field
Type
Label
Description
queue
string
Название очереди
routing_key
string
optional
Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди. Если не указан, то подписка происходит на все типы сообщений в очереди
cursor
string
optional
Опциональная строка указатель на последнее полученное сообщение. Необходим для возможности получения истории сообщений или восстановления работы консьюмера после сбоя. Если значение не указано -- подписка с текущего момента. Значение как пустая строка -- подписка с начала очереди. Значение указано -- подписка с указанного сообщения в очереди
sharding_key
string
optional
Ключ шардирования. Необходим для распределения данных в системе. Если не указан, то подписка происходит на все типы сообщений в очереди
sharding_keys
string
repeated
Ключи шардирования позволяют производить фильтрацию по нескольким ключам шардирования в рамках одной подписки
consume_id
string
optional
Уникальный ключ идентификации подписки. По значению этого параметра система может создать персистентную подписку и восстановить сохраненное состояние персистентной подписки, чтобы возобновить передачу сообщений с последней сохраненной позиции
ttl
float
Время жизни состояния подписки.

CommitRequest

Запрос на обновление состояния подписки.

Field
Type
Label
Description
cursor
string
Указатель на последнее обработанное сообщение в очереди

SubscriptionStreamResponse

Поток ответов подписки.

Field
Type
Label
Description
notifications
Сообщения в стриме подписки
commit_response
Ответ на обновление состояния подписки

SubscriptionNotifications

Сообщение в стриме подписки.

Field
Type
Label
Description
notifications
repeated
Новые сообщения в очереди с курсорами

SubscriptionNotification

Уведомление клиента о новых сообщениях в очереди.

Field
Type
Label
Description
cursor
string
Строка-указатель сообщения
message
Сообщение

QueueMessage

Сообщение в очереди.

Field
Type
Label
Description
id
uint64
Идентификатор сообщения. Заполняется автоматически при записи сообщения в очередь
queue
string
Название очереди, в которую необходимо опубликовать сообщение
routing_key
string
optional
Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди на консьюмерах
sharding_key
string
optional
Ключ шардирования. Необходим для распределения данных в системе
deduplication_key
string
optional
Ключ дедупликации. Необходим для проверки повторных сообщений. Если не указан, то проверка не производится
payload
bytes
Произвольные данные в бинарном формате (тело сообщения)
metadata
repeated
Произвольные данные в формате списка из пар ключ-значение.
timestamp
int64
Время вставки сообщения в очередь в наносекундах

Pair

Пара ключ-значение.

Field
Type
Label
Description
key
string
Ключ пары
value
string
Значение пары

CommitResponse

Ответ на обновление состояния подписки.

Field
Type
Label
Description
cursor
string
Обновленная строка-указатель сообщения.