Skip to content

Latest commit

 

History

History
 
 

async

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 

Asynchronous programming

The driver exposes an asynchronous API that allows you to write programs in a fully-non blocking manner. Asynchronous methods return instances of Guava's ListenableFuture, that can be conveniently chained and composed.

Here is a short example that opens a session and runs a query asynchronously:

import com.google.common.util.concurrent.*;

ListenableFuture<Session> session = cluster.connectAsync();

// Use transform with an AsyncFunction to chain an async operation after another:
ListenableFuture<ResultSet> resultSet = Futures.transform(session,
    new AsyncFunction<Session, ResultSet>() {
        public ListenableFuture<ResultSet> apply(Session session) throws Exception {
            return session.executeAsync("select release_version from system.local");
        }
    });

// Use transform with a simple Function to apply a synchronous computation on the result:
ListenableFuture<String> version = Futures.transform(resultSet,
    new Function<ResultSet, String>() {
        public String apply(ResultSet rs) {
            return rs.one().getString("release_version");
        }
    });

// Use a callback to perform an action once the future is complete:
Futures.addCallback(version, new FutureCallback<String>() {
    public void onSuccess(String version) {
        System.out.printf("Cassandra version: %s%n", version);
    }

    public void onFailure(Throwable t) {
        System.out.printf("Failed to retrieve the version: %s%n",
            t.getMessage());
    }
});

Async paging

If you consume a ResultSet in a callback, be aware that iterating the rows will trigger synchronous queries as you page through the results. To avoid this, use getAvailableWithoutFetching to limit the iteration to the current page, and fetchMoreResults to get a future to the next page (see also the section on paging). Here is a full example:

Statement statement = new SimpleStatement("select * from foo").setFetchSize(20);
ListenableFuture<ResultSet> future = Futures.transform(
    session.executeAsync(statement),
    iterate(1));

private static AsyncFunction<ResultSet, ResultSet> iterate(final int page) {
    return new AsyncFunction<ResultSet, ResultSet>() {
        @Override
        public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {

            // How far we can go without triggering the blocking fetch:
            int remainingInPage = rs.getAvailableWithoutFetching();

            System.out.printf("Starting page %d (%d rows)%n", page, remainingInPage);

            for (Row row : rs) {
                System.out.printf("[page %d - %d] row = %s%n", page, remainingInPage, row);
                if (--remainingInPage == 0)
                    break;
            }
            System.out.printf("Done page %d%n", page);

            boolean wasLastPage = rs.getExecutionInfo().getPagingState() == null;
            if (wasLastPage) {
                System.out.println("Done iterating");
                return Futures.immediateFuture(rs);
            } else {
                ListenableFuture<ResultSet> future = rs.fetchMoreResults();
                return Futures.transform(future, iterate(page + 1));
            }
        }
    };
}

Good practices

If your callback is slow, consider providing a separate executor. Otherwise the callback might run on one of the driver's I/O threads, blocking I/O operations for other requests while it is running:

ListenableFuture<String> result = Futures.transform(resultSet,
    new Function<ResultSet, String>() {
        public String apply(ResultSet rs) {
            return someVeryLongComputation(rs);
        }
    }, myCustomExecutor);

Avoid blocking operations in callbacks, especially if you don't provide a separate executor. This could easily lead to deadlock if the thread that's supposed to complete the blocking call is also the thread that's waiting on it:

ListenableFuture<ResultSet> resultSet = Futures.transform(session,
    new Function<Session, ResultSet>() {
        public ResultSet apply(Session session) {
            // Synchronous operation in a callback.
            // DON'T DO THIS! It might deadlock.
            return session.execute("select release_version from system.local");
        }
    });

Known limitations

There are still a few places where the driver will block internally (mainly for historical reasons):

  • Cluster#init performs blocking I/O operations. To avoid issues, you should create your Cluster instances while bootstrapping your application, and call init immediately. If you need to create new instances at runtime, make sure this does not happen on an I/O thread.
  • if a connection pool is busy, the driver will block until a connection becomes available. To avoid this, set PoolingOptions.poolTimeoutMillis to 0; the driver will not block, just move to the next host immediately.
  • if the session is set to a specific keyspace (either at startup or by issuing a USE statement on a running session), the keyspace needs to be propagated to any newly created connection. This is done when the connection is first borrowed from the connection pool, and currently blocks. To avoid any issue, only use a session with no keyspace set (i.e. created by Cluster#connect()). This will be addressed in JAVA-893.
  • trying to read fields from a query trace will block if the trace hasn't been fetched already.