Subxt: deep dive into `reconnecting/retry functionality`

Hey, recently subxt added new functionality to deal with the fact that WebSocket connections may be suffering from spurious disconnects and in this post I’ll explain how it works and how to use it.

At a high level, a subxt client consists of a set of high level APIs on top of something implementing a Backend trait, which exposes a common interface that all high level APIs require. We have two backend implementations:

  1. LegacyBackend: legacy RPC methods
  2. UnstableBackend: new RPC spec which needs to be tested more thoroughly and gain more user feedback until it is stabilized.

It is possible to avoid a backend and directly make RPC calls via subxt by constructing and calling methods on “LegacyRpcMethods” and “UnstableRpcMethods”; however, it is worth noting that the logic for reconnecting/retry is baked into each of the backend implementations and not into the RPC calls themselves.

Introduction

Previously, the user was responsible for dealing with reconnecting, restarting subscriptions, and
so on when the connection is lost. This is tricky because subxt has its own
abstraction on top of the underlying RPC client where it was only possible to detect when the connection was lost by the following:

  1. Downcast the subxt::RpcError to a jsonrpsee::Error::RestartNeeded to figure out if the connection was closed.
  2. Create a WsClient and call WsClient::on_disconnect, on_disconnect is an async call that returns when the connection is lost.

In both cases, the subxt client has to be re-instantiated to get a new connection state.

It is not an ideal solution, but the reason for that is that the RPC method calls and subscriptions
may have “side effects” and when reconnecting, it can end up with some edge cases
such as “submitting a transaction twice”. Thus, to avoid complexity, subxt left this to users to handle.

Reconnect/retry functionality

subxt v0.36 introduces a new feature called unstable-reconnecting-rpc-client that takes care of
reconnecting according to the specified retry policy and retrying RPC calls and subscriptions (except submitting transactions) when the connection is lost.

It’s possible to create a reconnecting client by doing the following:

// requires the feature `unstable-reconnecting-rpc-client` 
use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a new client with with a reconnecting RPC client.
    let rpc = Client::builder()
        // We can configure the retry policy; here to an exponential backoff.
        // This API accepts an iterator of retry delays, and here we use `take`
        // to limit the number of retries.
        .retry_policy(
            ExponentialBackoff::from_millis(100)
                .max_delay(Duration::from_secs(10))
                .take(3),
        )
        .build("ws://somenode.io".to_string())
        .await?;

    // Use this reconnecting client when instantiating a Subxt client:
    let api: OnlineClient = OnlineClient::from_rpc_client(rpc.clone()).await?;

How it works in subxt, is that once the connection is lost a specific error is emitted
by the RPC client called DisconnectWillReconnect that the backend will then try to automatically handle any reconnection internally if possible, and if not possible, it will forward an error to the user so that they can handle it themselves.

Retry legacy methods

Since reconnection logic is baked into the backend implementations, you’ll have to manually handle reconnection if you are directly calling the low level RPC methods. We expose a couple of utility functions, subxt::utils::retry and subxt::utils::retry_stream to take care of this for you. Here is a small example of how to retry such RPC call:

async fn retry_legacy_rpc_call<T: subxt::Config>(api: &LegacyRpcMethods<T>) {
    // Retry if the connection is lost.
    //
    // You can provide your own retry function or use the default one provided by subxt
    let version = subxt::backend::utils::retry(|| api.system_version())
        .await
        .unwrap();
}

Submitting transactions

Submitting transactions is not currently automatically retried, because we currently rely on subscription events to tell us the state of the submitted transaction. In the future, we will probably check for transactions in blocks manually, allowing us to resume these checks when we are reconnected in most cases.

Here’s a simple example of how you can manually retry a transaction on disconnect (however, it’s not possible to know if the transaction made it into a block using this method):

async fn retry_tx(api: &OnlineClient, tx: &[u8]) {
    // This closure below is retried when the connection is closed
    // and it may be possible to submit the transaction more than once
    //
    // It's not possible to know whether the transaction was submitted to the chain
    // or not by using this solely.
    let sub = subxt::backend::utils::retry_stream(move || {
        let api2 = api.clone();
        Box::pin(async move { api2.backend().submit_transaction(tx).await })
    })
    .await
    .unwrap();
    
    // poll retry_sub.
}

Next steps

  • Get more users
  • Stablize the feature
  • More robust testing

We hope this feature will cover most use-cases to deal with WebSocket disconnects
and any feedback is appreciated!

7 Likes