Skip to content

Instantly share code, notes, and snippets.

@jongan69
Created July 8, 2024 05:27
Show Gist options
  • Save jongan69/d615720f647e2ffea4113ed93203cb2f to your computer and use it in GitHub Desktop.
Save jongan69/d615720f647e2ffea4113ed93203cb2f to your computer and use it in GitHub Desktop.
/lock_in buy bot
TELEGRAM_BOT_TOKEN=your-telegram-bot-token
PRIVATE_KEY=your-wallet-private-key
SOLANA_RPC_ENDPOINT=https://api.mainnet-beta.solana.com
FIXED_AMOUNT=1 # Replace with the fixed amount you want to buy every time
import base64
import json
import os
from dotenv import load_dotenv
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Processed
from solana.rpc.types import TxOpts
from solders.keypair import Keypair
from solders import message
from solders.transaction import VersionedTransaction
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
from jupiter_python_sdk.jupiter import Jupiter
import base58
# Load environment variables
load_dotenv()
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
PRIVATE_KEY = Keypair.from_bytes(base58.b58decode(os.getenv("PRIVATE_KEY")))
SOLANA_RPC_ENDPOINT = os.getenv("SOLANA_RPC_ENDPOINT")
FIXED_AMOUNT = int(os.getenv("FIXED_AMOUNT"))
# Mint addresses
INPUT_TOKEN_MINT = "So11111111111111111111111111111111111111112"
OUTPUT_TOKEN_MINT = "8Ki8DpuWNxu9VsS3kQbarsCWMcFGWkzzA8pUPto9zBd5"
# Initialize AsyncClient and Jupiter
async_client = AsyncClient(SOLANA_RPC_ENDPOINT)
jupiter = Jupiter(
async_client=async_client,
keypair=PRIVATE_KEY,
quote_api_url="https://quote-api.jup.ag/v6/quote?",
swap_api_url="https://quote-api.jup.ag/v6/swap",
open_order_api_url="https://jup.ag/api/limit/v1/createOrder",
cancel_orders_api_url="https://jup.ag/api/limit/v1/cancelOrders",
query_open_orders_api_url="https://jup.ag/api/limit/v1/openOrders?wallet=",
query_order_history_api_url="https://jup.ag/api/limit/v1/orderHistory",
query_trade_history_api_url="https://jup.ag/api/limit/v1/tradeHistory")
async def buy_token(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle the /buy command to buy a fixed amount of a token."""
try:
# Execute the swap transaction
transaction_data = await jupiter.swap(
input_mint=INPUT_TOKEN_MINT,
output_mint=OUTPUT_TOKEN_MINT,
amount=FIXED_AMOUNT,
slippage_bps=1,
)
raw_transaction = VersionedTransaction.from_bytes(
base64.b64decode(transaction_data))
signature = PRIVATE_KEY.sign_message(
message.to_bytes_versioned(raw_transaction.message))
signed_txn = VersionedTransaction.populate(raw_transaction.message,
[signature])
opts = TxOpts(skip_preflight=False, preflight_commitment=Processed)
result = await async_client.send_raw_transaction(txn=bytes(signed_txn),
opts=opts)
transaction_id = json.loads(result.to_json())['result']
# Respond with the transaction details
await update.message.reply_text(
f"Purchased token. Transaction ID: https://explorer.solana.com/tx/{transaction_id}"
)
except Exception as e:
await update.message.reply_text(f"An error occurred: {str(e)}")
def main():
# Initialize the Telegram bot application
application = Application.builder().token(BOT_TOKEN).build()
# Add command handler for /buy
application.add_handler(CommandHandler("lock_in", buy_token))
# Start the bot
print("Bot started with /buy command functionality!")
application.run_polling()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment