Announcing the Solana Elixir SDK

e^{i} Ventures
8 min readNov 16, 2021

--

Bringing Elixir to the Solana ecosystem, and why you might want to build your next program client in Elixir

By Derek Meer

I'm happy to announce the initial releases of two new Elixir packages: solana, an unofficial Solana API client, and solana_spl, an unofficial Solana Program Library interface.

Why?

Before I started working at a DeFi venture studio, I worked on web applications written in Elixir. Elixir is a great choice for building modern, fault-tolerant, concurrent applications like Web2 apps or data pipelines. I noted several cases at work that could benefit from an Elixir library or language feature. Recently, while doing research for a new project, I found one case which fit Elixir's strengths so well that I decided to kickstart the Elixir-Solana ecosystem myself.

Note: The rest of this article assumes you have some knowledge of Elixir. If you're interested in learning more, the official site and documentation are good places to start.

The Problem

Aggregate all transactions associated with tokens and run proprietary operations on them.

Solana provides a few API methods to do this: getSignaturesForAccount, which returns a list of transaction signatures, and getTransaction, which returns the transaction details given its signature. You need to call getSignaturesForAccount on the token mint you want to analyze, then for each item in the response, you call getTransaction to get the details.

Some additional constraints:

  • most official endpoints have a limit of 100 requests every 10 seconds per IP.
  • getSignaturesForAccount returns a maximum of 1000 signatures; most large token distributions have orders of magnitude more transactions than that.

I don't know the best way to do this in JavaScript, so let's see how I would solve this problem in Elixir.

The Code

The first constraint I listed above is a rate limit. When enforcing rate limits in data pipelines like this, Elixir developers turn to the GenStage package. This is such a common use case, the official repository provides a rate limiter example.

I modified that code slightly to fit my needs:

elixir
defmodule SolanaExample.Client.Producer do
@moduledoc """
This module accepts new API requests and forwards them to the
`SolanaExample.RateLimiter` for eventual execution.
"""
use GenStage

# API
def start_link(config) do
GenStage.start_link(__MODULE__, :ok, name: SolanaExample.Registry.via(name(config)))
end

# This process' name, so that other processes can find it
def name(config) do
Keyword.get(config, :network) <> "_producer"
end

def cast(producer, requests) do
GenStage.cast(producer, {:send, requests, self()})
end

def call(producer, requests), do: GenStage.call(producer, {:send, requests})

# GenStage callbacks
def init(:ok) do
{:producer, %{demand: 0, queue: []}}
end

def handle_cast({:send, requests, from}, state) do
handle_call({:send, requests}, from, state)
end

def handle_call({:send, requests}, from, state) do
{events, queue, demand} =
state.queue
|> add_to_queue(from, requests)
|> fulfill_demand(state.demand)

{:noreply, expand(events), %{queue: queue, demand: demand}}
end

def handle_demand(demand, state) do
{events, queue, demand} =
state.queue
|> fulfill_demand(demand)

{:noreply, expand(events), %{queue: queue, demand: demand}}
end

defp add_to_queue(queue, from, requests) do
List.flatten([queue | [{from, List.wrap(requests)}]])
end

defp expand(events) do
events
|> Enum.map(fn {from, requests} -> Enum.map(requests, &{from, &1}) end)
|> List.flatten()
end

defp fulfill_demand(queue, demand) do
Enum.reduce_while(queue, {[], queue, demand}, fn
{from, requests}, {to_emit, [_ | rest], demand} when length(requests) <= demand ->
{:cont, {[{from, requests} | to_emit], rest, demand - length(requests)}}

_, acc ->
{:halt, acc}
end)
end
end

defmodule SolanaExample.Client.RateLimiter do
@moduledoc """
This module rate limits the API requests from the Producer to 100 every 10
seconds.
"""
use GenStage

alias SolanaExample.{Registry, Client}

# API
def start_link(config) do
GenStage.start_link(__MODULE__, config, name: Registry.via(name(config)))
end

# the registry's name for this process so other processes can find it
def name(config) do
Keyword.get(config, :network) <> "_ratelimiter"
end

# GenStage callbacks
def init(config) do
subscription = Registry.lookup(Client.Producer.name(config))
{:producer_consumer, %{}, subscribe_to: [subscription]}
end

def handle_subscribe(:producer, opts, from, producers) do
limit = Keyword.get(opts, :max_demand, 100)
interval = Keyword.get(opts, :interval, 10_000)

producers =
producers
|> Map.put(from, {limit, interval})
|> ask_and_schedule(from)

{:manual, producers}
end

def handle_subscribe(:consumer, _opts, _from, consumers) do
{:automatic, consumers}
end

def handle_cancel(_, from, producers) do
{:noreply, [], producers |> Map.delete(from)}
end

def handle_events(events, _from, producers) do
{:noreply, events, producers}
end

def handle_info({:ask, from}, producers) do
{:noreply, [], ask_and_schedule(producers, from)}
end

defp ask_and_schedule(producers, from) do
case producers do
%{^from => {limit, interval}} ->
GenStage.ask(from, limit)
Process.send_after(self(), {:ask, from}, interval)
producers

%{} ->
producers
end
end
end

defmodule SolanaExample.Client.Consumer do
@moduledoc """
This module actually executes the rate-limited API requests.
"""
use GenStage

alias Solana.RPC
alias SolanaExample.Client

# API
def start_link(config), do: GenStage.start_link(__MODULE__, config)

# GenStage callbacks
def init(config) do
subscription = SolanaExample.Registry.lookup(Client.RateLimiter.name(config))
adapter = {Tesla.Adapter.Gun, [certificates_verification: true]}
opts = Keyword.merge(config, adapter: adapter)
state = %{network: Keyword.get(config, :network), client: RPC.client(opts)}
{:consumer, state, subscribe_to: [subscription]}
end

def handle_events(events, _from, state = %{client: client}) do
events
|> Enum.reduce(%{}, fn {from, request}, batches ->
Map.update(batches, from, [request], &[request | &1])
end)
|> Enum.each(fn
{from, requests} when is_pid(from) ->
requests = Enum.reverse(requests)
response = Enum.zip(RPC.Request.encode(requests), RPC.send(client, requests))
send(from, {:rpc, state.network, response})

{from, requests} ->
GenStage.reply(from, RPC.send(client, Enum.reverse(requests)))
end)

{:noreply, [], state}
end
end

I'm not going to go over this code in detail, but its basic job is to take new API requests, rate limit them, and send them off to the Solana RPC endpoint. At work, projects consistently ran into API limits once they reached production; this code would prevent that issue.

Now all we need is a way to start all these processes and launch our initial query.

To launch all these processes with built-in crash recovery, you can supervise them:

elixir

defmodule SolanaExample.Client do
@moduledoc """
Responsible for supervising the GenStage pipeline for a specific Solana
cluster.
"""
use Supervisor

alias SolanaExample.{Client, Registry}

def start_link(config) do
Supervisor.start_link(__MODULE__, config, name: name(config))
end

def init(config) do
children = [
{Client.Producer, config},
{Client.RateLimiter, config},
{Client.Consumer, config}
]

Supervisor.init(children, strategy: :rest_for_one)
end

def cast(name, requests) when is_binary(name) do
lookup_and_run(name, requests, &cast/2)
end

def cast(client, requests) when is_pid(client) do
find_child_and_run(client, requests, &Client.Producer.cast/2)
end

def call(name, requests) when is_binary(name) do
lookup_and_run(name, requests, &call/2)
end

def call(client, requests) when is_pid(client) do
find_child_and_run(client, requests, &Client.Producer.call/2)
end

defp lookup_and_run(name, requests, fun) do
case Registry.lookup(name) do
nil -> :error
client -> fun.(client, requests)
end
end

defp find_child_and_run(client, requests, fun) do
client
|> Supervisor.which_children()
|> Enum.find(&(elem(&1, 0) == Client.Producer))
|> case do
nil -> :error
{_, producer, _, _} -> fun.(producer, requests)
end
end

defp name(config), do: Registry.via(Keyword.fetch!(config, :network))
end

In case you want to track multiple clusters, i.e. devnet vs. mainnet-beta,
you should probably have a different Client for each cluster you want to
track. You can do that with a DynamicSupervisor:

elixir
defmodule SolanaExample.Client.Supervisor do
@moduledoc """
Responsible for managing `SolanaExample.Client` processes.
"""
use DynamicSupervisor

@me __MODULE__

def start_link(_), do: DynamicSupervisor.start_link(@me, :ok, name: @me)

@doc """
retrieves or kicks off a `SolanaExample.Client` process based on the desired
cluster.
"""
def get_or_start_client(config) do
case start_child(config) do
{:error, {:already_started, pid}} -> {:ok, pid}
other -> other
end
end

def start_child(config) do
DynamicSupervisor.start_child(@me, {SolanaExample.Client, config})
end

def init(_), do: DynamicSupervisor.init(strategy: :one_for_one)
end

Finally, here's a way to kick off the asynchronous query:

elixir
defmodule SolanaExample.Runner do
@moduledoc """
Runs various asynchronous queries associated with collecting address
transactions.
"""
use GenServer

@me __MODULE__
@tx_limit 25

alias Solana.RPC
alias SolanaExample.Client

def start_link(_), do: GenServer.start_link(@me, :ok, name: @me)

@doc """
Gets account information for an address, and kicks off an asynchronous
workstream to retrieve every new transaction associated with this account.
This runs in the background until there are no more transactions to process.
"""
def sync(address, network) do
with {:ok, decoded} <- B58.decode58(address),
request = RPC.Request.get_account_info(decoded, encoding: "jsonParsed"),
nil <- get_account_from_database(address, network),
[{:ok, data}] <- Client.call(network, request) do
add_account_to_database(address, network, %{data: data, updating?: true, last: nil})
#
GenServer.cast(@me, {:sync, address, network})
{:ok, data}
else
# account already in the database, but not updating: kick off a new update
account = %{updating?: false} ->
edit_account_in_database(address, network, updating?: true)
GenServer.cast(@me, {:sync, address, network, List.wrap(account.last)})
{:ok, account.contents}

# account in database and updating: don't kick off a new update
%{updating?: true, contents: data} ->
{:ok, data}

:error ->
{:error, :invalid_client}

error ->
error
end
end

# GenServer callbacks

def init(:ok) do
{:ok, nil}
end

def handle_cast({:sync, address, network, opts}, state) do
opts = Keyword.merge(opts, limit: @tx_limit)

request =
address
|> B58.decode58!()
|> RPC.Request.get_signatures_for_address(opts)

# kick off an async request to the Solana RPC endpoint
Client.cast(network, request)
{:noreply, state}
end

def handle_cast({:sync, address, network}, state) do
request =
address
|> B58.decode58!()
|> RPC.Request.get_signatures_for_address(limit: @tx_limit)

# kick off an async request to the Solana RPC endpoint
Client.cast(network, request)
{:noreply, state}
end

# handles responses to async requests
def handle_info({:rpc, network, response}, state) do
Enum.reduce_while(response, {:noreply, state}, fn pair, _acc ->
case handle_rpc(pair, network, state) do
{:noreply, _} = result -> {:cont, result}
other -> {:halt, other}
end
end)
end

# get transaction signatures
defp handle_rpc(
{%{method: "getSignaturesForAddress", params: [address, opts]}, {:ok, response}},
network,
state
) do
Enum.each(response, &put_account_signature_in_database(address, &1, network))

response
|> Enum.map(&Map.get(&1, "signature"))
|> Enum.filter(&is_nil(get_transaction_from_database(B58.encode58(&1), network)))
|> Enum.map(&RPC.Request.get_transaction(&1, encoding: "jsonParsed"))
|> case do
[] -> :ok
requests -> Client.cast(network, requests)
end

handle_signature_response(response, address, opts, network, state)
end

# get transaction details
defp handle_rpc(
{%{method: "getTransaction", params: [signature, _]}, {:ok, response}},
network,
state
) do
put_transaction_in_database(signature, response, network)
{:noreply, state}
end

defp handle_signature_response(
response,
address,
%{"before" => _, "until" => until},
network,
state
) do
handle_signature_response(response, address, %{"until" => until}, network, state)
end

# now that we've got all the transactions going back in time, switch to going
# forward from our starting point
defp handle_signature_response(response, address, %{"before" => _}, network, state)
when length(response) < @tx_limit do
latest = get_latest_account_signature_from_database(address, network)
edit_account_in_database(address, network, last: {:until, latest})
handle_cast({:sync, address, network, until: latest}, state)
end

# move on to the previous N transactions
defp handle_signature_response(response, address, %{"before" => _}, network, state) do
oldest = response |> Enum.reverse() |> List.first() |> Map.get("signature")
edit_account_in_database(address, network, last: {:before, oldest})
handle_cast({:sync, address, network, before: oldest}, state)
end

# base case: no more transactions to analyze
defp handle_signature_response([], address, %{"until" => _}, network, state) do
latest = get_latest_account_signature_from_database(address, network)
edit_account_in_database(address, network, updating?: false, last: {:until, latest})
{:noreply, state}
end

# move on to the previous N transactions starting at the earliest transaction
# just pulled and going until `until`
defp handle_signature_response(response, address, %{"until" => until}, network, state) do
earliest = response |> Enum.reverse() |> List.first() |> Map.get("signature")
edit_account_in_database(address, network, last: [until: until, before: earliest])
handle_cast({:sync, address, network, until: until, before: earliest}, state)
end

# kick off the initial load
defp handle_signature_response(response, address, _, network, state) do
handle_signature_response(response, address, %{"before" => nil}, network, state)
end
end

defmodule SolanaExample do
@doc """
queries the Solana `network` for information about an `address`.
"""
def query(address, opts) do
case SolanaExample.Client.Supervisor.start_child(opts) do
{:error, reason} when elem(reason, 0) != :already_started ->
{:error, reason}

_ ->
SolanaExample.Runner.sync(address, opts[:network])
end
end
end

Everything that ends in _in_database involves putting records into and
retrieving records from a database of your choice. I'd recommend cubdb.

Running SolanaExample.query(address, network: "mainnet-beta") kicks off a process to retrieve all the transactions associated with an address in the background. In the meantime, you can query other addresses or execute other commands in your program. Once the background process finishes its work, you have a database full of transactions to aggregate and process.

Conclusion

That's all the code you'll need to collect transactions associated with an address. You can likely do this in JavaScript, but it migh be more complicated than Elixir's solution.

This example is just scratching the surface of what the solana package can do. It also includes helpful utilities for writing and testing your own program clients in Elixir. Check out the documentation to learn more!

P.S. If you need to interact with the Solana Program Library, you can use the solana_spl package. Check out its documentation if you need it!

--

--

No responses yet