Skip to content

Instantly share code, notes, and snippets.

@akrisanov
Last active November 29, 2022 11:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akrisanov/078b689d3f1303f2c6743c6d9e940406 to your computer and use it in GitHub Desktop.
Save akrisanov/078b689d3f1303f2c6743c6d9e940406 to your computer and use it in GitHub Desktop.
Trading cryptocurrencies
import logging
from http import HTTPStatus
import requests
from mnemonic import Mnemonic
class BcoinClient:
"""
Simple wrapper around Bcoin REST API.
"""
BASE_NODE_URL = "http://localhost:18332/"
BASE_WALLET_URL = "http://localhost:18334/"
BTC_UNIT = 100_000_000
CONFIRMATIONS_REQUIRED = 3
# NOTE: this multiplier may change over time: https://bitcoinfees.earn.com/
SATOSHI_PER_KBYTE = 3 * 1024
def __init__(
self,
node_api_key,
wallet_api_key,
base_node_url=BASE_NODE_URL,
base_wallet_url=BASE_WALLET_URL,
):
if not node_api_key:
raise ValueError("An API key for Bcoin node should be provided")
if not wallet_api_key:
raise ValueError("An API key for Wallet server should be provided")
self._node_api_key = node_api_key
self._wallet_api_key = wallet_api_key
self._base_node_url = base_node_url
self._base_wallet_url = base_wallet_url
self.logger = logging.getLogger()
def __repr__(self):
return f"Bcoin client {id(self)} connected to {self._base_node_url}"
def server_info(self):
"""
Retrieve system information about BTC node.
"""
resp = self.do_node_request().json()
print(
f"Version: {resp['version']} | "
f"Network: {resp['network']} | "
f"Progress: {resp['chain']['progress']} | "
f"Uptime: {resp['time']['uptime']} | "
f"Memory: {resp['memory']['total']}"
)
def create_wallet(self, name, passphrase=None):
"""
Create a new wallet with a specified name.
:param str name: Wallet ID (used for storage)
:param str passphrase: A strong passphrase used to encrypt the wallet. Don't use mnemonic!
"""
wallet = self.do_wallet_request(
f"wallet/{name}", "PUT", {"id": name, "passphrase": passphrase}
).json()
print("Wallet has been successfully created. Don't lose your secret keys!")
return {"id": wallet["id"], "passphare": passphrase, "token": wallet["token"]}
def wallet_accounts(self, wallet_name, wallet_token):
"""
List all account names (array indices map directly to bip44 account indices)
associated with a specific wallet id.
"""
accounts = self.do_wallet_request(
f"wallet/{wallet_name}/account", payload={"token": wallet_token}
).json()
return accounts
def wallet_account(self, wallet_name, wallet_token, account="default"):
"""
Get account info.
"""
account = self.do_wallet_request(
f"wallet/{wallet_name}/account/{account}", payload={"token": wallet_token}
).json()
return account
def wallet_balance(self, wallet_name, wallet_token, account=None):
"""
Get wallet or account balance.
If no account option is passed, the call defaults to wallet balance (with account index of -1).
Balance values for `unconfimred` and `confirmed` are expressed in satoshis.
The balance value returned by this method is a bit confusing:
- `confirmed` refers to the total value of all transactions confirmed on the blockchain.
- `unconfirmed` includes all those same confirmed transactions as well as any
transactions still in the mempool. So the unconfirmed value isn't just the value of
pending transactions, it's what the total balance of the wallet WILL BE once all
pending transactions are confirmed.
"""
payload = {"token": wallet_token}
if account:
payload["account"] = account
balance = self.do_wallet_request(
f"wallet/{wallet_name}/balance", payload=payload
).json()
return balance
def send_transaction(
self, wallet_name, wallet_token, passphrase, amount_in_satoshis, output
):
"""
Create, sign, and send a transaction.
Returns blockchain hash of the created transaction.
Be careful how you enter values and fee rates!
`value` and `rate` are expressed in satoshis.
"""
payload = {
"token": wallet_token,
"passphrase": passphrase,
"depth": self.CONFIRMATIONS_REQUIRED,
"rate": self.SATOSHI_PER_KBYTE,
"outputs": [{"value": amount_in_satoshis, "address": output}],
"subtractFee": False, # whether to subtract fee from outputs (evenly)
}
resp = self.do_wallet_request(
f"wallet/{wallet_name}/send", "POST", payload
).json()
return resp["hash"]
def get_transaction(self, wallet_name, wallet_token, blockchain_hash):
"""
Get wallet transaction details.
"""
resp = self.do_wallet_request(
f"wallet/{wallet_name}/tx/{blockchain_hash}",
payload={"token": wallet_token},
).json()
return resp
def pending_transactions(self, wallet_name, wallet_token):
"""
Get pending wallet transactions.
Returns array of tx details.
"""
resp = self.do_wallet_request(
f"wallet/{wallet_name}/tx/unconfirmed", payload={"token": wallet_token}
).json()
return resp
def do_node_request(self, url_path="", verb="GET", payload=None):
"""
Helper method for doing HTTP request to the node API.
"""
if url_path:
url = f"{self._base_node_url}{url_path}"
else:
url = self._base_node_url
return self.do_request(self._node_api_key, verb, url, payload)
def do_wallet_request(self, url_path="", verb="GET", payload=None):
"""
Helper method for doing HTTP request to the wallet API.
"""
if url_path:
url = f"{self._base_wallet_url}{url_path}"
else:
url = self._base_wallet_url
return self.do_request(self._wallet_api_key, verb, url, payload)
def do_request(self, api_key, verb, url, payload):
"""
Helper method for doing authorized HTTP request.
"""
auth = ("x", api_key)
print(f"Requesting {url}")
if verb in ["POST", "PUT", "PATCH"]:
r = requests.request(verb, url, json=payload, auth=auth)
else:
r = requests.request(verb, url, params=payload, auth=auth)
if r.status_code != HTTPStatus.OK:
r.raise_for_status()
return r
@classmethod
def satoshis_to_btc(cls, amount):
"""
Convert Satoshis to BTC.
Examples:
format(BcoinClient.satoshis_to_btc(1), ".8f") # => 0.00000001
format(BcoinClient.satoshis_to_btc(100_000), ".8f") # => 0.00100000
format(BcoinClient.satoshis_to_btc(100_000_000), ".8f") # => 1.00000000
"""
return amount / cls.BTC_UNIT
@classmethod
def btc_to_satoshis(cls, amount):
"""
Convert BTC to Satoshis.
"""
return amount * cls.BTC_UNIT
@staticmethod
def mnemonic(lang="english", strength=128):
"""
Generate strong mnemonic BIP39 passphrase.
"""
m = Mnemonic(lang)
return m.generate(strength)
import logging
from decimal import ROUND_HALF_UP, Decimal
import requests
class CurrencyConverter:
"""
Currency converter based on European Central Bank rates.
"""
BASE_CURRENCY = "USD"
RATE_API_URL = f"https://api.exchangeratesapi.io/latest?base={BASE_CURRENCY}"
CENTS = Decimal(".01")
def __init__(self):
self.rates = CurrencyConverter.fetch_rates()
self.logger = logging.getLogger()
def __repr__(self):
return f"1 USD = {self.usd_eur()} EUR\n\t{self.usd_rub()} RUB"
def base_currency(self):
return self.BASE_CURRENCY
def usd_eur(self):
return self.rates["rates"]["EUR"]
def usd_rub(self):
return self.rates["rates"]["RUB"]
def from_eur(self, amount):
exchange_rate = self.usd_eur()
amount_usd = self._convert(amount, exchange_rate)
self.logger.info(f"CC: {amount} EUR has been converted to {amount_usd} USD")
return amount_usd
def from_rub(self, amount):
exchange_rate = self.usd_rub()
amount_usd = self._convert(amount, exchange_rate)
self.logger.info(f"CC: {amount} RUB has been converted to {amount_usd} USD")
return amount_usd
def _convert(self, amount, rate):
amount = Decimal(amount) / Decimal(rate)
return amount.quantize(self.CENTS, ROUND_HALF_UP)
@classmethod
def fetch_rates(cls):
r = requests.get(cls.RATE_API_URL)
return r.json()
import base64
import datetime
import hashlib
import hmac
import logging
from http import HTTPStatus
from urllib.parse import urlencode, urlparse
import requests
class HuobiClient:
"""
Simple wrapper around Huobi API.
Each API key can send maximum of 100 https requests within 10 seconds and expires in 90 days.
Contact Huobi support if you believe you need higher limit rate.
TODO: add method for fetching market price
"""
ENCODING = "utf-8"
SCHEMA = "https://"
def __init__(
self, access_key, secret_key, api_host="api.huobi.pro", api_version="/v1/"
):
self._access_key = access_key
self._secret_key = secret_key
self.api_host = api_host
self.api_version = api_version
self.logger = logging.getLogger()
def accounts(self):
"""
Get all accounts of the current client.
Response content:
- id | integer | Unique account id
- state | string | working, lock
- type | string | spot, margin, otc, point
"""
url_path = f"{self.api_version}account/accounts"
params = self._sign_request(url_path)
resp = self.do_request(url_path, params).json()
return resp["data"]
def account_balance(self, account_id, currency="usdt"):
"""
Get account balance of a specific account.
Response content:
- currency | string | usdt
- type | string | spot, margin, otc, point
- balance | number | >= 0, integer or float
"""
url_path = f"{self.api_version}account/accounts/{account_id}/balance"
params = self._sign_request(url_path)
resp = self.do_request(url_path, params).json()
balances = resp["data"]["list"]
usdt_balances = [
balance
for balance in balances
if balance["currency"] == currency and balance["type"] == "trade"
]
if len(usdt_balances) > 1:
raise "There are two active trading accounts! Huobi Client doesn't support that yet!"
return usdt_balances[0]
def aggregate_balance(self):
"""
Return the balances of all the sub-account aggregated.
"""
url_path = f"{self.api_version}subuser/aggregate-balance"
params = self._sign_request(url_path)
resp = self.do_request(url_path, params).json()
return resp["data"]
def open_orders(self):
"""
Get all open orders which have not been filled completely.
"""
url_path = f"{self.api_version}order/openOrders"
params = self._sign_request(url_path)
resp = self.do_request(url_path, params).json()
return resp["data"]
def place_order(self, account_id, amount):
"""
Place a new order and send to the exchange to be matched.
"""
url_path = f"{self.api_version}order/orders/place"
payload = {
"account-id": account_id,
"amount": amount,
"source": "api",
"symbol": "btcusdt",
"type": "buy-market",
}
signature_query_string = urlencode(self._sign_request(url_path, "POST"))
url_path = url_path + "?" + signature_query_string
resp = self.do_request(url_path, payload, "POST").json()
if resp["status"] == "ok":
order_id = resp["data"]
self.logger.info(f"Order {order_id} has been successfully placed")
return order_id
else:
self.logger.error(
f"Order placement has been failed: \"{resp['err-msg']}\". Request params: {payload}"
)
return resp
def order_details(self, order_id):
"""
Return the details of the order.
"""
url_path = f"{self.api_version}order/orders/{order_id}"
params = self._sign_request(url_path)
resp = self.do_request(url_path, params).json()
return resp["data"]
def do_request(self, url_path, params, verb="GET"):
"""
Helper method for doing authorized HTTP request.
"""
url = f"{self.SCHEMA}{self.api_host}{url_path}"
self.logger.info(f"Requesting {url}")
payload_key = "json" if verb in ["POST", "PUT", "PATCH"] else "params"
r = requests.request(verb, url, **{payload_key: params})
if r.status_code != HTTPStatus.OK:
self.logger.error(
f"Status: {r.status_code}, reason: {r.reason}, message: {r.json()}"
)
r.raise_for_status()
return r
def _sign_request(self, url_path, verb="GET"):
"""
To protect API communication from unauthorized change,
all non-public API calls are required to be signed.
https://huobiapi.github.io/docs/v1/en/#authentication
"""
url_params = self._signature_params()
url_params = dict(sorted(url_params.items()))
query_string = urlencode(url_params)
message = self._build_signature_message(verb, url_path, query_string)
url_params["Signature"] = self._build_signature(message)
return url_params
def _signature_params(self):
return {
"AccessKeyId": self._access_key,
"SignatureVersion": "2",
"SignatureMethod": "HmacSHA256",
"Timestamp": self.__class__.utc_now(),
}
def _build_signature_message(self, verb, url_path, query_string):
return f"{verb}\n{self.api_host}\n{url_path}\n{query_string}"
def _build_signature(self, message):
dig = hmac.new(
self._secret_key.encode(self.ENCODING),
msg=message.encode(self.ENCODING),
digestmod=hashlib.sha256,
).digest()
return base64.b64encode(dig).decode()
@staticmethod
def utc_now():
return datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment