[Python-modules-commits] [python-keepkey] 02/06: Import python-keepkey_0.7.0.orig.tar.gz
Tristan Seligmann
mithrandi at moszumanska.debian.org
Sun May 29 14:24:39 UTC 2016
This is an automated email from the git hooks/post-receive script.
mithrandi pushed a commit to branch master
in repository python-keepkey.
commit 147a8be0b5c9d91d20317ed926823e2334e414d5
Author: Tristan Seligmann <mithrandi at mithrandi.net>
Date: Sun May 29 15:50:37 2016 +0200
Import python-keepkey_0.7.0.orig.tar.gz
PKG-INFO | 14 +
README.rst | 88 +
keepkey.egg-info/PKG-INFO | 14 +
keepkey.egg-info/SOURCES.txt | 27 +
keepkey.egg-info/dependency_links.txt | 1 +
keepkey.egg-info/not-zip-safe | 1 +
keepkey.egg-info/requires.txt | 4 +
keepkey.egg-info/top_level.txt | 1 +
keepkeyctl | 432 +++++
keepkeylib/__init__.py | 0
keepkeylib/ckd_public.py | 125 ++
keepkeylib/client.py | 904 ++++++++++
keepkeylib/debuglink.py | 112 ++
keepkeylib/mapping.py | 33 +
keepkeylib/messages_pb2.py | 3059 +++++++++++++++++++++++++++++++++
keepkeylib/protobuf_json.py | 142 ++
keepkeylib/qt/__init__.py | 0
keepkeylib/qt/pinmatrix.py | 110 ++
keepkeylib/tools.py | 98 ++
keepkeylib/transport.py | 133 ++
keepkeylib/transport_fake.py | 24 +
keepkeylib/transport_hid.py | 133 ++
keepkeylib/transport_pipe.py | 59 +
keepkeylib/transport_serial.py | 39 +
keepkeylib/transport_socket.py | 110 ++
keepkeylib/tx_api.py | 55 +
keepkeylib/types_pb2.py | 986 +++++++++++
setup.cfg | 5 +
setup.py | 40 +
29 files changed, 6749 insertions(+)
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..9d02d00
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,14 @@
+Metadata-Version: 1.1
+Name: keepkey
+Version: 0.7.0
+Summary: Python library for communicating with KeepKey Hardware Wallet
+Home-page: https://github.com/keepkey/python-keepkey
+Author: Bitcoin TREZOR and KeepKey
+Author-email: support at keepkey.com
+License: UNKNOWN
+Description: UNKNOWN
+Platform: UNKNOWN
+Classifier: License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)
+Classifier: Operating System :: POSIX :: Linux
+Classifier: Operating System :: Microsoft :: Windows
+Classifier: Operating System :: MacOS :: MacOS X
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..f940b43
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,88 @@
+Client side implementation for KeepKey-compatible Bitcoin hardware wallets.
+This is a modified version of python-trezor. The changes made were to
+support KeepKey's protocol, as well as the additional feature set
+of KeepKey. For example, by default, device_recovery command invokes
+KeepKey's style of device recovery using the Recovery Cipher.
+See http://www.keepkey.com for more information.
+also found in ``helloworld.py``
+.. code:: python
+ #!/usr/bin/env python
+ from keepkeylib.client import KeepKeyClient
+ from keepkeylib.transport_hid import HidTransport
+ def main():
+ # List all connected KeepKeys on USB
+ devices = HidTransport.enumerate()
+ # Check whether we found any
+ if len(devices) == 0:
+ print 'No KeepKey found'
+ return
+ # Use first connected device
+ transport = HidTransport(devices[0])
+ # Creates object for manipulating KeepKey
+ client = KeepKeyClient(transport)
+ # Print out KeepKey's features and settings
+ print client.features
+ # Get the first address of first BIP44 account
+ # (should be the same address as shown in KeepKey wallet Chrome extension)
+ bip32_path = client.expand_path("44'/0'/0'/0/0")
+ address = client.get_address('Bitcoin', bip32_path)
+ print 'Bitcoin address:', address
+ client.close()
+ if __name__ == '__main__':
+ main()
+PIN Entering
+When you are asked for PIN, you have to enter scrambled PIN. Follow the numbers shown on KeepKey display and enter the their positions using the numeric keyboard mapping:
+=== === ===
+ 7 8 9
+ 4 5 6
+ 1 2 3
+=== === ===
+Example: your PIN is **1234** and KeepKey is displaying the following:
+=== === ===
+ 2 8 3
+ 5 4 6
+ 7 9 1
+=== === ===
+You have to enter: **3795**
+How to install (Windows)
+* Install Python 2.7 (http://python.org)
+* Run C:\\python27\\scripts\\pip.exe install cython
+* Install Microsoft Visual C++ Compiler for Python 2.7
+* Clone repository (using TortoiseGit) to local directory
+* Run C:\\python27\\python.exe setup.py install (or develop)
+How to install (Debian-Ubuntu)
+* sudo apt-get install python-dev python-setuptools cython libusb-1.0-0-dev libudev-dev git
+* git clone https://github.com/keepkey/python-keepkey.git
+* cd python-keepkey
+* python setup.py install (or develop)
diff --git a/keepkey.egg-info/PKG-INFO b/keepkey.egg-info/PKG-INFO
new file mode 100644
index 0000000..9d02d00
--- /dev/null
+++ b/keepkey.egg-info/PKG-INFO
@@ -0,0 +1,14 @@
+Metadata-Version: 1.1
+Name: keepkey
+Version: 0.7.0
+Summary: Python library for communicating with KeepKey Hardware Wallet
+Home-page: https://github.com/keepkey/python-keepkey
+Author: Bitcoin TREZOR and KeepKey
+Author-email: support at keepkey.com
+License: UNKNOWN
+Description: UNKNOWN
+Platform: UNKNOWN
+Classifier: License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)
+Classifier: Operating System :: POSIX :: Linux
+Classifier: Operating System :: Microsoft :: Windows
+Classifier: Operating System :: MacOS :: MacOS X
diff --git a/keepkey.egg-info/SOURCES.txt b/keepkey.egg-info/SOURCES.txt
new file mode 100644
index 0000000..05e1923
--- /dev/null
+++ b/keepkey.egg-info/SOURCES.txt
@@ -0,0 +1,27 @@
\ No newline at end of file
diff --git a/keepkey.egg-info/dependency_links.txt b/keepkey.egg-info/dependency_links.txt
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/keepkey.egg-info/dependency_links.txt
@@ -0,0 +1 @@
diff --git a/keepkey.egg-info/not-zip-safe b/keepkey.egg-info/not-zip-safe
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/keepkey.egg-info/not-zip-safe
@@ -0,0 +1 @@
diff --git a/keepkey.egg-info/requires.txt b/keepkey.egg-info/requires.txt
new file mode 100644
index 0000000..c587b67
--- /dev/null
+++ b/keepkey.egg-info/requires.txt
@@ -0,0 +1,4 @@
\ No newline at end of file
diff --git a/keepkey.egg-info/top_level.txt b/keepkey.egg-info/top_level.txt
new file mode 100644
index 0000000..15e0ee5
--- /dev/null
+++ b/keepkey.egg-info/top_level.txt
@@ -0,0 +1 @@
diff --git a/keepkeyctl b/keepkeyctl
new file mode 100755
index 0000000..8c23be2
--- /dev/null
+++ b/keepkeyctl
@@ -0,0 +1,432 @@
+#!/usr/bin/env python
+import os
+import binascii
+import argparse
+import json
+import base64
+import urllib
+import tempfile
+from keepkeylib.client import KeepKeyClient, KeepKeyClientDebug
+def parse_args(commands):
+ parser = argparse.ArgumentParser(description='Commandline tool for KeepKey devices.')
+ parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Prints communication to device')
+ parser.add_argument('-t', '--transport', dest='transport', choices=['usb', 'serial', 'pipe', 'socket', 'bridge'], default='usb', help="Transport used for talking with the device")
+ parser.add_argument('-p', '--path', dest='path', default='', help="Path used by the transport (usually serial port)")
+# parser.add_argument('-dt', '--debuglink-transport', dest='debuglink_transport', choices=['usb', 'serial', 'pipe', 'socket'], default='usb', help="Debuglink transport")
+# parser.add_argument('-dp', '--debuglink-path', dest='debuglink_path', default='', help="Path used by the transport (usually serial port)")
+ parser.add_argument('-j', '--json', dest='json', action='store_true', help="Prints result as json object")
+# parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='Enable low-level debugging')
+ cmdparser = parser.add_subparsers(title='Available commands')
+ for cmd in commands._list_commands():
+ func = object.__getattribute__(commands, cmd)
+ try:
+ arguments = func.arguments
+ except AttributeError:
+ arguments = ((('params',), {'nargs': '*'}),)
+ item = cmdparser.add_parser(cmd, help=func.help)
+ for arg in arguments:
+ item.add_argument(*arg[0], **arg[1])
+ item.set_defaults(func=func)
+ item.set_defaults(cmd=cmd)
+ return parser.parse_args()
+def get_transport(transport_string, path, **kwargs):
+ if transport_string == 'usb':
+ from keepkeylib.transport_hid import HidTransport
+ if path == '':
+ try:
+ path = list_usb()[0][0]
+ except IndexError:
+ raise Exception("No KeepKey found on USB")
+ for d in HidTransport.enumerate():
+ # Two-tuple of (normal_interface, debug_interface)
+ if path in d:
+ return HidTransport(d, **kwargs)
+ raise Exception("Device not found")
+ if transport_string == 'serial':
+ from keepkeylib.transport_serial import SerialTransport
+ return SerialTransport(path, **kwargs)
+ if transport_string == 'pipe':
+ from keepkeylib.transport_pipe import PipeTransport
+ return PipeTransport(path, is_device=False, **kwargs)
+ if transport_string == 'socket':
+ from keepkeylib.transport_socket import SocketTransportClient
+ return SocketTransportClient(path, **kwargs)
+ if transport_string == 'fake':
+ from keepkeylib.transport_fake import FakeTransport
+ return FakeTransport(path, **kwargs)
+ raise NotImplemented("Unknown transport")
+class Commands(object):
+ def __init__(self, client):
+ self.client = client
+ @classmethod
+ def _list_commands(cls):
+ return [ x for x in dir(cls) if not x.startswith('_') ]
+ def list(self, args):
+ # Fake method for advertising 'list' command
+ pass
+ def get_address(self, args):
+ address_n = self.client.expand_path(args.n)
+ return self.client.get_address(args.coin, address_n, args.show_display)
+ def get_entropy(self, args):
+ return binascii.hexlify(self.client.get_entropy(args.size))
+ def get_features(self, args):
+ return self.client.features
+ def list_coins(self, args):
+ return [ coin.coin_name for coin in self.client.features.coins ]
+ def ping(self, args):
+ return self.client.ping(args.msg, button_protection=args.button_protection, pin_protection=args.pin_protection, passphrase_protection=args.passphrase_protection)
+ def get_public_node(self, args):
+ address_n = self.client.expand_path(args.n)
+ return self.client.get_public_node(address_n, args.ecdsa_curve_name, args.show_display)
+ def set_label(self, args):
+ return self.client.apply_settings(label=args.label)
+ def clear_session(self, args):
+ return self.client.clear_session()
+ def change_pin(self, args):
+ return self.client.change_pin(args.remove)
+ def wipe_device(self, args):
+ return self.client.wipe_device()
+ def recovery_device(self, args):
+ return self.client.recovery_device(args.use_trezor_method, args.words, args.passphrase_protection,
+ args.pin_protection, args.label, 'english')
+ def load_device(self, args):
+ if not args.mnemonic and not args.xprv:
+ raise Exception("Please provide mnemonic or xprv")
+ if args.mnemonic:
+ mnemonic = ' '.join(args.mnemonic)
+ return self.client.load_device_by_mnemonic(mnemonic, args.pin,
+ args.passphrase_protection, args.label, 'english', args.skip_checksum)
+ else:
+ return self.client.load_device_by_xprv(args.xprv, args.pin,
+ args.passphrase_protection, args.label, 'english')
+ def reset_device(self, args):
+ return self.client.reset_device(True, args.strength, args.passphrase_protection,
+ args.pin_protection, args.label, 'english')
+ def sign_message(self, args):
+ address_n = self.client.expand_path(args.n)
+ ret = self.client.sign_message(args.coin, address_n, args.message)
+ output = {
+ 'message': args.message,
+ 'address': ret.address,
+ 'signature': base64.b64encode(ret.signature)
+ }
+ return output
+ def verify_message(self, args):
+ signature = base64.b64decode(args.signature)
+ return self.client.verify_message(args.address, signature, args.message)
+ def encrypt_message(self, args):
+ pubkey = binascii.unhexlify(args.pubkey)
+ address_n = self.client.expand_path(args.n)
+ ret = self.client.encrypt_message(pubkey, args.message, args.display_only, args.coin, address_n)
+ output = {
+ 'nonce': binascii.hexlify(ret.nonce),
+ 'message': binascii.hexlify(ret.message),
+ 'hmac': binascii.hexlify(ret.hmac),
+ 'payload': base64.b64encode(ret.nonce + ret.message + ret.hmac),
+ }
+ return output
+ def decrypt_message(self, args):
+ address_n = self.client.expand_path(args.n)
+ payload = base64.b64decode(args.payload)
+ nonce, message, msg_hmac = payload[:33], payload[33:-8], payload[-8:]
+ ret = self.client.decrypt_message(address_n, nonce, message, msg_hmac)
+ return ret
+ def encrypt_keyvalue(self, args):
+ address_n = self.client.expand_path(args.n)
+ ret = self.client.encrypt_keyvalue(address_n, args.key, args.value)
+ return binascii.hexlify(ret)
+ def decrypt_keyvalue(self, args):
+ address_n = self.client.expand_path(args.n)
+ ret = self.client.decrypt_keyvalue(address_n, args.key, args.value.decode("hex"))
+ return ret
+ def firmware_update(self, args):
+ if not args.file and not args.url:
+ raise Exception("Must provide firmware filename or URL")
+ if args.file:
+ fp = open(args.file, 'r')
+ elif args.url:
+ print "Downloading from", args.url
+ resp = urllib.urlretrieve(args.url)
+ fp = open(resp[0], 'r')
+ urllib.urlcleanup() # We still keep file pointer open
+ if fp.read(8) == '54525a52':
+ print "Converting firmware to binary"
+ fp.seek(0)
+ fp_old = fp
+ fp = tempfile.TemporaryFile()
+ fp.write(binascii.unhexlify(fp_old.read()))
+ fp_old.close()
+ fp.seek(0)
+ if fp.read(4) != 'KPKY':
+ raise Exception("KeepKey firmware header expected")
+ print "Please confirm action on device..."
+ fp.seek(0)
+ return self.client.firmware_update(fp=fp)
+ list.help = 'List connected KeepKey USB devices'
+ ping.help = 'Send ping message'
+ get_address.help = 'Get bitcoin address in base58 encoding'
+ get_entropy.help = 'Get example entropy'
+ get_features.help = 'Retrieve device features and settings'
+ get_public_node.help = 'Get public node of given path'
+ set_label.help = 'Set new wallet label'
+ clear_session.help = 'Clear session (remove cached PIN, passphrase, etc.)'
+ change_pin.help = 'Change new PIN or remove existing'
+ list_coins.help = 'List all supported coin types by the device'
+ wipe_device.help = 'Reset device to factory defaults and remove all private data.'
+ recovery_device.help = 'Start safe recovery workflow'
+ load_device.help = 'Load custom configuration to the device'
+ reset_device.help = 'Perform device setup and generate new seed'
+ sign_message.help = 'Sign message using address of given path'
+ verify_message.help = 'Verify message'
+ encrypt_message.help = 'Encrypt message'
+ decrypt_message.help = 'Decrypt message'
+ encrypt_keyvalue.help = 'Encrypt value by given key and path'
+ decrypt_keyvalue.help = 'Decrypt value by given key and path'
+ firmware_update.help = 'Upload new firmware to device (must be in bootloader mode)'
+ get_address.arguments = (
+ (('-c', '--coin'), {'type': str, 'default': 'Bitcoin'}),
+ (('-n', '-address'), {'type': str}),
+ (('-d', '--show-display'), {'action': 'store_true', 'default': False}),
+ )
+ get_entropy.arguments = (
+ (('size',), {'type': int}),
+ )
+ get_features.arguments = ()
+ list_coins.arguments = ()
+ ping.arguments = (
+ (('msg',), {'type': str}),
+ (('-b', '--button-protection'), {'action': 'store_true', 'default': False}),
+ (('-p', '--pin-protection'), {'action': 'store_true', 'default': False}),
+ (('-r', '--passphrase-protection'), {'action': 'store_true', 'default': False}),
+ )
+ set_label.arguments = (
+ (('-l', '--label',), {'type': str, 'default': ''}),
+# (('-c', '--clear'), {'action': 'store_true', 'default': False})
+ )
+ change_pin.arguments = (
+ (('-r', '--remove'), {'action': 'store_true', 'default': False}),
+ )
+ wipe_device.arguments = ()
+ recovery_device.arguments = (
+ (('-t', '--use-trezor-method'), {'action': 'store_true', 'default': False}),
+ (('-w', '--words'), {'type': int}),
+ (('-p', '--pin-protection'), {'action': 'store_true', 'default': False}),
+ (('-r', '--passphrase-protection'), {'action': 'store_true', 'default': False}),
+ (('-l', '--label'), {'type': str, 'default': ''}),
+ )
+ load_device.arguments = (
+ (('-m', '--mnemonic'), {'type': str, 'nargs': '+'}),
+ (('-x', '--xprv'), {'type': str}),
+ (('-p', '--pin'), {'type': str, 'default': ''}),
+ (('-r', '--passphrase-protection'), {'action': 'store_true', 'default': False}),
+ (('-l', '--label'), {'type': str, 'default': ''}),
+ (('-s', '--skip-checksum'), {'action': 'store_true', 'default': False}),
+ )
+ reset_device.arguments = (
+ (('-t', '--strength'), {'type': int, 'choices': [128, 192, 256], 'default': 256}),
+ (('-p', '--pin-protection'), {'action': 'store_true', 'default': False}),
+ (('-r', '--passphrase-protection'), {'action': 'store_true', 'default': False}),
+ (('-l', '--label'), {'type': str, 'default': ''}),
+ )
+ sign_message.arguments = (
+ (('-c', '--coin'), {'type': str, 'default': 'Bitcoin'}),
+ (('-n', '-address'), {'type': str}),
+ (('message',), {'type': str}),
+ )
+ encrypt_message.arguments = (
+ (('pubkey',), {'type': str}),
+ (('message',), {'type': str}),
+ (('-d', '--display-only'), {'action': 'store_true', 'default': False}),
+ (('-c', '--coin'), {'type': str, 'default': 'Bitcoin'}),
+ (('-n', '-address'), {'type': str}),
+ )
+ decrypt_message.arguments = (
+ (('-n', '-address'), {'type': str}),
+ (('payload',), {'type': str}),
+ )
+ verify_message.arguments = (
+ (('address',), {'type': str}),
+ (('signature',), {'type': str}),
+ (('message',), {'type': str}),
+ )
+ encrypt_keyvalue.arguments = (
+ (('-n', '-address'), {'type': str}),
+ (('key',), {'type': str}),
+ (('value',), {'type': str}),
+ )
+ decrypt_keyvalue.arguments = (
+ (('-n', '-address'), {'type': str}),
+ (('key',), {'type': str}),
+ (('value',), {'type': str}),
+ )
+ get_public_node.arguments = (
+ (('-n', '-address'), {'type': str}),
+ (('-e', '--ecdsa-curve-name'), {'type': str}),
+ (('-d', '--show-display'), {'action': 'store_true', 'default': False}),
+ )
+ firmware_update.arguments = (
+ (('-f', '--file'), {'type': str}),
+ (('-u', '--url'), {'type': str}),
+ )
+def list_usb():
+ from keepkeylib.transport_hid import HidTransport
+ return HidTransport.enumerate()
+class PinMatrixThread(threading.Thread):
+ # Hacked PinMatrixWidget into command line tool :-).
+ def __init__(self, input_text, message):
+ super(PinMatrixThread, self).__init__()
+ self.input_text = input_text
+ self.message = message
+ self.pin_value = ''
+ def run(self):
+ from keepkeylib.pinmatrix import PinMatrixWidget
+ import sys
+ from PyQt4.Qt import QApplication, QWidget, QVBoxLayout
+ from PyQt4.QtGui import QPushButton, QLabel
+ from PyQt4.QtCore import QObject, SIGNAL
+ a = QApplication(sys.argv)
+ matrix = PinMatrixWidget()
+ def clicked():
+ self.pin_value = str(matrix.get_value())
+ a.closeAllWindows()
+ ok = QPushButton('OK')
+ QObject.connect(ok, SIGNAL('clicked()'), clicked)
+ vbox = QVBoxLayout()
+ vbox.addWidget(QLabel(self.input_text + self.message))
+ vbox.addWidget(matrix)
+ vbox.addWidget(ok)
+ w = QWidget()
+ w.setLayout(vbox)
+ w.move(100, 100)
+ w.show()
+ a.exec_()
+def qt_pin_func(input_text, message=None):
+ # This is a hack to display Qt window in non-qt application.
+ # Qt window just asks for PIN and closes itself, which trigger join().
+ if False: # os.getenv('DISPLAY'):
+ # Let's hope that system is configured properly and this won't crash
+ t = PinMatrixThread(input_text, message)
+ t.start()
+ t.join()
+ return t.pin_value
+ else:
+ # Most likely no X is running,
+ # let's fallback to default pin_func implementation
+ return pin_func(input_text, message)
+def main():
+ args = parse_args(Commands)
+ if args.cmd == 'list':
+ devices = list_usb()
+ if args.json:
+ print json.dumps(devices)
+ else:
+ for dev in devices:
+ if dev[1] != None:
+ print "%s - debuglink enabled" % dev[0]
+ else:
+ print dev[0]
+ return
+ transport = get_transport(args.transport, args.path)
+ if args.verbose:
+ client = KeepKeyClientDebug(transport)
+ else:
+ client = KeepKeyClient(transport)
+ cmds = Commands(client)
+ res = args.func(cmds, args)
+ if args.json:
+ print json.dumps(res, sort_keys=True, indent=4)
+ else:
+ print res
+if __name__ == '__main__':
+ main()
diff --git a/keepkeylib/__init__.py b/keepkeylib/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/keepkeylib/ckd_public.py b/keepkeylib/ckd_public.py
new file mode 100644
index 0000000..64787b7
--- /dev/null
+++ b/keepkeylib/ckd_public.py
@@ -0,0 +1,125 @@
+import struct
+import hmac
+import hashlib
+import ecdsa
+from ecdsa.util import string_to_number, number_to_string
+from ecdsa.curves import SECP256k1
+from ecdsa.ellipticcurve import Point, INFINITY
+import tools
+import types_pb2 as proto_types
+def point_to_pubkey(point):
+ order = SECP256k1.order
+ x_str = number_to_string(point.x(), order)
+ y_str = number_to_string(point.y(), order)
+ vk = x_str + y_str
+ return chr((ord(vk[63]) & 1) + 2) + vk[0:32] # To compressed key
+def sec_to_public_pair(pubkey):
+ """Convert a public key in sec binary format to a public pair."""
+ x = string_to_number(pubkey[1:33])
+ sec0 = pubkey[:1]
+ if sec0 not in (b'\2', b'\3'):
+ raise Exception("Compressed pubkey expected")
+ def public_pair_for_x(generator, x, is_even):
+ curve = generator.curve()
+ p = curve.p()
+ alpha = (pow(x, 3, p) + curve.a() * x + curve.b()) % p
+ beta = ecdsa.numbertheory.square_root_mod_prime(alpha, p)
+ if is_even == bool(beta & 1):
+ return (x, p - beta)
+ return (x, beta)
+ return public_pair_for_x(ecdsa.ecdsa.generator_secp256k1, x, is_even=(sec0 == b'\2'))
+def is_prime(n):
+ return (bool)(n & PRIME_DERIVATION_FLAG)
+def fingerprint(pubkey):
+ return string_to_number(tools.hash_160(pubkey)[:4])
+def get_address(public_node, address_type):
+ return tools.public_key_to_bc_address(public_node.public_key, address_type)
+def public_ckd(public_node, n):
+ if not isinstance(n, list):
+ raise Exception('Parameter must be a list')
+ node = proto_types.HDNodeType()
+ node.CopyFrom(public_node)
+ for i in n:
+ node.CopyFrom(get_subnode(node, i))
+ return node
+def get_subnode(node, i):
+ # Public Child key derivation (CKD) algorithm of BIP32
+ i_as_bytes = struct.pack(">L", i)
+ if is_prime(i):
+ raise Exception("Prime derivation not supported")
+ # Public derivation
+ data = node.public_key + i_as_bytes
+ I64 = hmac.HMAC(key=node.chain_code, msg=data, digestmod=hashlib.sha512).digest()
+ I_left_as_exponent = string_to_number(I64[:32])
+ node_out = proto_types.HDNodeType()
+ node_out.depth = node.depth + 1
+ node_out.child_num = i
+ node_out.chain_code = I64[32:]
+ node_out.fingerprint = fingerprint(node.public_key)
+ # BIP32 magic converts old public key to new public point
+ x, y = sec_to_public_pair(node.public_key)
+ point = I_left_as_exponent * SECP256k1.generator + \
+ Point(SECP256k1.curve, x, y, SECP256k1.order)
+ if point == INFINITY:
+ raise Exception("Point cannot be INFINITY")
+ # Convert public point to compressed public key
+ node_out.public_key = point_to_pubkey(point)
+ return node_out
+def serialize(node, version=0x0488B21E):
+ s = ''
+ s += struct.pack('>I', version)
+ s += struct.pack('>B', node.depth)
+ s += struct.pack('>I', node.fingerprint)
+ s += struct.pack('>I', node.child_num)
+ s += node.chain_code
+ if node.private_key:
+ s += '\x00' + node.private_key
+ else :
+ s += node.public_key
+ s += tools.Hash(s)[:4]
+ return tools.b58encode(s)
+def deserialize(xpub):
+ data = tools.b58decode(xpub, None)
+ if tools.Hash(data[:-4])[:4] != data[-4:]:
+ raise Exception("Checksum failed")
+ node = proto_types.HDNodeType()
+ node.depth = struct.unpack('>B', data[4:5])[0]
+ node.fingerprint = struct.unpack('>I', data[5:9])[0]
+ node.child_num = struct.unpack('>I', data[9:13])[0]
+ node.chain_code = data[13:45]
+ key = data[45:-4]
+ if key[0] == '\x00':
+ node.private_key = key[1:]
+ else:
+ node.public_key = key
+ return node
diff --git a/keepkeylib/client.py b/keepkeylib/client.py
new file mode 100644
index 0000000..98b8c6a
--- /dev/null
+++ b/keepkeylib/client.py
@@ -0,0 +1,904 @@
+import os
+import sys
+import time
+import binascii
+import hashlib
+import unicodedata
+import mapping
+import json
+import getpass
+import tools
+import messages_pb2 as proto
+import types_pb2 as types
+import protobuf_json
+from keepkeylib.debuglink import DebugLink
+from mnemonic import Mnemonic
+# try:
+# from PIL import Image
+# except:
+DEFAULT_CURVE = 'secp256k1'
+# monkeypatching: text formatting of protobuf messages
+def get_buttonrequest_value(code):
+ # Converts integer code to its string representation of ButtonRequestType
+ return [ k for k, v in types.ButtonRequestType.items() if v == code][0]
+def pprint(msg):
+ msg_class = msg.__class__.__name__
+ msg_size = msg.ByteSize()
+ """
+ msg_ser = msg.SerializeToString()
+ msg_id = mapping.get_type(msg)
+ msg_json = json.dumps(protobuf_json.pb2json(msg))
+ """
+ if isinstance(msg, proto.FirmwareUpload):
+ return "<%s> (%d bytes):\n" % (msg_class, msg_size)
+ else:
+ return "<%s> (%d bytes):\n%s" % (msg_class, msg_size, msg)
+def log(msg):
+ sys.stderr.write("%s\n" % msg)
+ sys.stderr.flush()
+def log_cr(msg):
+ sys.stdout.write('\r%s' % msg)
+ sys.stdout.flush()
+def log_backspace(times):
+ for _ in range(times):
+ sys.stdout.write('\b')
+ sys.stdout.flush()
+def format_mnemonic(word_pos, character_pos):
+ return "WORD %d: %s" % (word_pos, character_pos * '*')
+def getch():
+ try:
+ import termios
+ except ImportError:
+ # Non-POSIX. Return msvcrt's (Windows') getch.
+ import msvcrt
+ return msvcrt.getch()
+ # POSIX system. Create and return a getch that manipulates the tty.
+ import sys, tty
+ def _getch():
+ fd = sys.stdin.fileno()
+ old_settings = termios.tcgetattr(fd)
+ try:
+ tty.setraw(fd)
+ ch = sys.stdin.read(1)
+ finally:
+ termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
+ return ch
+ return _getch()
+class CallException(Exception):
+ def __init__(self, code, message):
+ super(CallException, self).__init__()
+ self.args = [code, message]
+class PinException(CallException):
+ pass
+class field(object):
+ # Decorator extracts single value from
+ # protobuf object. If the field is not
+ # present, raises an exception.
+ def __init__(self, field):
+ self.field = field
+ def __call__(self, f):
+ def wrapped_f(*args, **kwargs):
+ ret = f(*args, **kwargs)
+ ret.HasField(self.field)
+ return getattr(ret, self.field)
+ return wrapped_f
+class expect(object):
+ # Decorator checks if the method
+ # returned one of expected protobuf messages
+ # or raises an exception
+ def __init__(self, *expected):
+ self.expected = expected
+ def __call__(self, f):
+ def wrapped_f(*args, **kwargs):
+ ret = f(*args, **kwargs)
+ if not isinstance(ret, self.expected):
+ raise Exception("Got %s, expected %s" % (ret.__class__, self.expected))
+ return ret
+ return wrapped_f
+def normalize_nfc(txt):
+ # Normalize string to UTF8 NFC for sign_message
+ if isinstance(txt, str):
+ utxt = txt.decode('utf8')
+ elif isinstance(txt, unicode):
+ utxt = txt
+ else:
+ raise Exception("String value expected")
+ return unicodedata.normalize('NFC', utxt)
+class BaseClient(object):
+ # Implements very basic layer of sending raw protobuf
+ # messages to device and getting its response back.
+ def __init__(self, transport, **kwargs):
+ self.transport = transport
+ super(BaseClient, self).__init__() # *args, **kwargs)
+ def cancel(self):
+ self.transport.write(proto.Cancel())
+ def call_raw(self, msg):
+ try:
+ self.transport.session_begin()
+ self.transport.write(msg)
+ resp = self.transport.read_blocking()
+ finally:
+ self.transport.session_end()
+ return resp
+ def call(self, msg):
+ try:
+ self.transport.session_begin()
+ resp = self.call_raw(msg)
+ handler_name = "callback_%s" % resp.__class__.__name__
+ handler = getattr(self, handler_name, None)
+ if handler != None:
+ msg = handler(resp)
+ if msg == None:
+ raise Exception("Callback %s must return protobuf message, not None" % handler)
+ resp = self.call(msg)
+ finally:
+ self.transport.session_end()
+ return resp
+ def callback_Failure(self, msg):
+ if msg.code in (types.Failure_PinInvalid,
+ types.Failure_PinCancelled, types.Failure_PinExpected):
+ raise PinException(msg.code, msg.message)
+ raise CallException(msg.code, msg.message)
+ def close(self):
+ self.transport.close()
... 5956 lines suppressed ...
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-keepkey.git
More information about the Python-modules-commits
mailing list