Source code for chaino.rpc

import time
import logging
import threading

from web3 import Web3, HTTPProvider
from web3.middleware import simple_cache_middleware, geth_poa_middleware
from requests.exceptions import HTTPError

from .utils import convert_signature_to_abi
from nested_filestore.exceptions import AlreadyExistsError


[docs]class RPC: """ RPC class for Chaino. """ def __init__(self, w3=None, url=None, chain=None, poa=False, tick_delay=0.1, slow_timeout=30, num_threads=2): # provide some defaults for a few chains if not w3 and not url and chain: if chain in rpc_defaults.keys(): url = rpc_defaults[chain] else: raise Exception(f"Unknown chain {chain}") if w3: self._w3 = w3 elif url: self._w3 = Web3(HTTPProvider(url)) self._w3.middleware_onion.add(simple_cache_middleware) if 'bsc' in url or 'binance' in url or 'avax' in url or 'avalanche' in url: poa = True if poa is True: self._w3.middleware_onion.inject(geth_poa_middleware, layer=0) else: raise Exception("Either w3 or url must be provided") self.halt_event = threading.Event() self.lock = threading.Lock() self.running_threads = set() self.tick_delay = tick_delay self.num_threads = num_threads self.slow_mode = False self.slow_timeout = slow_timeout self.good_runs = 0 self.good_runs_reset = 200 self.too_fast = [] self.task_counter = 0 @property def w3(self): "Get the web3 instance." # enforce a delay by calling tick before returning w3 self.tick() return self._w3
[docs] def any_available_threads(self): "Check if any threads are available." with self.lock: return len(self.running_threads) < self.num_threads
[docs] def any_threads_running(self): "Check if any threads are running." with self.lock: return len(self.running_threads) > 0
[docs] def run_slow_if_necessary(self): "Run slowly if necessary." # if another thread received a 429, this will be set while self.slow_mode is True: time.sleep(0.01)
[docs] def slow_down(self): "Slow down the RPC." with self.lock: check_slow_mode = self.slow_mode if check_slow_mode is False: # other threads on this RPC need to stop self.slow_mode = True logging.getLogger("chaino").warning(f"{self} too fast: {self.tick_delay}") self.too_fast.append(self.tick_delay) self.tick_delay += 0.005 time.sleep(self.slow_timeout) self.slow_mode = False
[docs] def consider_speedup(self): "Consider speeding up the RPC." with self.lock: self.good_runs += 1 if self.good_runs >= self.good_runs_reset: self.good_runs = 0 self.good_runs_reset = int(self.good_runs_reset * 1.1) self.tick_delay -= 0.001 if self.tick_delay <= 0: self.tick_delay = 0.001 logging.getLogger("chaino").info(f"{self} faster: {self.tick_delay}")
[docs] def tick(self): "Advance in time by one tick." self.run_slow_if_necessary() self.consider_speedup() time.sleep(self.tick_delay)
[docs] def fetch_result(self, task_id, task_fn, *args): "Fetch the result of a task." result = None while result is None: self.run_slow_if_necessary() try: # this implements the tick delay implicitly result = task_fn(self.w3, *args) self.running_threads.remove(task_id) return True except AlreadyExistsError: logging.getLogger("chaino").warning(f"{self} {task_id} already exists") self.running_threads.remove(task_id) return True except HTTPError as e: logging.getLogger("chaino").warning(f"{self} HTTP error {task_id}: {e}") self.slow_down() except Exception as e: logging.getLogger("chaino").warning(f"{self} failed {task_id}: {type(e)} {e}") self.slow_down() self.running_threads.remove(task_id) return False
[docs] def dispatch_task(self, task_fn, *args): "Dispatch a task to the RPC." task_id = f"{task_fn.__name__}-{self.task_counter}" self.task_counter += 1 logging.getLogger("chaino").debug(f"{self} {task_id}") # # for debugging, launch the task in the current thread # self.fetch_result(task_id, task_fn, *args) thread = threading.Thread(target=self.fetch_result, args=(task_id, task_fn, *args)) self.running_threads.add(task_id) thread.start() return thread
def __repr__(self): rpc_name = self._w3.provider.endpoint_uri.replace("https://", "") return f"<RPC {rpc_name[:25]}>"
[docs] def eth_contract_function(self, address, function_signature): "Get a contract function for an address." checksum_address = Web3.toChecksumAddress(address) function_abi = convert_signature_to_abi(function_signature) contract = self._w3.eth.contract(address, abi=[function_abi]) return contract.functions[function_abi["name"]]
[docs] def eth_call(self, address, function_signature, block_number=None, *vargs): "Call a function on a contract." fn = self.eth_contract_function(address, function_signature) if block_number is None: return fn(*args).call() else: return fn(*args).call(block_identifier=block_number)
rpc_defaults = { "fantom": "https://rpc.ftm.tools", "binance": "https://bsc-dataseed1.binance.org/", "arbitrum": "https://arb1.arbitrum.io/rpc", "avalanche": "https://api.avax.network/ext/bc/C/rpc", "ethereum": "https://eth.llamarpc.com", } synonyms = { "bsc": "binance", "eth": "ethereum", "ftm": "fantom", "arb": "arbitrum", "avax": "avalanche", } for synonym in synonyms: rpc_defaults[synonym] = rpc_defaults[synonyms[synonym]]