Streams | Tarantool

Streams

The Streams and interactive transactions feature, which was added in Tarantool version v. 2.10.0, allows two things: sequential processing and interleaving.

Sequential processing: With streams there is a guarantee that the server instance will not handle the next request in a stream until it has completed the previous one.

Interleaving: For example, a series of requests can include “begin for stream #1”, “begin for stream #2”, “insert for stream #1”, “insert for stream #2”, “delete for stream #1”, “commit for stream #1”, “rollback for stream #2”.

To work with stream transactions using iproto, the following is required:

  • The engine should be vinyl or memtx with mvcc.
  • The client is responsible for ensuring that the stream identifier, unsigned integer IPROTO_STREAM_ID, is in the request header. IPROTO_STREAM_ID can be any positive 64-bit number, and should be unique for the connection. If IPROTO_STREAM_ID equals zero, the server instance will ignore it.

Name Code Description
IPROTO_BEGIN 0x0e Begin a transaction in the specified stream
IPROTO_COMMIT 0x0f Commit the transaction in the specified stream
IPROTO_ROLLBACK 0x10 Rollback the transaction in the specified stream

Code: 0x0e.

Begin a transaction in the specified stream. See stream:begin(). The body is optional and can contain two items:

IPROTO_BEGINSizeMP_UINTHeaderIPROTO_REQUEST_TYPEIPROTO_BEGINIPROTO_SYNCMP_UINTIPROTO_STREAM_IDMP_UINTBodyIPROTO_TIMEOUTMP_DOUBLEIPROTO_TXN_ISOLATIONMP_UINT

IPROTO_TIMEOUT is an optional timeout (in seconds). After it expires, the transaction will be rolled back automatically.

Code: 0x0f.

Commit the transaction in the specified stream. See stream:commit().

IPROTO_COMMITSizeMP_UINTHeaderIPROTO_REQUEST_TYPEIPROTO_COMMITIPROTO_SYNCMP_UINTIPROTO_STREAM_IDMP_UINT

Codde: 0x10.

Rollback the transaction in the specified stream. See stream:rollback().

IPROTO_ROLLBACKSizeMP_UINTHeaderIPROTO_REQUEST_TYPEIPROTO_ROLLBACKIPROTO_SYNCMP_UINTIPROTO_STREAM_IDMP_UINT

Suppose that the client has started a stream with the net.box module

net_box = require('net.box')
conn = net_box.connect('localhost:3302')
stream = conn:new_stream()

At this point the stream object will look like a duplicate of the conn object, with just one additional member: stream_id. Now, using stream instead of conn, the client sends two requests:

stream.space.T:insert{1}
stream.space.T:insert{2}

The header and body of these requests will be the same as in non-stream IPROTO_INSERT requests, except that the header will contain an additional item: IPROTO_STREAM_ID=0x0a with MP_UINT=0x01. It happens to equal 1 for this example because each call to conn:new_stream() assigns a new number, starting with 1.

The client makes stream transactions by sending, in order:

  1. IPROTO_BEGIN with an optional transaction timeout in the IPROTO_TIMEOUT field of the request body.
  2. The transaction data-change and query requests.
  3. IPROTO_COMMIT or IPROTO_ROLLBACK.

All these requests must contain the same IPROTO_STREAM_ID value.

A rollback will happen automatically if a disconnect occurs or the transaction timeout expires before the commit is possible.

Thus there are now multiple ways to do transactions: with net_box stream:begin() and stream:commit() or stream:rollback() which cause IPROTO_BEGIN and IPROTO_COMMIT or IPROTO_ROLLBACK with the current value of stream.stream_id; with box.begin() and box.commit() or box.rollback(); with SQL and START TRANSACTION and COMMIT or ROLLBACK. An application can use any or all of these ways.

Found what you were looking for?
Feedback