Source code for grapheneapi.api

# -*- coding: utf-8 -*-
import logging
from collections import Counter
from itertools import cycle
from time import sleep
from .exceptions import RPCError, NumRetriesReached

from .websocket import Websocket
from .http import Http

log = logging.getLogger(__name__)


class Api:
    def __init__(self, urls, user=None, password=None, connect=True, **kwargs):

        # Some internal variables
        self._connections = dict()
        self._url_counter = Counter()

        # Let's store user and password in kwargs as well
        self.user = user
        self.password = password
        kwargs.update(dict(user=user, password=password))
        self._kwargs = kwargs

        # How often do we accept retries?
        self.num_retries = kwargs.pop("num_retries", 1)

        if not isinstance(urls, list):
            urls = [urls]

        for url in urls:
            self._url_counter[url] = 0

        self.url = urls[0]
        self._active_url = None

        # Let's also be able to deal with infinite connection
        self.urls = cycle(urls)
        self._cnt_retries = 0
        self._network = None

        # Connect!
        if connect:
            self.connect()
            self._network = self.get_network()

    # Get chain parameters
    @property
    def chain_params(self):
        if self._network is None:
            self._network = self.get_network()
        return self._network

[docs] def get_network(self): return self.get_chain_properties()
[docs] def updated_connection(self): if self.url[:2] == "ws": return Websocket(self.url, **self._kwargs) elif self.url[:4] == "http": return Http(self.url, **self._kwargs) else: raise ValueError("Only support http(s) and ws(s) connections!")
@property def connection(self): if self._active_url != self.url: log.debug( "Updating connection from {} to {}".format(self._active_url, self.url) ) self._active_connection = self.updated_connection() self._active_url = self.url return self._active_connection
[docs] def connect(self): try: self.connection.connect() except Exception as e: log.warning(str(e)) self.error_url() self.next() self.register_apis()
[docs] def find_next(self): """ Find the next url in the list """ if int(self.num_retries) < 0: # pragma: no cover self._cnt_retries += 1 sleeptime = (self._cnt_retries - 1) * 2 if self._cnt_retries < 10 else 10 if sleeptime: log.warning( "Lost connection to node during rpcexec(): %s (%d/%d) " % (self.url, self._cnt_retries, self.num_retries) + "Retrying in %d seconds" % sleeptime ) sleep(sleeptime) return next(self.urls) urls = [ k for k, v in self._url_counter.items() if ( # Only provide URLS if num_retries is bigger equal 0, # i.e. we want to do reconnects at all int(self.num_retries) >= 0 # the counter for this host/endpoint should be smaller than # num_retries and v <= self.num_retries # let's not retry with the same URL *if* we have others # available and (k != self.url or len(self._url_counter) == 1) ) ] if not len(urls): raise NumRetriesReached url = urls[0] return url
[docs] def reset_counter(self): """ reset the failed connection counters """ self._cnt_retries = 0 for i in self._url_counter: self._url_counter[i] = 0
[docs] def error_url(self): # pragma: no cover if self.url in self._url_counter: self._url_counter[self.url] += 1 else: self._url_counter[self.url] = 1
[docs] def next(self): self.connection.disconnect() self.url = self.find_next() self.connect()
def post_process_exception(self, exception): raise exception @property def api_id(self): """ This allows to list api_ids, if they have been registered through api_register() -- LEGACY In previous API version, one would connect and register to APIs like this .. code-block:: python self.api_id["database"] = self.database(api_id=1) self.api_id["history"] = self.history(api_id=1) self.api_id["network_broadcast"] = self.network_broadcast( api_id=1) """ return self.connection.api_id
[docs] def register_apis(self): # pragma: no cover """ This method is called right after connection and has previously been used to register to different APIs within the backend that are considered default. The requirement to register to APIs has been removed in some systems. """ pass
def __getattr__(self, name): """ Proxies RPC calls to actual Websocket or Http instance. Connection-related errors catched here and handled. """ def func(*args, **kwargs): while True: try: # RPC method called on actual Websocket or Http class func = self.connection.__getattr__(name) r = func(*args, **kwargs) self.reset_counter() break except KeyboardInterrupt: # pragma: no cover raise except RPCError as e: # pragma: no cover """ When the backend actual returns an error """ self.post_process_exception(e) # the above line should raise. Let's be sure to at least # break break # pragma: no cover except IOError: # pragma: no cover import traceback log.debug(traceback.format_exc()) log.warning("Connection was closed remotely.") log.warning("Reconnecting ...") self.error_url() self.next() except Exception as e: # pragma: no cover """ When something fails talking to the backend """ import traceback log.debug(traceback.format_exc()) log.warning(str(e)) log.warning("Reconnecting ...") self.error_url() self.next() return r return func