1.4. Запрос на сложную обработку кластером¶
Если требуется выполнить обработку всех данных в кластере, то её необходимо
выполнять на всех экземплярах типа storage. Для этого можно использовать
функцию map_reduce.
Сложная кластерная обработка по модели MapReduce в TDG состоит из трех этапов:
Map — выполняется на экземплярах с ролью
storage;Combine — выполняется на экземплярах с ролью
storage;Reduce — выполняется на экземпляре, где была вызвана функция
map_reduce.
1.4.1. Синтаксис запроса¶
Функция
map_reduce(type_name, filter, version, map_fn, combine_fn, reduce_fn, opts)
имеет следующие аргументы:
type_name — имя агрегата;
filter — булевы выражения / предикаты;
version — номер версии;
map_fn — указатель на функцию для этапа Map;
combine_fn — указатель на функцию для этапа Combine;
reduce_fn — указатель на функцию для этапа Reduce;
opts — необязательные аргументы:
opts.map_args — дополнительные аргументы для этапа Map;
opts.combine_args — дополнительные аргументы для этапа Combine;
opts.combine_initial_state — исходное состояние для этапа Combine;
opts.reduce_args — дополнительные аргументы для этапа Reduce;
opts.reduce_initial_state — исходное состояние для этапа Reduce.
1.4.2. Этап Map¶
Сначала на каждом экземпляре c ролью storage будет вызвана функция map_fn,
столько раз, сколько на этом экземпляре будет найдено подходящих объектов, соответствующих типу
type_name и условиям filter.
Функция map_fn применяется к каждому найденному кортежу.
local map_res, err = map_fn(tuple, opts.map_args)
Эта функция может вернуть любые данные или nil.
Если функция вернет данные, то на следующем этапе эти данные будут использованы
для запуска combine_fn. combine_fn будет вызвана столько раз, сколько раз
вызовы map вернут данные.
1.4.3. Этап Combine¶
Эта стадия опциональна и нужна для того, чтобы по возможности уменьшить количество данных, полученных на стадии Map, перед их передачей по сети и дальнейшей обработкой.
Результат каждого вызова map_fn передаётся в combine_fn.
Каждый вызов combine_fn осуществляется с передачей исходного состояния,
равного результату предыдущего вызова combine_fn. Для первого вызова исходное
состояние равно opts.combine_initial_state, либо равно пустой таблице, если этот параметр не задан.
local combine_res = opts.combine_initial_state or {}
local combine_res, err = combine_fn(combine_res, map_res, opts.combine_args)
Функция combine_fn также выполняется на роли storage.
Результат выполнения combine_fn аккумулируется в переменной combine_res
и передается на ту роль, откуда был вызван map_reduce — в функцию reduce_fn.
1.4.4. Этап Reduce¶
Данный этап предназначен для завершения обработки данных со всего кластера.
Данные передаются по сети на экземпляр, откуда была вызвана функция map_reduce.
Там будет вызвана функция reduce_fn. Эта функция будет вызвана столько
раз, сколько экземпляров типа storage в кластере найдет подходящие объекты и,
соответственно, вернет результат в combine_res.
local reduce_res, err = reduce_fn(combine_res, opts.reduce_initial_state, opts.reduce_args)
Результат reduce_fn возвращается как результат всего map_reduce.