Версия:

Module fiber

Module fiber

Overview

С помощью модуля fiber можно:

  • создавать, запускать и управлять файберами,
  • отправлять и получать сообщения для различных процессов (например, разные соединения, сессии или файберы) по каналам, а также
  • использовать механизм синхронизации для файберов, аналогично работе «условных переменных» и функций операционных систем, таких как pthread_cond_wait() плюс pthread_cond_signal().

Index

Ниже приведен перечень всех функций и элементов модуля fiber.

Name Use
fiber.create() Создание и запуск файбера
fiber.new() Создание файбера без запуска
fiber.self() Получение объекта файбера
fiber.find() Получение объекта файбера по ID
fiber.sleep() Перевод файбера в режим ожидания
fiber.yield() Передача управления
fiber.status() Получение статуса активного файбера
fiber.info() Получение информации о всех файберах
fiber.kill() Отмена файбера
fiber.testcancel() Проверка отмены действующего файбера
fiber_object:id() Получение ID файбера
fiber_object:name() Получение имени файбера
fiber_object:name(name) Назначение имени файбера
fiber_object:status() Получение статуса файбера
fiber_object:cancel() Отмена файбера
fiber_object.storage Локальное хранилище в пределах файбера
fiber_object:set_joinable() Создание возможности подключения нового файбера
fiber_object:join() Ожидание статуса „dead“ (недоступен) для файбера
fiber.time() Получение системного времени в секундах
fiber.time64() Получение системного времени в микросекундах
fiber.channel() Создание канала связи
channel_object:put() Отправка сообщения по каналу связи
channel_object:close() Закрытие канала
channel_object:get() Перехват сообщения из канала
channel_object:is_empty() Проверка пустоты канала
channel_object:count() Подсчет сообщений в канале
channel_object:is_full() Проверка заполненности канала
channel_object:has_readers() Проверка пустого канала на наличие читателей в состоянии ожидания
channel_object:has_writers() Проверка полного канала на наличие писателей в состоянии ожидания
channel_object:is_closed() Проверка закрытия канала
fiber.cond() Создание условной переменной
cond_object:wait() Перевод файбера в режим ожидания до пробуждения другим файбером
cond_object:signal() Пробуждение отдельного файбера
cond_object:broadcast() Пробуждение всех файберов

Fibers

Файбер – это набор инструкций, которые выполняются по принципу кооперативной многозадачности. Файберы, управление которых происходит с помощью модуля fiber, связаны с функцией под названием функция для файбера, которую задает пользователь.

Существуют три возможных состояния файбера: running (активен), suspended (приостановлен) или dead (недоступен). После создания файбера с помощью fiber.create() он сразу активен. После создания файбера с помощью fiber.new() или передачи управления с помощью fiber.sleep() файбер будет приостановлен. По окончании работы (по причине окончания работы соответствующей функции) файбер становится недоступен.

Все файберы составляют часть реестра файберов. Можно производить поиск по реестру с помощью fiber.find() по ID файбера (fid), который представляет собой числовой идентификатор.

Неконтролируемый файбер можно остановить с помощью fiber_object.cancel. Однако, функция fiber_object.cancel консультативна, то есть сработает только в том случае, если неконтролируемый файбер случайно вызовет fiber.testcancel(). Большинство функций типа box.*, например box.space…delete() или box.space…update(), действительно вызывают fiber.testcancel(), а box.space…select{} не вызовет. В действительности неконтролируемый файбер может перестать отвечать, если он производит большое количество вычислений и не проверяет вероятность отмены.

Другой потенциальной проблемой могут стать файберы, которые не включаются в расписание, поскольку они не подписаны ни на какие события, или потому что соответствующие события не происходят. Такие файберы можно в любое принудительно остановить с помощью fiber.kill(), потому что функция fiber.kill() отправляет асинхронное событие пробуждения на файбер, а fiber.testcancel() проверяет наступление такого события пробуждения.

Сборщик мусора собирает недоступные файберы так же, как и все Lua-объекты: сборщик мусора в Lua освобождает память выделенного для файбера пула, сбрасывает все данные файбера и возвращает файбер (который теперь называется каркасом файбера) в пул файберов. Каркас можно использовать повторно при создании другого файбера.

У файбера есть все возможности сопрограммы (coroutine) на языке Lua, и все принципы программирования, которые применяются к сопрограммам на Lua, применимы и к файберам. Однако Tarantool расширил возможности файберов для внутреннего использования. Поэтому, несмотря на возможность и поддержку использования сопрограмм, рекомендуется использовать файберы.

fiber.create(function[, function-arguments])

Создание и запуск файбера. Происходит создание файбера, который незамедлительно начинает работу.

Параметры:
  • function – функция, которая будет связана с файбером
  • function-arguments – what will be passed to function
Возвращается:

созданный объект файбера

Тип возвращаемого значения:
 

userdata

Example:

tarantool> fiber = require('fiber')
 ---
 ...
 tarantool> function function_name()
          >   fiber.sleep(1000)
          > end
 ---
 ...
 tarantool> fiber_object = fiber.create(function_name)
 ---
 ...
fiber.new(function[, function-arguments])

Создание файбера без запуска: файбер создается, но не запускается сразу же, а ожидает, пока создатель файбера (то есть задача, которая вызывает fiber.new()) не передаст управление согласно правилам контроля транзакций. Файбер создается со статусом „suspended“ (приостановлен). Таким образом, логика fiber.new() слегка отличается от fiber.create().

Как правило, fiber.new() используется вместе с fiber_object:set_joinable() и fiber_object:join().

Параметры:
  • function – функция, которая будет связана с файбером
  • function-arguments – what will be passed to function
Возвращается:

созданный объект файбера

Тип возвращаемого значения:
 

userdata

Example:

tarantool> fiber = require('fiber')
---
...
tarantool> function function_name()
         >   fiber.sleep(1000)
         > end
---
...
tarantool> fiber_object = fiber.new(function_name)
---
...
fiber.self()
Возвращается:объект файбера для запланированного на данный момент файбера.
Тип возвращаемого значения:
 userdata

Example:

tarantool> fiber.self()
 ---
 - status: running
   name: interactive
   id: 101
 ...
fiber.find(id)
Параметры:
  • id – числовой идентификатор файбера.
Возвращается:

объект файбера для указанного файбера.

Тип возвращаемого значения:
 

userdata

Example:

tarantool> fiber.find(101)
 ---
 - status: running
   name: interactive
   id: 101
 ...
fiber.sleep(time)

Передача управления планировщику и переход в режим ожидания на указанное количество секунд. Только текущий файбер можно перевести в режим ожидания.

Параметры:
  • time – количество секунд в режиме ожидания.

Example:

tarantool> fiber.sleep(1.5)
 ---
 ...
fiber.yield()

Передача управления планировщику. Работает аналогично fiber.sleep(0), только fiber.sleep(0) зависит от таймера, fiber.yield() – нет.

Example:

tarantool> fiber.yield()
 ---
 ...
fiber.status([fiber_object])

Возврат статуса текущего файбера. Или же, если передается необязательный параметр fiber_object, возврат статуса указанного файбера.

Возвращается:статус файбера: “dead” (недоступен), “suspended” (приостановлен) или “running” (активен).
Тип возвращаемого значения:
 string

Example:

tarantool> fiber.status()
 ---
 - running
 ...
fiber.info()

Возврат информации о всех файберах.

Возвращается:количество переключений контекста, обратная трассировка, ID, общий объем памяти, объем используемой памяти, имя каждого файбера.
Тип возвращаемого значения:
 table

Example:

tarantool> fiber.info()
 ---
 - 101:
     csw: 7
     backtrace: []
     fid: 101
     memory:
       total: 65776
       used: 0
     name: interactive
 ...
fiber.kill(id)

Поиск файбера по числовому идентификатору и его отмена. Другими словами, fiber.kill() объединяет в себе fiber.find() и fiber_object:cancel().

Параметры:
  • id – ID файбера для отмены.
Исключение:

указанный файбер отсутствует, или отмена невозможна.

Example:

tarantool> fiber.kill(fiber.id()) -- функция с self может вызвать окончание программы
 ---
 - error: fiber is cancelled
 ...
fiber.testcancel()

Проверка отмены действующего файбера и выдача исключения, если файбер отменен.

Примечание

Even if you catch the exception, the fiber will remain cancelled. Most types of calls will check fiber.testcancel(). However, some functions (id, status, join etc.) will return no error. We recommend application developers to implement occasional checks with fiber.testcancel() and to end fiber’s execution as soon as possible in case it has been cancelled.

Example:

tarantool> fiber.testcancel()
 ---
 - error: fiber is cancelled
 ...
object fiber_object
fiber_object:id()
Параметры:
  • fiber_object – как правило, это объект, полученный в результате вызова fiber.create, fiber.self или fiber.find
Возвращается:

ID файбера.

Тип возвращаемого значения:
 

number

fiber.self():id() может также быть выражен как fiber.id().

Example:

tarantool> fiber_object = fiber.self()
 ---
 ...
 tarantool> fiber_object:id()
 ---
 - 101
 ...
fiber_object:name()
Параметры:
  • fiber_object – как правило, это объект, полученный в результате вызова fiber.create, fiber.self или fiber.find
Возвращается:

имя файбера.

Тип возвращаемого значения:
 

string

fiber.self():name() может также быть выражен как fiber.name().

Example:

tarantool> fiber.self():name()
 ---
 - interactive
 ...
fiber_object:name(name)

Изменение имени файбера. По умолчанию, файбер в интерактивном режиме экземпляра Tarantool’а называется „interactive“, а новые файберы, созданные с помощью fiber.create, называются „lua“. Переименование файберов позволяет легче различать их при использовании fiber.info.

Параметры:
  • fiber_object – как правило, это объект, полученный в результате вызова fiber.create, fiber.self или fiber.find
  • name (string) – новое имя файбера.
Возвращается:

nil

Example:

tarantool> fiber.self():name('non-interactive')
 ---
 ...
fiber_object:status()

Возврат статуса указанного файбера.

Параметры:
  • fiber_object – как правило, это объект, полученный в результате вызова fiber.create, fiber.self или fiber.find
Возвращается:

статус файбера: “dead” (недоступен), “suspended” (приостановлен) или “running” (активен).

Тип возвращаемого значения:
 

string

fiber.self():status( может также быть выражен как fiber.status().

Example:

tarantool> fiber.self():status()
 ---
 - running
 ...
fiber_object:cancel()

Cancel a fiber. Running and suspended fibers can be cancelled. After a fiber has been cancelled, attempts to operate on it will cause errors, for example fiber_object:name() will cause error: the fiber is dead.

Параметры:
  • fiber_object – как правило, это объект, полученный в результате вызова fiber.create, fiber.self или fiber.find
Возвращается:

nil

Возможные ошибки: нельзя отменить указанный объект файбера.

Example:

tarantool> fiber.self():cancel() -- функция с self может вызвать окончание программы
---
- error: fiber is cancelled
...
fiber_object.storage

Локальное хранилище в пределах файбера. Хранилище может содержать любое количество именованных значений при соблюдении ограничений памяти. Правила именования: объект_файбера.storage.имя, либо объект_файбера.storage['имя']., либо с числом объект_файбера.storage[число]. Значения могут быть числовыми или строковыми. Сборщик мусора в Lua отметит или освободит локальное хранилище при вызове fiber_object:cancel().

Example:

tarantool> fiber = require('fiber')
---
...
tarantool> function f () fiber.sleep(1000); end
---
...
tarantool> fiber_function = fiber.create(f)
---
...
tarantool> fiber_function.storage.str1 = 'string'
---
...
tarantool> fiber_function.storage['str1']
---
- string
...
tarantool> fiber_function:cancel()
---
...
tarantool> fiber_function.storage['str1']
---
- error: '[string "return fiber_function.storage[''str1'']"]:1: the fiber is dead'
...

См. также box.session.storage.

fiber_object:set_joinable(true_or_false)

fiber_object:set_joinable(true) делает файбер доступным для присоединения; fiber_object:set_joinable(false) делает файбер недоступным для присоединения; по умолчанию, false.

Присоединяемый файбер можно ожидать с помощью fiber_object:join().

Лучше всего вызвать fiber_object:set_joinable() до начала выполнения функции с файбером, поскольку в противном случае файбер может стать недоступен до того, как сработает fiber_object:set_joinable(). Правильная последовательность может быть такой:

  1. Вызов fiber.new() вместо fiber.create() для создания нового объекта файбера fiber_object.

    Не передавать управление, поскольку это приведет к началу работы функции с файбером.

  2. Вызов fiber_object:set_joinable(true), чтобы сделать новый объект файбера fiber_object присоединяемым.

    Сейчас можно передать управление.

  3. Вызов fiber_object:join().

    Как правило, следует вызвать fiber_object:join(), в противном случае, статус файбера может перейти в „suspended“ (приостановлен) после выполнения функции, а не „dead“ (недоступен).

Параметры:
  • true_or_false – логическое значение, которое изменяет флаг set_joinable
Возвращается:

nil

Example:

Результат следующего ряда запросов:

  • глобальная переменная d получит значение 6 (что доказывает, что функция не выполнялась до тех пор, пока значение d не стало 1, когда fiber.sleep(1) вызвал передачу управления);
  • fiber.status(fi2) будет приостановлен „suspended“ (что доказывает, что после выполнения функции статус файбера не изменился на недоступный „dead“).
fiber=require('fiber')
d=0
function fu2() d=d+5 end
fi2=fiber.new(fu2) fi2:set_joinable(true) d=1 fiber.sleep(1)
print(d)
fiber.status(fi2)
fiber_object:join()

«Присоединение» присоединяемого файбера. То есть возможность запуска функции с файбером и ожидание перехода файбера в статус недоступности „dead“ (как правило, статус переходит в „dead“, когда заканчивается выполнение функции). Присоединение вызовет передачу управления, таким образом, если файбер находится в приостановленном состоянии, выполнение функции файбера возобновится.

Такое ожидание более удобно, чем переход в цикл с периодической проверкой статуса; тем не менее, это работает, только если файбер был создан с помощью fiber.new() и стал доступным для присоединения путем fiber_object:set_joinable().

Возвращается:two values. The first value is boolean. If the first value is true, then the join succeeded because the fiber’s function ended normally and the second result has the return value from the fiber’s function. If the first value is false, then the join succeeded because the fiber’s function ended abnormally and the second result has the details about the error, which one can unpack in the same way that one unpacks a pcall result.
Тип возвращаемого значения:
 boolean +result type, or boolean + struct error

Example:

Результат следующего ряда запросов:

  • первый вызов fiber.status() возвращает „suspended“ (приостановлен),
  • вызов join() возвращает true (правда),
  • как правило, проходит 5 секунд, и
  • второй вызов fiber.status() возвращает „dead“ (недоступен).

Это доказывает, что join() не возвращает результат, пока функция, которая находится в режиме ожидания в течение 5 секунд, недоступна („dead“).

fiber=require('fiber')
function fu2() fiber.sleep(5) end
fi2=fiber.new(fu2) fi2:set_joinable(true)
start_time = os.time()
fiber.status(fi2)
fi2:join()
print('elapsed = ' .. os.time() - start_time)
fiber.status(fi2)
fiber.time()
Возвращается:текущее системное время (в секундах с начала отсчета) в виде Lua-числа. Время берется из часов событийного цикла, поэтому вызов полезен лишь для создания искусственных ключей кортежа.
Тип возвращаемого значения:
 num

Example:

tarantool> fiber.time(), fiber.time()
 ---
 - 1448466279.2415
 - 1448466279.2415
 ...
fiber.time64()
Возвращается:текущее системное время (в микросекундах с начала отсчета) в виде 64-битного целого числа. Время берется из часов событийного цикла.
Тип возвращаемого значения:
 num

Example:

tarantool> fiber.time(), fiber.time64()
 ---
 - 1448466351.2708
 - 1448466351270762
 ...

Example

Создание функции, которая будет связана с файбером. Такая функция содержит бесконечный цикл. Каждая итерация цикла прибавляет 1 к глобальной переменной под названием gvar, а затем уходит в режим ожидания на 2 секунды. Ожидание вызывает неявную передачу управления fiber.yield().

tarantool> fiber = require('fiber')
tarantool> function function_x()
         >   gvar = 0
         >   while true do
         >     gvar = gvar + 1
         >     fiber.sleep(2)
         >   end
         > end
---
...

Создание файбера, ассоциация функции function_x с файбером и запуск function_x. Она сразу же «отсоединится», то есть будет работать отдельно от вызывающего метода.

tarantool> gvar = 0

 tarantool> fiber_of_x = fiber.create(function_x)
 ---
 ...

Получение ID файбера (fid) для последующего вывода.

tarantool> fid = fiber_of_x:id()
 ---
 ...

Небольшая остановка, пока работает отсоединенная функция. Затем … Отображение идентификатора файбера, статуса файбера и переменной gvar (значение gvar немного увеличится в зависимости от длительности паузы). Статус будет «suspended» (приостановлен), потому что файбер практически всё время проводит в режиме ожидания или передачи управления.

tarantool> print('#', fid, '. ', fiber_of_x:status(), '. gvar=', gvar)
 # 102 .  suspended . gvar= 399
 ---
 ...

Небольшая остановка, пока работает отсоединенная функция. Затем … Отмена файбера. Затем снова отображение идентификатора файбера, статуса файбера и переменной gvar (значение gvar немного увеличится в зависимости от длительности паузы). На этот раз статус будет «dead» (недоступен), потому что произошла отмена.

tarantool> fiber_of_x:cancel()
 ---
 ...
 tarantool> print('#', fid, '. ', fiber_of_x:status(), '. gvar=', gvar)
 # 102 .  dead . gvar= 421
 ---
 ...

Каналы

Вызов fiber.channel() для выделения спейса и получение объекта канала, который будет называться «channel» в примерах данного раздела.

Вызов других процедур по каналу для отправки сообщений, получения сообщений или проверки статуса канала.

Обмен сообщения происходит синхронно. Сборщик мусора в Lua отмечает или освобождает канал, когда его никто не использует, как и любой другой Lua-объект. Используйте объектно-ориентированный синтаксис, например channel:put(message), а не fiber.channel.put(message).

fiber.channel([capacity])

Создание нового канала связи.

Параметры:
  • capacity (int) – максимальное количество слотов (спейсы для сообщений channel:put), которые можно использовать одновременно. По умолчанию, 0.
Return:

новый канал.

Rtype:

пользовательские данные, возможно включая строку «channel …».

object channel_object
channel_object:put(message[, timeout])

Отправка сообщения по каналу связи. Если канал заполнен, channel:put() ожидает, пока не освободится слот в канале.

Параметры:
  • message (lua-value) – то, что отправляется, как правило, строка, число или таблица
  • timeout (number) – максимальное количество секунд ожидания, чтобы слот освободился
Return:

Если указан параметр времени ожидания timeout, и в канале нет свободного слота в течение указанного времени, возвращается значение false (ложь). Если канал закрыт, возвращается значение false. В остальных случаях возвращается значение true (правда), которое указывает на успешную отправку.

Rtype:

boolean

channel_object:close()

Закрытие канала. Все, кто находится в режиме ожидания в канале, отключаются. Все последующие операции channel:get() вернут нулевое значение nil, а все последующие операции channel:put() вернут false (ложь).

channel_object:get([timeout])

Перехват и удаление сообщения из канала. Если канал пуст, channel:get() будет ожидать сообщения.

Параметры:
  • timeout (number) – максимальное количество секунд ожидания сообщения
Return:

Если указан параметр времени ожидания timeout, и в канале нет сообщения в течение указанного времени, возвращается нулевое значение nil. Если канал закрыт, возвращается значение nil. В остальных случаях возвращается сообщение, отправленное на канал с помощью channel:put().

Rtype:

как правило, строка, число или таблица, как определяет channel:put()

channel_object:is_empty()

Проверка пустоты канала (отсутствие сообщений).

Return:true (правда), если канал пуст. В противном случае, false (ложь).
Rtype:boolean
channel_object:count()

Определение количества сообщений в канале.

Return:количество сообщений.
Rtype:number
channel_object:is_full()

Проверка заполненности канала.

Return:true (правда), если канал заполнен (количество сообщений в канале равно количеству слотов, то есть нет места для новых сообщений). В противном случае, false (ложь).
Rtype:boolean
channel_object:has_readers()

Проверка пустого канала на наличие читателей в состоянии ожидания сообщения после отправки запросов channel:get().

Return:true (правда), если на канале есть читатели в ожидании сообщения. В противном случае, false (ложь).
Rtype:boolean
channel_object:has_writers()

Проверка полного канала на наличие писателей в состоянии ожидания после отправки запросов channel:put().

Return:true (правда), если на канале есть писатели в состоянии ожидании. В противном случае, false (ложь).
Rtype:boolean
channel_object:is_closed()
Return:true (правда), если канал уже закрыт. В противном случае, false (ложь).
Rtype:boolean

Example

В данном примере дается примерное представление о том, как должны выглядеть функции для файберов. Предполагается, что на функции ссылается fiber.create().

fiber = require('fiber')
 channel = fiber.channel(10)
 function consumer_fiber()
     while true do
         local task = channel:get()
         ...
     end
 end

 function consumer2_fiber()
     while true do
         -- 10 секунд
         local task = channel:get(10)
         if task ~= nil then
             ...
         else
             -- время ожидания
         end
     end
 end

 function producer_fiber()
     while true do
         task = box.space...:select{...}
         ...
         if channel:is_empty() then
             -- канал пуст
         end

         if channel:is_full() then
             -- канал полон
         end

         ...
         if channel:has_readers() then
             -- есть файберы
             -- которые ожидают данные
         end
         ...

         if channel:has_writers() then
             -- есть файберы
             -- которые ожидают читателей
         end
         channel:put(task)
     end
 end

 function producer2_fiber()
     while true do
         task = box.space...select{...}
         -- 10 секунд
         if channel:put(task, 10) then
             ...
         else
             -- время ожидания
         end
     end
 end

Условные переменные

Вызов fiber.cond() используется для создания именованной условной переменной, которая будет называться „cond“ для примеров данного раздела.

Вызов cond:wait() используется, чтобы заставить файбер ожидать сигнал, с помощью условной переменной.

Вызов cond:signal() используется, чтобы отправить сигнал для пробуждения отдельного файбера, который выполнил запрос cond:wait().

Вызов cond:broadcast() используется для отправки сигнала всем файберам, которые выполнили cond:wait().

fiber.cond()

Создание новой условной переменной.

Return:новая условная переменная.
Rtype:Lua object
object cond_object
cond_object:wait([timeout])

Перевод файбера в режим ожидания до пробуждения другим файбером с помощью метода signal() или broadcast(). Переход в режим ожидания вызывает неявную передачу управления fiber.yield().

Параметры:
  • timeout – количество секунд ожидания, по умолчанию = всегда.
Return:

Если указан параметр времени ожидания timeout, и сигнал не передается в течение указанного времени, wait() вернет значение false (ложь). Если передается signal() или broadcast(), wait() вернет true (правда).

Rtype:

boolean

cond_object:signal()

Пробуждение отдельного файбера, который выполнил wait() для той же переменной.

Rtype:nil
cond_object:broadcast()

Пробуждение всех файберов, которые выполнили wait() для той же переменной.

Rtype:nil

Example

Предположим, что запущен экземпляр Tarantool’а на прослушивание на localhost по порту 3301. Предположим, что у пользователя guest есть права на подключение. Используем утилиту tarantoolctl для запуска двух клиентов.

В первом терминале введите:

$ tarantoolctl connect '3301'
 tarantool> fiber = require('fiber')
 tarantool> cond = fiber.cond()
 tarantool> cond:wait()

Задача повиснет, поскольку cond:wait() – без дополнительного аргумента времени ожидания timeout – уйдет в режим ожидания до изменения условной переменной.

Во втором терминале введите:

$ tarantoolctl connect '3301'
 tarantool> cond:signal()

Теперь снова взгляните на терминал №1. Он покажет, что ожидание прекратилось, и функция cond:wait() вернула значение true.

В данном примере показана зависимость от использования глобальной условной переменной с произвольным именем cond. В реальной жизни разработчики следят за использованием различных имен для условных переменных в разных приложениях.