Top.Mail.Ru
Модуль expirationd | Tarantool
Tarantool
Узнайте содержание релиза 2.8

Модуль expirationd

For a commercial-grade example of a Lua rock that works with Tarantool, let us look at the source code of expirationd, which Tarantool supplies on GitHub with an Artistic license. The expirationd.lua program is lengthy (about 500 lines), so here we will only highlight the matters that will be enhanced by studying the full source later.

task.worker_fiber = fiber.create(worker_loop, task)
log.info("expiration: task %q restarted", task.name)
...
fiber.sleep(expirationd.constants.check_interval)
...

Если в Tarantool’е упоминается «демон», то речь идет об использовании файбера. Программа создает файбер и передает управление так, что он периодически запускается, уходит в режим ожидания, а затем повторяет эти действия.

for _, tuple in scan_space.index[0]:pairs(nil, {iterator = box.index.ALL}) do
...
    expiration_process(task, tuple)
...
    /* expiration_process() contains:
    if task.is_tuple_expired(task.args, tuple) then
    task.expired_tuples_count = task.expired_tuples_count + 1
    task.process_expired_tuple(task.space_id, task.args, tuple) */

Команду «for» можно перевести как «выполнить итерацию по индексу сканируемого спейса», а внутри – если кортеж «неактуален» (например, если в кортеже есть поле метки времени, которое меньше текущего времени), то обработать кортеж как неактуальный кортеж.

-- функция обработки неактуального кортежа по умолчанию
local function default_tuple_drop(space_id, args, tuple)
    box.space[space_id]:delete(construct_key(space_id, tuple))
end
    /* construct_key() contains:
    local function construct_key(space_id, tuple)
        return fun.map(
            function(x) return tuple[x.fieldno] end,
           box.space[space_id].index[0].parts
        ):totable()
    end */

Ultimately the tuple-expiry process leads to default_tuple_drop() which does a «delete» of a tuple from its original space. First the fun fun module is used, specifically fun.map. Remembering that index[0] is always the space’s primary key, and index[0].parts[N].fieldno is always the field number for key part N, fun.map() is creating a table from the primary-key values of the tuple. The result of fun.map() is passed to space_object:delete().

local function expirationd_run_task(name, space_id, is_tuple_expired, options)
...

На этом этапе ясно, что expirationd.lua запускает фоновый процесс (файбер), который выполняет итерацию по всем кортежам в спейсе, в рамках кооперативной многозадачности уходит в режим ожидания, чтобы другие файберы могли работать одновременно с ним, а когда находит неактуальный кортеж, удаляет его из спейса. Теперь функцию «expirationd_run_task()» можно использовать в тестировании, где создаются образцы данных, некоторое время работает демон, и выводятся результаты.

Если вы хотите увидеть, как все работает, обратите внимание на нижеприведенные шаги по включению expirationd в тестирование.

  1. Найдите expirationd.lua. Можно воспользоваться стандартным способом, поскольку модуль включен в общий список модулей, но для этой цели просто скопируйте содержимое expirationd.lua в директорию в Lua-пути (введите print(package.path), чтобы увидеть Lua-путь).
  2. Запустите Tarantool-сервер, как описано выше.
  3. Выполните следующие запросы:
fiber = require('fiber')
expd = require('expirationd')
box.cfg{}
e = box.schema.space.create('expirationd_test')
e:create_index('primary', {type = 'hash', parts = {1, 'unsigned'}})
e:replace{1, fiber.time() + 3}
e:replace{2, fiber.time() + 30}
function is_tuple_expired(args, tuple)
  if (tuple[2] < fiber.time()) then return true end
  return false
  end
expd.run_task('expirationd_test', e.id, is_tuple_expired)
retval = {}
fiber.sleep(2)
expd.task_stats()
fiber.sleep(2)
expd.task_stats()
expd.kill_task('expirationd_test')
e:drop()
os.exit()

Запросы в работе с базой данных (cfg, space.create, create_index) уже должны быть вам знакомы.

В expirationd передается функция is_tuple_expired, которая задает следующее условие: если второе поле кортежа меньше текущего времени , вернуть true (правда), в противном случае, вернуть false (ложь).

The key for getting the rock rolling is expd = require('expirationd'). The require function is what reads in the program; it will appear in many later examples in this manual, when it’s necessary to get a module that’s not part of the Tarantool kernel, but is on the Lua path (package.path) or the C path (package.cpath). After the Lua variable expd has been assigned the value of the expirationd module, it’s possible to invoke the module’s run_task() function.

После ухода в режим ожидания на две секунды, когда проводится итерация по спейсам, expd.task_stats() выведет отчет о количестве неактуальных кортежей – «expired_count: 0».

После ожидания в течение еще двух секунд expd.task_stats() выведет отчет о количестве неактуальных кортежей – «expired_count: 1». Это показывает, что функция is_tuple_expired() с течением времени вернула «true» для одного из кортежей, поскольку поле метки времени было дольше трех секунд.

Конечно, expirationd можно настроить на выполнение различных задач с помощью разных параметров, что будет очевидно после более детального изучения исходного кода. В частности, важны опции {options}, которые можно добавить в качестве последнего параметра в expirationd.run_task:

  • force (логическое значение) – выполнение задачи даже на реплике. По умолчанию: force=false, поэтому, как правило, expirationd не учитывает реплики.
  • tuples_per_iteration (целое число) – количество кортежей, которые проверяются за одну итерацию. По умолчанию: tuples_per_iteration=1024.
  • full_scan_time (число) – число секунд на полное сканирование диска. По умолчанию: full_scan_time=3600.
  • vinyl_assumed_space_len (целое число) – предполагаемый размер спейса vinyl’а, используется только для первой итерации. По умолчанию: vinyl_assumed_space_len=10000000.
  • vinyl_assumed_space_len_factor (целое число) – коэффициент перерасчета размера спейса vinyl’а. По умолчанию: vinyl_assumed_space_len_factor=2. (Размер спейса vinyl’а не так легко рассчитать, поэтому для первой итерации используется «предполагаемый» размер, на второй итерации – «предполагаемый» размер, помноженный на «коэффициент», на третьей итерации – «предполагаемый» размер, дважды помноженный на «коэффициент» и так далее.)