Skip to content

Instantly share code, notes, and snippets.

@supervik
Created November 3, 2023 08:41
Show Gist options
  • Save supervik/f13965c243b50e0bb9037188ffc5192f to your computer and use it in GitHub Desktop.
Save supervik/f13965c243b50e0bb9037188ffc5192f to your computer and use it in GitHub Desktop.
API transfer to Kucoin inner accounts
import json
import requests
import hashlib
import time
import base64
import hmac
import uuid
class KucoinAPI:
def __init__(self, api_key, api_secret, api_passphrase):
self.api_key = api_key
self.api_secret = api_secret
self.api_passphrase = api_passphrase
def _generate_signature(self, data_to_sign):
"""Generate HMAC signature for the provided data."""
return base64.b64encode(
hmac.new(self.api_secret.encode('utf-8'), data_to_sign.encode('utf-8'), hashlib.sha256).digest()
)
def _send_request(self, method, endpoint, data=None):
"""Send a request to the Kucoin API."""
url = "https://api.kucoin.com" + endpoint
now = int(time.time() * 1000)
str_to_sign = str(now) + method + endpoint
if data:
str_to_sign += json.dumps(data)
signature = self._generate_signature(str_to_sign)
passphrase = self._generate_signature(self.api_passphrase)
headers = {
"KC-API-KEY": self.api_key,
"KC-API-PASSPHRASE": passphrase,
"KC-API-TIMESTAMP": str(now),
"KC-API-SIGN": signature,
"KC-API-KEY-VERSION": "2",
"Content-Type": "application/json"
}
if method == 'POST':
response = requests.post(url, headers=headers, json=data)
elif method == 'DELETE':
response = requests.delete(url, headers=headers, params=data)
else:
raise ValueError('Invalid HTTP method')
response.raise_for_status()
return response.json()
def inner_transfer(self, currency, amount, from_account, to_account, from_tag=None, to_tag=None):
"""Perform an inner transfer between Kucoin accounts."""
data = {
"clientOid": str(uuid.uuid4()),
"currency": currency,
"from": from_account,
"to": to_account,
"amount": amount
}
if from_tag:
data["fromTag"] = from_tag
if to_tag:
data["toTag"] = to_tag
return self._send_request('POST', '/api/v2/accounts/inner-transfer', data)
def cancel_all_orders(self, symbol):
"""Cancel all orders for a specific trading pair symbol."""
endpoint = f"/api/v1/hf/orders?symbol={symbol}"
return self._send_request('DELETE', endpoint)
def main():
# API keys
api_key = "KEY"
api_secret = "SECRET"
api_passphrase = "PASSPHRASE"
api = KucoinAPI(api_key=api_key, api_secret=api_secret, api_passphrase=api_passphrase)
# Inner transfer
currency = "USDT"
amount = "0.01"
from_account = "main"
to_account = "trade_hf"
response_transfer = api.inner_transfer(currency, amount, from_account, to_account)
print(response_transfer)
# Cancel all orders
# response_cancel = api.cancel_all_orders("KCS-USDT")
# print(response_cancel)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment