# -*- coding: utf-8 -*-
import json
import time
import signal
import logging
import threading
import websocket
import traceback
from itertools import cycle
from events import Events
from .exceptions import NumRetriesReached
# This restores the default Ctrl+C signal handler, which just kills the process
signal.signal(signal.SIGINT, signal.SIG_DFL)
log = logging.getLogger(__name__)
# logging.basicConfig(level=logging.DEBUG)
[docs]class BitSharesWebsocket(Events):
"""
Create a websocket connection and request push notifications.
:param str urls: Either a single Websocket URL, or a list of URLs
:param str user: Username for Authentication
:param str password: Password for Authentication
:param list accounts: list of account names or ids to get push notifications for
:param list markets: list of asset_ids, e.g. ``[['1.3.0', '1.3.121']]``
:param list objects: list of objects id's you'd like to be notified when changing
:param int keep_alive: seconds between a ping to the backend (defaults to 25seconds)
After instanciating this class, you can add event slots for:
* ``on_tx``
* ``on_object``
* ``on_block``
* ``on_account``
* ``on_market``
which will be called accordingly with the notification
message received from the BitShares node:
.. code-block:: python
ws = BitSharesWebsocket(
"wss://node.testnet.bitshares.eu",
objects=["2.0.x", "2.1.x", "1.3.x"]
)
ws.on_object += print
ws.run_forever()
Notices:
* ``on_account``:
.. code-block:: js
{'id': '2.6.29',
'lifetime_fees_paid': '44257768405',
'most_recent_op': '2.9.1195638',
'owner': '1.2.29',
'pending_fees': 0,
'pending_vested_fees': 100,
'total_core_in_orders': '6788960277634',
'total_ops': 505865}
* ``on_block``:
.. code-block:: js
'0062f19df70ecf3a478a84b4607d9ad8b3e3b607'
* ``on_tx``:
.. code-block:: js
{'expiration': '2017-02-23T09:33:22',
'extensions': [],
'operations': [[0,
{'amount': {'amount': 100000, 'asset_id': '1.3.0'},
'extensions': [],
'fee': {'amount': 100, 'asset_id': '1.3.0'},
'from': '1.2.29',
'to': '1.2.17'}]],
'ref_block_num': 62001,
'ref_block_prefix': 390951726,
'signatures': ['20784246dc1064ed5f87dbbb9aaff3fcce052135269a8653fb500da46e7068bec56e85ea997b8d250a9cc926777c700eed41e34ba1cabe65940965ebe133ff9098']}
* ``on_market``:
.. code-block:: js
['1.7.68612']
"""
__events__ = ["on_tx", "on_object", "on_block", "on_account", "on_market"]
def __init__(
self,
urls,
user="",
password="",
*args,
accounts=None,
markets=None,
objects=None,
on_tx=None,
on_object=None,
on_block=None,
on_account=None,
on_market=None,
keep_alive=25,
num_retries=-1,
**kwargs
):
self.num_retries = num_retries
self.keepalive = None
self._request_id = 0
self.ws = None
self.user = user
self.password = password
self.keep_alive = keep_alive
self.run_event = threading.Event()
if isinstance(urls, cycle):
self.urls = urls
elif isinstance(urls, list):
self.urls = cycle(urls)
else:
self.urls = cycle([urls])
# Instanciate Events
Events.__init__(self)
self.events = Events()
# Store the objects we are interested in
self.subscription_accounts = accounts or []
self.subscription_markets = markets or []
self.subscription_objects = objects or []
if on_tx:
self.on_tx += on_tx
if on_object:
self.on_object += on_object
if on_block:
self.on_block += on_block
if on_account:
self.on_account += on_account
if on_market:
self.on_market += on_market
[docs] def cancel_subscriptions(self):
self.cancel_all_subscriptions()
[docs] def on_open(self, *args, **kwargs):
"""
This method will be called once the websocket connection is established. It
will.
* login,
* register to the database api, and
* subscribe to the objects defined if there is a
callback/slot available for callbacks
"""
self.login(self.user, self.password, api_id=1)
self.database(api_id=1)
self.__set_subscriptions()
self.keepalive = threading.Thread(target=self._ping)
self.keepalive.start()
[docs] def reset_subscriptions(self, accounts=None, markets=None, objects=None):
self.subscription_accounts = accounts or []
self.subscription_markets = markets or []
self.subscription_objects = objects or []
self.__set_subscriptions()
def __set_subscriptions(self):
self.cancel_all_subscriptions()
# Subscribe to events on the Backend and give them a
# callback number that allows us to identify the event
if len(self.on_object) or len(self.subscription_accounts):
self.set_subscribe_callback(self.__events__.index("on_object"), False)
if self.subscription_accounts and self.on_account:
# Unfortunately, account subscriptions don't have their own
# callback number
log.debug("Subscribing to accounts %s" % str(self.subscription_accounts))
self.get_full_accounts(self.subscription_accounts, True)
if self.subscription_markets and self.on_market:
log.debug("Subscribing to markets %s" % str(self.subscription_markets))
for market in self.subscription_markets:
# Technially, every market could have it's own
# callback number
self.subscribe_to_market(
self.__events__.index("on_market"), market[0], market[1]
)
if len(self.on_tx):
self.set_pending_transaction_callback(self.__events__.index("on_tx"))
if len(self.on_block):
self.set_block_applied_callback(self.__events__.index("on_block"))
def _ping(self):
# We keep the connection alive by requesting a short object
while not self.run_event.wait(self.keep_alive):
log.debug("Sending ping")
self.get_objects(["2.8.0"])
[docs] def process_notice(self, notice):
"""
This method is called on notices that need processing.
Here, we call ``on_object`` and ``on_account`` slots.
"""
id = notice["id"]
_a, _b, _ = id.split(".")
if id in self.subscription_objects:
self.on_object(notice)
elif ".".join([_a, _b, "x"]) in self.subscription_objects:
self.on_object(notice)
elif id[:4] == "2.6.":
# Treat account updates separately
self.on_account(notice)
[docs] def on_message(self, reply, *args, **kwargs):
"""
This method is called by the websocket connection on every message that is
received.
If we receive a ``notice``, we hand over post-processing and signalling of
events to ``process_notice``.
"""
if isinstance(reply, websocket.WebSocketApp):
reply = args[0]
log.debug("Received message: %s" % str(reply))
data = {}
try:
data = json.loads(reply, strict=False)
except ValueError:
raise ValueError("API node returned invalid format. Expected JSON!")
if data.get("method") == "notice":
id = data["params"][0]
if id >= len(self.__events__):
log.critical("Received an id that is out of range\n\n" + str(data))
return
# This is a "general" object change notification
if id == self.__events__.index("on_object"):
# Let's see if a specific object has changed
for notice in data["params"][1]:
try:
if "id" in notice:
self.process_notice(notice)
else:
for obj in notice:
if "id" in obj:
self.process_notice(obj)
except Exception as e:
log.critical(
"Error in process_notice: {}\n\n{}".format(
str(e), traceback.format_exc
)
)
else:
try:
callbackname = self.__events__[id]
log.debug("Patching through to call %s" % callbackname)
[getattr(self.events, callbackname)(x) for x in data["params"][1]]
except Exception as e:
log.critical(
"Error in {}: {}\n\n{}".format(
callbackname, str(e), traceback.format_exc()
)
)
[docs] def on_error(self, error, *args, **kwargs):
"""Called on websocket errors."""
log.exception(error)
[docs] def on_close(self, *args, **kwargs):
"""Called when websocket connection is closed."""
log.debug("Closing WebSocket connection with {}".format(self.url))
[docs] def run_forever(self, *args, **kwargs):
"""
This method is used to run the websocket app continuously.
It will execute callbacks as defined and try to stay connected with the provided
APIs
"""
cnt = 0
while not self.run_event.is_set():
cnt += 1
self.url = next(self.urls)
log.debug("Trying to connect to node %s" % self.url)
try:
# websocket.enableTrace(True)
self.ws = websocket.WebSocketApp(
self.url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open,
)
self.ws.run_forever()
except websocket.WebSocketException:
if self.num_retries >= 0 and cnt > self.num_retries:
raise NumRetriesReached()
sleeptime = (cnt - 1) * 2 if cnt < 10 else 10
if sleeptime:
log.warning(
"Lost connection to node during wsconnect(): %s (%d/%d) "
% (self.url, cnt, self.num_retries)
+ "Retrying in %d seconds" % sleeptime
)
time.sleep(sleeptime)
except KeyboardInterrupt:
self.ws.keep_running = False
return
except Exception as e:
log.critical("{}\n\n{}".format(str(e), traceback.format_exc()))
[docs] def close(self, *args, **kwargs):
"""Closes the websocket connection and waits for the ping thread to close."""
self.run_event.set()
self.ws.close()
if self.keepalive and self.keepalive.is_alive():
self.keepalive.join()
[docs] def get_request_id(self):
self._request_id += 1
return self._request_id
""" RPC Calls
"""
[docs] def rpcexec(self, payload):
"""
Execute a call by sending the payload.
:param dict payload: Payload data
:raises ValueError: if the server does not respond in proper JSON format
:raises RPCError: if the server returns an error
"""
log.debug(json.dumps(payload))
self.ws.send(json.dumps(payload, ensure_ascii=False).encode("utf8"))
def __getattr__(self, name):
"""Map all methods to RPC calls and pass through the arguments."""
if name in self.__events__:
return getattr(self.events, name)
def method(*args, **kwargs):
# Sepcify the api to talk to
if "api_id" not in kwargs:
if "api" in kwargs:
if kwargs["api"] in self.api_id and self.api_id[kwargs["api"]]:
api_id = self.api_id[kwargs["api"]]
else:
raise ValueError(
"Unknown API! "
"Verify that you have registered to %s" % kwargs["api"]
)
else:
api_id = 0
else:
api_id = kwargs["api_id"]
# let's be able to define the num_retries per query
self.num_retries = kwargs.get("num_retries", self.num_retries)
query = {
"method": "call",
"params": [api_id, name, list(args)],
"jsonrpc": "2.0",
"id": self.get_request_id(),
}
r = self.rpcexec(query)
return r
return method