Task Manager for Tarantool Enterprise | Enterprise
Task Manager for Tarantool Enterprise

Task Manager for Tarantool Enterprise

@lookup README.md

Task manager module allows you to automate several types of background jobs:

  • periodical, that need to be launched according to cron-like schedule;

  • continuous, that need to be working at all times;

  • single_shot, that are launched manually by the operations team.

You get the following features out-of-the-box:

  • configurable schedule of periodic tasks;

  • guarding and restarting continuous tasks;

  • stored log of task launches;

  • API and UI for launching tasks and observing launch results.

Task manager comes with several built-in cartridge roles:

  • task.roles.scheduler, a module which allows to configure launchable tasks;

  • task.roles.runner, a cluster-aware stateless task runner;

  • task.roles.storage, a cluster-aware dedicated task contents storage;

  • plugin-based API which allows you to provide you own storage module (e. g. distributed, or external to Tarantool cluster), or your own runner module, providing more tooling for your needs.

Basic usage (single-node application)

  1. Embed the following to the instance file (init.lua):

...
local task = require('task')
require('my_module')
...
local ok, err = cartridge.cfg({
   roles = {'my_role', ...}
})
assert(ok, tostring(err))

task.init_webui()
  1. Add to your role dependency on task scheduler, runner and storage roles:

return {
    ...
    dependencies = {
        'task.roles.storage',
        'task.roles.scheduler',
        'task.roles.runner'
    }
}
  1. Add tasks section to your clusterwide configuration:

tasks:
  my_task:
    kind: periodical
    func_name: my_module.my_task
    schedule: "*/* 1 * * * *"
  1. That’s it! my_task function from my_module module will be launched every minute.

Advanced usage (multi-node installation)

  1. Embed the following to the instance file:

...

...
local task = require('task')
require('my_module')
local ok, err = cartridge.cfg({
   roles = {
    ...
    'task.roles.scheduler',
    'task.roles.storage',
    'task.roles.runner'
   }
})
assert(ok, tostring(err))

task.init_webui()

Important: task.init_webui() should be called on all nodes. It is a safe and necessary thing to do even if Tarantool Cartridge Web UI is disabled on some nodes.

  1. Enable the task scheduler role on a dedicated node in your cluster (after deployment). If you set up a big cluster, don’t set up more than one replica set with the scheduler.

  2. Enable the task storage role on a dedicated node in your cluster (after deployment), possibly on the same node as task scheduler. If you set up a big cluster, don’t set up more than one replica set with the storage.

  3. Enable the task runner role on dedicated stateless nodes in your cluster (after deployment) - as many as you may need.

Advanced usage (sharded storage)

  1. Embed the following to the instance file:

...
local task = require('task')
require('my_module')
local ok, err = cartridge.cfg({
   roles = {
    ...
    'task.roles.sharded.scheduler',
    'task.roles.sharded.storage',
    'task.roles.sharded.runner'
   }
})
assert(ok, tostring(err))

task.init_webui()

Important: task.init_webui() should be called on all nodes. It is a safe and necessary thing to do even if Tarantool Cartridge Web UI is disabled on some nodes.

  1. Enable the task scheduler role on a dedicated node in your cluster (after deployment). If you set up a big cluster, don’t set up more than one replica set with the scheduler.

  2. Enable the task storage role on the nodes of some vshard group (or an all storage nodes). Set up cartridge built-in vshard-storage role on these nodes.

  3. Enable the task runner role on dedicated stateless nodes in your cluster (after deployment) - as many as you may need.

Tasks configuration

Tasks are configured via the scheduler cluster role. An example of valid role configuration:

tasks:
    my_reload:
      kind: periodical
      func_name: my_module.cache_reload
      schedule: "*/* 1 * * * *"
      time_to_resolve: 180
    my_flush:
      kind: single_shot
      func_name: my_module.cache_flush
      args:

        - some_string1
        - some_string2
    push_metrics:
      kind: continuous
      func_name: my_module.push_metrics
      pause_sec: 30

task_storage:
  task_ttr: 60 # default time_to_resolve, if no other value is specified in task
  task_log_max_size: 100 # number of task history records (excluding pending or running ones) to be kept on a single storage node (per task name)

task_runner:
    capacity: 128 # number of tasks that can be executed simultaneously on a single runner node
  • Every task must have a unique name (subsection name in config).

  • Every lua module containing task function (e. g. my_module) should be explicitely require`d somewhere in your application. `Task module won’t load any new lua code for safety reasons.

  • Each task must have a kind: periodical, continuous, single_shot.

  • Each task must have a func_name - name of the function (preceded by the name of the module) which will be invoked.

  • Each task may have time_to_resolve - timeout after which a running task is considered lost (failed). Default value is 60 seconds.

  • Each task may have max_attempts, delay and delay_factor (which default to 1, 0, and 1 respectively) which allow to retry failed and lost tasks.

For example:

  • delay = 60, max_attemps = 5 will lead to 5 retries every 60 seconds,

  • delay = 8, delay_factor = 1.5, max_attemps = 8 will lead to retries after 8, 12, 18, 27, 40, 60 and 90 seconds,

    with last attempt happening 260 seconds after first one.

Note: be careful when setting up retries for periodic tasks that may lead to overlaps between task launches.

  • Each task may have args - an array of arguments which will be passed to the function (to allow basic parametrization)

  • Periodical tasks also must have a schedule, conforming with ccronexpr (basically, cron with seconds).

  • Tasks may have a pause_sec - pause between launches (60 seconds by default).

You may set up default task config for your application in task.init() call:

task.init({
    default_config = {
        my_task = {
            kind = 'single_shot',
            func_name = 'dummy_task.dummy',
        }
    },
    task_storage = {
        task_ttr = 3600,
        task_log_max_size = 10000
    },
    task_runner = {
        capacity = 1
    }
})

Default config will be applied if no respective sections (tasks, task_runner, task_storage) are set in clusterwide config. task.init() should be called prior to cartridge.cfg().

Advanced usage

Monitoring

Task storage nodes expose current breakdown by statuses for your monitoring plugins of choice:

> require('task.roles.storage').statistics()

---
- statuses:
    stopped: 0
    failed: 0
    lost: 0
    total: 6
    completed: 3
    pending: 1
    did not start: 0
    running: 2
    unknown task: 0

Same goes for sharded storage role.

Running a task via API

Everything visible from the UI is available via the API. You may look up requests in the UI or in the cartridge graphql schema.

curl -w "\n" -X POST http://127.0.0.1:8080/admin/api --fail -d@- <<'QUERY'
{"query": "mutation { task { start(name: "my_reload") } }"}
QUERY

Running massive amounts of tasks

If you need to run a big amount of background tasks (say, more than several per second) in parallel, there are two options:

Option A - registered tasks

require('task').start(
    'my_task', -- task_name from the config, probably it is of `single_shot` kind
    {...}, -- arguments
)

Option B - anonymous

Or, if you don’t want to bother with registering tasks in clusterwide config), you may do so:

require('task').create(
    'your_module.some_background_job', -- func_name
    {...}, -- arguments
    { time_to_resolve = 1 } -- options
)
In both cases, such a task will be put to a storage and executed on a runner as usual. Remember that:
  • If you’re using sharded task storage, you need to have vshard-router enabled on the instance where you call task.create

  • You may want to increase task_storage.task_log_max_size appropriately, if you intend to launch many tasks simultaneously.

Supplying custom runner and storage

Embed the following to your instance file

task.init({
    runner = 'my_tasks.my_runner',
    storage = 'my_tasks.my_storage'
})
...
local ok, err = (cartridge.cfg{...})
assert(ok, tostring(err))
task.init_webui()

Be sure to call task.init() it prior to cartridge.cfg, so that custom options would be provided by the time role initialization starts.

You may set up then only task scheduler role, and handle storages and runners yourself.

Writing your own runner and storage

Runner module must expose api member with the following functions:
  • stop_task

storage module must expose api member with the following functions:
  • select

  • get

  • delete

  • put

  • complete

  • take

  • cancel

  • wait

  • (optional) set_ddl function, which should be (safely) called on each nodes of the cluster

For more details refer to built-in runner and storage documentation

Found what you were looking for?
Feedback