Created
November 3, 2023 08:41
-
-
Save supervik/f13965c243b50e0bb9037188ffc5192f to your computer and use it in GitHub Desktop.
API transfer to Kucoin inner accounts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
import hashlib | |
import time | |
import base64 | |
import hmac | |
import uuid | |
class KucoinAPI: | |
def __init__(self, api_key, api_secret, api_passphrase): | |
self.api_key = api_key | |
self.api_secret = api_secret | |
self.api_passphrase = api_passphrase | |
def _generate_signature(self, data_to_sign): | |
"""Generate HMAC signature for the provided data.""" | |
return base64.b64encode( | |
hmac.new(self.api_secret.encode('utf-8'), data_to_sign.encode('utf-8'), hashlib.sha256).digest() | |
) | |
def _send_request(self, method, endpoint, data=None): | |
"""Send a request to the Kucoin API.""" | |
url = "https://api.kucoin.com" + endpoint | |
now = int(time.time() * 1000) | |
str_to_sign = str(now) + method + endpoint | |
if data: | |
str_to_sign += json.dumps(data) | |
signature = self._generate_signature(str_to_sign) | |
passphrase = self._generate_signature(self.api_passphrase) | |
headers = { | |
"KC-API-KEY": self.api_key, | |
"KC-API-PASSPHRASE": passphrase, | |
"KC-API-TIMESTAMP": str(now), | |
"KC-API-SIGN": signature, | |
"KC-API-KEY-VERSION": "2", | |
"Content-Type": "application/json" | |
} | |
if method == 'POST': | |
response = requests.post(url, headers=headers, json=data) | |
elif method == 'DELETE': | |
response = requests.delete(url, headers=headers, params=data) | |
else: | |
raise ValueError('Invalid HTTP method') | |
response.raise_for_status() | |
return response.json() | |
def inner_transfer(self, currency, amount, from_account, to_account, from_tag=None, to_tag=None): | |
"""Perform an inner transfer between Kucoin accounts.""" | |
data = { | |
"clientOid": str(uuid.uuid4()), | |
"currency": currency, | |
"from": from_account, | |
"to": to_account, | |
"amount": amount | |
} | |
if from_tag: | |
data["fromTag"] = from_tag | |
if to_tag: | |
data["toTag"] = to_tag | |
return self._send_request('POST', '/api/v2/accounts/inner-transfer', data) | |
def cancel_all_orders(self, symbol): | |
"""Cancel all orders for a specific trading pair symbol.""" | |
endpoint = f"/api/v1/hf/orders?symbol={symbol}" | |
return self._send_request('DELETE', endpoint) | |
def main(): | |
# API keys | |
api_key = "KEY" | |
api_secret = "SECRET" | |
api_passphrase = "PASSPHRASE" | |
api = KucoinAPI(api_key=api_key, api_secret=api_secret, api_passphrase=api_passphrase) | |
# Inner transfer | |
currency = "USDT" | |
amount = "0.01" | |
from_account = "main" | |
to_account = "trade_hf" | |
response_transfer = api.inner_transfer(currency, amount, from_account, to_account) | |
print(response_transfer) | |
# Cancel all orders | |
# response_cancel = api.cancel_all_orders("KCS-USDT") | |
# print(response_cancel) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment