-
-
Save pierreia/967feaf15c62d6fc53c2173071928ddb to your computer and use it in GitHub Desktop.
PancakeBot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from selenium import webdriver | |
from selenium.webdriver.common.keys import Keys | |
from time import sleep | |
import datetime | |
import pandas as pd | |
import numpy as np | |
from binance.client import Client | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.ensemble import RandomForestClassifier | |
class PanBot: | |
def __init__(self,balance,fees,bet): | |
self.balance = balance | |
self.fees = fees | |
self.driver = webdriver.Firefox() | |
self.data = {} | |
self.api = {'key':'PUBLIC_BINANCE_API_KEY','secret':'SECRET_BINANCE_API_KEY'} | |
self.model = None | |
self.first_round = True | |
self.round_number = 0 | |
self.scaler = None | |
self.bet = bet | |
self.next_id = 0 | |
self.last_div = 3 | |
self.scraped_result = False | |
def open(self): | |
self.driver.get("https://pancakeswap.finance/prediction") | |
self.driver.find_element_by_xpath("""//*[@id="checkbox"]""").click() | |
self.driver.find_element_by_xpath("""//*[@id="checkbox1"]""").click() | |
self.driver.find_element_by_xpath("/html/body/div[1]/div[1]/div[2]/div[2]/button").click() | |
sleep(8) | |
def get_prices(self,window): | |
client = Client(self.api['key'], self.api['secret']) | |
klines = client.get_historical_klines("BNBUSDT", Client.KLINE_INTERVAL_1MINUTE, window + " UTC") | |
klines_close = [kline[1] for kline in klines] # la colomne des valeurs de clotures est la colonne 1 | |
return klines_close | |
def train(self): | |
print("Creating model...") | |
self.model = RandomForestClassifier(max_depth=20, n_estimators=500) | |
print("Scraping prices...") | |
closep = self.get_prices("3 day") | |
X_l = [] | |
Y_l = [] | |
length = 5 | |
for i in range(length, len(closep) - length): | |
if closep[i] < closep[i + length]: | |
Y_l.append(1) | |
else: | |
Y_l.append(0) | |
X_l.append(closep[i - length:i]) | |
X = np.array(X_l) | |
Y = np.array(Y_l) | |
scaler = StandardScaler() | |
X = scaler.fit_transform(X) | |
self.scaler = scaler | |
print('Training model...') | |
self.model.fit(X,Y) | |
print("Model trained") | |
def scrape_next(self): | |
next_id = int(self.driver.find_element_by_css_selector(".fDqYQz").text[1:]) | |
self.next_id = next_id | |
next_win_odd_str = self.driver.find_element_by_css_selector("div.sc-fnebDD:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(2) > div:nth-child(1) > div:nth-child(1)").text | |
next_win_odd = float(next_win_odd_str[:-1].replace(',','.')) | |
next_lose_odd_str = self.driver.find_element_by_css_selector("div.sc-fnebDD:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(3) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1) > div:nth-child(1) > div:nth-child(1)").text | |
next_lose_odd = float(next_lose_odd_str[:-1].replace(',','.')) | |
current_price = float(self.driver.find_element_by_css_selector(".sc-dlnjwi").text[1:].replace(',','.')) | |
last_prices_brut = self.get_prices("5 minute") | |
while len(last_prices_brut) < 5: | |
sleep(1) | |
print("Error while scraping prices") | |
last_prices_brut = self.get_prices("5 minute") | |
last_prices = self.scaler.transform(np.array([last_prices_brut])) | |
if next_id not in self.data: | |
self.data[next_id] = { "win odd" : next_win_odd, | |
"lose odd": next_lose_odd, | |
"current price": current_price, | |
"last prices": last_prices, | |
"bet": None, | |
"final odd": None, | |
"final closed price": None, | |
"final locked price": None, | |
"prediction": None, | |
"result": None, | |
"computed": False, | |
} | |
self.predict(next_id) | |
print("Id " + str(next_id) + " scraped") | |
else: | |
print("Id " + str(next_id) + " already scraped") | |
def scrape_result(self): | |
last_id = int(self.driver.find_element_by_css_selector("div.swiper-slide:nth-child("+ str(self.last_div) +") > div:nth-child(1) > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1)").text[1:]) | |
if last_id in self.data and self.data[last_id]['computed'] == False: | |
final_win_odd_str = self.driver.find_element_by_css_selector("div.swiper-slide:nth-child("+ str(self.last_div) +") > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(2) > div:nth-child(1) > div:nth-child(1)").text | |
#final_win_odd = float(final_win_odd_str[:-1].replace(',','.')) | |
final_lose_odd_str = self.driver.find_element_by_css_selector("div.swiper-slide:nth-child("+ str(self.last_div) +") > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(3) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1) > div:nth-child(1) > div:nth-child(1)").text | |
#final_lose_odd = float(final_lose_odd_str[:-1].replace(',','.')) | |
final_closed_price_str = self.driver.find_element_by_css_selector("div.swiper-slide:nth-child("+ str(self.last_div) +") > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(2) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1)").text | |
#final_closed_price = float(final_closed_price_str[:-1].replace(',','.')) | |
final_locked_price_str = self.driver.find_element_by_css_selector("div.swiper-slide:nth-child("+ str(self.last_div) +") > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(2) > div:nth-child(1) > div:nth-child(3) > div:nth-child(2)").text | |
final_win_odd = float(final_win_odd_str[:-1].replace(',', '.')) | |
final_lose_odd = float(final_lose_odd_str[:-1].replace(',', '.')) | |
final_closed_price = float(final_closed_price_str[1:].replace(',', '.')) | |
final_locked_price = float(final_locked_price_str[1:].replace(',', '.')) | |
prediction = self.data[last_id]["prediction"] | |
temp_dic = self.data[last_id].copy() | |
if prediction == 1 : | |
temp_dic["final odd"] = final_win_odd | |
else: | |
temp_dic["final odd"] = final_lose_odd | |
temp_dic["final closed price"] = final_closed_price | |
temp_dic["final locked price"] = final_locked_price | |
if final_closed_price > final_locked_price and prediction == 1: | |
temp_dic["result"] = 1 | |
result = "Success" | |
elif final_closed_price < final_locked_price and prediction == 0: | |
temp_dic["result"] = 1 | |
result = "Success" | |
else: | |
temp_dic["result"] = 0 | |
result = "Fail" | |
self.data[last_id] = temp_dic | |
print("Result updated for " + str(last_id) + " (Final price, final odd: " + str(final_closed_price) + ", " + str(temp_dic["final odd"])) | |
print(result) | |
self.compute_gains(last_id) | |
else: | |
print("Id " + str(last_id) + " not found or already updated") | |
self.scraped_result = True | |
def predict(self,id): | |
X = self.data[id]["last prices"] | |
prediction = self.model.predict(X)[0] | |
if prediction == 0: | |
if self.data[id]["lose odd"] > 1: | |
self.data[id]['bet'] = True | |
print("Prediction " + str(id) + " : Down " + str(self.data[id]["lose odd"])) | |
else: | |
self.data[id]['bet'] = False | |
print("Odd too low" + str(id) + " : Down " + str(self.data[id]["lose odd"])) | |
else: | |
if self.data[id]["win odd"] > 1: | |
self.data[id]['bet'] = True | |
print("Prediction " + str(id) + " : Up " + str(self.data[id]["win odd"])) | |
else: | |
self.data[id]['bet'] = False | |
print("Odd too low " + str(id) + " : " + str(self.data[id]["win odd"])) | |
self.data[id]["prediction"] = prediction | |
def compute_gains(self,id): | |
data_id = self.data[id] | |
if data_id["computed"] == True: | |
return print("Reward already computed for " + str(id)) | |
if data_id["result"] == 1 and data_id["bet"] == True: | |
odd = data_id["final odd"] | |
win = (odd-1)*self.bet - 2*self.fees | |
self.balance += win | |
print("Earnings " + str(id) + " : " + str(win)) | |
elif data_id["result"] == 0 and data_id["bet"] == True: | |
lose = (self.bet + self.fees) | |
self.balance -= lose | |
print("Loss" + str(id) + " : " + str(lose)) | |
data_id["computed"] = True | |
data_id["balance"] = self.balance | |
print("Balance:" + str(self.balance)) | |
print("Round: " + str(self.round_number)) | |
def run(self): | |
print("Init...") | |
while True: | |
round_number = self.round_number | |
changed_div = False | |
if round_number == 0: | |
self.open() | |
self.train() | |
time = self.driver.find_element_by_css_selector(".bVVZDK").text | |
if time == "Closing": | |
while time == "Closing": | |
print("Closing...") | |
sleep(5) | |
time = self.driver.find_element_by_css_selector(".bVVZDK").text | |
while int(time.replace(":","")) > 10: | |
print("Waiting for time <10s") | |
sleep(5) | |
time = self.driver.find_element_by_css_selector(".bVVZDK").text | |
self.scrape_next() | |
self.round_number += 1 | |
sleep(10) | |
else: | |
time = self.driver.find_element_by_css_selector(".bVVZDK").text | |
self.scrape_result() | |
sleep(1) | |
if time == "Closing": | |
while time == "Closing": | |
print("Closing...") | |
sleep(5) | |
time = self.driver.find_element_by_css_selector(".bVVZDK").text | |
self.last_div += 1 | |
while int(time.replace(":","")) > 10: | |
print("Waiting for time <10s") | |
sleep(4) | |
time = self.driver.find_element_by_css_selector(".bVVZDK").text | |
self.scrape_next() | |
self.scrape_result() | |
self.round_number += 1 | |
sleep(10) | |
bot = PanBot(1,0.001,0.1) | |
bot.run() |
@abdel2991 @pierreia Hi Guys , i'm almost running it but the problem i have now is that the gecko driver gets detected and firefox enters in a marionette mode, which doesn't let me connect wallet etc. Any way to bypass this?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi,
Finaly I made it work :-), the prediction is back today yeah.
I had an error on this div.sc-fnebDD element location, can you help to locate the new one ?
Thanks Pierre