1.4. Запрос на сложную обработку кластером¶
Если требуется выполнить обработку всех данных в кластере, то её необходимо
выполнять на всех экземплярах типа storage
. Для этого можно использовать
функцию map_reduce
.
Сложная кластерная обработка по модели MapReduce в TDG состоит из трех этапов:
Map — выполняется на экземплярах с ролью
storage
;Combine — выполняется на экземплярах с ролью
storage
;Reduce — выполняется на экземпляре, где была вызвана функция
map_reduce
.
1.4.1. Синтаксис запроса¶
Функция
map_reduce(type_name, filter, version, map_fn, combine_fn, reduce_fn, opts)
имеет следующие аргументы:
type_name — имя агрегата;
filter — булевы выражения / предикаты;
version — номер версии;
map_fn — указатель на функцию для этапа Map;
combine_fn — указатель на функцию для этапа Combine;
reduce_fn — указатель на функцию для этапа Reduce;
opts — необязательные аргументы:
opts.map_args — дополнительные аргументы для этапа Map;
opts.combine_args — дополнительные аргументы для этапа Combine;
opts.combine_initial_state — исходное состояние для этапа Combine;
opts.reduce_args — дополнительные аргументы для этапа Reduce;
opts.reduce_initial_state — исходное состояние для этапа Reduce.
1.4.2. Этап Map¶
Сначала на каждом экземпляре c ролью storage
будет вызвана функция map_fn
,
столько раз, сколько на этом экземпляре будет найдено подходящих объектов, соответствующих типу
type_name
и условиям filter
.
Функция map_fn
применяется к каждому найденному кортежу.
local map_res, err = map_fn(tuple, opts.map_args)
Эта функция может вернуть любые данные или nil.
Если функция вернет данные, то на следующем этапе эти данные будут использованы
для запуска combine_fn
. combine_fn
будет вызвана столько раз, сколько раз
вызовы map
вернут данные.
1.4.3. Этап Combine¶
Эта стадия опциональна и нужна для того, чтобы по возможности уменьшить количество данных, полученных на стадии Map, перед их передачей по сети и дальнейшей обработкой.
Результат каждого вызова map_fn
передаётся в combine_fn
.
Каждый вызов combine_fn
осуществляется с передачей исходного состояния,
равного результату предыдущего вызова combine_fn
. Для первого вызова исходное
состояние равно opts.combine_initial_state
, либо равно пустой таблице, если этот параметр не задан.
local combine_res = opts.combine_initial_state or {}
local combine_res, err = combine_fn(combine_res, map_res, opts.combine_args)
Функция combine_fn
также выполняется на роли storage
.
Результат выполнения combine_fn
аккумулируется в переменной combine_res
и передается на ту роль, откуда был вызван map_reduce
— в функцию reduce_fn
.
1.4.4. Этап Reduce¶
Данный этап предназначен для завершения обработки данных со всего кластера.
Данные передаются по сети на экземпляр, откуда была вызвана функция map_reduce
.
Там будет вызвана функция reduce_fn
. Эта функция будет вызвана столько
раз, сколько экземпляров типа storage
в кластере найдет подходящие объекты и,
соответственно, вернет результат в combine_res
.
local reduce_res, err = reduce_fn(combine_res, opts.reduce_initial_state, opts.reduce_args)
Результат reduce_fn
возвращается как результат всего map_reduce
.