Шардирование с vshard | Tarantool
Документация на русском языке
поддерживается сообществом
Администрирование Шардирование с vshard

Шардирование с vshard

Sharding in Tarantool is implemented in the vshard module. For a quick start with vshard, refer to Creating a sharded cluster.

Примечание

Starting with the 3.0 version, the recommended way of configuring Tarantool is using a configuration file. The sharding section defines configuration parameters related to sharding. To learn how to configure vshard in code, see Справочник по настройке.

Модуль vshard не входит в основной дистрибутив Tarantool. Чтобы установить модуль, выполните команду:

$ tt rocks install vshard

If you are developing a sharded cluster application, add the vshard module dependency to a *.rockspec file:

dependencies = {
    'vshard == 0.1.26'
}

Примечание

The minimum required version of vshard is 0.1.25.

Configuring settings related to sharding might include the following steps:

  1. Configure connection settings to allow instances within a sharded cluster to communicate with each other.
  2. Specify which role each replica set plays in a sharded cluster.
  3. Configure how data is partitioned across shards.
  4. Specify settings related to data rebalancing.

This section describes connection options that enable communication between instances within a sharded cluster. For general information about connections, see the Connections topic.

To allow a router and rebalancer to connect to storages, a user with the sharding role should be used. The example below shows how to grant the sharding role to the storage user:

credentials:
  users:
    replicator:
      password: 'topsecret'
      roles: [replication]
    storage:
      password: 'secret'
      roles: [sharding]

The sharding role has different privileges depending on a replica set’s sharding role. For replica sets with the storage sharding role, the sharding credential role has the following privileges:

  • All privileges provided by the replication role.
  • Executing vshard.storage.* functions.

If a replica set does not have the storage sharding role, the sharding credential role does not have any privileges.

Each replica set in a sharded cluster can have one of three roles:

  • router: a replica set acts as a router.
  • storage: a replica set acts as a storage.
  • rebalancer: a replica set acts as a rebalancer.

You can use the sharding.roles option to assign a specific role to a replica set or group of replica sets. In the example below, all replica sets in the storages group have the storage role while replica sets in the routers group have the router role.

groups:
  storages:
    sharding:
      roles: [storage]
    # ...
  routers:
    sharding:
      roles: [router]
    # ...

Note that the rebalancer role is optional. If it is not specified, a rebalancer is selected automatically from the master instances of replica sets. To specify the rebalancer manually or turn it off, use the sharding.rebalancer_mode option.

This section describes configuration settings related to data partitioning. Learn how to define spaces to be sharded in Data definition.

To define the total number of buckets in a cluster, configure the sharding.bucket_count option at the global level. In the example below, sharding.bucket_count is set to 1000:

sharding:
  bucket_count: 1000

sharding.bucket_count should be several orders of magnitude larger than the potential number of cluster nodes considering potential scaling out in the future.

If the estimated number of nodes in a cluster is N, then the data set should be divided into 100N or even 1000N buckets depending on the planned scaling out. This number is greater than the potential number of cluster nodes in the system being designed.

Keep in mind that too many buckets can cause a need to allocate more memory to store routing information. On the other hand, an insufficient number of buckets can lead to decreased granularity when rebalancing.

A replica set weight defines the storage capacity of the replica set: the larger the weight, the more buckets the replica set can store. You can configure a replica set weight using the sharding.weight option. This option can be used to store the prevailing amount of data on a replica set with more memory space. You can also assign a zero weight to a replica set to initiate migration of its buckets to the remaining cluster nodes.

In the example below, the storage-a replica set can store twice as much data as storage-b:

# ...
replicasets:
  storage-a:
    sharding:
      weight: 2
    # ...
  storage-b:
    sharding:
      weight: 1
    # ...

Существует эталонное число сегментов в наборе реплик («эталонный» в данном случае значит идеальный). Если во всем наборе реплик это число остается неизменным, то сегменты распределяются равномерно.

The etalon number is calculated automatically considering the number of buckets in the cluster and the weights of the replica sets.

Rebalancing starts if the disbalance threshold of a replica set exceeds the disbalance threshold specified in the configuration (the sharding.rebalancer_disbalance_threshold option).

Предел дисбаланса набора реплик рассчитывается следующим образом:

|эталонное_число_сегментов - текущее_число_сегментов| / эталонное_число_сегментов * 100

For example, a cluster is configured as follows:

In this case, the etalon numbers of buckets for the replica sets are:

  • 1st replica set – 1000.
  • 2nd replica set – 500.
  • 3rd replica set – 1500.

You can set a replica set weight to zero to initiate migration of its buckets to the remaining cluster nodes. You can also add a new replica set with a non-zero weight to initiate migration of the buckets from the existing replica sets.

When a new shard is added, a configuration should be reloaded on each instance to migrate buckets to a new shard:

  • If a centralized configuration storage is used, Tarantool reloads a changed configuration automatically.
  • If a local configuration file is used, you need to reload a configuration on all the routers first and then on all the storages.

Originally, vshard had quite a simple rebalancer – one process on one node that calculated routes that should send buckets, how many, and to whom. The nodes applied these routes one by one sequentially.

К сожалению, такая простая схема работала недостаточно быстро, особенно для Vinyl’а, где затраты ресурсов на чтение диска были сопоставимы с сетевыми затратами. На самом деле, механизм применения маршрутов в балансировщике Vinyl’а большую часть времени был в режиме ожидания.

Теперь каждый узел может параллельно посылать несколько сегментов по кругу в несколько пунктов назначения или всего в один.

To set the degree of parallelism, use the sharding.rebalancer_max_sending option:

sharding:
  rebalancer_max_sending: 5

Примечание

Specifying sharding.rebalancer_max_sending = N probably won’t give N times speed up. It depends on network, disk, number of other fibers in the system.

У вас уже есть 10 наборов реплик, добавили новый. Теперь все 10 наборов реплик будут пытаться отправить сегменты на новый.

Assume that each replica set can send up to 5 buckets at once. In that case, the new replica set will experience a rather big load of 50 buckets being downloaded at once. If the node needs to do some other work, perhaps such a big load is undesirable. Also too, many parallel buckets can cause timeouts in the rebalancing process itself.

To fix the problem, you can set a lower value for rebalancer_max_sending for old replica sets, or decrease rebalancer_max_receiving for the new one. In the latter case, some workers on old nodes will be throttled, and you will see that in the logs.

rebalancer_max_sending is important, if you have restrictions for the maximum number of buckets that can be read only at once in the cluster. As you remember, when a bucket is being sent, it does not accept new write requests.

У вас есть 100 000 сегментов, и каждый сегмент хранит ~ 0,001% ваших данных. В кластере 10 наборов реплик. И нельзя позволить себе заблокировать для записи > 0,1% данных. Таким образом, не следует устанавливать значение rebalancer_max_sending > 10 на этих узлах. Тогда балансировщик не будет посылать более 100 сегментов одновременно по всему кластеру.

If rebalancer_max_sending is too high and rebalancer_max_receiving is too low, then some buckets will try to get relocated – and will fail with that. This problem will consume network resources and time. It is important to configure these parameters to not conflict with each other.

A replica set lock (sharding.lock) makes a replica set invisible to the rebalancer: a locked replica set can neither receive new buckets nor migrate its own buckets.

A bucket pin (vshard.storage.bucket_pin(bucket_id)) blocks a specific bucket from migrating: a pinned bucket stays on the replica set to which it is pinned until it is unpinned.

Закрепление всех сегментов в наборе реплик не означает блокирование набора реплик. Даже после закрепления всех сегментов незаблокированный набор реплик может принимать новые сегменты.

A replica set lock is helpful, for example, to separate a replica set from production replica sets for testing, or to preserve some application metadata that must not be sharded for a while. A bucket pin is used for similar cases but in a smaller scope.

By both locking a replica set and pinning all buckets, you can isolate an entire replica set.

Заблокированные наборы реплик и закрепленные сегменты влияют на алгоритм балансировки, так как балансировщик должен игнорировать заблокированные наборы реплик и учитывать закрепленные сегменты при попытке достичь наилучшего возможного баланса.

Это нетривиальная задача, поскольку пользователь может закрепить слишком много сегментов в наборе реплик, так что становится невозможным достижение идеального баланса. Например, рассмотрим следующий кластер (предположим, что все веса наборов реплик равны 1).

Начальная конфигурация:

rs1: bucket_count = 150 -- число сегментов
rs2: bucket_count = 150, pinned_count = 120 -- число сегментов, число закрепленных сегментов

Добавление нового набора реплик:

rs1: bucket_count = 150
rs2: bucket_count = 150, pinned_count = 120
rs3: bucket_count = 0

Идеальным балансом было бы 100 - 100 - 100, чего невозможно достичь, поскольку набор реплик rs2 содержит 120 закрепленных сегментов. The best possible balance here is the following:

rs1: bucket_count = 90
rs2: bucket_count = 120, pinned_count 120
rs3: bucket_count = 90

The rebalancer moved as many buckets as possible from rs2 to decrease the disbalance. At the same time, it respected equal weights of rs1 and rs3.

Алгоритмы реализации блокировки и закрепления совершенно разные, хотя с точки зрения функций они похожи.

Locked replica sets do not participate in rebalancing. This means that even if the actual total number of buckets is not equal to the etalon number, the disbalance cannot be fixed due to the lock. When the rebalancer detects that one of the replica sets is locked, it recalculates the etalon number of buckets of the non-locked replica sets as if the locked replica set and its buckets did not exist at all.

Балансировка наборов реплик с закрепленными сегментами требует более сложного алгоритма. Здесь pinned_count[o] – это число закрепленных сегментов, а etalon_count – это эталонное число сегментов для набора реплик:

  1. Балансировщик рассчитывает эталонное число сегментов, как если бы все сегменты не были закреплены. Затем балансировщик проверяет каждый набор реплик и сопоставляет эталонное число сегментов с числом закрепленных сегментов в наборе реплик. Если pinned_count < etalon_count, незаблокированные наборы реплик (на данном этапе все заблокированные наборы реплик уже отфильтрованы) с закрепленными сегментами могут получать новые сегменты.
  2. Если же pinned_count > etalon_count, дисбаланс исправить нельзя, так как балансировщик не может вывести закрепленные сегменты из этого набора реплик. В таком случае эталонное число обновляется как равное числу закрепленных сегментов. Наборы реплик с pinned_count > etalon_count не обрабатываются балансировщиком`, а число закрепленных сегментов вычитается из общего числа сегментов. Балансировщик пытается вывести как можно больше сегментов из таких наборов реплик.
  3. Эта процедура перезапускается с шага 1 для наборов реплик с pinned_count >= etalon_count до тех пор, пока не будет выполнено условие pinned_count <= etalon_count для всех наборов реплик. Процедура также перезапускается при изменении общего числа сегментов.

Псевдокод для данного алгоритма будет следующим:

function cluster_calculate_perfect_balance(replicasets, bucket_count)
        -- балансировка сегментов с использованием веса рабочих наборов реплик --
end;

cluster = <all of the non-locked replica sets>;
bucket_count = <the total number of buckets in the cluster>;
can_reach_balance = false
while not can_reach_balance do
        can_reach_balance = true
        cluster_calculate_perfect_balance(cluster, bucket_count);
        foreach replicaset in cluster do
                if replicaset.perfect_bucket_count <
                   replicaset.pinned_bucket_count then
                        can_reach_balance = false
                        bucket_count -= replicaset.pinned_bucket_count;
                        replicaset.perfect_bucket_count =
                                replicaset.pinned_bucket_count;
                end;
        end;
end;
cluster_calculate_perfect_balance(cluster, bucket_count);

Сложность алгоритма составляет O(N^2), где N – количество наборов реплик. На каждом шаге алгоритм либо завершает вычисление, либо игнорирует хотя бы один новый набор реплик, перегруженный закрепленными сегментами, и обновляет эталонное число сегментов в других наборах реплик.

Ссылка в сегменте – это счетчик в оперативной памяти, который похож на закрепление сегмента со следующими отличиями:

  1. Ссылка в сегменте никогда не сохраняется. Ссылки предназначены для запрета передачи сегментов во время выполнения запроса, но при перезапуске все запросы отбрасываются.

  2. Есть 2 типа ссылок в сегменте: только чтение (RO) и чтение-запись (RW).

    Если в сегменте есть ссылки типа RW, его нельзя перемещать. Однако, если балансировщику требуется отправка этого сегмента, он блокирует его для новых запросов на запись, ожидает завершения всех текущих запросов, а затем отправляет сегмент.

    Если в сегменте есть ссылки типа RO, его можно отправить, но нельзя удалить. Такой сегмент может даже перейти в статус мусора GARBAGE или отправки SENT, но его данные сохраняются до тех пор, пока не уйдет последний читатель.

    В одном сегменте могут быть ссылки как типа RO, так и типа RW.

  3. Ссылки в сегменте исчисляются.

The vshard.storage.bucket_ref/unref() methods are called automatically when vshard.router.call() or vshard.storage.call() is used. For raw API like r = vshard.router.route() r:callro/callrw, you should explicitly call the bucket_ref() method inside the function. Also, make sure that you call bucket_unref() after bucket_ref(), otherwise the bucket cannot be moved from the storage until the instance is restarted.

Чтобы узнать количество ссылок в сегменте, используйте vshard.storage.buckets_info([идентификатор_сегмента]) (параметр идентификатор_сегмента необязателен).

Пример:

vshard.storage.buckets_info(1)
---
- 1:
    status: active
    ref_rw: 1
    ref_ro: 1
    ro_lock: true
    rw_lock: true
    id: 1

Sharded spaces should be defined in a storage application inside box.once() and should have a field with bucket id values. This field should meet the following requirements:

  • The field’s data type can be unsigned, number, or integer.
  • The field must be non-nullable.
  • The field must be indexed by the shard_index. The default name for this index is bucket_id.

In the example below, the bands space has the bucket_id field, which is used to partition a dataset across different storage instances:

box.once('bands', function()
    box.schema.create_space('bands', {
        format = {
            { name = 'id', type = 'unsigned' },
            { name = 'bucket_id', type = 'unsigned' },
            { name = 'band_name', type = 'string' },
            { name = 'year', type = 'unsigned' }
        },
        if_not_exists = true
    })
    box.space.bands:create_index('id', { parts = { 'id' }, if_not_exists = true })
    box.space.bands:create_index('bucket_id', { parts = { 'bucket_id' }, unique = false, if_not_exists = true })
end)

Example on GitHub: sharded_cluster

All DML operations with data should be performed via a router using the vshard.router.call functions, such as vshard.router.callrw() or vshard.router.callro(). For example, a storage application has the insert_band function used to insert new tuples:

function insert_band(id, bucket_id, band_name, year)
    box.space.bands:insert({ id, bucket_id, band_name, year })
end

In a router application, you can define the put function that specifies how a router selects the storage to write data:

function put(id, band_name, year)
    local bucket_id = vshard.router.bucket_id_mpcrc32({ id })
    vshard.router.callrw(bucket_id, 'insert_band', { id, bucket_id, band_name, year })
end

Learn more at Обработка запросов.

В случае отказа мастера в наборе реплик рекомендуется:

  1. Переключить одну из реплик в режим мастера, что позволит новому мастеру обрабатывать все входящие запросы.
  2. Обновить конфигурацию всех членов кластера, в результате чего все запросы будут перенаправлены на новый мастер.

In case a whole replica set fails, some part of the dataset becomes inaccessible. Meanwhile, the router tries to reconnect to the master of the failed replica set. This way, once the replica set is up and running again, the cluster is automatically restored.

Для проведения запланированного остановки мастера в наборе реплик рекомендуется:

  1. Update the configuration to use another instance as a master.
  2. Reload the configuration on all the instances. All the requests then are forwarded to a new master.

# Shut down the old master.

Для проведения запланированной остановки набора реплик рекомендуется:

  1. Migrate all the buckets to the other cluster storages. You can do this by assigning a zero weight to a replica set to initiate migration of its buckets to the remaining cluster nodes.
  2. Обновить конфигурацию всех узлов.
  3. Отключить набор реплик.

Поиск сегментов, восстановление сегментов и балансировка сегментов выполняются автоматически и не требуют ручного вмешательства.

С технической точки зрения есть несколько файберов, которые отвечают за различные типы действий:

  • a discovery fiber on the router searches for buckets in the background
  • a failover fiber on the router maintains replica connections
  • a garbage collector fiber on each master storage removes the contents of buckets that were moved
  • a bucket recovery fiber on each master storage recovers buckets in the SENDING and RECEIVING states in case of reboot
  • a rebalancer on a single master storage among all replica sets executes the rebalancing process.

Для получения подробной информации см. разделы Процесс балансировки и Миграция сегментов.

Файбер сборщик мусора работает в фоновом режиме на мастер-хранилищах в каждом наборе реплик. Он начинает удалять содержимое сегмента в состоянии мусора GARBAGE по частям. Когда сегмент пуст, запись о нем удаляется из системного спейса _bucket.

Файбер восстановления сегмента работает на мастер-хранилищах. Он помогает восстановить сегменты в статусах отправки SENDING и получения RECEIVING в случае перезагрузки.

Сегменты в статусе SENDING восстанавливаются следующим образом:

  1. Сначала система ищет сегменты в статусе SENDING.
  2. Если такой сегмент обнаружен, система отправляет запрос в целевой набор реплик.
  3. Если сегмент в целевом наборе реплик находится в активном статусе ACTIVE, исходный сегмент удаляется из исходного узла.

Сегменты в статусе RECEIVING удаляются без дополнительных проверок.

A failover fiber runs on every router. If a master of a replica set becomes unavailable, the failover fiber redirects read requests to the replicas. Write requests are rejected with an error until the master becomes available.

Нашли ответ на свой вопрос?
Обратная связь