Write-ahead logging
Write-ahead logging WAL is one of the most important parts of databases which implements atomicity and durability. The base idea of logging is to write the data to the storage before make this data accessible by others.
We will reference 2.x series of Tarantool source code.
Journal subsystem
The journal subsystem provides the following API calls:
journal_create
to create a new journal;
journal_set
to destroy old journal and set a new one;
journal_entry_new
to create a new journal entry;
journal_write
to write a journal entry.
There might be only one active journal at a time which is
referred via current_journal
variable (thus we can
create as many journals as we need with journal_create
but only one must be activated via journal_set
).
The journal_entry_new
requires a callback to be associated
with the journal entry to be called when entry processing is complete
(with success or failure). Since we use journal to process database
transactions we pass txn_on_journal_write
as a callback and a
pointer to the transaction itself as a data.
To explain in details how transactions are processed on journal level we need to look at transaction structure (short version to be precise, only with the members we’re interesed in).
/* box/txn.h */
struct txn {
/* Transaction cache */
struct stailq_entry in_txn_cache;
/* Memory to keep transaction data */
struct region region;
/* Transaction ID */
int64_t id;
/* LSN assigned by WAL or -1 on error */
int64_t signature;
/* A fiber initiated the transaction */
struct fiber *fiber;
};
A transaction is allocated via txn_new
call. The memory allocation
is a bit tricky here. We request memory region from the main cord
where we allocate the txn
itself and may increase memory usage if
txn
need additional data to carry. On txn_free
the additional
memory get freed and we put txn
to cache for fast reuse.
TODO: describe tx_on_journal_write
TODO: mention that the use of journal and transactions are described below
WAL thread
Because writing data to the real storage such as hardware drive is a way too slower than plain memory access the storage operations are implemented in a separate thread.
/* box/wal.c */
int
wal_init(enum wal_mode wal_mode, const char *wal_dirname,
int64_t wal_max_size, const struct tt_uuid *instance_uuid,
wal_on_garbage_collection_f on_garbage_collection,
wal_on_checkpoint_threshold_f on_checkpoint_threshold)
{
/* Initialize the state. */
struct wal_writer *writer = &wal_writer_singleton;
wal_writer_create(writer, wal_mode, wal_dirname,
wal_max_size, instance_uuid,
on_garbage_collection,
on_checkpoint_threshold);
/* Start WAL thread. */
if (cord_costart(&writer->cord, "wal", wal_writer_f, NULL) != 0)
return -1;
/* Create a pipe to WAL thread. */
cpipe_create(&writer->wal_pipe, "wal");
cpipe_set_max_input(&writer->wal_pipe, IOV_MAX);
return 0;
}
The writer->cord
points to the statically allocated
wal_writer_singleton
. In cord_costart
we start
a new cord. Once the new cord is initialized its event loop
runs wal_writer_f
:
/* box/wal.c */
static int
wal_writer_f(va_list ap)
struct wal_writer *writer = &wal_writer_singleton;
/* Initialize eio in this thread */
coio_enable();
// This is new thread and new cord thus we need own fiber scheduler
struct cbus_endpoint endpoint;
cbus_endpoint_create(&endpoint, "wal", fiber_schedule_cb, fiber());
// This one is event producer from wal thread to the main thread.
cpipe_create(&writer->tx_prio_pipe, "tx_prio");
// Enter the event loop
cbus_loop(&endpoint);
...
We’re running a new thread with own event loop and a fiber scheduler.
To communicate with this cord and others we use communication bus
(Cbus (Communication bus)) engine. wal
endpoint, which will be used by other cords
to send messages to the wal thread, and writer->tx_prio_pipe
, which
serves as a way for wal thread to communicate with the main tarantool
thread, are created.
/* box/box.cc */
box_cfg_xc(void)
...
cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb...);
Back to wal_writer_f
code: we enter the event loop cbus_loop
and wait for events to appear (via traditional libev
delivery).
Now back to wal_init
. The wal thread is running but we need
to push the messages to it from our side. For this sake we create
a communication pipe (cpipe).
/* box/wal.c */
wal_init
...
/* Create a pipe to WAL thread. */
cpipe_create(&writer->wal_pipe, "wal");
Since endpoint name is "wal"
this cpipe will be nofitying
wal thread.
In summary we have:
endpoint
"tx_prio"
which listens for events inside main tarantool thread;endpoint
"wal"
for events inside wal thread;cpipe
tx_prio_pipe
to notify main thread from inside of wal thread;cpipe
wal_pipe
to notify wal thread from inside of main thread.
Write data to WAL
When we need to issue a real write we allocate an journal entry which has a complete set of data to be written in a one pass.
/* box/journal.h */
struct journal_entry {
// To link entries
struct stailq_entry fifo;
// vclock or error code
int64_t res;
// transaction completions
journal_write_async_f write_async_cb;
void *complete_data;
bool is_complete;
// real user data to write
size_t approx_len;
int n_rows;
struct xrow_header *rows[];
};
We are not interested in specific data associated with the write
but need to point that entries are chained via fifo
member
and comes in strict order to be able to rollback if something goes
wrong.
Once allocated the entry is passed to
/* box/wal.c */
static int
wal_write_async(struct journal *journal, struct journal_entry *entry)
{
...
if (!stailq_empty(&writer->wal_pipe.input) &&
(batch = wal_msg(stailq_first_entry(&writer->wal_pipe.input,
struct cmsg, fifo)))) {
stailq_add_tail_entry(&batch->commit, entry, fifo);
} else {
batch = (struct wal_msg *)mempool_alloc(&writer->msg_pool);
wal_msg_create(batch);
stailq_add_tail_entry(&batch->commit, entry, fifo);
cpipe_push(&writer->wal_pipe, &batch->base);
}
...
cpipe_flush_input(&writer->wal_pipe);
...
}
Here we allocate the communication record (wal_msg_create
)
then bind journal entry into it, push it into writer->wal_pipe
and notify the producer that there is data to handle. Note that
notification does not mean the data gonna be handled immediately
but get queued into the event loop. The loop here is our main cord
loop (remember as we create writer->wal_pipe
in wal_init
).
If the batch already exists in the message queue, which is intended to be sent to the wal thread, we just add an entry to it.
After that the notification is pushed to the wal thread Once notification
received it runs a callback which has been initialized earlier in
wal_write_async
:
/* box/wal.c */
wal_write_async(struct journal *journal, struct journal_entry *entry)
...
wal_msg_create(batch);
...
where:
/* box/wal.c */
static struct cmsg_hop wal_request_route[] = {
{wal_write_to_disk, &wal_writer_singleton.tx_prio_pipe},
{tx_schedule_commit, NULL},
};
static void
wal_msg_create(struct wal_msg *batch)
{
cmsg_init(&batch->base, wal_request_route);
...
}
The cbus_loop
inside wal thread wakes
and fetches the message (we’re sharing memory between main
tarantool thread and wal thread) and manage that named “route”
functions one by one in direct order.
First the wal_write_to_disk
tries to write journal entries
in a batch to the storage. Actually it does a way more than
simply write to the disk but we’re not going to consider it right now.
What is important is that each journal entry gets vclock
value
assigned to the journal_entry:res
member (which is set to
-1
on failure).
Once everything is written the tx_prio_pipe
is notified
and then tx_schedule_commit
is running inside main thread.
/* box/wal.c */
static void
tx_schedule_queue(struct stailq *queue)
{
struct journal_entry *req, *tmp;
stailq_foreach_entry_safe(req, tmp, queue, fifo)
journal_entry_complete(req);
}
static void
tx_schedule_commit(struct cmsg *msg)
{
struct wal_writer *writer = &wal_writer_singleton;
struct wal_msg *batch = (struct wal_msg *) msg;
if (!stailq_empty(&batch->rollback)) {
stailq_concat(&writer->rollback, &batch->rollback);
}
vclock_copy(&replicaset.vclock, &batch->vclock);
tx_schedule_queue(&batch->commit);
mempool_free(&writer->msg_pool, ...);
}
Here we call a callback associated with journal entry (it is been assigned during entry allocation we will talk about it later) and then drop the cbus message back to free pool.
In summary:
we notify the wal thread via
wal_pipe
;wal thread runs
wal_write_to_disk
and notifies main thread viatx_prio_pipe
;main thread runs
tx_schedule_commit
.
Transactions processing
Transactions processing in 1.x series
In this series all transactions are processed in synchronous way. The journal entry carries no callbacks. We allocate the journal entry and bind the transaction into from inside of the main cord
/* box/txn.c */
struct txn *
txn_begin(bool is_autocommit)
{
static int64_t txn_id = 0;
struct txn *txn = region_alloc_object(&fiber()->gc, struct txn);
...
txn->id = ++txn_id;
txn->signature = -1;
txn->engine = NULL;
txn->engine_tx = NULL;
fiber_set_txn(fiber(), txn);
return txn;
}
which implies that the fiber which issue the trancaction
must not be freed until the transaction processing is finished.
The txn->signature
is set to -1
pointing that
transaction has not yet been processed (same code is used
in case if transaction has failed though). The signature
is set to vclock upon transaction completion by the wal engine.
/* box/txn.c */
int
txn_commit(struct txn *txn)
{
if (txn->engine != NULL) {
if (engine_prepare(txn->engine, txn) != 0)
goto fail;
}
if (txn->n_rows > 0) {
txn->signature = txn_write_to_wal(txn);
if (txn->signature < 0)
goto fail;
}
...
if (txn->engine != NULL)
engine_commit(txn->engine, txn);
fiber_gc();
fiber_set_txn(fiber(), NULL);
return 0;
fail:
txn_rollback();
return -1;
}
The key moment here is txn_write_to_wal
function which
sends the transaction to the journal engine, which in turn passes
it to the wal thread.
/* box/txn.c */
static int64_t
txn_write_to_wal(struct txn *txn)
{
struct journal_entry *req = journal_entry_new(txn->n_rows);
...
int64_t res = journal_write(req);
...
if (res < 0)
txn_rollback();
...
return res;
}
The journal_write
sends to wal thread and what is
important it yields the current fiber. Unlinke 2.x series
there is no callbacks associated with journal entry we just
wake up the fiber which has initiated the transaction (the
fiber initiating the transaction saves pointer to the self
in journal_entry
structure.
/* box/wal.c */
// main cord thread
wal_write
// notify wal thread about queued data
cpipe_flush_input(&writer->wal_pipe);
...
bool cancellable = fiber_set_cancellable(false);
fiber_yield();
fiber_set_cancellable(cancellable);
return entry->res;
// wal thread
wal_write_to_disk
...
// notify main thread
wal_writer_singleton.tx_prio_pipe
// main thread
tx_schedule_queue
stailq_foreach_entry(req, queue, fifo)
fiber_wakeup(req->fiber);
Thus the transaction is woken up once wal thread finished processing of the transaction and wrote the entry to the storage.
Then the fiber from wal_write
is woken up and test
for write result via txn->signature
and either
pass the commit to engine or calls txn_rollback
to rollback the transaction on failure.
Transactions processing in 2.x series
The transaction processing in 2.x series is almost the same
as in 1.x with one significant exception - journal writes
became asynchronous. We bind callback txn_on_journal_write
to the journal entry which completes the transaction. This
has been done in the sake of parallel applier (which is heavily
used in replication engine).
Thus the txn_write
routine does not wait for transaction
to complete, still for synchronous transactions we wait explicitly
until the journal callback finished
/* box/txn.c */
int
txn_commit(struct txn *txn)
{
struct journal_entry *req;
...
req = txn_journal_entry_new(txn);
/* txn_journal_entry_new code */
...
req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows + 1,
txn_region, txn_on_journal_write, txn);
...
if (journal_write(req) != 0)
goto rollback_io;
...
}