Module expirationd

expirationd - data expiration with custom quirks.

Task instance methods

task.start (self) Start a task.
task.stop (self) Stop a task.
task.restart (self) Restart a task.
task.kill (self) Kill a task.
task.statistics (self) Get a statistics about a task.

Module constants

_VERSION Current module version in format MAJOR.MINOR.PATCH.

Module functions

cfg (options) Configure expirationd.
start (name, space, is_tuple_expired[, options]) Run a scheduled task to check and process (expire) tuples in a given space.
kill (name) Kill an existing task.
tasks () Return a list with task's names.
stats ([name]) Return task statistics in table.
task (name) Get task by name.
update () Reload module.


Task instance methods

NOTE: task object contains a number of properties that available for users. However these properties are not a part of expirationd API. Property name can be changed or property itself can be removed in future version. Be careful!

task.start (self)

Start a task. It continues processing from a last tuple if the task was previously stopped by task.stop and the task has default start_key and iterate_with functions.

Parameters:

  • self

    Task instance.

Returns:

    None
task.stop (self)

Stop a task.

Parameters:

  • self

    Task instance.

Returns:

    None
task.restart (self)

Restart a task.

Parameters:

  • self

    Task instance.

Returns:

    None
task.kill (self)

Kill a task.

Stop a task and delete it from list of tasks.

Parameters:

  • self

    Task instance.

Returns:

    None
task.statistics (self)

Get a statistics about a task.

Parameters:

  • self

    Task instance.

Returns:

    Response of the following structure:

     {
         checked_count = number,
         expired_count = number,
         restarts = number,
         working_time = number,
     }

    where:

    checked_count is a number of tuples checked for expiration (expired + skipped).

    expired_count is a number of expired tuples.

    restarts is a number of restarts since start. From the start restarts is equal to 1.

    working_time is a task's operation time.

Module constants

_VERSION

Current module version in format MAJOR.MINOR.PATCH.

Module functions

cfg (options)

Configure expirationd.

Since version 1.2.0.

How to set up a configuration option:

 expirationd.cfg({metrics = true})

How to get an option value:

 print(expirationd.cfg.metrics)
 true

Parameters:

  • options (table)

    • metrics (bool, optional)

      Enable or disable stats collection by metrics. metrics >= 0.11.0 is required. It is enabled by default.

      If enabled it creates four counter collectors, see task.statistics:

      1. expirationd_checked_count

      2. expirationd_expired_count

      3. expirationd_restarts

      4. expirationd_working_time

      Labeled with name = task_name.

Returns:

    None
start (name, space, is_tuple_expired[, options])

Run a scheduled task to check and process (expire) tuples in a given space.

How expirationd works in general:

1. Process min(space_length, tuples_per_iteration) tuples at once.

2. Sleep tuples_per_iteration × full_scan_time / space_length (but not beyond 1 second).

3. Repeat 1-2 until the whole space will be traversed.

4. Sleep 1 second.

5. Repeat 1-4.

NOTE: By default expirationd does not start tasks on an read-only instance for non-local persistent spaces, see the force option.

NOTE: By default expirationd continues processing from a last processed tuple if a task with same name has not been killed properly with task.kill or kill. This behavior only works with default functions start_key and iterate_with. You need to set at least one function if another behavior is needed, see start_key and iterate_with options.

Parameters:

  • name (string)

    Task name.

  • space (string)

    Space to look in for expired tuples. space can be numeric or string. It can be a space id or a space name, respectively.

  • is_tuple_expired (func)

    Function, must accept tuple and return true or false (is tuple expired or not), receives args and tuple as arguments.

    Example of function:

     local function is_tuple_expired(args, tuple)
         local tuple_expire_time = get_field(tuple, args.field_no)
         local current_time = fiber.time()
         return current_time >= tuple_expire_time
     end

  • options (table, optional)

    Table with named options.

    • args optional)

      Passed to is_tuple_expired() and process_expired_tuple() as an additional context.

    • atomic_iteration (boolean, optional)

      False (default) to process each tuple as a single transaction and true to process tuples from each batch in a single transaction.

    • force (boolean, optional)

      By default expirationd processes tasks for all types of spaces only on an writable instance. It does not process tasks on an read-only instance for non-local persistent spaces. It means that expirationd will not start the task processing on a replica for regular spaces. Set the option to true to force enable the task processing anyway.

    • force_allow_functional_index (boolean, optional)

      By default expirationd returns an error on iteration through a functional index for Tarantool < 2.8.4 because it may cause a crash, see https://github.com/tarantool/expirationd/issues/101 You can skip the error using the option if you know what you are doing (implement your own iterate_with as example).

    • full_scan_delay (number, optional)

      Sleep time between full scans (in seconds). It is allowed to pass an FFI number: 1LL, 1ULL etc. Default value is 1 sec.

    • full_scan_time (number, optional)

      Time required for a full index scan (in seconds). It is allowed to pass an FFI number: 1LL, 1ULL etc. full_scan_time used for calculation of time during which fiber sleeps between iterations. Default value is 3600.

    • index (string, optional)

      Name or id of the index to iterate on. If omitted, will use the primary index. If there's no index with this name, will throw an error. Supported index types are TREE and HASH, using other types will result in an error.

    • iterate_with (func, optional)

      Function which returns an iterator object which provides tuples to check, considering the start_key, process_while and other options. When option is nil default function is used. Function must accept a task instance object. Default function returns iterator returned by index_object:pairs(), where index is a primary index or index that specified with argument options.index:

      Example of function:

        local function iterate_with()
            index:pairs(option.start_key(), {
               iterator = option.iterator_type
            }):take_while(
                   function()
                       return option.process_while()
                   end
              )
        end

    • iteration_delay (number, optional)

      Max sleep time between iterations (in seconds). It is allowed to pass an FFI number: 1LL, 1ULL etc. Default value is 1 sec. Fiber sleeps min(tuples_per_iteration × full_scan_time / space_length, iteration_delay).

    • iterator_type (string, optional)

      Type of the iterator to use, as string or box.index constant, for example, EQ or box.index.EQ, default is box.index.ALL. See more about index iterators in index_object:pairs().

    • on_full_scan_complete (func, optional)

      Function to call after completing a full scan iteration. Default value is a function that do nothing.

    • on_full_scan_error (func, optional)

      Function to call after terminating a full scan due to an error. Default value is a function that do nothing.

      Example of function:

       local function on_full_scan_error()
           pcall(fiber.sleep, 1)
       end

    • on_full_scan_start (func, optional)

      Function to call before starting a full scan iteration. Default value is a function that do nothing.

    • on_full_scan_success (func, optional)

      Function to call after successfully completing a full scan iteration. Default value is a function that do nothing.

    • process_expired_tuple (func, optional)

      Applied to expired tuples, receives space, args, tuple as arguments. When process_expired_tuple is not passed (or nil passed), tuples are removed.

      Example of function:

       local function put_tuple_to_archive(space, args, tuple)
           box.space[space]:delete{tuple[1]}
           local email = tuple[2]
           if args.archive_space ~= nil and email ~= nil then
               box.space[args.archive_space]:replace{email, fiber.time()}
           end
       end

    • process_while (func, optional)

      Function to call before checking each tuple. If it returns false, the task will stop until next full scan. Default is a function that always return true.

      Example of function:

       local function process_while()
           return false
       end

    • start_key optional)

      Start iterating from the tuple with this index value. Or when iterator is 'EQ', iterate over tuples with this index value. Must be a value of the same data type as the index field or fields, or a function which returns such value. If omitted or nil, all tuples will be checked.

    • tuples_per_iteration (number, optional)

      Number of tuples to check in one batch (iteration). It is allowed to pass an FFI number: 1LL, 1ULL etc. Default value is 1024.

    • vinyl_assumed_space_len_factor (number, optional)

      Factor for recalculation of vinyl space size. Vinyl space size can't be counted (since many operations, upsert for example, are applied when you address some data), so you should count (approximate space size) tuples with the first start. vinyl_assumed_space_len is approximate count for first run and vinyl_assumed_space_len_factor for next milestone (after we've reached next milestone is * and so on). It is allowed to pass an FFI number: 1LL, 1ULL etc. Default value is 2.

    • vinyl_assumed_space_len (number, optional)

      Assumed size of vinyl space (in the first iteration). Vinyl space size can't be counted (since many operations, upsert for example, are applied when you address some data), so you should count (approximate space size) tuples with the first start. vinyl_assumed_space_len is approximate count for first run and vinyl_assumed_space_len_factor for next milestone (after we've reached next milestone is * and so on). It is allowed to pass an FFI number: 1LL, 1ULL etc. Default value is 10^7.

Returns:

    task instance

Usage:

    local expirationd = require('expirationd')
    
    box.cfg{}
    
    local space = box.space.old
    local job_name = "clean_all"
    
    local function is_expired(args, tuple)
        return true
    end
    
    local function delete_tuple(space, args, tuple)
        box.space[space]:delete{tuple[1]}
    end
    
    expirationd.start(job_name, space.id, is_expired, {
        process_expired_tuple = delete_tuple,
        args = nil,
        tuples_per_iteration = 50,
        full_scan_time = 3600
    })
kill (name)

Kill an existing task.

Parameters:

  • name (string)

    Task name.

Returns:

    None
tasks ()

Return a list with task's names.

Returns:

    Response of the following structure:

     {
         "expirationd-1"
         "expirationd-2",
         "expirationd-3",
     }
stats ([name])

Return task statistics in table.

Parameters:

  • name (string, optional)

    Task name. If name is nil, then return map of name:stats, else return map with stats.

Returns:

    Response of the following structure:

     {
         checked_count = number,
         expired_count = number,
         restarts = number,
         working_time = number,
     }

    where:

    checked_count is a number of tuples checked for expiration (expired + skipped).

    expired_count is a number of expired tuples.

    restarts is a number of restarts since start. From the start restarts is equal to 1.

    working_time is a task's operation time.

task (name)

Get task by name.

Parameters:

  • name (string)

    Task name.

Returns:

    task instance
update ()

Reload module.

Update expirationd version in a running Tarantool and restart all tasks. Reload process step by step: remove expirationd module from package.loaded, import new version of expirationd using require and finally restart all tasks.

Returns:

    None