Source code for graphenecommon.wallet

# -*- coding: utf-8 -*-
import logging

from graphenestorage import InRamPlainKeyStore, SqliteEncryptedKeyStore
from graphenecommon.exceptions import (
from .instance import AbstractBlockchainInstanceProvider

log = logging.getLogger(__name__)

class Wallet(AbstractBlockchainInstanceProvider):
    """ The wallet is meant to maintain access to private keys for
        your accounts. It either uses manually provided private keys
        or uses a SQLite database managed by

        :param array,dict,string keys: Predefine the wif keys to shortcut the
               wallet database

        Three wallet operation modes are possible:

        * **Wallet Database**: Here, the library loads the keys from the
          locally stored wallet SQLite database (see ````).
        * **Providing Keys**: Here, you can provide the keys for
          your accounts manually. All you need to do is add the wif
          keys for the accounts you want to use as a simple array
          using the ``keys`` parameter to your blockchain instance.
        * **Force keys**: This more is for advanced users and
          requires that you know what you are doing. Here, the
          ``keys`` parameter is a dictionary that overwrite the
          ``active``, ``owner``, ``posting`` or ``memo`` keys for
          any account. This mode is only used for *foreign*

    def __init__(self, *args, **kwargs):
        assert self.privatekey_class
        assert self.default_key_store_app_name

        # Compatibility after name change from wif->keys
        if "wif" in kwargs and "keys" not in kwargs:
            kwargs["keys"] = kwargs["wif"]

        if "keys" in kwargs:
   = InRamPlainKeyStore()
            if "appname" not in kwargs:
                kwargs["appname"] = self.default_key_store_app_name
   = kwargs.get(
                SqliteEncryptedKeyStore(config=self.blockchain.config, **kwargs),

[docs] def privatekey(self, key): return self.privatekey_class(key, prefix=self.prefix)
[docs] def publickey_from_wif(self, wif): return str(self.privatekey(str(wif)).pubkey)
@property def prefix(self): if self.blockchain.is_connected(): return self.blockchain.prefix else: # If not connected, load prefix from config return self.blockchain.config["prefix"] @property def rpc(self): if not self.blockchain.is_connected(): raise OfflineHasNoRPCException("No RPC available in offline mode!") return self.blockchain.rpc
[docs] def setKeys(self, loadkeys): """ This method is strictly only for in memory keys that are passed to Wallet with the ``keys`` argument """ log.debug("Force setting of private keys. Not using the wallet database!") if isinstance(loadkeys, dict): loadkeys = list(loadkeys.values()) elif not isinstance(loadkeys, (list, set)): loadkeys = [loadkeys] for wif in loadkeys: pub = self.publickey_from_wif(wif), pub)
[docs] def is_encrypted(self): """ Is the key store encrypted? """ return
[docs] def unlock(self, pwd): """ Unlock the wallet database """ if return
[docs] def lock(self): """ Lock the wallet database """ if return else: return False
[docs] def unlocked(self): """ Is the wallet database unlocked? """ if return not else: return True
[docs] def locked(self): """ Is the wallet database locked? """ if return
[docs] def changePassphrase(self, new_pwd): """ Change the passphrase for the wallet database """ self.masterpwd.changePassword(new_pwd)
[docs] def created(self): """ Do we have a wallet database already? """ if len( # Already keys installed return True else: return False
[docs] def create(self, pwd): """ Alias for newWallet() """ self.newWallet(pwd)
[docs] def newWallet(self, pwd): """ Create a new wallet database """ if self.created(): raise WalletExists("You already have created a wallet!")
[docs] def addPrivateKey(self, wif): """ Add a private key to the wallet database """ try: pub = self.publickey_from_wif(wif) except Exception: raise InvalidWifError("Invalid Key format!") if str(pub) in raise KeyAlreadyInStoreException("Key already in the store"), str(pub))
[docs] def getPrivateKeyForPublicKey(self, pub): """ Obtain the private key for a given public key :param str pub: Public Key """ if str(pub) not in raise KeyNotFound return
[docs] def removePrivateKeyFromPublicKey(self, pub): """ Remove a key from the wallet database """
[docs] def removeAccount(self, account): """ Remove all keys associated with a given account """ accounts = self.getAccounts() for a in accounts: if a["name"] == account:["pubkey"])
[docs] def getOwnerKeyForAccount(self, name): """ Obtain owner Private Key for an account from the wallet database """ account = self.rpc.get_account(name) for authority in account["owner"]["key_auths"]: key = self.getPrivateKeyForPublicKey(authority[0]) if key: return key raise KeyNotFound
[docs] def getMemoKeyForAccount(self, name): """ Obtain owner Memo Key for an account from the wallet database """ account = self.rpc.get_account(name) key = self.getPrivateKeyForPublicKey(account["options"]["memo_key"]) if key: return key return False
[docs] def getActiveKeyForAccount(self, name): """ Obtain owner Active Key for an account from the wallet database """ account = self.rpc.get_account(name) for authority in account["active"]["key_auths"]: try: return self.getPrivateKeyForPublicKey(authority[0]) except Exception: pass return False
[docs] def getAccountFromPrivateKey(self, wif): """ Obtain account name from private key """ pub = self.publickey_from_wif(wif) return self.getAccountFromPublicKey(pub)
[docs] def getAccountsFromPublicKey(self, pub): """ Obtain all accounts associated with a public key """ names = self.rpc.get_key_references([str(pub)])[0] for name in names: yield name
[docs] def getAccountFromPublicKey(self, pub): """ Obtain the first account name from public key """ # FIXME, this only returns the first associated key. # If the key is used by multiple accounts, this # will surely lead to undesired behavior names = list(self.getAccountsFromPublicKey(str(pub))) if names: return names[0]
[docs] def getAllAccounts(self, pub): """ Get the account data for a public key (all accounts found for this public key) """ return DeprecationWarning( "Use 'getAccountsFromPublicKey' instead and resolve with your own " "Account() class!" )
[docs] def getKeyType(self, account, pub): """ Get key type """ for authority in ["owner", "active"]: for key in account[authority]["key_auths"]: if str(pub) == key[0]: return authority if str(pub) == account["options"]["memo_key"]: return "memo" return None
[docs] def getAccounts(self): """ Return all accounts installed in the wallet database """ pubkeys = self.getPublicKeys() accounts = [] for pubkey in pubkeys: # Filter those keys not for our network if pubkey[: len(self.prefix)] == self.prefix: accounts.extend(self.getAccountsFromPublicKey(pubkey)) return accounts
[docs] def getPublicKeys(self, current=False): """ Return all installed public keys :param bool current: If true, returns only keys for currently connected blockchain """ pubkeys = if not current: return pubkeys pubs = [] for pubkey in pubkeys: # Filter those keys not for our network if pubkey[: len(self.prefix)] == self.prefix: pubs.append(pubkey) return pubs
[docs] def wipe(self, sure=False): if not sure: log.error( "You need to confirm that you are sure " "and understand the implications of " "wiping your wallet!" ) return else: