Руководство по администрированию
Настояшее Руководство содержит описание и сценарии эксплуатации и администрирования TQE(MQ).
TQE(MQ) выступает в качестве брокера, ответственного за передачу сообщений потребителям. Потребители получают только те сообщения, на которые подписаны. Такой подход снижает ненужный трафик данных, повышает эффективность обработки и обеспечивает максимально высокую пропускную способность.
Каждое сообщение состоит из нескольких полей:
id- идентификатор сообщения. Монотонно возрастающий уникальный ключ, который генерируется мастером набора реплик. При асинхронной репликации монотонность и уникальность поддерживаются только на уровне одной реплики.sharding_key- ключ шардирования; Опциональное поле, если оно указано, то на его основе выбирается соответствующий целевой шард для сообщения. Если оно не указано, то система самостоятельно проставляет ключ с целью равномерной загрузки шарда. При статическом шардированииsharding_keyобязателен.routing_key- ключ фильтрации. Опциональное поле, если оно указано, то очередь будет отвечать на запрос сообщений только сообщениями с указаннымrouting_key.deduplication_key- ключ дедупликации. Опциональное поле, если оно указано, то перед каждой вставкой очередь проверяет наличие в индексе такогоdeduplication_key. По умолчанию очередь отвечает ошибкой, если такой ключ уже существует.payload- содержимое сообщения;metadata- метаданные;timestamp- время вставки.
Система гарантирует строгий порядок сообщений в пределах одного шарда — это значит, что сообщения с одинаковым ключом шардирования (sharding_key) обрабатываются строго друг за другом.
Публикация. На каждое опубликованное сообщение система возвращает
уникальный идентификатор. Чтобы экономить сетевые ресурсы, можно отправлять сообщения пакетами. В этом случае
для всего пакета указывается единый sharding_key, а запись в шард происходит транзакционно. Однако если хотя бы одно
сообщение из пакета не проходит валидацию, то весь пакет не записывается. Система вернет ошибку с указанием идентификатора
первого проблемного сообщения. В случае успеха вы получите список идентификаторов всех сообщений в том же порядке,
в каком они отправлялись в запросе.
Чтение. Читать сообщения можно только в пределах одного шарда. Доступны три режима навигации:
- с самого первого сообщения;
- с указанного курсора;
- с последнего сообщения в очереди.
В ответе на запрос будет содержаться поле cursor - его можно использовать для того, чтобы возобновить чтение с последнего
прочитанного сообщения (например для восстановления подписки).
Если выставить таймаут ожидания polling_timeout (в миллисекундах), очередь будет ждать появления новых сообщений
и ответит сразу, как только они появятся. Это избавляет от пустых опросов, когда сообщений временно нет.
Фильтрация и консистентность. При чтении можно указать routing_key, и очередь вернет только сообщения с этим ключом.
Также поддерживается фильтрация по нескольким sharding_key. Если какая-то реплика отстала от лидера и получила запрос
на чтение с курсора, которого у нее еще нет, она задержит ответ до момента получения актуальных данных
(в пределах указанного таймаута) — так обеспечивается консистентное чтение.
Подписка на сообщения (например, через gRPC) реализуется на уровне коннектора, а не самой очереди.
Шардирование. Система поддерживает два типа шардирования:
- Стандартное шардирование tarantool,
реализованное через
vshard. - Статическое шардирование — его настройка и принципы работы описаны ниже.
Отказоустойчивость. Используется стандартная процедура tarantool без дополнительных модификаций.
Журналирование. Настройка журналов ведется через параметры конфигурационного файла. См. подробнее.
Метрики. Система предоставляет набор метрик для мониторинга состояния. См. подробнее.
Сценарии использования в данном руководстве разделены согласно компонентам TQE(MQ).
Сценарии охватывают следующие роли пользователей TQE(MQ):
- инженер по эксплуатации;
- администратор.
Инженер по эксплуатации выполняет следующие сценарии:
Администратор выполняет следующие сценарии:
- Установка компонентов TQE(MQ);
- Конфигурация компонентов TQE(MQ);
- Резервное копирование;
- Удержание сообщений.
Проверка статуса ядра выполняется с помощью команды:
$ curl localhost:8081/health
В случае штатного режима работы ядра возвращаемый ответ:
app is OK
Проверка статуса модуля API выполняется с помощью команды:
$ curl localhost:18184/readyz
В случае штатного режима работы модуля API возвращаемый ответ:
{"consumer-tarantool": "OK","producer-tarantool": "OK","started": "OK"}
Где:
startedобязательное значение. Указывает на статус работы модуля API. Принимает значения "OK" или текст ошибки.consumer-tarantoolнеобязательное значение. Указывает на статус подключения сервиса подписки на сообщения к ядру TQE(MQ). Принимает значения "OK" или текст ошибки.producer-tarantoolнеобязательное значение. Указывает на статус подключения сервиса публикации сообщений к ядру TQE(MQ). Принимает значения "OK" или текст ошибки.
Встроенный инструментарий TQE(MQ) предоставляет метрики для оценки процесса работы с сообщениями в очередях.
Эти метрики предоставляют диагностическую информацию для использования в стороннем программном обеспечении.
Метрики ядра TQE(MQ) соответствуют стандартному набору метрик tarantool.
С перечнем метрик можно ознакомиться здесь
В дополнение реализованы метрики, специфичные для TQE(MQ):
mqee_tnt_duplicates_total– счетчик дубликатов
Метрики ядра TQE(MQ) доступны по HTTP-адресу:
$ curl localhost:8081/metrics
Ожидаемый ответ содержит перечень метрик, отражающих текущее состояние очереди сообщений TQE(MQ). Пример частичного вывода метрик ядра:
# HELP tnt_vinyl_disk_index_size Amount of index stored in files# TYPE tnt_vinyl_disk_index_size gaugetnt_vinyl_disk_index_size{alias="app"} 0# HELP tnt_read_only Is instance read only# TYPE tnt_read_only gaugetnt_read_only{alias="app"} 0# HELP tnt_vinyl_disk_data_size Amount of data stored in files# TYPE tnt_vinyl_disk_data_size gaugetnt_vinyl_disk_data_size{alias="app"} 0...
Полный список метрик компонента ядра приведен в документе Справочники.
TQE(MQ) использует механизм виртуального шардирования (vshard) для организации распределения данных.
Базовой атомарной единицей в vshard является бакет, логический контейнер данных с уникальным идентификатором.
Если два различных сообщения принадлежат одному бакету, то они гарантированно
хранятся на одном наборе реплик с ролью storage.
В TQE(MQ) доступен режим статического шардирования — это режим ручной настройки карты распределения бакетов по наборам реплик в кластере. Настройка задается с помощью глобальной конфигурации кластера, после чего необходимо выполнить начальную загрузку. В результате в наборах реплик кластера создаются бакеты в соответствии с заданной картой.
{note} После загрузки кластера изменять конфигурацию нельзя. {/note}
Конфигурация статического шардирования осуществляется посредством настройки роли roles.tqe-router в
разделе roles_cfg:
- в разделе
sharding.routingнеобходимо указать карту бакетов в форматеreplicaset-alias:bucket-range;bucket-range- массив с перечислением бакетов или диапазонов бакетов.
Пример:
roles_cfg:roles.tqe-router:sharding:routing:core-1:buckets:- 1 # Можно задать идентификатор бакета точечно.- 2- [3,300] # Можно указать диапазон бакетов.core-2:buckets:- [301,500]core-3:buckets:- [501,1000]
По окончании настройки необходимо выполнить загрузку кластера стандартной командой tarantool:
$ tt replicaset vshard bootstrap .
Метрики модуля API доступны по адресу (endpoint):
$ curl localhost:18184/metrics
Ожидаемый ответ содержит перечень метрик, отражающих текущее состояние модуля API. Пример частичного вывода метрик модуля API:
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.# TYPE go_gc_duration_seconds summarygo_gc_duration_seconds{quantile="0"} 5.3002e-05go_gc_duration_seconds{quantile="0.25"} 7.574e-05go_gc_duration_seconds{quantile="0.5"} 9.047e-05go_gc_duration_seconds{quantile="0.75"} 0.000111856go_gc_duration_seconds{quantile="1"} 0.000338263go_gc_duration_seconds_sum 1.305067544go_gc_duration_seconds_count 11608...
Полный список метрик компонента модуля API приведен в документе Справочники.
Для поддержки мониторинга TQE(MQ) можно использовать базовый дашборд Tarantool Queue Enterprise (MQ) Overview.
Дашборд находится в архиве поставки проекта в директории ./monitoring. Для запуска дашборда необходимо выполнить команду:
docker compose -f ./monitoring/docker-compose.yml up -d
В базовый дашборд включены следующие группы панелей:
- gRPC server queue info - общее состояние очереди: количество сообщений, объем в байтах;
- Tarantool queue info - состояние очереди со стороны Tarantool: число задач в спейсе, время жизни сообщений, данные о дедупликации;
- gRPC server queue detailed info - детализация по каждой очереди: глубина, скорость приема и выдачи, задержка операций;
- gRPC server requests - интенсивность и длительность gRPC-вызовов, распределение по статусам и методам;
- gRPC server Go runtime - метрики среды исполнения Go для компонентов, реализованных на Go;
- gRPC server process info - системные метрики процесса: CPU, память и пр;
- Tarantool cluster overview - общая информация о кластере: состояние экземпляров, нагрузка, память, статус
конфигурации, режим
read-onlyи выборы лидера; - Tarantool replication overview - информация о репликации: задержка, состояние, статистика синхронной очереди;
- Tarantool network activity - сетевая активность экземпляров: входящий/исходящий трафик, соединения, детализация запросов и потребление памяти;
- Tarantool memtx allocation overview - память
memtx: аллокация кортежей и индексов, краткая инструкция по мониторингу; - Tarantool MVCC overview - статистика менеджера транзакций движка
memtx; - Tarantool space statistics - статистика по спейсам: количество кортежей, объем данных, операции чтения/записи, фрагментация;
- Tarantool vinyl statistics - память и дисковое пространство
vinyl, планировщик и статистика транзакций; - Tarantool CPU statistics - загрузка процессора по экземплярам (пользовательское и системное время);
- Tarantool runtime overview - Метрики рантайма: память Lua и транзакций, статистика файберов, цикл событий;
- Tarantool LuaJit statistics - детальная информация о LuaJIT: аллокация объектов, сборка мусора, потребление памяти, трассы JIT;
- Tarantool operations statistics - операции над спейсами (
select,updateи др.) и другие вызовы (eval,call,auth, ошибки, SQL); - Tarantool expirationd overview - cтатистика задач
expirationd: количество кортежей, число перезапусков, время работы.

Установка компонентов TQE(MQ) выполняется с помощью инсталлятора Ansible Tarantool Enterprise (ATE). Подготовка ATE к работе и процедуры установки компонентов TQE(MQ) описаны в документации ATE.
Конфигурация компонентов TQE(MQ) включает в себя раздельные настройки для ядра и для модуля API. Настройки описываются в формате YAML.
Конфигурацию выполняют сначала для ядра, а затем для модуля API.
Конфигурация ядра состоит из нескольких секций, описываемых далее.
Конфигурация ядра происходит стандартными средствами Tarantool 3: с помощью обновления файла конфигурации или конфигурации в etcd/Tarantool Config Storage. Подробнее про конфигурацию в Tarantool 3 можно прочитать здесь
Основная конфигурация очереди задается на уровне ролей roles.tqe-router и roles.tqe-storage, например:
roles_cfg:roles.tqe-storage:queues: ["queue1", "queue2"]
Расширенная дедупликация в TQE(MQ) — это режим, при котором проверяется не только уникальность
deduplication_key сообщения, но и полное совпадение его содержимого. Если при одинаковом deduplication_key
содержимое сообщений различается — система возвращает ошибку.
Такой режим позволяет безопасно работать нескольким независимым публикаторам (основному и резервному):
дублирование одинаковых сообщений допускается, а расхождение в данных обнаруживается сразу.
Для включения режима расширенной дедупликации необходимо добавить параметр
deduplication_mode: extended в конфигурацию очереди в роль roles.tqe-storage:
roles_cfg:roles.tqe-storage:queues:- name: dedupdeduplication_mode: extended
При включенном режиме расширенной дедупликации обнаружение дубликатов с разным содержимым в одиночном сообщении или при
публикации в рамках пакетной отправки приводит к
ошибке failed to publish messages to tarantool: storage.produce error: id (*): payload may be corrupted.
YAML-файл конфигурации модуля API имеет несколько секций. Полный перечень настроек представлен в следующем примере:
app_name: MESSAGE_QUEUE_EE_APIapp_version: testcore_host: 0.0.0.0core_port: 18184grpc_host: 0.0.0.0grpc_port: 18182producer:enabled: truetarantool:user: userpass: passconnections:routers:- "localhost:3301"consumer:enabled: truepolling_timeout: 500mscursor_serilizer: 'yaml'buffer: 12concurrency: 2access_mode: 'prefer_ro'tarantool:user: userpass: passconnections:storage-1:- "localhost:3301"log:file: log.jsonlformat: jsonlevel: info
Для резервного копирования данных TQE(MQ) необходимо выполнить процедуру бэкапа ядра (кластерного приложения) c помощью инсталлятора Ansible Tarantool Enterprise.
Так как объем памяти, доступный для хранения сообщений, ограничен местом на диске, может возникнуть потребность удаления ненужных сообщений.
Для периодической очистки очереди сообщений TQE(MQ) использует модуль expirationd.
Соблюдение условий для удаления сообщений контролируется функцией удержания сообщений (data retention).
Функция удержания сообщений работает в соответствии с установленной политикой (policy) очистки ненужных сообщений.
Политика может быть включена (policy: cleanup) или отключена (policy: disabled). Во включенном состоянии политика
удаляет сообщения в зависимости от установленных параметров:
time- допустимое время жизни сообщения в очереди в секундах. Значение 0 или отсутствие параметра отключает проверку, сообщения могут жить в очереди "бесконечно" долго.length- допустимая длина очереди. Значение 0 или отсутствие параметра отключает проверку, очередь становится "бесконечно" длинной.consumers_required- включает очистку по прочитанным сообщениям. При включенной очистке политика сравнивает указанное значение с количеством персистентных подписчиков:- Если количество персистентных подписчиков превышает или совпадает со значением
сonsumers_required, то сообщение будет удалено только после того, как его прочтут все персистентные подписчики. - Если количество персистентных подписчиков ниже значения
сonsumers_required, то сообщение не будет удалено очисткой по прочитанным сообщениям. - При
consumers_required: 0и каком-то количестве персистентных подписчиков очистка будет ждать, пока все персистентные подписчики прочтут сообщение. - При
consumers_required: 0и без персистентных подписчиков очистка считает все сообщения прочитанными и удаляет их сразу.
- Если количество персистентных подписчиков превышает или совпадает со значением
Политика очистки ненужных сообщений и смежные параметры настраиваются в секции retention в роли roles.tqe-storage. Секцию retention
можно настраивать как на глобальном уровне, так и на уровне отдельной очереди. При наличии разницы в параметрах, приоритетной
считается настройка отдельной очереди. Подробнее о настройках секции retention смотрите в
документации по expirationd.
По умолчанию конфигурация для всех очередей выглядит так:
roles_cfg:roles.tqe-storage:retention:time: 3153600000 # Допустимое время жизни сообщений всех очередей в секундах.length: 0 # Проверка на максимальную длину очереди не выполняетсяpolicy: cleanup # Политика очистки включена.options: # Конфигурационные параметры для expirationd.start.iteration_delay: 0 # Максимальное время в секундах для fiber.sleep между итерациями.
Переопределить конфигурацию для определенной очереди можно указав секцию retention с отличными значениями параметров
в секции конфигурации конкретной очереди example_queue:
roles_cfg:roles.tqe-storage:queues:- name: example_queueretention:time: 100 # Допустимое время жизни сообщений конкретной очереди в секундах.length: 1000 # Максимальная длина очереди указана.policy: cleanup # Политика очистки включена.consumers_required: 2 # Количество постоянных подписчиков, после прочтения которыми сообщение может быть удалено.options:tuples_per_iteration: 512
Следующий пример конфигурации позволяет:
- Hастроить глобальное время удержания сообщений по всем очередям в
120секунд и указать максимальную длину очереди в1024сообщения; - Для очереди
input:- Переопределить время удержания в
60секунд; - Увеличить максимальную длину очереди до
2048сообщений;
- Переопределить время удержания в
- Для вызова
expirationd.start:- Указать опцию
full_scan_time;
- Указать опцию
- Для очереди
output:- Отключить очистку очереди от устаревших сообщений.
В подобной конфигурации секция роли roles.tqe-storage будет выглядеть так:
roles_cfg:roles.tqe-storage:retention:time: 120length: 1024queues:- name: inputretention:time: 60length: 2048options:full_scan_time: 1024- name: outputretention:policy: disabled # Политика очистки отключена.
Получение сообщений в TQE(MQ) возможно только при наличии подписки.
Для получения подписки, выполните запрос SubscriptionRequest без указания consume_id.
Также вы можете получить персистентную подписку. Система хранит состояние персистентной подписки и данные о прочитанных сообщениях, что позволяет возобновлять чтение сообщений с последней сохраненной позиции при переподключении.
Для получения персистентной подписки, выполните запрос SubscriptionRequest
с указанием параметра consume_id.
С каждым сообщением персистентным подписчикам приходит значение параметра cursor, соответствующее позиции сообщения в очереди.
Значение cursor критически важно при восстановлении состояния подписки после отключения.
Восстановить состояние подписки можно при переподключении. Для этого после отключения нужно выполнить запрос
SubscriptionRequest с теми же значениями
параметров queue, routing_key, sharding_key, consume_id,
которые были использованы при инициализации подписки. В запрос обязательно нужно включить параметр cursor, который был использован
непосредственно перед отключением.
При совпадении параметров в запросе с данными системы, подписка восстанавливается и сообщения отправляются с последней сохраненной позиции. Если же любой параметр не соотвтетствует данным из хранилища, то система вернет в ответе ошибку.
При разрыве соединения можно повторно инициировать подписку, передав свой consume_id.
Сервер автоматически восстановит состояние подписки из хранилища и возобновит чтение с последней известной позиции.
После перезапуска сервер теряет состояние в памяти. Вы должны переподключиться и отправить
запрос SubscriptionRequest,
включающий consume_id и cursor последнего полученного сообщения. Сервер автоматически восстановит состояние из персистентного
хранилища и возобновит передачу сообщений.
При перезапуске реплики gRPC-сервер автоматически обнаруживает сбой, выбирает другую доступную реплику и возобновляет обработку запросов с последней известной позиции.
После переключения на новую мастер-реплику gRPC-сервер автоматически устанавливает соединение и проверяет состояние подписок. Для персистентных подписок их состояние полностью восстанавливается на новой реплике.