Source code for chaino.grouped_multicall

from multicall import Call, Multicall
from web3 import Web3

from .utils import convert_signature_to_abi


[docs]class GroupedMulticall: """ GroupedMulticall is a class that groups multiple multicalls together. It is used to group multiple multicalls together to reduce the number of RPC calls. """ def __init__(self, w3, inputs, block_number=None, margin=0.1): self.w3 = w3 self.inputs = inputs self.margin = margin self.block_number = block_number def __call__(self): "Yield a list of multicalls" contract_calls = [] for contract_address, function, input_value in self.inputs: if len(contract_calls) == self.max_length: yield Multicall(contract_calls, block_id=self.block_number, _w3=self.w3) contract_calls = [] fn_call = [function] fn_call.extend(input_value) checksum_address = Web3.toChecksumAddress(contract_address) contract_calls.append( Call( target=checksum_address, function=fn_call, returns=[(f"[{checksum_address}, {fn_call}]", None)], ) ) # yield the last one if len(contract_calls) > 0: yield Multicall(contract_calls, block_id=self.block_number, _w3=self.w3) @property def max_length(self): "Get the maximum number of items that will fit inside a multicall" if not hasattr(self, "_max_length"): if len(self.inputs) == 0: return 0 # estimate gas used for a single call contract_address, function, input_value = list(self.inputs)[0] function_abi = convert_signature_to_abi(function) contract = self.w3.eth.contract(contract_address, abi=[function_abi]) fn = contract.functions[function_abi["name"]] gas_estimate = fn(*input_value).estimateGas() # obtain gas limit from multicall fn_call = [function] fn_call.extend(input_value) gas_limit = Multicall([Call( contract_address, fn_call, [(str(fn_call), None)] )], _w3=self.w3).gas_limit self._max_length = int(gas_limit / gas_estimate * (1 - self.margin)) return self._max_length
[docs] @classmethod def from_vectors(cls, w3, contract_address_vector, function_vector, input_vector): "auto-replicate function_vector if len(1) and len of another one is longer than 1" max_length = max([ len(function_vector), len(contract_address_vector), len(input_vector), ]) if len(function_vector) == 1: function_vector = function_vector * max_length if len(contract_address_vector) == 1: contract_address_vector = contract_address_vector * max_length if len(input_vector) == 1: input_vector = input_vector * max_length inputs = list(zip( contract_address_vector, function_vector, input_vector, )) return cls(w3=w3, inputs=inputs)