Версия:

Модуль expirationd

Модуль expirationd

Рассмотрим исходный код expirationd – пример Lua-модуля для промышленной эксплуатации, который работает с Tarantool’ом – Tarantool предоставляет его с лицензией Artistic на GitHub. Программа expirationd.lua довольно объемная (около 500 строк), поэтому здесь мы остановимся на пунктах, знания о которых можно расширить, позднее изучив программу полностью.

task.worker_fiber = fiber.create(worker_loop, task)
log.info("expiration: task %q restarted", task.name)
...
fiber.sleep(expirationd.constants.check_interval)
...

Если в Tarantool’е упоминается «демон», то речь идет об использовании файбера. Программа создает файбер и передает управление так, что он периодически запускается, уходит в режим ожидания, а затем повторяет эти действия.

for _, tuple in scan_space.index[0]:pairs(nil, {iterator = box.index.ALL}) do
...
    expiration_process(task, tuple)
...
    /* expiration_process() contains:
    if task.is_tuple_expired(task.args, tuple) then
    task.expired_tuples_count = task.expired_tuples_count + 1
    task.process_expired_tuple(task.space_id, task.args, tuple) */

Команду «for» можно перевести как «выполнить итерацию по индексу сканируемого спейса», а внутри – если кортеж «неактуален» (например, если в кортеже есть поле метки времени, которое меньше текущего времени), то обработать кортеж как неактуальный кортеж.

-- функция обработки неактуального кортежа по умолчанию
local function default_tuple_drop(space_id, args, tuple)
    box.space[space_id]:delete(construct_key(space_id, tuple))
end
    /* construct_key() contains:
    local function construct_key(space_id, tuple)
        return fun.map(
            function(x) return tuple[x.fieldno] end,
           box.space[space_id].index[0].parts
        ):totable()
    end */

В конечном итоге, обработка неактуального кортежа приводит к default_tuple_drop(), что приводит к удалению кортежа из первоначального спейса. Сначала используется модуль fun, в частности fun.map. Учитывая, что index[0] всегда является первичным ключом спейса, а index[0].parts[N].fieldno всегда является номером поля для компонента ключа N, функция fun.map() создает таблицу из первичных значений кортежа. Результат fun.map() передается в space_object:delete().

local function expirationd_run_task(name, space_id, is_tuple_expired, options)
...

На этом этапе ясно, что expirationd.lua запускает фоновый процесс (файбер), который выполняет итерацию по всем кортежам в спейсе, в рамках кооперативной многозадачности уходит в режим ожидания, чтобы другие файберы могли работать одновременно с ним, а когда находит неактуальный кортеж, удаляет его из спейса. Теперь функцию «expirationd_run_task()» можно использовать в тестировании, где создаются образцы данных, некоторое время работает демон, и выводятся результаты.

Если вы хотите увидеть, как все работает, обратите внимание на нижеприведенные шаги по включению expirationd в тестирование.

  1. Найдите expirationd.lua. Можно воспользоваться стандартным способом, поскольку модуль включен в общий список модулей, но для этой цели просто скопируйте содержимое expirationd.lua в директорию в Lua-пути (введите print(package.path), чтобы увидеть Lua-путь).
  2. Запустите Tarantool-сервер, как описано выше.
  3. Выполните следующие запросы:
fiber = require('fiber')
expd = require('expirationd')
box.cfg{}
e = box.schema.space.create('expirationd_test')
e:create_index('primary', {type = 'hash', parts = {1, 'unsigned'}})
e:replace{1, fiber.time() + 3}
e:replace{2, fiber.time() + 30}
function is_tuple_expired(args, tuple)
  if (tuple[2] < fiber.time()) then return true end
  return false
  end
expd.run_task('expirationd_test', e.id, is_tuple_expired)
retval = {}
fiber.sleep(2)
expd.task_stats()
fiber.sleep(2)
expd.task_stats()
expd.kill_task('expirationd_test')
e:drop()
os.exit()

Запросы в работе с базой данных (cfg, space.create, create_index) уже должны быть вам знакомы.

В expirationd передается функция is_tuple_expired, которая задает следующее условие: если второе поле кортежа меньше текущего времени , вернуть true (правда), в противном случае, вернуть false (ложь).

Ключ к запуску модуля – expd = require('expirationd'). Функция require – это именно то, что выполняет чтение в программе. Она появится и в дальнейших примерах в данном руководстве, когда будет необходимо запустить модуль, который не входит в ядро Tarantool’а, но находится в Lua-пути (package.path) или же C-пути (package.cpath). После того, как Lua-переменной expd присваивается значение модуля expirationd, можно вызвать функцию модуля run_task().

После ухода в режим ожидания на две секунды, когда проводится итерация по спейсам, expd.task_stats() выведет отчет о количестве неактуальных кортежей – «expired_count: 0».

После ожидания в течение еще двух секунд expd.task_stats() выведет отчет о количестве неактуальных кортежей – «expired_count: 1». Это показывает, что функция is_tuple_expired() с течением времени вернула «true» для одного из кортежей, поскольку поле метки времени было дольше трех секунд.

Конечно, expirationd можно настроить на выполнение различных задач с помощью разных параметров, что будет очевидно после более детального изучения исходного кода. В частности, важны опции {options}, которые можно добавить в качестве последнего параметра в expirationd.run_task:

  • force (логическое значение) – выполнение задачи даже на реплике. По умолчанию: force=false, поэтому, как правило, expirationd не учитывает реплики.
  • tuples_per_iteration (целое число) – количество кортежей, которые проверяются за одну итерацию. По умолчанию: tuples_per_iteration=1024.
  • full_scan_time (число) – число секунд на полное сканирование диска. По умолчанию: full_scan_time=3600.
  • vinyl_assumed_space_len (целое число) – предполагаемый размер спейса vinyl’а, используется только для первой итерации. По умолчанию: vinyl_assumed_space_len=10000000.
  • vinyl_assumed_space_len_factor (целое число) – коэффициент перерасчета размера спейса vinyl’а. По умолчанию: vinyl_assumed_space_len_factor=2. (Размер спейса vinyl’а не так легко рассчитать, поэтому для первой итерации используется «предполагаемый» размер, на второй итерации – «предполагаемый» размер, помноженный на «коэффициент», на третьей итерации – «предполагаемый» размер, дважды помноженный на «коэффициент» и так далее.)