Версия:

Модуль fiber

Модуль fiber

Общие сведения

С помощью модуля fiber можно:

  • создавать, запускать и управлять файберами,
  • отправлять и получать сообщения для различных процессов (например, разные соединения, сессии или файберы) по каналам, а также
  • использовать механизм синхронизации для файберов, аналогично работе «условных переменных» и функций операционных систем, таких как pthread_cond_wait() плюс pthread_cond_signal().

Индекс

Ниже приведен перечень всех функций и элементов модуля fiber.

Имя Использование
fiber.create() Создание и запуск файбера
fiber.new() Создание файбера без запуска
fiber.self() Получение объекта файбера
fiber.find() Получение объекта файбера по ID
fiber.sleep() Перевод файбера в режим ожидания
fiber.yield() Передача управления
fiber.status() Получение статуса активного файбера
fiber.info() Получение информации о всех файберах
fiber.kill() Отмена файбера
fiber.testcancel() Проверка отмены действующего файбера
fiber_object:id() Получение ID файбера
fiber_object:name() Получение имени файбера
fiber_object:name(name) Назначение имени файбера
fiber_object:status() Получение статуса файбера
fiber_object:cancel() Отмена файбера
fiber_object.storage Локальное хранилище в пределах файбера
fiber_object:set_joinable() Создание возможности подключения нового файбера
fiber_object:join() Ожидание статуса „dead“ (недоступен) для файбера
fiber.time() Получение системного времени в секундах
fiber.time64() Получение системного времени в микросекундах
fiber.channel() Создание канала связи
channel_object:put() Отправка сообщения по каналу связи
channel_object:close() Закрытие канала
channel_object:get() Перехват сообщения из канала
channel_object:is_empty() Проверка пустоты канала
channel_object:count() Подсчет сообщений в канале
channel_object:is_full() Проверка заполненности канала
channel_object:has_readers() Проверка пустого канала на наличие читателей в состоянии ожидания
channel_object:has_writers() Проверка полного канала на наличие писателей в состоянии ожидания
channel_object:is_closed() Проверка закрытия канала
fiber.cond() Создание условной переменной
cond_object:wait() Перевод файбера в режим ожидания до пробуждения другим файбером
cond_object:signal() Пробуждение отдельного файбера
cond_object:broadcast() Пробуждение всех файберов

Файберы

Файбер – это набор инструкций, которые выполняются по принципу кооперативной многозадачности. Файберы, управление которых происходит с помощью модуля fiber, связаны с функцией под названием функция для файбера, которую задает пользователь.

Существуют три возможных состояния файбера: running (активен), suspended (приостановлен) или dead (недоступен). После создания файбера с помощью fiber.create() он сразу активен. После создания файбера с помощью fiber.new() или передачи управления с помощью fiber.sleep() файбер будет приостановлен. По окончании работы (по причине окончания работы соответствующей функции) файбер становится недоступен.

Все файберы составляют часть реестра файберов. Можно производить поиск по реестру с помощью fiber.find() по ID файбера (fid), который представляет собой числовой идентификатор.

Неконтролируемый файбер можно остановить с помощью fiber_object.cancel. Однако, функция fiber_object.cancel консультативна, то есть сработает только в том случае, если неконтролируемый файбер случайно вызовет fiber.testcancel(). Большинство функций типа box.*, например box.space…delete() или box.space…update(), действительно вызывают fiber.testcancel(), а box.space…select{} не вызовет. В действительности неконтролируемый файбер может перестать отвечать, если он производит большое количество вычислений и не проверяет вероятность отмены.

Другой потенциальной проблемой могут стать файберы, которые не включаются в расписание, поскольку они не подписаны ни на какие события, или потому что соответствующие события не происходят. Такие файберы можно в любое принудительно остановить с помощью fiber.kill(), потому что функция fiber.kill() отправляет асинхронное событие пробуждения на файбер, а fiber.testcancel() проверяет наступление такого события пробуждения.

Сборщик мусора собирает недоступные файберы так же, как и все Lua-объекты: сборщик мусора в Lua освобождает память выделенного для файбера пула, сбрасывает все данные файбера и возвращает файбер (который теперь называется каркасом файбера) в пул файберов. Каркас можно использовать повторно при создании другого файбера.

У файбера есть все возможности сопрограммы (coroutine) на языке Lua, и все принципы программирования, которые применяются к сопрограммам на Lua, применимы и к файберам. Однако Tarantool расширил возможности файберов для внутреннего использования. Поэтому, несмотря на возможность и поддержку использования сопрограмм, рекомендуется использовать файберы.

fiber.create(function[, function-arguments])

Создание и запуск файбера. Происходит создание файбера, который незамедлительно начинает работу.

Параметры:
  • function – функция, которая будет связана с файбером
  • function-arguments – что передается в функцию
Возвращается:

созданный объект файбера

Тип возвращаемого значения:
 

пользовательские данные

Пример:

tarantool> fiber = require('fiber')
---
...
tarantool> function function_name()
         >   fiber.sleep(1000)
         > end
---
...
tarantool> fiber_object = fiber.create(function_name)
---
...
fiber.new(function[, function-arguments])

Создание файбера без запуска: файбер создается, но не запускается сразу же, а ожидает, пока создатель файбера (то есть задача, которая вызывает fiber.new()) не передаст управление согласно правилам контроля транзакций. Файбер создается со статусом „suspended“ (приостановлен). Таким образом, логика fiber.new() слегка отличается от fiber.create().

Как правило, fiber.new() используется вместе с fiber_object:set_joinable() и fiber_object:join().

Параметры:
  • function – функция, которая будет связана с файбером
  • function-arguments – что передается в функцию
Возвращается:

созданный объект файбера

Тип возвращаемого значения:
 

пользовательские данные

Пример:

tarantool> fiber = require('fiber')
---
...
tarantool> function function_name()
         >   fiber.sleep(1000)
         > end
---
...
tarantool> fiber_object = fiber.new(function_name)
---
...
fiber.self()
Возвращается:объект файбера для запланированного на данный момент файбера.
Тип возвращаемого значения:
 пользовательские данные

Пример:

tarantool> fiber.self()
---
- status: running
  name: interactive
  id: 101
...
fiber.find(id)
Параметры:
  • id – числовой идентификатор файбера.
Возвращается:

объект файбера для указанного файбера.

Тип возвращаемого значения:
 

пользовательские данные

Пример:

tarantool> fiber.find(101)
---
- status: running
  name: interactive
  id: 101
...
fiber.sleep(time)

Передача управления планировщику и переход в режим ожидания на указанное количество секунд. Только текущий файбер можно перевести в режим ожидания.

Параметры:
  • time – количество секунд в режиме ожидания.

Пример:

tarantool> fiber.sleep(1.5)
---
...
fiber.yield()

Передача управления планировщику. Работает аналогично fiber.sleep(0), только fiber.sleep(0) зависит от таймера, fiber.yield() – нет.

Пример:

tarantool> fiber.yield()
---
...
fiber.status([fiber_object])

Возврат статуса текущего файбера. Или же, если передается необязательный параметр fiber_object, возврат статуса указанного файбера.

Возвращается:статус файбера: “dead” (недоступен), “suspended” (приостановлен) или “running” (активен).
Тип возвращаемого значения:
 string (строка)

Пример:

tarantool> fiber.status()
---
- running
...
fiber.info()

Возврат информации о всех файберах.

Возвращается:количество переключений контекста, обратная трассировка, ID, общий объем памяти, объем используемой памяти, имя каждого файбера.
Тип возвращаемого значения:
 таблица

Пример:

tarantool> fiber.info()
---
- 101:
    csw: 7
    backtrace: []
    fid: 101
    memory:
      total: 65776
      used: 0
    name: interactive
...
fiber.kill(id)

Поиск файбера по числовому идентификатору и его отмена. Другими словами, fiber.kill() объединяет в себе fiber.find() и fiber_object:cancel().

Параметры:
  • id – ID файбера для отмены.
Исключение:

указанный файбер отсутствует, или отмена невозможна.

Пример:

tarantool> fiber.kill(fiber.id()) -- функция с self может вызвать окончание программы
---
- error: fiber is cancelled
...
fiber.testcancel()

Проверка отмены действующего файбера и выдача исключения, если файбер отменен.

Примечание

Even if you catch the exception, the fiber will remain cancelled. Most types of calls will check fiber.testcancel(). However, some functions (id, status, join etc.) will return no error. We recommend application developers to implement occasional checks with fiber.testcancel() and to end fiber’s execution as soon as possible in case it has been cancelled.

Пример:

tarantool> fiber.testcancel()
---
- error: fiber is cancelled
...
object fiber_object
fiber_object:id()
Параметры:
  • fiber_object – как правило, это объект, полученный в результате вызова fiber.create, fiber.self или fiber.find
Возвращается:

ID файбера.

Тип возвращаемого значения:
 

число

fiber.self():id() может также быть выражен как fiber.id().

Пример:

tarantool> fiber_object = fiber.self()
---
...
tarantool> fiber_object:id()
---
- 101
...
fiber_object:name()
Параметры:
  • fiber_object – как правило, это объект, полученный в результате вызова fiber.create, fiber.self или fiber.find
Возвращается:

имя файбера.

Тип возвращаемого значения:
 

string (строка)

fiber.self():name() может также быть выражен как fiber.name().

Пример:

tarantool> fiber.self():name()
---
- interactive
...
fiber_object:name(name)

Изменение имени файбера. По умолчанию, файбер в интерактивном режиме экземпляра Tarantool’а называется „interactive“, а новые файберы, созданные с помощью fiber.create, называются „lua“. Переименование файберов позволяет легче различать их при использовании fiber.info.

Параметры:
  • fiber_object – как правило, это объект, полученный в результате вызова fiber.create, fiber.self или fiber.find
  • name (string) – новое имя файбера.
Возвращается:

nil

Пример:

tarantool> fiber.self():name('non-interactive')
---
...
fiber_object:status()

Возврат статуса указанного файбера.

Параметры:
  • fiber_object – как правило, это объект, полученный в результате вызова fiber.create, fiber.self или fiber.find
Возвращается:

статус файбера: “dead” (недоступен), “suspended” (приостановлен) или “running” (активен).

Тип возвращаемого значения:
 

string (строка)

fiber.self():status( может также быть выражен как fiber.status().

Пример:

tarantool> fiber.self():status()
---
- running
...
fiber_object:cancel()

Cancel a fiber. Running and suspended fibers can be cancelled. After a fiber has been cancelled, attempts to operate on it will cause errors, for example fiber_object:name() will cause error: the fiber is dead.

Параметры:
  • fiber_object – как правило, это объект, полученный в результате вызова fiber.create, fiber.self или fiber.find
Возвращается:

nil

Возможные ошибки: нельзя отменить указанный объект файбера.

Пример:

tarantool> fiber.self():cancel() -- функция с self может вызвать окончание программы
---
- error: fiber is cancelled
...
fiber_object.storage

Локальное хранилище в пределах файбера. Хранилище может содержать любое количество именованных значений при соблюдении ограничений памяти. Правила именования: объект_файбера.storage.имя, либо объект_файбера.storage['имя']., либо с числом объект_файбера.storage[число]. Значения могут быть числовыми или строковыми. Сборщик мусора в Lua отметит или освободит локальное хранилище при вызове fiber_object:cancel().

Пример:

tarantool> fiber = require('fiber')
---
...
tarantool> function f () fiber.sleep(1000); end
---
...
tarantool> fiber_function = fiber.create(f)
---
...
tarantool> fiber_function.storage.str1 = 'string'
---
...
tarantool> fiber_function.storage['str1']
---
- string
...
tarantool> fiber_function:cancel()
---
...
tarantool> fiber_function.storage['str1']
---
- error: '[string "return fiber_function.storage[''str1'']"]:1: the fiber is dead'
...

См. также box.session.storage.

fiber_object:set_joinable(true_or_false)

fiber_object:set_joinable(true) делает файбер доступным для присоединения; fiber_object:set_joinable(false) делает файбер недоступным для присоединения; по умолчанию, false.

Присоединяемый файбер можно ожидать с помощью fiber_object:join().

Лучше всего вызвать fiber_object:set_joinable() до начала выполнения функции с файбером, поскольку в противном случае файбер может стать недоступен до того, как сработает fiber_object:set_joinable(). Правильная последовательность может быть такой:

  1. Вызов fiber.new() вместо fiber.create() для создания нового объекта файбера fiber_object.

    Не передавать управление, поскольку это приведет к началу работы функции с файбером.

  2. Вызов fiber_object:set_joinable(true), чтобы сделать новый объект файбера fiber_object присоединяемым.

    Сейчас можно передать управление.

  3. Вызов fiber_object:join().

    Как правило, следует вызвать fiber_object:join(), в противном случае, статус файбера может перейти в „suspended“ (приостановлен) после выполнения функции, а не „dead“ (недоступен).

Параметры:
  • true_or_false – логическое значение, которое изменяет флаг set_joinable
Возвращается:

nil

Пример:

Результат следующего ряда запросов:

  • глобальная переменная d получит значение 6 (что доказывает, что функция не выполнялась до тех пор, пока значение d не стало 1, когда fiber.sleep(1) вызвал передачу управления);
  • fiber.status(fi2) будет приостановлен „suspended“ (что доказывает, что после выполнения функции статус файбера не изменился на недоступный „dead“).
fiber=require('fiber')
d=0
function fu2() d=d+5 end
fi2=fiber.new(fu2) fi2:set_joinable(true) d=1 fiber.sleep(1)
print(d)
fiber.status(fi2)
fiber_object:join()

«Присоединение» присоединяемого файбера. То есть возможность запуска функции с файбером и ожидание перехода файбера в статус недоступности „dead“ (как правило, статус переходит в „dead“, когда заканчивается выполнение функции). Присоединение вызовет передачу управления, таким образом, если файбер находится в приостановленном состоянии, выполнение функции файбера возобновится.

Такое ожидание более удобно, чем переход в цикл с периодической проверкой статуса; тем не менее, это работает, только если файбер был создан с помощью fiber.new() и стал доступным для присоединения путем fiber_object:set_joinable().

Возвращается:два значения. Первое значение логическое. Если первое значение = true (правда), значит присоединение прошло успешно, поскольку функция файбера была выполнена нормально, а второй результат – это возвращаемое значение функции файбера. Если же первое значение = false (ложь), значит присоединение не было осуществлено, поскольку выполнение функции файбера было прервано, а второй результат содержит подробную информацию об ошибке, которую можно распаковать так же, как результат вызова pcall.
Тип возвращаемого значения:
 логическое значение +тип результата, или логическое значение + ошибка структуры

Пример:

Результат следующего ряда запросов:

  • первый вызов fiber.status() возвращает „suspended“ (приостановлен),
  • вызов join() возвращает true (правда),
  • как правило, проходит 5 секунд, и
  • второй вызов fiber.status() возвращает „dead“ (недоступен).

Это доказывает, что join() не возвращает результат, пока функция, которая находится в режиме ожидания в течение 5 секунд, недоступна („dead“).

fiber=require('fiber')
function fu2() fiber.sleep(5) end
fi2=fiber.new(fu2) fi2:set_joinable(true)
start_time = os.time()
fiber.status(fi2)
fi2:join()
print('elapsed = ' .. os.time() - start_time)
fiber.status(fi2)
fiber.time()
Возвращается:текущее системное время (в секундах с начала отсчета) в виде Lua-числа. Время берется из часов событийного цикла, поэтому вызов полезен лишь для создания искусственных ключей кортежа.
Тип возвращаемого значения:
 num

Пример:

tarantool> fiber.time(), fiber.time()
---
- 1448466279.2415
- 1448466279.2415
...
fiber.time64()
Возвращается:текущее системное время (в микросекундах с начала отсчета) в виде 64-битного целого числа. Время берется из часов событийного цикла.
Тип возвращаемого значения:
 num

Пример:

tarantool> fiber.time(), fiber.time64()
---
- 1448466351.2708
- 1448466351270762
...

Пример

Создание функции, которая будет связана с файбером. Такая функция содержит бесконечный цикл. Каждая итерация цикла прибавляет 1 к глобальной переменной под названием gvar, а затем уходит в режим ожидания на 2 секунды. Ожидание вызывает неявную передачу управления fiber.yield().

tarantool> fiber = require('fiber')
tarantool> function function_x()
         >   gvar = 0
         >   while true do
         >     gvar = gvar + 1
         >     fiber.sleep(2)
         >   end
         > end
---
...

Создание файбера, ассоциация функции function_x с файбером и запуск function_x. Она сразу же «отсоединится», то есть будет работать отдельно от вызывающего метода.

tarantool> gvar = 0

tarantool> fiber_of_x = fiber.create(function_x)
---
...

Получение ID файбера (fid) для последующего вывода.

tarantool> fid = fiber_of_x:id()
---
...

Небольшая остановка, пока работает отсоединенная функция. Затем … Отображение идентификатора файбера, статуса файбера и переменной gvar (значение gvar немного увеличится в зависимости от длительности паузы). Статус будет «suspended» (приостановлен), потому что файбер практически всё время проводит в режиме ожидания или передачи управления.

tarantool> print('#', fid, '. ', fiber_of_x:status(), '. gvar=', gvar)
# 102 .  suspended . gvar= 399
---
...

Небольшая остановка, пока работает отсоединенная функция. Затем … Отмена файбера. Затем снова отображение идентификатора файбера, статуса файбера и переменной gvar (значение gvar немного увеличится в зависимости от длительности паузы). На этот раз статус будет «dead» (недоступен), потому что произошла отмена.

tarantool> fiber_of_x:cancel()
---
...
tarantool> print('#', fid, '. ', fiber_of_x:status(), '. gvar=', gvar)
# 102 .  dead . gvar= 421
---
...

Каналы

Вызов fiber.channel() для выделения спейса и получение объекта канала, который будет называться «channel» в примерах данного раздела.

Вызов других процедур по каналу для отправки сообщений, получения сообщений или проверки статуса канала.

Обмен сообщения происходит синхронно. Сборщик мусора в Lua отмечает или освобождает канал, когда его никто не использует, как и любой другой Lua-объект. Используйте объектно-ориентированный синтаксис, например channel:put(message), а не fiber.channel.put(message).

fiber.channel([capacity])

Создание нового канала связи.

Параметры:
  • capacity (int) – максимальное количество слотов (спейсы для сообщений channel:put), которые можно использовать одновременно. По умолчанию, 0.
возвращается:

новый канал.

тип возвращаемого значения:
 

пользовательские данные, возможно включая строку «channel …».

object channel_object
channel_object:put(message[, timeout])

Отправка сообщения по каналу связи. Если канал заполнен, channel:put() ожидает, пока не освободится слот в канале.

Параметры:
  • message (lua-value) – то, что отправляется, как правило, строка, число или таблица
  • timeout (number) – максимальное количество секунд ожидания, чтобы слот освободился
возвращается:

Если указан параметр времени ожидания timeout, и в канале нет свободного слота в течение указанного времени, возвращается значение false (ложь). Если канал закрыт, возвращается значение false. В остальных случаях возвращается значение true (правда), которое указывает на успешную отправку.

тип возвращаемого значения:
 

boolean (логический)

channel_object:close()

Закрытие канала. Все, кто находится в режиме ожидания в канале, отключаются. Все последующие операции channel:get() вернут нулевое значение nil, а все последующие операции channel:put() вернут false (ложь).

channel_object:get([timeout])

Перехват и удаление сообщения из канала. Если канал пуст, channel:get() будет ожидать сообщения.

Параметры:
  • timeout (number) – максимальное количество секунд ожидания сообщения
возвращается:

Если указан параметр времени ожидания timeout, и в канале нет сообщения в течение указанного времени, возвращается нулевое значение nil. Если канал закрыт, возвращается значение nil. В остальных случаях возвращается сообщение, отправленное на канал с помощью channel:put().

тип возвращаемого значения:
 

как правило, строка, число или таблица, как определяет channel:put()

channel_object:is_empty()

Проверка пустоты канала (отсутствие сообщений).

возвращается:true (правда), если канал пуст. В противном случае, false (ложь).
тип возвращаемого значения:
 boolean (логический)
channel_object:count()

Определение количества сообщений в канале.

возвращается:количество сообщений.
тип возвращаемого значения:
 число
channel_object:is_full()

Проверка заполненности канала.

возвращается:true (правда), если канал заполнен (количество сообщений в канале равно количеству слотов, то есть нет места для новых сообщений). В противном случае, false (ложь).
тип возвращаемого значения:
 boolean (логический)
channel_object:has_readers()

Проверка пустого канала на наличие читателей в состоянии ожидания сообщения после отправки запросов channel:get().

возвращается:true (правда), если на канале есть читатели в ожидании сообщения. В противном случае, false (ложь).
тип возвращаемого значения:
 boolean (логический)
channel_object:has_writers()

Проверка полного канала на наличие писателей в состоянии ожидания после отправки запросов channel:put().

возвращается:true (правда), если на канале есть писатели в состоянии ожидании. В противном случае, false (ложь).
тип возвращаемого значения:
 boolean (логический)
channel_object:is_closed()
возвращается:true (правда), если канал уже закрыт. В противном случае, false (ложь).
тип возвращаемого значения:
 boolean (логический)

Пример

В данном примере дается примерное представление о том, как должны выглядеть функции для файберов. Предполагается, что на функции ссылается fiber.create().

fiber = require('fiber')
 channel = fiber.channel(10)
 function consumer_fiber()
     while true do
         local task = channel:get()
         ...
     end
 end

 function consumer2_fiber()
     while true do
         -- 10 секунд
         local task = channel:get(10)
         if task ~= nil then
             ...
         else
             -- время ожидания
         end
     end
 end

 function producer_fiber()
     while true do
         task = box.space...:select{...}
         ...
         if channel:is_empty() then
             -- канал пуст
         end

         if channel:is_full() then
             -- канал полон
         end

         ...
         if channel:has_readers() then
             -- есть файберы
             -- которые ожидают данные
         end
         ...

         if channel:has_writers() then
             -- есть файберы
             -- которые ожидают читателей
         end
         channel:put(task)
     end
 end

 function producer2_fiber()
     while true do
         task = box.space...select{...}
         -- 10 секунд
         if channel:put(task, 10) then
             ...
         else
             -- время ожидания
         end
     end
 end

Условные переменные

Вызов fiber.cond() используется для создания именованной условной переменной, которая будет называться „cond“ для примеров данного раздела.

Вызов cond:wait() используется, чтобы заставить файбер ожидать сигнал, с помощью условной переменной.

Вызов cond:signal() используется, чтобы отправить сигнал для пробуждения отдельного файбера, который выполнил запрос cond:wait().

Вызов cond:broadcast() используется для отправки сигнала всем файберам, которые выполнили cond:wait().

fiber.cond()

Создание новой условной переменной.

возвращается:новая условная переменная.
тип возвращаемого значения:
 Lua-объект
object cond_object
cond_object:wait([timeout])

Перевод файбера в режим ожидания до пробуждения другим файбером с помощью метода signal() или broadcast(). Переход в режим ожидания вызывает неявную передачу управления fiber.yield().

Параметры:
  • timeout – количество секунд ожидания, по умолчанию = всегда.
возвращается:

Если указан параметр времени ожидания timeout, и сигнал не передается в течение указанного времени, wait() вернет значение false (ложь). Если передается signal() или broadcast(), wait() вернет true (правда).

тип возвращаемого значения:
 

boolean (логический)

cond_object:signal()

Пробуждение отдельного файбера, который выполнил wait() для той же переменной.

тип возвращаемого значения:
 nil
cond_object:broadcast()

Пробуждение всех файберов, которые выполнили wait() для той же переменной.

тип возвращаемого значения:
 nil

Пример

Предположим, что запущен экземпляр Tarantool’а на прослушивание на localhost по порту 3301. Предположим, что у пользователя guest есть права на подключение. Используем утилиту tarantoolctl для запуска двух клиентов.

В первом терминале введите:

$ tarantoolctl connect '3301'
tarantool> fiber = require('fiber')
tarantool> cond = fiber.cond()
tarantool> cond:wait()

Задача повиснет, поскольку cond:wait() – без дополнительного аргумента времени ожидания timeout – уйдет в режим ожидания до изменения условной переменной.

Во втором терминале введите:

$ tarantoolctl connect '3301'
tarantool> cond:signal()

Теперь снова взгляните на терминал №1. Он покажет, что ожидание прекратилось, и функция cond:wait() вернула значение true.

В данном примере показана зависимость от использования глобальной условной переменной с произвольным именем cond. В реальной жизни разработчики следят за использованием различных имен для условных переменных в разных приложениях.