Skip to content

Instantly share code, notes, and snippets.

@havaker
Last active April 7, 2022 17:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save havaker/d3ed06a02186e59c775c39bd14b09fe7 to your computer and use it in GitHub Desktop.
Save havaker/d3ed06a02186e59c775c39bd14b09fe7 to your computer and use it in GitHub Desktop.

In a previous lesson, Rust and Scylla we explained how to use the Rust driver to create applications that interact with Scylla. However, we did not cover more advanced topics such as prepared statements, paging or retries. They will be the subject of this lesson.

Example application

Let's create a console application that reads messages from standard input and puts them into some table in Scylla.

Here is some boilerplate code for keyspace and table creation:

CREATE KEYSPACE IF NOT EXISTS log WITH REPLICATION = {
  'class': 'SimpleStrategy',
  'replication_factor': 1
};

CREATE TABLE IF NOT EXISTS log.messages (
  id bigint,
  message text,
  PRIMARY KEY (id)
);

Now let's look at the sample code of such an application.

use chrono::Utc;
use scylla::{Session, SessionBuilder};
use tokio::io::{stdin, AsyncBufReadExt, BufReader};

let session: Session = SessionBuilder::new()
    .known_node("127.0.0.1:9042".to_string())
    .build()
    .await?;

let mut lines_from_stdin = BufReader::new(stdin()).lines();
while let Some(line) = lines_from_stdin.next_line().await? {
    let id: i64 = Utc::now().timestamp_millis();

    session
        .query(
            "INSERT INTO log.messages (id, message) VALUES (?, ?)",
            (id, line),
        )
        .await?;
}

let rows = session
    .query("SELECT id, message FROM log.messages", &[])
    .await?
    .rows_typed::<(i64, String)>()?;

for row in rows {
    let (id, message) = row?;
    println!("{}: {}", id, message);
}

Prepared statements

In every iteration of a while loop, we want to insert new data into the log.messages table. Unfortunately doing it this way is inefficient - every call to session.query sends the entire query string to the database, which then parses it. To avoid unnecessary server-side calculations, one can prepare a query in advance using the session.prepare method. Call to this method will return a PreparedStatement object, which can be used later with session.execute() to execute desired query.

What exactly are prepared statements?

A prepared statement is a query that is parsed by Scylla and then saved for later use. One of the valuable benefits is that you can continue to reuse that query and modify variables in the query to match variables such as names, addresses, and locations. Let’s dive a little deeper to see how it works.

When asked to prepare a CQL statement, a client library will send a CQL statement to Scylla. Scylla will then create a unique fingerprint for that CQL statement by MD5 hashing the CQL statement. Scylla will use this hash to check its query cache to see if it has already cached that CQL statement. If Scylla had seen that CQL statement, it will send back a reference to that cached CQL statement. If Scylla does not have that unique query hash in its cache, it will then proceed to parse the query and insert the parsed output into its cache.

The client will then be able to send an execute request specifying the statement id (encapsulated in the PreparedStatement object) and providing the (bound) variables, as we will see next.

Adding prepared statements to our app

Let’s go over a sample code above and modify it to use prepared statements.

The first step is to create a prepared statement (with the help of session.prepare) before the while loop. Next, we need to replace session.query with session.execute inside the while loop.

+    let insert_message = session
+        .prepare("INSERT INTO log.messages (id, message) VALUES (?, ?)")
+        .await?;
+
     let mut lines_from_stdin = BufReader::new(stdin()).lines();
     while let Some(line) = lines_from_stdin.next_line().await? {
         let id: i64 = Utc::now().timestamp_millis();
 
-        session
-            .query(
-                "INSERT INTO log.messages (id, message) VALUES (?, ?)",
-                (id, line),
-            )
-            .await?;
+        session.execute(&insert_message, (id, line)).await?;
     }

After these two steps, our app will reuse a prepared statement insert_message instead of sending raw queries. This significantly improves performance.

Paging

Let's focus on the last lines of our application.

let rows = session
    .query("SELECT id, message FROM log.messages", &[])
    .await?
    .rows_typed::<(i64, String)>()?;

for row in rows {
    let (id, message) = row?;
    println!("{}: {}", id, message);
}

There is a call to Session::query method there - an unprepared select query is sent. Since this query is executed only once, it isn't worth preparing. However if we suspect that the result will be big, it might be better to use paging.

What is paging?

Paging is a way to return a lot of data in manageable chunks. With paging disabled, the coordinator is forced to prepare a single result that holds all the data and returns it. This can have an significant performance impact. Memory usage may also rise excessively (on the driver and Scylla side).

Adding paging to our app

As you may have guessed by now, Session::query does not use paging, it fetches the whole result into memory in one go. There is an alternative Session method, that uses paging under the hood - Session::query_iter. Session::query_iter (Session::execute_iter is an alternative working with prepared statements) takes a query + value list as arguments and returns an async iterator (stream) over result Rows. Let's see how its use will look like:

-    let rows = session
-        .query("SELECT id, message FROM log.messages", &[])
+    let mut row_stream = session
+        .query_iter("SELECT id, message FROM log.messages", &[])
         .await?
-        .rows_typed::<(i64, String)>()?;
+        .into_typed::<(i64, String)>();
 
-    for row in rows {
+    while let Some(row) = row_stream.next().await {
         let (id, message) = row?;
         println!("{}: {}", id, message);
     }

After query_iter invocation, the driver starts a background task that fetches subsequent rows. The caller task (one that invoked query_iter) consumes newly fetched rows by using an iterator-like stream interface. Caller and the background task are running concurrently, so one of them can fetch new rows while the other is consuming.

By adding paging to our app, we reduced memory usage and increased the performance of our application.

Retries

After a query fails the driver might decide to retry it based on its retry policy and the query itself. Retry policy can be configured for whole Session or just for a single query.

Provided retry policies

The driver offers two policies to choose from:

It is possible to provide a custom retry policy by implementing traits RetryPolicy and RetrySesssion.

Using retry policies

The key to enjoying the goodness of retry policies is to provide more information about query idempotency. A query is idempotent if it can be applied multiple times without changing the result of the initial application. The driver will not retry failed query if it is not idempotent. Marking queries as idempotent is expected to be done by the user, as the driver does not parse query strings.

Let's mark our app's select statement as an idempotent one:

+    let mut select_query = Query::new("SELECT id, message FROM log.messages");
+    select_query.set_is_idempotent(true);
+
     let mut row_stream = session
-        .query_iter("SELECT id, message FROM log.messages", &[])
+        .query_iter(select_query, &[])
         .await?
         .into_typed::<(i64, String)>();

By making such a change, we will be able to use retries (provided by the default retry policy) in case of select statement execution error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment