Source code for graphenecommon.aio.witness
# -*- coding: utf-8 -*-
from ..exceptions import WitnessDoesNotExistsException
from .blockchainobject import BlockchainObject, BlockchainObjects
from ..witness import Witness as SyncWitness, Witnesses as SyncWitnesses
class Witness(BlockchainObject, SyncWitness):
""" Read data about a witness in the chain
:param str account_name: Name of the witness
:param instance blockchain_instance: instance to use when accesing a RPC
"""
async def __init__(self, *args, **kwargs):
self.define_classes()
assert self.type_id or self.type_ids
assert self.account_class
await BlockchainObject.__init__(self, *args, **kwargs)
[docs] async def refresh(self):
if self.test_valid_objectid(self.identifier):
_, i, _ = self.identifier.split(".")
if int(i) == 6:
witness = await self.blockchain.rpc.get_object(self.identifier)
else:
witness = await self.blockchain.rpc.get_witness_by_account(
self.identifier
)
else:
account = await self.account_class(
self.identifier, blockchain_instance=self.blockchain
)
witness = await self.blockchain.rpc.get_witness_by_account(account["id"])
if not witness:
raise WitnessDoesNotExistsException(self.identifier)
await super(Witness, self).__init__(
witness, blockchain_instance=self.blockchain
)
@property
async def account(self):
return await self.account_class(
self["witness_account"], blockchain_instance=self.blockchain
)
@property
async def weight(self):
if not self.is_active:
return 0
else:
account = await self.account_class(
"witness-account", blockchain_instance=self.blockchain
)
threshold = account["active"]["weight_threshold"]
weight = next(
filter(
lambda x: x[0] == self.account["id"],
account["active"]["account_auths"],
)
)
return float(weight[1]) / float(threshold)
@property
async def is_active(self):
account = await self.account_class(
"witness-account", blockchain_instance=self.blockchain
)
return self.account["id"] in [x[0] for x in account["active"]["account_auths"]]
class Witnesses(BlockchainObjects, SyncWitnesses):
""" Obtain a list of **active** witnesses and the current schedule
:param bool only_active: (False) Only return witnesses that are
actively producing blocks
:param instance blockchain_instance: instance to use when accesing a RPC
"""
async def __init__(self, *args, only_active=False, lazy=False, **kwargs):
self.define_classes()
assert self.account_class
assert self.witness_class
self.lazy = lazy
self.only_active = only_active
await super().__init__(*args, **kwargs)
[docs] async def refresh(self, *args, **kwargs):
self.schedule = await self.blockchain.rpc.get_object("2.12.0").get(
"current_shuffled_witnesses", []
)
witnesses = [
await self.witness_class(
x, lazy=self.lazy, blockchain_instance=self.blockchain
)
for x in self.schedule
]
if self.only_active:
account = await self.account_class(
"witness-account", blockchain_instance=self.blockchain
)
filter_by = [x[0] for x in account["active"]["account_auths"]]
witnesses = list(
filter(lambda x: x["witness_account"] in filter_by, witnesses)
)
self.store(witnesses)
def __contains__(self, item):
if self.witness_class.objectid_valid(item):
id = item
elif isinstance(item, self.account_class):
id = item["id"]
else:
# Async version does not support querying by name
raise NotImplementedError("Please use object id or Account class")
return any([id == x["id"] for x in self]) or any(
[id == x["witness_account"] for x in self]
)