Chaino

Chaino is an EVM blockchain research tool to rapidly:

Overview

Chaino can use multiple RPCs in parallel, each with multiple threads. Chaino attempts to automatically maximize its speed without abusing the RPC.

Blocks are stored as web3.py objects inside Python pickle files. The block files are archived with NestedFilestore <https://github.com/0xidm/nested-filestore>, which can manage millions of files on a filesystem.

Calls are bundled with GroupedMulticall, which is a wrapper around Multicall. A GroupedMulticall can be executed against the current head block or any historical block, as the RPC permits.

Installation

Python 3.9 is required.

pip install 'git+https://github.com/0xidm/chaino'

Online resources

Usage

Python example: Block download with BlockScheduler

Download the first 1000 blocks of the Fantom DAG with BlockScheduler.

from chaino.scheduler.block import BlockScheduler
from chaino.rpc import RPC

scheduler = BlockScheduler(filestore_path="chaino-example")
scheduler.add_rpc(RPC(url="https://rpc.ftm.tools"))
for block_number in range(1, 1000):
   scheduler.add_task(block_number=block_number)
scheduler.start()

Results will be available as a NestedFilestore in a directory called chaino-example.

Python example: Calling contract functions with CallScheduler

Get ERC-20 balances for a list of addresses on Fantom with CallScheduler

from chaino.scheduler.call import CallScheduler, parse_address
from chaino.rpc import RPC

result = CallScheduler.map_call(
   rpc=RPC(url="https://rpc.ftm.tools"),
   contract_address="0x21Ada0D2aC28C3A5Fa3cD2eE30882dA8812279B6",
   function_signature="balanceOf(address)(uint256)",
   inputs=[
      ["0x5aa1039D09330DF607F88e72bb9C1E0F66C96AA0"],
      ["0x18Bf8D51f7695AA3E63fEA9E99416530c1420511"],
   ]
)
[(parse_address(key), value) for key, value in result.items()]

which produces these results:

[('0x5aa1039D09330DF607F88e72bb9C1E0F66C96AA0', 5925106268789833088835427),
('0x18Bf8D51f7695AA3E63fEA9E99416530c1420511', 2765360879676247594525586)]

Command Line Example

The example script blockchain.py demonstrates some simple chaino tasks.

On the command line, download the first 1000 Arbitrum blocks. Then, extract all transactions as CSV to a separate filestore. Finally, combine all transactions into a single CSV file.

mkdir -p var
blockchain.py download arbitrum 1 1000 var/arbitrum-blocks
blockchain.py extract-txs var/arbitrum-blocks var/arbitrum-txs 3,3
blockchain.py transactions-csv var/arbitrum-txs 3,3 ./var/arbitrum-txs.csv.gz

Docker

Chaino can also run in a Docker container.

docker build -t chaino https://raw.githubusercontent.com/0xidm/chaino/main/Dockerfile
docker volume create chaino
docker run --rm -it --name chaino -v chaino:/mnt/chaino chaino

To provide a custom RPC configuration file, add another -v option:

docker run --rm -it --name chaino -v chaino:/mnt/chaino -v /path/to/rpc.json:/home/chaino/.config/chaino/rpc.json chaino

To monitor progress inside a chaino container:

docker exec -it chaino tail -f /tmp/chaino.log

API

BlockScheduler

class chaino.scheduler.block.BlockScheduler(filestore_path, hierarchy_order=[3, 3, 3], *args, **kwargs)[source]

Bases: Scheduler

BlockScheduler downloads blocks from a blockchain. Each task for the scheduler to simply download a single block. Block results are stored in a NestedFilestore as pickled web3.py objects.

add_task(block_number, check_existing=True)[source]

Add one task to be executed

get_block(w3, block_number)[source]

Get a block from the blockchain and save it to disk

start()[source]

Start the scheduler

CallScheduler

class chaino.scheduler.call.CallScheduler(state_path, project_name='chaino', block_number=None, *args, **kwargs)[source]

Bases: Scheduler

Call Scheduler class for Chaino.

This scheduler is used to call functions on contracts.

add_task(contract_address, function, input_value)[source]

Add one call to the task queue

get_result(w3, mc)[source]

Get the result of a multicall

classmethod map_call(rpc, contract_address, function_signature, inputs, block_number=None, state_path='/tmp/chaino')[source]

Call one function on one contract for a list of inputs. This is a common pattern when a function is invoked on a list of addresses.

start()[source]

Start the scheduler

Scheduler

class chaino.scheduler.Scheduler(chain=None)[source]

Bases: object

Scheduler class for Chaino.

A Scheduler attempts to send a task to an RPC that has available threads - as quickly as possible, without abusing the RPC. The Scheduler implements a “tick” function that introduces a delay between each task, which is used to remain under RPC rate limits. After a certain number of successful requests to an RPC, the Scheduler reduces the tick delay. If the RPC throws an exception, Scheduler pauses that RPC and increases the tick delay.

add_rpc(rpc)[source]

Add an RPC to the scheduler.

add_rpc_config(chain, config_filename='~/.config/chaino/rpc.json')[source]
add_rpc_default(chain, tick_delay=0.15, slow_timeout=120, num_threads=2)[source]

Add a default RPC to the scheduler.

add_rpcs(chain)[source]

Add a quick RPC to the scheduler.

add_task()[source]

Add one task to be executed

any_rpc_running()[source]

Check if any RPC is running.

get_available_rpc()[source]

Get an RPC that has available threads.

get_result()[source]

Get the result of a task

start()[source]

Start the scheduler

RPC

class chaino.rpc.RPC(w3=None, url=None, chain=None, poa=False, tick_delay=0.1, slow_timeout=30, num_threads=2)[source]

Bases: object

RPC class for Chaino.

any_available_threads()[source]

Check if any threads are available.

any_threads_running()[source]

Check if any threads are running.

consider_speedup()[source]

Consider speeding up the RPC.

dispatch_task(task_fn, *args)[source]

Dispatch a task to the RPC.

eth_call(address, function_signature, block_number=None, *vargs)[source]

Call a function on a contract.

eth_contract_function(address, function_signature)[source]

Get a contract function for an address.

fetch_result(task_id, task_fn, *args)[source]

Fetch the result of a task.

run_slow_if_necessary()[source]

Run slowly if necessary.

slow_down()[source]

Slow down the RPC.

tick()[source]

Advance in time by one tick.

property w3

Get the web3 instance.

GroupedMulticall

class chaino.grouped_multicall.GroupedMulticall(w3, inputs, block_number=None, margin=0.1)[source]

Bases: object

GroupedMulticall is a class that groups multiple multicalls together.

It is used to group multiple multicalls together to reduce the number of RPC calls.

classmethod from_vectors(w3, contract_address_vector, function_vector, input_vector)[source]

auto-replicate function_vector if len(1) and len of another one is longer than 1

property max_length

Get the maximum number of items that will fit inside a multicall