Skip to content

Instantly share code, notes, and snippets.

@bigbes
Created November 13, 2017 13:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bigbes/35bdc4008e6a3ce3b5630804203b8158 to your computer and use it in GitHub Desktop.
Save bigbes/35bdc4008e6a3ce3b5630804203b8158 to your computer and use it in GitHub Desktop.
sharding algo
------------------------------------Шардинг-------------------------------------
Есть некоторое заранее зафиксированное множество виртуальных бакетов (vbacket),
каждый из которых - число от 0 до количества vbackets. Есть множество
репликасетов - они же реальные бакеты (backet). Каждый backet хранит некоторое
уникальное подмножество vbackets. То есть один vbacket не разделяется между
backets.
Пример: 11 vbackets и 4 backets.
+-----------------------------------------------------------------+
| vb1 | vb2 | vb3 | vb4 | vb5 | vb6 | vb7 | vb8 | vb9 | vb10| vb11|
+-----------------------------------------------------------------+
| b1 | b2 | b3 | b4 |
+-----------------------------------------------------------------+
Каждый тапл при создании получает номер виртуального бакета, в котором он будет
всегда храниться. При вставке тапла по номеру его vbacket получается номер
backet, и на этот backet отправляется тапл в тот спейс, для которого он был
создан. При этом номер vbacket, посчитанный для тапла, сохраняется в нем, как
поле shard_key, которое также должно присутствовать в формате любого спейса,
который должен участвовать в решардинге.
Функция вычисления номера виртуального бакета может быть разной для разных
спейсов, и вовсе не обязана быть функцией от первичного ключа или вообще от
ключа какого-либо индекса. Рассмотрим пример:
CREATE TABLE customer (customer_id PRIMARY KEY, name, date, email, phone);
CREATE TABLE customer_service (customer_id, service_id,
PRIMARY KEY (customer_id, service_id));
Есть пользователь и его тарифы. Нужно хранить все тарифы пользователя на том же
узле, что и самого пользователя. Но первичный ключ у этих спейсов разный, как
минимум по количеству колонок. Однако у них есть одна общая колонка -
customer_id, которая фактически является FOREIGN KEY. И хочется хранить на одном
узле записи с одинаковым customer_id.
То есть требуется возможность явно указать для каждого спейса, по каким его
колонкам требуется считать номер виртуального бакета. Такая возможность может
быть реализована через дополнительную опцию в спейсе: shard_columns. Итого,
каждый спейс может управлять своим шардингом через следующие опции:
- is_shardable - нужно ли вообще этот спейс шардить;
- shard_columns - массив номеров колонок, которые должны составлять аргументы
шардфункции;
- колонка shard_key в формате спейса, в которую нужно сохранять вычисленное
значение шардфункции (оно же - номер vbacket). Понятно, что сделать поле
shard_key, как одну из shard_columns нельзя - шардфункция не может принимать
аргументом свой будущий результат.
Колонки shard_columns и shard_key должны быть readonly, так как изменение
результата шардфункции тапла может привести к тому, что его нужно переносить
на другой backet, а это недопустимо в ручном режиме и должно выполняться только
во время решардинга.
Репликасет сам хранит номера своих vbackets в специальном спейсе
_vbackets{vbacket = 'number'}. Спейс нужен для того, чтобы 1) персистить номера
vbackets, 2) распространять номера vbackets по всеми репликасету, чтобы при
переключении на другого мастера он уже знал, какие vbackets принадлежат его
репликасету.
Из-за наличия колонки shard_key во всех спейсах на пользователя ложится
ответственность за то, чтобы посчитать это значение для каждого тапла. Причем
посчитать его можно, только уже сформировав все остальные колонки. Предлагается
следующая схема заполнения shard_key: пользователь создает таплы, заполняя поле
shard_key нулем и вызывает над ним функцию
shard.space[space_name]:set_shard_key(tuple). Эта функция посчитает шардфункцию
и заменит ноль в поле shard_key на вычисленное значение. Или можно не давать эту
функцию пользователю, а любые DML запросы будут сами проверять, что если
shard_key поле равно 0, то нужно его посчитать и заполнить перед выполнением
DML. Но есть еще call-запросы, которые в качестве аргумента должны принимать уже
вычисленное значение шардфункции. Для этого предлагается сделать глобальную
функцию shard.shard_function() от произвольного числа аргументов, которая
считает шардфункцию так же, как space:set_shard_key, но только set_shard_key
проверяет, что поданный ему тапл имеет все колонки, указанные в shard_columns и
меняет поле shard_key в них (изменение поля очень дешево, поскольку тапл здесь -
это луашная таблица), а shard.shard_function этого не делает, и считает все, что
ему было подано.
Почему не стоит избавляться от shard_columns и просто всегда использовать
shard.shard_function()? Плюс явного задания номеров колонок-аргументов
шардфункции в том, что при выполнении select(EQ, fullkey)-запроса по ключу и
некоторому индексу можно определить, что если колонки этого индекса совпадают с
shard_columns, то можно посчитать шардфункцию от ключа и сразу отправится на
правильный репликасет. То же самое с update, delete, upsert, get.
Помимо репликасетов в кластере присутствуют роутеры, через которые идут все
клиентские запросы. Роутер, в отличие от узлов-стораджей, не хранит никакого
состояния кластера, кроме таблицы отображения vbackets на backets. Но даже ее он
не персистит, она хранится полностью в памяти. При создании роутера его таблица
отображений пуста, и чтобы ее построить, он обходит мастеров всех репликасетов
и получает их vbackets. Построив таким образом таблицу отображений бакетов,
роутер принимает клиентские запросы.
-----------------------------------Решардинг------------------------------------
Спустя какое-то время может получиться так, что на некоторых backets суммарное
место, занимаемое их vbackets, стало слишком велико. Тогда пользователь
добавляет новый backet (он же репликасет) и запускает процедуру ребалансировки
vbackets.
Пример:
+------->---------------->----------------->----------------+
^ ^ ^ |
| | | |
+--+-----------------+-----------------+-----------------------V--------+
| vb1 | vb2 | vb3 | vb4 | vb5 | vb6 | vb7 | vb8 | vb9 | |
+-----------------------------------------------------------------------+
| b1 | b2 | b3 | b4 |
+-----------------------------------------------------------------------+
При добавлении нового бакета роутеры постепенно узнают об этом от глобального
оркестратора. При изменении конфигурации роутер замечает, что у одного из
бакетов нет виртуальных бакетов - это дисбаланс. Тогда роутер по заданному
алгоритму запускает процедуру ребалансировки. Алгоритм выбора того, какие
виртуальные бакеты на какие реальные бакеты нужно перенести, определен заранее и
одинаков на всех роутерах.
Роутер посылает на определенные бакеты сообщение с запросом на перенос некоторых
виртуальных бакетов по указанному адресу (ip, порт). Разослав эти сообщения, он
продолжает слать клиентские запросы так, будто новый бакет все еще пустой (то
есть туда никакие запросы не идут). Когда какой-нибудь бакет закончит перенос
своего виртуального бакета на другой реальный бакет, он начнет отклонять все
запросы по тому виртуальному бакету, который он перенес. Роутер, получив такой
отказ, понимает, что расположение запрошенного виртуального бакета изменилось,
и он заново ищет его в мастерах кластера. Найдя этот vbacket на другом backet,
роутер продолжит слать запросы уже на этот новый backet, обновив свою таблицу
vbacket->backet.
Рассмотрим подробнее, как каждый бакет переносит свои vbackets на новые бакеты.
Процесс переноса является гранулярным. То есть vbackets переносятся по одному, и
после переноса каждого vbacket этот бакет перестает принимать запросы
перенесенного vbacket. Кроме того, пока перенос не закончен, никакие данные
этого vbacket не удаляются со старого backet.
Vbackets переносятся последовательно. В спейсе _vbackets кроме номеров vbackets
хранятся состояния всех vbackets:
- STORED - vbacket хранится на своем месте и никуда не собирается переезжать;
- IN_TRANSFER - vbacket сейчас в процессе переноса;
- SCHEDULED_FOR_TRANSFER - vbacket сейчас не переносится, но он будет
переноситься.
У vbackets в состоянии IN_TRANSFER и SCHEDULED_FOR_TRANSFER хранится адрес узла,
на который надо перенести vbacket. Это нужно затем, что за раз могло быть
добавлено более одного бакета, и тогда с уже существующих бакетов их vbackets
могу переезжать на разные бакеты.
Пример:
+------------->---------------->----------->----------------+
| +------->---------------->-----------+ |
| ^ ^ | |
| | | | |
+--+-----+-----------+------------------------V----------------V--------+
| vb1 | vb2 | vb3 | vb4 | vb5 | vb6 | | |
+-----------------------------------------------------------------------+
| b1 | b2 | b3 | b4 |
+-----------------------------------------------------------------------+
Здесь vb1 переезжает в b4, а vb2 переезжает в b3.
Рассмотрим детали переноса одного конкретно взятого vbacket.
Любой vbacket может содержать относительно большие объемы данных, из-за чего
переносить его одним махом может быть невозможно. Поэтому перенос vbacket
должен производиться частями. Каждая часть отправляется на новый бакет, там она
применяется, после чего переносится следующая часть и т.д., пока не будет
перенесен весь vbacket. Но пока vbacket переносится, над его уже перенесенными
диапазонами или над переносимым в данный момент могут производиться DML
операции. Обновления таплов, вызванные этими операциями, нужно тоже как-то
отправлять. Делать это предлагается так: если DML операция меняет данные,
которые перенесены или в процессе переноса, то она выполняется, а ее номер
спейса, тип и измененный тапл (или ключ для DELETE) запоминаются в специальную
таблицу в памяти - dml_log. Эта таблица не является спейсом и не персистится.
Когда приходит время слать очередной диапазон данных vbacket, к нему
прекрепляются накопленные операции из dml_log. Когда батч перенесен, записи из
dml_log удаляются. Но как удалить их, если их много? Полный проход по этой
таблице даже в памяти может быть не быстрым. Для этого предлагается
многоуровневая структура dml_log: {
'number' = [array of
space_id = [array of
{key = 'array' or tuple = 'array', type = 'number'}
]
]
}
Каждый элемент dml_log - батч. И в памяти есть возрастающий счетчик для
получения идентификаторов DML батчей. На отправку батчи уходят по одному
начиная с самого старого. Батч состоит из операций, сгруппированных по спейсам.
Новые DML запросы добавляются в самый новый батч. Когда батч собирается
отправляться, его идентификатор запоминается, а в dml_log создается новый пустой
батч. Когда становится известно, что батч успешно дошел и применился на новом
бакете, то он удалется одним махом: dml_log[id] = nil.
Когда оказалось, что для переносимого vbacket уже кончились все диапазоны, и
остались только dml_log операции, то все DML запросы на этот vbacket
блокируются путем сброса любых on_replace, где shard_key тапла равен этому
vbacket. Когда dml_log становится пустым, то запускается процедура 2pc для
установки номера перенесенного vbacket на новом backet и удаления этого vbacket
со старого backet. Когда vbacket перенесен уже на уровне _vbacket спейса, то
запросы по этому vbacket уже не блокируются, а отклоняются по механизму,
описанному в разделе "Как делать отлуп". Тогда роутер находит его процедурой,
описанной в начале данной статьи, и обновляет свою таблицу vbacket->backet.
Записи переносимых vbacket могут находиться в разных спейсах. Предлагается
переносить спейсы последовательно, создавая итератор для текущего спейса и
итерируясь по его индексу по shard_key. Чтобы отслеживать все DML операции над
перенесенными данными, для уже перенесенных спейсов нужно ставить on_replace
триггер, который все таплы клал бы в dml_log. А для спейса в процессе переноса
нужно поставить триггер on_replace, в котором производить проверку, попадает ли
ключ dml операции в уже перенесенную часть.
Для этого на время переноса vbacket создается еще одна таблица (не спейс):
spaces = {
space_id = {state = 'number'}
};
Каждая запись таблицы - информация о переносе спейса. У спейса может быть два
состояния:
- IN_TRANSFER - спейс сейчас переносится;
- TRANSFERRED - спейс еще не начал перенос, но будет переноситься.
Когда спейс перенесен, он удаляется из этой таблицы. Когда таблица кончилась,
перенос vbacket окончен.
-------------------Алгоритм вычисления передвижений vbackets--------------------
При добавлении backet и запуске ребалансировки нужно как-то вычислить, какие
vbacket попадут на этот новый backet. Для этого предлагается следующий алгоритм.
Упорядочить таблицу vbacket->backet по двум числам: количество vbacket в
одном backet и номер этого backet. Получим таблицу: {a1, a2, a3, a4, ..., aN},
где a_i - количество vbackets в бакете i.
Рассмотрим добавление новых k бакетов:
/----- k ----\
a = {a1, a2, a3, a4, ..., aN, 0, 0, ... 0, 0}
Посчитаем, сколько в новой конфигурации должно быть минимум vbackets на один
backet:
b = max(1, bottom(sum(a_i, i = 1, N) / (N + k))). Из-за того, что можно хранить
только целое число vbackets, на некоторых бакетах может быть b + 1 vbackets.
При переносе vbackets нужно запоминать, какие vbackets куда переехали, чтобы
послать соответствующие запросы на разные бакеты. Это хранится в специальной
таблице move_vbackets = {
backet = {
backet = [array of vbackets]
},
}.
В ней для каждого бакета хранится массив того, какому backet какие его vbackets
нужно двинуть. Алгоритм заполнения таблицы move_vbackets:
// Позиция бакетов, на которые надо двигать vbackets.
j = N + 1
// Итерация по тем бакетам, с которых надо двигать.
foreach a_i, i = 1, N do
// Скидываем с каждого бакета все vbackets, пока не останется b.
foreach a_l, l = j, N + k do
needed = b - a_l
available = a_i - b
if available >= needed then
to_move = needed
else
to_move = available
end
a_i = a_i - to_move
a_l = a_l + to_move
Move first 'to_move' vbackets from a_i to a_l in move_vbackets table;
if available == 0 then
break
end
end
end
По окончании работы алгоритма между всеми backet не будет разницы в количестве
vbackets более чем 1.
Что делать с тем, что разные роутеры начнут слать на один бакет одни и те же
запросы на передвижки vbackets? Ведь они все будут считать описанные выше вещи
параллельно, как только на них изменится конфигурация. На этот случай на уровне
стораджа предлагается проверять, что если пришел запрос на передвижение vbacket
VB по адресу, а vbacket с таким адресом и состоянием IN_TRANSFER или
SCHEDULED_FOR_TRANSFER уже есть в _vbackets, то не делать ничего. Если же такого
vbacket вообще нет в таблице, то значит, что он уже перенесен - тоже ничего не
делать.
--------------------------------Как делать отлуп--------------------------------
Как определить, что тапл не подходит к данному репликасету? Для этого нужно
проверять у него поле shard_key для каждого спейса и каждой DML операции. Но
тогда это увеличит время выполнения запроса, ведь нужно установить триггер
on_replace на каждый спейс на луа и в нем проверять поле. Это
1) дополнительный вызов луа функции (при том, что в идеале вся штатная DML
работа ведется только через сишный iproto);
2) получение филда тапла, его декодинг и проверка (это еще вызовы между луа и
С).
Вместо этого предлагается переиспользовать механизм версий схемы из netbox.
Метод работы этого механизма таков, что с каждым запросом пересылается версия
схемы БД, которая меняется на каждое обновление системных спейсов. Она
проверяется на сервере, и если она не совпала у БД и запроса к БД, то в ответ
отправляется ошибка несовпадения версий. В ответ на это netbox выкачивает всю
схему и обновляет свою версию.
Использовать эту особенность для обновления списков vbacket на роутерах можно
так: когда backet перенес vbacket на другой backet (запись о vbacket появилась
на новом backet), он удаляет запись об этом vbacket из своего спейса _vbacket, и
перевставляет запись об _vbacket в _space - это не меняет ничего, кроме версии
схемы. И когда следующий нетбокс запрос придет на этот backet, он получит отказ,
который вызовет обновление схемы. В ответ на изменение схемы будет вызван
триггер на стороне роутеров on_schema_change. В нем можно выкачать спейс
_vbacket из того коннекшена, у которого поменялась схема.
Поскольку перебалансировка будет делаться не слишком часто, это будет работать.
-------------------Структура пакетов, отсутствующих в iproto--------------------
* Роутер посылает на бакет запрос на передвижение vbackets на другие бакеты:
{
type = "MOVE_VBACKETS",
destination_backets = {
backet = [array of vbackets],
...
}
}
* Бакет шлет запрос другому бакету на применение батча:
{
type = "APPLY_BATCH",
range = [array of tuples to replace],
dml = [array of {space_id = [array of {key or tuple, type}]}],
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment