Relay
Introduction
Relay serves as a counterpart to replication procedure.
When replication
, which specifies replication sources(-s),
is set in the box configuration we connect to the remote peer(-s),
from which we will fetch database updates, to keep all nodes
up to date. These remote peers run the relay service in order
to send all local changes to other nodes.
Relay start up
Applier fiber is started for every URI in replication
parameter.
It creates the connection to this URI, performs joining procedure if
needed and subscribes for the changes on remote node. With
applier_join()
function IPROTO_JOIN
code is sent. It initializes
JOIN procedure on the remote node. The full description about message
exchange, details on working of the applier_f
and the overview of the
JOIN procedure from applier’s point of view can be found in
Replication internal documents.
In order to see how relay is initialized we should consider how
IPROTO_JOIN
is processed on the node, to which it was sent by
the applier:
/* box/iproto.cc */
static void
tx_process_replication(struct cmsg *m)
{
...
switch (msg->header.type) {
case IPROTO_JOIN:
box_process_join(&io, &msg->header);
break;
box_process_join()
is invoked on receiving of the join code. Firstly,
we initialize joining procedure:
/* box/box.cc */
void
box_process_join(struct ev_io io, struct xrow_header *header)
{
...
xrow_decode_join_xc(header, &instance_uuid, &replica_version_id);
/* Check that bootstrap has been finished */
if (!is_box_configured)
tnt_raise(ClientError, ER_LOADING);
...
/* Check permissions */
access_check_universe_xc(PRIV_R);
...
/* Add to _cluster space */
struct replica *replica = replica_by_uuid(&instance_uuid);
if (replica == NULL || replica->id == REPLICA_ID_NIL) {
box_check_writable_xc();
struct space *space = space_cache_find_xc(BOX_CLUSTER_ID);
access_check_space_xc(space, PRIV_W);
}
/* Forbid replication with disabled WAL */
if (wal_mode() == WAL_NONE) {
tnt_raise(ClientError, ER_UNSUPPORTED, ...)
}
/* Register the replica as a WAL consumer so that */
struct gc_consumer *gc = gc_consumer_register(&replicaset.vclock,
"replica %s", tt_uuid_str(&instance_uuid));
auto gc_guard = make_scoped_guard([&] { gc_consumer_unregister(gc); });
We make sure that replica has enough permissons, our node is ready for join (it’s WAL is enabled, configuration has finished) and register the consumer, the purpose of which is to be sure that WAL file is not rotated while there are some records which are not yet sent to that replica. This way final stage of join can resume where initial one ends.
Then we create the relay instance for the initial JOIN itself:
/* box/box.cc */
void
box_process_join(struct ev_io *io, struct xrow_header *header)
{
...
struct vclock start_vclock;
relay_initial_join(io->fd, header->sync, &start_vclock);
/* relay_initial_join() code */
struct relay *relay = relay_new(NULL);
relay_start(relay, io, sync, relay_send_initial_join_row,
relay_yield, UINT64_MAX);
...
/* Freeze a read view in engines. */
struct engine_join_ctx ctx;
engine_prepare_join_xc(&ctx);
...
wal_sync(vclock)
...
txn_limbo_wait_confirm(&txn_limbo)
...
txn_limbo_checkpoint(&txn_limbo, &req);
box_raft_checkpoint_local(&raft_req);
...
/* Respond to the JOIN request with the current vclock. */
xrow_encode_vclock_xc(&row, vclock);
coio_write_xrow(relay->io, &row);
/* JOIN_META */
if (replica_version_id > 0) {
xrow_encode_type(&row, IPROTO_JOIN_META);
xstream_write(&relay->stream, &row);
xrow_encode_raft(&row, &fiber()->gc, &raft_req);
xstream_write(&relay->stream, &row);
char body[XROW_SYNCHRO_BODY_LEN_MAX];
xrow_encode_synchro(&row, body, &req);
row.replica_id = req.replica_id;
xstream_write(&relay->stream, &row);
xrow_encode_type(&row, IPROTO_JOIN_SNAPSHOT);
xstream_write(&relay->stream, &row);
}
/* Send read view to the replica. */
engine_join_xc(&ctx, &relay->stream);
The relay_initial_join
sends a stream of engine-specific rows (e.g.
snapshot rows for memtx or dirty cursor data for Vinyl fed from a read-view)
to the remote replica.
For this the function creates new relay structure and prepares data to be
sent. Firstly, we create a new relay structure and get a read view from
engine, after that we save the state of the limbo, which stores synchronous
transactions in progress of collecting ACKs from replicas, state of the raft
protocol in messages, which will be sent lately in JOIN META stage if the
protocol of the replica supports this stage. At the end we send a read view
with all the meta information we gathered and free relay
instance upon
completion.
Then we continue joining procedure
/* box/box.cc */
void
box_process_join(struct ev_io *io, struct xrow_header *header)
{
...
/* Check for replicaid or register new one */
box_on_join(&instance_uuid);
...
/* Master's vclock */
struct vclock stop_vclock;
vclock_copy(&stop_vclock, &replicaset.vclock);
/* Send it to the peer */
struct xrow_header row;
xrow_encode_vclock_xc(&row, &stop_vclock);
row.sync = header->sync;
coio_write_xrow(io, &row);
/* The WAL range (start_vclock; stop_vclock) with rows */
relay_final_join(io->fd, header->sync, &start_vclock, &stop_vclock);
/* End of WAL marker */
xrow_encode_vclock_xc(&row, &replicaset.vclock);
row.sync = header->sync;
coio_write_xrow(io, &row);
/* Advance the consumer position */
gc_consumer_advance(gc, &stop_vclock);
...
We fetch master’s node vclock (the replicaset.vclock
is updated
by WAL engine upon on commit when data is already written to the storage)
and send it out. Then we send the vclock range from start_vclock
to stop_vclock
together with rows bound to the range and end it
sending end of WAL marker.
The relay_final_join
is a bit tricky:
/* box/relay.cc */
void
relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
struct vclock *stop_vclock)
{
struct relay *relay = relay_new(NULL);
...
relay_start(relay, fd, sync, relay_send_row);
...
relay->state = RELAY_FOLLOW;
...
relay->r = recovery_new(cfg_gets("wal_dir"), false,
start_vclock);
vclock_copy(&relay->stop_vclock, stop_vclock);
int rc = cord_costart(&relay->cord, "final_join",
relay_final_join_f, relay);
...
}
It runs relay_final_join_f
in a separate thread waiting for
its completion. This function runs recover_remaining_wals
which scans the WAL files (they can rotate) for rows associated
with {start_vclock; stop_vclock}
range and send them all to
the remote peer.
After this stage our node is joined and any further communication
will be initiated by replica, once it sends the SUBSCRIBE command
(see applier_f()
at Replication). Once received we
prepare our node to send local updates to the peer.
/* box/iproto.cc */
static void
tx_process_replication(struct cmsg *m)
{
...
switch (msg->header.type) {
...
case IPROTO_SUBSCRIBE:
box_process_subscribe(&io, &msg->header);
break;
...
The box_process_subscribe()
never returns but rather watches
for local changes and sends them up. As we remember the same way
the applier_subscribe()
behaves.
The handler is pretty self-explaining:
/* box/box.cc */
void
box_process_subscribe(struct ev_io *io, struct xrow_header *header)
{
...
/* Get vclock of the remote peer */
xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock,
&replica_version_id, &anon, &id_filter);
...
/* Remember current WAL clock */
vclock_create(&vclock);
vclock_copy(&vclock, &replicaset.vclock);
/* Send it to the peer */
struct xrow_header row;
xrow_encode_subscribe_response_xc(&row, &REPLICASET_UUID, &vclock);
/* Send replica id to the peer */
struct replica *self = replica_by_uuid(&INSTANCE_UUID);
row.replica_id = self->id;
row.sync = header->sync;
coio_write_xrow(io, &row);
if (replica_version_id >= version_id(2, 6, 0) && !anon) {
/* Send raft state */
struct raft_request req;
box_raft_checkpoint_remote(&req);
xrow_encode_raft(&row, &fiber()->gc, &req);
coio_write_xrow(io, &row);
sent_raft_term = req.term;
}
/* Set 0 component to ours 0 component value */
vclock_reset(&replica_clock, 0, vclock_get(&replicaset.vclock, 0));
/* Initiate subscription procedure */
relay_subscribe(replica, io->fd, header->sync, &replica_clock,
replica_version_id, id_filter);
}
The subscription routine runs until explicitly cancelled:
/* box/relay.cc */
void
relay_subscribe(struct replica *replica, int fd, uint64_t sync,
struct vclock *replica_clock, uint32_t replica_version_id,
uint32_t replica_id_filter)
{
struct relay *relay = replica->relay;
...
relay_start(relay, fd, sync, relay_send_row);
...
vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
relay->r = recovery_new(cfg_gets("wal_dir"), false, replica_clock);
vclock_copy(&relay->tx.vclock, replica_clock);
...
int rc = cord_costart(&relay->cord, "subscribe",
relay_subscribe_f, relay);
...
}
The relay->r = recovery_new
provides us access to the WAL files while
relay_subscribe_f
runs inside a separate thread.
/* box/relay.cc */
static int
relay_subscribe_f(va_list ap)
{
struct relay *relay = va_arg(ap, struct relay *);
coio_enable();
relay_set_cord_name(relay->io->fd);
cbus_endpoint_create(&relay->tx_endpoint,
tt_sprintf("relay_tx_%p", relay),
fiber_schedule_cb, fiber());
cbus_pair("tx", relay->tx_endpoint.name, &relay->tx_pipe,
&relay->relay_pipe, relay_thread_on_start, relay,
cbus_process);
cbus_endpoint_create(&relay->wal_endpoint,
tt_sprintf("relay_wal_%p", relay),
fiber_schedule_cb, fiber());
...
/* Setup WAL watcher for sending new rows to the replica. */
wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
relay_process_wal_event, cbus_process);
/* Start fiber for receiving replica acks. */
char name[FIBER_NAME_MAX];
snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
struct fiber *reader = fiber_new_xc(name, relay_reader_f);
fiber_set_joinable(reader, true);
fiber_start(reader, relay, fiber());
relay_send_heartbeat(relay);
...
}
Firstly, we create relay->tx_endpoint
endpoint,and pair it
with tx
endpoint. Once paired we will have relay->tx_pipe
which serves as a uni-directional queue from relay
thread to
tx
and relay->relay_pipe
- from tx
to relay
.
relay_thread_on_start()
sets relay->tx.is_raft_enabled
saying to tx
that relay is ready to accept raft pushes
(see relay_push_raft()
below).
Then we setup a watcher on WAL changes. On every new commit
the relay_process_wal_event
will be called which calls
the recover_remaining_wals
helper to advance xlog cursor
in the WAL file and send new rows to the remote replica.
wal_set_watchar
pairs relay’s wal_endpoint
with wal
,
which works inside the WAL thread (see wal_writer_f
in
Write-ahead logging).
The reader of new Acks coming from remote node is implemented
via relay_reader_f
fiber. The one of the key moment is
that all replicas are sending heartbeat messages each other
pointing that they are alive.
In summary in the field of cbus we have:
endpoint
"tx_endpoint"
which listens for events inside relay thread fromtx
;cpipe
tx_pipe
to notifytx
thread from inside of relay thread;cpipe
relay_pipe
to notify relay thread from inside oftx
thread.endpoint
"wal_endpoint"
for events from the wal thread;cpipes from and to the wal thread are located inside the wal watcher;
Relay lifecycle
relay_subscribe_f
sends current recovery vector clock as
a marker of the “current” state of the master. When replica
fetches rows up to this position, it knows it is synced with
the master. Heartbeats are also sent in this fiber.
/* box/relay.cc */
static int
relay_subscribe_f(va_list ap)
{
...
while (!fiber_is_cancelled()) {
/*
* Wait for incoming data from remote
* peer (it is Ack/Heartbeat message)
*/
double timeout = replication_timeout;
...
fiber_cond_wait_deadline(&relay->reader_cond,
relay->last_row_time + timeout);
...
cbus_process(&relay->tx_endpoint);
cbus_process(&relay->wal_endpoint);
/* Send the heartbeat if needed */
relay_send_heartbeat_on_timeout(relay);
/*
* Check that the vclock has been updated and the previous
* status message is delivered
*/
if (relay->status_msg.msg.route != NULL)
continue;
struct vclock *send_vclock;
if (relay->version_id < version_id(1, 7, 4))
send_vclock = &relay->r->vclock;
else
send_vclock = &relay->recv_vclock;
/* Collect xlog files received by the replica. */
relay_schedule_pending_gc(relay, send_vclock);
/* Nothing to do */
double tx_idle = ev_monotonic_now(loop()) - relay->tx_seen_time;
if (vclock_sum(&relay->status_msg.vclock) ==
vclock_sum(send_vclock) && tx_idle <= timeout &&
relay->status_msg.vclock_sync == relay->recv_vclock_sync)
continue;
static const struct cmsg_hop route[] = {
{tx_status_update, NULL}
};
cmsg_init(&relay->status_msg.msg, route);
vclock_copy(&relay->status_msg.vclock, send_vclock);
relay->status_msg.txn_lag = relay->txn_lag;
relay->status_msg.relay = relay;
relay->status_msg.vclock_sync = relay->recv_vclock_sync;
cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
}
...
}
Firstly, we wait for heartbeat packet from remote peer (the
relay_reader_f
will wake us up via relay->reader_cond
).
Then we send our own heartbeat message if tx
thread is
responsive. And finally we send the last received vclock from the
remote peer. Same time we notify xlog engine about WAL files we no
longer need since they are propagated.
Note that WAL commits runs relay_process_wal_event
by
self, still the event is delivered to main event loop and then
to the relay thread.
Relay Raft
The only function which is related to Raft and exported to the public relay
API is relay_push_raft()
. It’s used in order to send raft update request
from the tx thread to relay, after which it is forwarded to the
remote peer, and then returned to the tx.
Let’s consider the way it’s done. The similar constructions can be found between “applier thread” and applier fiber in tx thread, iproto and tx (iproto kharon).
A lot of times cbus serves as means to notify one thread of some news
happening in another thread. Noone limits the pace at which the notifications
appear. For example, relay_push_raft
may be triggered tens of times a
second, if raft terms are bumped fast enough. We don’t want to dynamically
allocate tens of cbus messages in such cases, and we are fine with losing
older messages as long as we deliver newer ones.
/* box/relay.cc */
struct relay {
...
struct {
...
struct relay_raft_msg raft_msgs[2];
int raft_ready_msg;
bool is_raft_push_sent;
...
} tx;
};
We usually pre-allocate 2 messages (like we did here in relay structure: relay->tx.raft_msgs). At any given point in time, at least one of the two messages resides in sender thread (tx), it receives any updates that arrive while the other message is somewhere between sender and receiver (tx and relay).
/* box/relay.cc */
void
relay_push_raft(struct relay *relay, const struct raft_request *req)
{
/* Choose the idle message */
struct relay_raft_msg *msg =
&relay->tx.raft_msgs[relay->tx.raft_ready_msg];
/* Update the request, overwrite if needed */
msg->req = *req;
...
/* Send to the remote peer */
msg->route[0].f = relay_raft_msg_push;
msg->route[0].pipe = &relay->tx_pipe;
/* Retry if needed */
msg->route[1].f = tx_raft_msg_return;
msg->route[1].pipe = NULL;
cmsg_init(&msg->base, msg->route);
msg->relay = relay;
relay->tx.is_raft_push_pending = true;
relay_push_raft_msg(relay);
/* relay_push_raft_msg() code */
if (!relay->tx.is_raft_enabled || relay->tx.is_raft_push_sent)
return;
struct relay_raft_msg *msg =
&relay->tx.raft_msgs[relay->tx.raft_ready_msg];
cpipe_push(&relay->relay_pipe, &msg->base);
relay->tx.raft_ready_msg = (relay->tx.raft_ready_msg + 1) % 2;
relay->tx.is_raft_push_sent = true;
relay->tx.is_raft_push_pending = false;
}
Firstly, we choose the message, which is idling in tx thread and ready to
save Raft requests. After that we update the request, not paying attention
to the data saved in it. As soon as the route of the message (see cbus)
is set, it’s pushed to cpipe directed to the relay thread. The message can be
pushed only if is_raft_enabled
flag is set, which means tx_pipe
and
relay_pipe
have already been created. is_raft_push_sent shows whether
any of the messages is en route between threads, so it must equal to false,
as otherwise there will be no idle message to store incoming updates in tx.
As soon as msg is delivered to relay thread relay_raft_msg_push()
,
which sends message to the remote peer via network (1st route), is executed:
/* box/relay.cc */
static void
relay_raft_msg_push(struct cmsg *base)
{
struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
struct xrow_header row;
xrow_encode_raft(&row, &fiber()->gc, &msg->req);
try {
/* Send via network */
relay_send(msg->relay, &row);
msg->relay->sent_raft_term = msg->req.term;
} catch (Exception *e) {
relay_set_error(msg->relay, e);
fiber_cancel(fiber());
}
}
This function sends the message to the remote peer. After that the message is returned to the tx thread and it is checked if the other message has new updates: is_raft_push_sent flag blocked sending of the new messages, now it’s released and a new message, saved in the other index of the raft_msgs (not the same as was just returned back) is already ready to be pushed to the relay thread:
/* box/relay.cc */
static void
tx_raft_msg_return(struct cmsg *base)
{
struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
/* no message is en route */
msg->relay->tx.is_raft_push_sent = false;
/* if the other message was already saved into raft_msgs[] */
if (msg->relay->tx.is_raft_push_pending)
relay_push_raft_msg(msg->relay);
}