Справочники
Документ рассматривает параметры конфигурации, метрики и функции API для Tarantool Queue Enterprise, функциональность MQ (далее по тексту – TQE(MQ)):
Пример YAML-файла конфигурации модуля API:
app_name: MESSAGE_QUEUE_EE_APIapp_version: 1.0core_host: 0.0.0.0core_port: 18184grpc_listen:- uri: 'tcp://0.0.0.0:18182'producer:enabled: truetarantool:user: userpass: passconnections:routers:- "localhost:3301"queues:queue:connections:routers:- "localhost:3302"consumer:enabled: truepolling_timeout: 500mstarantool:user: userpass: passconnections:storage-1:- "localhost:3301"queues:queue:connections:storage-1:- "localhost:3302"log:file: log.jsonlformat: jsonlevel: info
Основные параметры обеспечивают настройку адресов и портов подключения модуля API:
app_name: MESSAGE_QUEUE_EE_APIapp_version: 1.0core_host: 0.0.0.0core_port: 18184grpc_listen:- uri: 'tcp://0.0.0.0:18182'grpc_host: 0.0.0.0grpc_port: 18182grpc_options:initial_conn_window_size: 65535initial_window_size: 65535header_table_size: 16777216max_header_list_size: 128max_concurrent_streams: 128num_stream_workers: 0max_recv_msg_size: 4294967296max_send_msg_size: 2147483647read_buffer_size: 262144write_buffer_size: 262144shared_write_buffer: false
где:
app_name- название приложения;app_version- версия приложения;core_host- адрес сбора метрик и проверки состояния модуля API;core_port- порт сбора метрик и проверки состояния модуля API;grpc_options- набор опций для конфигурации grpc-сервера (подробнее здесь). Опции не изменяются после запуска grpc-сервераы:initial_conn_window_size- начальное окно в байтах http2-соединения, по умолчанию 64kb;initial_window_size- начальное окно http2-стрима в байта, по умолчанию 64kb;header_table_size- размер динамической таблицы с заголовками http2, по умолчанию не задано (динамическая таблица не используется);max_header_list_size- максимальный размер таблицы с заголовками http2, по умолчанию не задано;max_concurrent_streams- количество одновременных http-стримов, по умолчанию 128;num_stream_workers- количество воркеров для обработки входящих запросов, по умолчанию 0 (под каждый http2-стрим создается отдельная горутина);max_recv_msg_size- максимальный размер входящего сообщения в байтах, по умолчанию 4mb;max_send_msg_size- максимальный размер исходящего сообщения в байтах, по умолчанию 2mb;read_buffer_size- размер буфер на чтение, по умолчанию 256kb;write_buffer_size- размер буфер на запись, по умолчанию 256kb;shared_write_buffer- позволяет переиспользовать буфер на запись, а не создавать под каждое подключение, по умолчаниюfalse;
grpc_listen- набор интрефейсов, на которых обслуживает gRPC-сервер;uri- адрес прослушивания, например:unix:///tmp/tqe-mq.sockилиtcp://localhost:18182
grpc_host- (устарело после введенияgrpc_listen) адрес модуля API;grpc_port- (устарело после введенияgrpc_listen) порт модуля API.
Параметр producer модуля API обеспечивает настройку параметров публикации.
producer:enabled: truetarantool:user: userpass: passqueues:queue:connections:routers:- "localhost:3301"
Доступные опции параметра producer:
enabled- параметр доступности сервиса:trueдоступен,false- нет;
tarantool- секция конфигурации доступа к ядру TQE(MQ):user- логин доступа к очереди (указывается в секцииcredsконфигурации ядра TQE(MQ));pass- пароль доступа к очереди (указывается в секцииcredsконфигурации ядра TQE(MQ));connections- секция адресов роутеров (маршрутизаторов) для публикации сообщений.replicasets- список алиасов репликасетов (whitelist), на которые будут публиковаться сообщения.queues- секция доступных очередей ядра TQE(MQ);connections- секция адресов роутеров (маршрутизаторов) для публикации сообщений.
Параметр consumer модуля API обеспечивает настройку параметров подписки на очередь сообщений.
consumer:enabled: truepolling_timeout: 500mstarantool:user: userpass: passqueues:queue:connections:storage-1:- "localhost:3301"
Доступные опции параметра consumer:
enabled- параметр доступности сервиса:true- доступен,false- нет;
polling_timeout- время задержки между запросами новых сообщений, пример: 500ms;tarantool- секция конфигурации доступа к очереди TQE MQ:user- логин доступа к очереди (указывается в секцииcredsконфигурации ядра TQE(MQ));pass- пароль доступа к очереди (указывается в секцииcredsконфигурации ядра TQE(MQ));connections- секция адресов хранилищ для подписки на сообщения.queues- секция доступных очередей TQE(MQ);connections- секция адресов хранилищ для подписки на сообщения.
Параметр log модуля API обеспечивает настройку параметров журналирования.
log:file: log.jsonlformat: jsonlevel: info
Доступные опции параметра log:
file- имя файла записи журнала (может содержать абсолютный или относительный путь, по умолчанию /dev/stderr либо в файл server.jsonl, при запуске модуля API в фоновом режиме с опцией-d);format- формат вывода сообщений, принимаемые значения: text|json (по умолчанию text);level- уровень журналирования, принимаемые значения: debug|info|warn|error|dpanic(паника в режиме разработки)|panic(паника)|fatal (по умолчанию info).
Конфигурация ядра Tarantool 3 происходит с помощью обновления файла конфигурации или конфигурации в etcd/Tarantool Config Storage. Подробнее про конфигурацию в Tarantool 3 можно прочитать здесь
Пример YAML-файла конфигурации модуля ядра:
credentials:users:user:roles: [super]password: passroles_cfg:app.roles.queue:features:metrics_enabled: truevalidation_enabled: truequeues:- name: queue- name: another_queuededuplication_mode: basic- name: archive_queuestorage: disk- name: queue_disabled_indexedisabled_filters_by: [routing_key, sharding_key]app.roles.api:sharding:routing:core-1:buckets:- 1- [2,10]core-2:buckets:- [11,20]core-3:buckets:- [21,1000]
Секция features отвечает за настройку параметров функций ядра. Настраивается на уровне ролей app.roles.queue и app.roles.api.
roles_cfg:app.roles.queue:features:metrics_enabled: truevalidation_enabled: falseapp.roles.api:features:metrics_enabled: falsevalidation_enabled: true
где:
metrics_enabled- включение/выключение сбора метрик для методов API ядра. По умолчаниюtrue;validation_enabled- включение/выключение валидации сообщений для методов API ядра. По умолчаниюtrue.
Секция queues отвечает за описание используемых очередей сообщений и настройку их параметров. Настраивается на уровне роли app.roles.queue. Имеет следующую структуру:
roles_cfg:app.roles.queue:queues:- name: some-queuelatency: 1disabled_filters_by: [sharding_key]deduplication_mode: basic- name: other-queue
где:
name- название очереди сообщений.latency- задержка в миллисекундах между оповещениями подписчика о новых сообщениях. По умолчанию –1.deduplication_mode- режим дедупликации (т.н. режим устранения избыточности, дублирования данных). Может принимать значенияbasic,extended,keep_latest,keep_first. По умолчанию –basic.poll_max_batch– максимальное количество сообщений, которое одно ядро вернет за один запрос подписки. По умолчанию –100.storage– движок хранения очереди. Допустимые значения:memoryдля использования memtx,diskдля использования vinyl. По умолчанию -memory.poll_yield_every– период (в сообщениях), с которым обработчик запроса будет передавать управление другим обработчикам. По умолчанию –512.disabled_filters_by- список отключенных фильтров. Отключение фильтра делает невозможным подписку с фильтрацией по указанному полю. Изменение этой опции после создания очереди позволяет включать фильтраю по полю при его удаление из списка, но не позволяет отключать ее. По умолчанию - [] (фильтрация включена по всем полям). Возможные значения -routing_key,sharding_key.
Также существует другой вариант настройки параметров секции queues, который имеет следующую структуру:
roles_cfg:app.roles.queue:queues: ["some-queue", "other-queue"]
Секция sharding отвечает за описание статического шардинга. Настраивается на уровне роли app.roles.api:
roles_cfg:app.roles.api:sharding:routing:core-1:buckets:- 1- [2,300]core-2:buckets:- [301,500]core-3:buckets:- [501,1000]
где:
core-1- псевдоним набора реплик (replica set) из топологии;buckets- диапазон обслуживаемых бакетов. Может принимать массив значений, где каждое значение - либоidодного бакета, либо диапазон бакетов "от и до".
Контроль работоспособности модулей выполняется на основании проверки метрик производительности.
Метрики модулей предоставляются в формате prometheus.
Метрики модуля API доступны на HTTP-эндпоинте /metrics.
Пример частичного вывода метрик модуля API:
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.# TYPE go_gc_duration_seconds summarygo_gc_duration_seconds{quantile="0"} 5.3002e-05go_gc_duration_seconds{quantile="0.25"} 7.574e-05go_gc_duration_seconds{quantile="0.5"} 9.047e-05go_gc_duration_seconds{quantile="0.75"} 0.000111856go_gc_duration_seconds{quantile="1"} 0.000338263go_gc_duration_seconds_sum 1.305067544go_gc_duration_seconds_count 11608...
Основный метрики модуля API предоставляют следующую диагностическую информацию:
- mqee_grpc_publishing_request_messages - гистограмма опубликованных сообщений в очередь:
- grpc_method - название удаленной процедуры;
- queue - название очереди;
- routing_key - значение
routing_key, по которому будет произведена публикация (""для batch-запросов); - result - "success", если сообщения успешно опубликованны, иначе - "fail".
- mqee_grpc_requests_size_bytes_total - счетчик общего размера запросов в байтах:
- grpc_method - название удаленной процедуры;
- queue - название очереди;
- routing_key - routing_key обрабатываемоего сообщения (
""для batch-запросов).
- mqee_grpc_subscribers_count - индикатор текущего количества подписчиков:
- queue - название очереди;
- routing_key - значение
routing_key, по которому была произведена подписка.
- mqee_grpc_app_info - информация об очереди (
mqee_grpc_app_info{app_name="MESSAGE_QUEUE_EE_API",app_version="test"} 1); - mqee_grpc_gomaxprocs - индикатор текущего значения
GOMAXPROCS. - mqee_grpc_tuples_received - гистограмма количества сообщений, которые были получены от ядра или из кэша;
включает в себя
mqee_grpc_tuples_received_sum(суммарное количество полученных сообщений) иmqee_grpc_tuples_received_count(число запросов на получение сообщений в ядро или кэш):- queue - название очереди;
- routing_key - значение
routing_key, по которому была произведена подписка.
- mqee_grpc_sent_messages - гистограмма количества сообщений, отправленных потребителям gRPC-сервером;
включает в себя
mqee_grpc_sent_messages_sum(суммарное количество отправленных сообщений) иmqee_grpc_sent_messages_count(число отправленных ответов):- queue - название очереди;
- routing_key - значение
routing_key, по которому была произведена подписка.
- grpc_server_started_total - счетчик полученных запросов на удаленную процедуру:
- grpc_type - тип подключения (
unary,client_stream,server_stream,bidi_stream); - grpc_service - название gRPC-сервиса;
- grpc_method - название процедуры:
- Publish;
- PublishBatch;
- ServerReflectionInfo;
- Subscribe.
- grpc_type - тип подключения (
- grpc_server_handled_total - счетчик завершенных удаленных процедур:
- grpc_type - тип подключения (
unary,client_stream,server_stream,bidi_stream): - grpc_service - название gRPC-сервиса;
- grpc_method - название процедуры (см. список в grpc_server_started_total).
- grpc_code - код ответа gRPC-код.
- grpc_type - тип подключения (
- grpc_server_msg_received_total - счетчик полученных сообщений:
- grpc_type - тип подключения(
unary,client_stream,server_stream,bidi_stream); - grpc_service - название gRPC-сервиса;
- grpc_method - название процедуры (см. список в grpc_server_started_total).
- grpc_type - тип подключения(
- grpc_server_msg_sent_total - счетчик отправленных сообщений:
- grpc_type - тип подключения(
unary,client_stream,server_stream,bidi_stream); - grpc_service - название gRPC-сервиса;
- grpc_method - название процедуры (см. список в grpc_server_started_total).
- grpc_type - тип подключения(
- grpc_server_handling_seconds - гистограмма секунд на обработку gRPC-вызова:
- grpc_type - тип подключения(
unary,client_stream,server_stream,bidi_stream); - grpc_service - название gRPC-сервиса;
- grpc_method - название процедуры (см. список в grpc_server_started_total).
- grpc_type - тип подключения(
- go_gc_duration_seconds - сводная информация по длительности паузы на цикл сборки мусора окружения Go. Используемый диапазон в секундах: 0, 0.25, 0.5, 0.75, 1;
- go_goroutines - индикатор текущего количества goroutine;
- go_info - информация о версии окружения Go;
- go_memstats_* - набор метрик для отслеживания использования памяти средой выполнения Go;
- go_threads - индикатор текущего количества потоков (threads) операционной системы, используемых средой выполнения Go.
- mqee_grpc_processing_time - время на выполнения стадии обработки сообщения в модуле API:
- point - стадия обработки сообщений, может принимать следующие значения:
- "produce-request-received";
- "broadcast-request-received";
- "produce-request-validated";
- "broadcast-request-validated";
- "produce-request-persisted";
- "broadcast-request-persisted";
- "subscribe-notifications-sent";
- queue - название очереди;
- routing_key - routing_key обрабатываемоего сообщения (пустая строка, если сообщение без routing_key);
- result - "success" если стадия завершилась успешно, "fail" если с ошибкой.
- point - стадия обработки сообщений, может принимать следующие значения:
Метрики ядра TQE(MQ) доступны на HTTP-эндпоинте /metrics на каждом экземпляре Tarantool.
Пример частичного вывода метрик модуля ядра:
# HELP tnt_vinyl_disk_index_size Amount of index stored in files# TYPE tnt_vinyl_disk_index_size gaugetnt_vinyl_disk_index_size{alias="app"} 0# HELP tnt_read_only Is instance read only# TYPE tnt_read_only gaugetnt_read_only{alias="app"} 0# HELP tnt_vinyl_disk_data_size Amount of data stored in files# TYPE tnt_vinyl_disk_data_size gaugetnt_vinyl_disk_data_size{alias="app"} 0...
С полным списком встроенных метрик можно ознакомиться в справочнике метрик Tarantool
Метрики модуля ядра предоставляют следующую диагностическую информацию:
- mqee_tnt_latency_seconds - время на выполнения стадии обработки сообщения в модуле ядра:
- point - стадия обработки сообщений, может принимать следующие значения:
- "batch-publish-request-tnt-storage-persisted";
- "broadcast-request-tnt-storage-persisted";
- queue - название очереди;
- routing_key - routing_key обрабатываемоего сообщения (
""для batch-запросов); - result - "success" если стадия завершилась успешно, "fail" если с ошибкой.
- point - стадия обработки сообщений, может принимать следующие значения:
- mqee_tnt_pollers_total - индикатор текущего количества поллеров:
- queue - название очереди;
В этом разделе описана спецификация API на основе протокола gRPC.
Примеры создания gRPC-клиента и выполнения запросов к очереди приведены в документах:
Сервер публикации сообщений брокера очередей.
Method Name | Request Type | Response Type | Description |
|---|---|---|---|
Produce | Публикация группы сообщений в очередь |
Запрос на публикацию группы сообщений в очередь.
Field | Type | Label | Description |
|---|---|---|---|
queue | string | Название очереди, в которой необходимо опубликовать сообщения | |
sharding_key | string | optional | Ключ шардирования. Необходим для распределения данных в системе |
messages | repeated | Набор сообщений | |
metadata | repeated | Произвольные данные в формате списка из пар ключ-значение. |
Группа сообщений.
Field | Type | Label | Description |
|---|---|---|---|
routing_key | string | optional | Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди на консьюмерах |
deduplication_key | string | optional | Ключ дедупликации. Необходим для проверки повторных сообщений. Если не указан, то проверка не производится |
payload | bytes | Произвольные данные в бинарном формате (тело сообщения) | |
metadata | repeated | Произвольные данные в формате списка из пар ключ-значение. |
Ответ на публикацию группы сообщений.
Field | Type | Label | Description |
|---|---|---|---|
ids | uint64 | repeated | Идентификаторы сообщений |
is_duplicates | bool | repeated | Флаги наличия дубликатов |
Запрос на рассылку сообщения на указанные шарды.
Field | Type | Label | Description |
|---|---|---|---|
queue | string | Название очереди, в которую необходимо опубликовать сообщение | |
routing_key | string | optional | Ключ маршрутизации сообщения (тип сообщения) необходим для фильтрации сообщений из очереди на консьюмерах |
deduplication_key | string | optional | Ключ дедупликации необходим для проверки повторных сообщений, если не указан, то проверка не производится |
payload | bytes | Произвольные данные в бинарном формате, содержит тело сообщения | |
metadata | repeated | Произвольные данные в формате списка из пар ключ-значение. | |
replicasets | string | repeated | Список с названиями репликасетов, на которые нужно опубликовать сообщение. По умолчанию рассылка происходит на все шарды. |
timeout | uint64 | optional | Максимальное время на рассылку сообщения |
Ответ на рассылку сообщения.
Field | Type | Label | Description |
|---|---|---|---|
code | uint32 | Код завершения рассылки: 0 - Успешная публикация 1 - Ошибка на роутере 2 - Ошибка на репликасете | |
error | string | optional | Сообщение об ошибке |
replicasets | repeated | Набор ответов с шардов |
Field | Type | Label | Description |
|---|---|---|---|
key | |||
value |
Ответ репликасета на публикацию сообщения.
Field | Type | Label | Description |
|---|---|---|---|
success | Сообщение об успешной публикации | ||
error | Сообщение об ошибке публикации |
Сообщение об успешной публикации.
Field | Type | Label | Description |
|---|---|---|---|
id | uint64 | Идентификатор сообщения добавленного в очередь |
Сообщение об ошибке публикации.
Field | Type | Label | Description |
|---|---|---|---|
code | uint32 | Код ошибки | |
message | string | Сообщение об ошибке |
Сервер подписок на сообщения брокера очередей.
Method Name | Request Type | Response Type | Description |
|---|---|---|---|
Subscribe | Subscription Notifications stream | Подписка на сообщения с фильтром |
Поток запросов подписки.
Field | Type | Label | Description |
|---|---|---|---|
subscribe_request | Запрос на подписку | ||
commit_request | Запрос на обновление состояния подписки. |
Запрос на подписку.
Field | Type | Label | Description |
|---|---|---|---|
queue | string | Название очереди | |
routing_key | string | optional | Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди. Если не указан, то подписка происходит на все типы сообщений в очереди |
cursor | string | optional | Опциональная строка указатель на последнее полученное сообщение. Необходим для возможности получения истории сообщений или восстановления работы консьюмера после сбоя. Если значение не указано -- подписка с текущего момента. Значение как пустая строка -- подписка с начала очереди. Значение указано -- подписка с указанного сообщения в очереди |
sharding_key | string | optional | Ключ шардирования. Необходим для распределения данных в системе. Если не указан, то подписка происходит на все типы сообщений в очереди |
sharding_keys | string | repeated | Ключи шардирования позволяют производить фильтрацию по нескольким ключам шардирования в рамках одной подписки |
consume_id | string | optional | Уникальный ключ идентификации подписки. По значению этого параметра система может создать персистентную подписку и восстановить сохраненное состояние персистентной подписки, чтобы возобновить передачу сообщений с последней сохраненной позиции |
ttl | float | Время жизни состояния подписки. |
Запрос на обновление состояния подписки.
Field | Type | Label | Description |
|---|---|---|---|
cursor | string | Указатель на последнее обработанное сообщение в очереди |
Поток ответов подписки.
Field | Type | Label | Description |
|---|---|---|---|
notifications | Сообщения в стриме подписки | ||
commit_response | Ответ на обновление состояния подписки |
Сообщение в стриме подписки.
Field | Type | Label | Description |
|---|---|---|---|
notifications | repeated | Новые сообщения в очереди с курсорами |
Уведомление клиента о новых сообщениях в очереди.
Field | Type | Label | Description |
|---|---|---|---|
cursor | string | Строка-указатель сообщения | |
message | Сообщение |
Сообщение в очереди.
Field | Type | Label | Description |
|---|---|---|---|
id | uint64 | Идентификатор сообщения. Заполняется автоматически при записи сообщения в очередь | |
queue | string | Название очереди, в которую необходимо опубликовать сообщение | |
routing_key | string | optional | Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди на консьюмерах |
sharding_key | string | optional | Ключ шардирования. Необходим для распределения данных в системе |
deduplication_key | string | optional | Ключ дедупликации. Необходим для проверки повторных сообщений. Если не указан, то проверка не производится |
payload | bytes | Произвольные данные в бинарном формате (тело сообщения) | |
metadata | repeated | Произвольные данные в формате списка из пар ключ-значение. | |
timestamp | int64 | Время вставки сообщения в очередь в наносекундах |
Пара ключ-значение.
Field | Type | Label | Description |
|---|---|---|---|
key | string | Ключ пары | |
value | string | Значение пары |
Ответ на обновление состояния подписки.
Field | Type | Label | Description |
|---|---|---|---|
cursor | string | Обновленная строка-указатель сообщения. |