Source code for chaino.scheduler.call

import os
import time
import json
import logging
import threading

from ..grouped_multicall import GroupedMulticall
from . import Scheduler


[docs]class CallScheduler(Scheduler): """ Call Scheduler class for Chaino. This scheduler is used to call functions on contracts. """ def __init__(self, state_path, project_name="chaino", block_number=None, *args, **kwargs): "Initialize the scheduler." super().__init__(*args, **kwargs) self.state_path = state_path if not os.path.exists(self.state_path): os.makedirs(self.state_path) self.results = {} self.block_number = block_number self.filename = f"{self.state_path}/{project_name}-{self.timestamp}-results.json"
[docs] def add_task(self, contract_address, function, input_value): "Add one call to the task queue" self.tasks.append((contract_address, function, input_value)) logging.getLogger("chaino").debug(f"Added call {contract_address, function, input_value} to task queue")
[docs] def start(self): "Start the scheduler" logging.getLogger("chaino").info(f"Starting scheduler with {len(self.tasks)} tasks") rpc = self.get_available_rpc() gmc = GroupedMulticall(rpc._w3, self.tasks, block_number=self.block_number, margin=0.6) for mc in gmc(): # wait for a thread to become available while not rpc.any_available_threads(): time.sleep(0.01) # dispatch the task rpc.dispatch_task(self.get_result, mc) logging.getLogger("chaino").info("Waiting for tasks to finish...") while self.any_rpc_running(): time.sleep(0.1) logging.getLogger("chaino").info("All tasks completed") return self.results.copy()
[docs] def get_result(self, w3, mc): "Get the result of a multicall" # must take w3 as the first option, even though we ignore it result = mc() with self.lock: self.results.update(result) with open (self.filename, "w") as f: json.dump(self.results, f) return result
[docs] @classmethod def map_call(cls, rpc, contract_address, function_signature, inputs, block_number=None, state_path="/tmp/chaino"): """ Call one function on one contract for a list of inputs. This is a common pattern when a function is invoked on a list of addresses. """ call_scheduler = cls( block_number=block_number, state_path=state_path, ) call_scheduler.add_rpc(rpc) for input_vector in inputs: call_scheduler.add_task( contract_address=contract_address, function=function_signature, input_value=input_vector, ) return call_scheduler.start()
def parse_address(item): """ Convenience function to parse an address in the special case that the function was called with a single parameter, which was an address. """ return item.split(",")[2].split("'")[1]