from selenium import webdriver | |
from selenium.webdriver.common.keys import Keys | |
from time import sleep | |
import datetime | |
import pandas as pd | |
import numpy as np | |
from binance.client import Client | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.ensemble import RandomForestClassifier | |
class PanBot: | |
def __init__(self,balance,fees,bet): | |
self.balance = balance | |
self.fees = fees | |
self.driver = webdriver.Firefox() | |
self.data = {} | |
self.api = {'key':'PUBLIC_BINANCE_API_KEY','secret':'SECRET_BINANCE_API_KEY'} | |
self.model = None | |
self.first_round = True | |
self.round_number = 0 | |
self.scaler = None | |
self.bet = bet | |
self.next_id = 0 | |
self.last_div = 3 | |
self.scraped_result = False | |
def open(self): | |
self.driver.get("https://pancakeswap.finance/prediction") | |
self.driver.find_element_by_xpath("""//*[@id="checkbox"]""").click() | |
self.driver.find_element_by_xpath("""//*[@id="checkbox1"]""").click() | |
self.driver.find_element_by_xpath("/html/body/div[1]/div[1]/div[2]/div[2]/button").click() | |
sleep(8) | |
def get_prices(self,window): | |
client = Client(self.api['key'], self.api['secret']) | |
klines = client.get_historical_klines("BNBUSDT", Client.KLINE_INTERVAL_1MINUTE, window + " UTC") | |
klines_close = [kline[1] for kline in klines] # la colomne des valeurs de clotures est la colonne 1 | |
return klines_close | |
def train(self): | |
print("Creating model...") | |
self.model = RandomForestClassifier(max_depth=20, n_estimators=500) | |
print("Scraping prices...") | |
closep = self.get_prices("3 day") | |
X_l = [] | |
Y_l = [] | |
length = 5 | |
for i in range(length, len(closep) - length): | |
if closep[i] < closep[i + length]: | |
Y_l.append(1) | |
else: | |
Y_l.append(0) | |
X_l.append(closep[i - length:i]) | |
X = np.array(X_l) | |
Y = np.array(Y_l) | |
scaler = StandardScaler() | |
X = scaler.fit_transform(X) | |
self.scaler = scaler | |
print('Training model...') | |
self.model.fit(X,Y) | |
print("Model trained") | |
def scrape_next(self): | |
next_id = int(self.driver.find_element_by_css_selector(".fDqYQz").text[1:]) | |
self.next_id = next_id | |
next_win_odd_str = self.driver.find_element_by_css_selector("div.sc-fnebDD:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(2) > div:nth-child(1) > div:nth-child(1)").text | |
next_win_odd = float(next_win_odd_str[:-1].replace(',','.')) | |
next_lose_odd_str = self.driver.find_element_by_css_selector("div.sc-fnebDD:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(3) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1) > div:nth-child(1) > div:nth-child(1)").text | |
next_lose_odd = float(next_lose_odd_str[:-1].replace(',','.')) | |
current_price = float(self.driver.find_element_by_css_selector(".sc-dlnjwi").text[1:].replace(',','.')) | |
last_prices_brut = self.get_prices("5 minute") | |
while len(last_prices_brut) < 5: | |
sleep(1) | |
print("Error while scraping prices") | |
last_prices_brut = self.get_prices("5 minute") | |
last_prices = self.scaler.transform(np.array([last_prices_brut])) | |
if next_id not in self.data: | |
self.data[next_id] = { "win odd" : next_win_odd, | |
"lose odd": next_lose_odd, | |
"current price": current_price, | |
"last prices": last_prices, | |
"bet": None, | |
"final odd": None, | |
"final closed price": None, | |
"final locked price": None, | |
"prediction": None, | |
"result": None, | |
"computed": False, | |
} | |
self.predict(next_id) | |
print("Id " + str(next_id) + " scraped") | |
else: | |
print("Id " + str(next_id) + " already scraped") | |
def scrape_result(self): | |
last_id = int(self.driver.find_element_by_css_selector("div.swiper-slide:nth-child("+ str(self.last_div) +") > div:nth-child(1) > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1)").text[1:]) | |
if last_id in self.data and self.data[last_id]['computed'] == False: | |
final_win_odd_str = self.driver.find_element_by_css_selector("div.swiper-slide:nth-child("+ str(self.last_div) +") > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(2) > div:nth-child(1) > div:nth-child(1)").text | |
#final_win_odd = float(final_win_odd_str[:-1].replace(',','.')) | |
final_lose_odd_str = self.driver.find_element_by_css_selector("div.swiper-slide:nth-child("+ str(self.last_div) +") > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(3) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1) > div:nth-child(1) > div:nth-child(1)").text | |
#final_lose_odd = float(final_lose_odd_str[:-1].replace(',','.')) | |
final_closed_price_str = self.driver.find_element_by_css_selector("div.swiper-slide:nth-child("+ str(self.last_div) +") > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(2) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1)").text | |
#final_closed_price = float(final_closed_price_str[:-1].replace(',','.')) | |
final_locked_price_str = self.driver.find_element_by_css_selector("div.swiper-slide:nth-child("+ str(self.last_div) +") > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(2) > div:nth-child(1) > div:nth-child(3) > div:nth-child(2)").text | |
final_win_odd = float(final_win_odd_str[:-1].replace(',', '.')) | |
final_lose_odd = float(final_lose_odd_str[:-1].replace(',', '.')) | |
final_closed_price = float(final_closed_price_str[1:].replace(',', '.')) | |
final_locked_price = float(final_locked_price_str[1:].replace(',', '.')) | |
prediction = self.data[last_id]["prediction"] | |
temp_dic = self.data[last_id].copy() | |
if prediction == 1 : | |
temp_dic["final odd"] = final_win_odd | |
else: | |
temp_dic["final odd"] = final_lose_odd | |
temp_dic["final closed price"] = final_closed_price | |
temp_dic["final locked price"] = final_locked_price | |
if final_closed_price > final_locked_price and prediction == 1: | |
temp_dic["result"] = 1 | |
result = "Success" | |
elif final_closed_price < final_locked_price and prediction == 0: | |
temp_dic["result"] = 1 | |
result = "Success" | |
else: | |
temp_dic["result"] = 0 | |
result = "Fail" | |
self.data[last_id] = temp_dic | |
print("Result updated for " + str(last_id) + " (Final price, final odd: " + str(final_closed_price) + ", " + str(temp_dic["final odd"])) | |
print(result) | |
self.compute_gains(last_id) | |
else: | |
print("Id " + str(last_id) + " not found or already updated") | |
self.scraped_result = True | |
def predict(self,id): | |
X = self.data[id]["last prices"] | |
prediction = self.model.predict(X)[0] | |
if prediction == 0: | |
if self.data[id]["lose odd"] > 1: | |
self.data[id]['bet'] = True | |
print("Prediction " + str(id) + " : Down " + str(self.data[id]["lose odd"])) | |
else: | |
self.data[id]['bet'] = False | |
print("Odd too low" + str(id) + " : Down " + str(self.data[id]["lose odd"])) | |
else: | |
if self.data[id]["win odd"] > 1: | |
self.data[id]['bet'] = True | |
print("Prediction " + str(id) + " : Up " + str(self.data[id]["win odd"])) | |
else: | |
self.data[id]['bet'] = False | |
print("Odd too low " + str(id) + " : " + str(self.data[id]["win odd"])) | |
self.data[id]["prediction"] = prediction | |
def compute_gains(self,id): | |
data_id = self.data[id] | |
if data_id["computed"] == True: | |
return print("Reward already computed for " + str(id)) | |
if data_id["result"] == 1 and data_id["bet"] == True: | |
odd = data_id["final odd"] | |
win = (odd-1)*self.bet - 2*self.fees | |
self.balance += win | |
print("Earnings " + str(id) + " : " + str(win)) | |
elif data_id["result"] == 0 and data_id["bet"] == True: | |
lose = (self.bet + self.fees) | |
self.balance -= lose | |
print("Loss" + str(id) + " : " + str(lose)) | |
data_id["computed"] = True | |
data_id["balance"] = self.balance | |
print("Balance:" + str(self.balance)) | |
print("Round: " + str(self.round_number)) | |
def run(self): | |
print("Init...") | |
while True: | |
round_number = self.round_number | |
changed_div = False | |
if round_number == 0: | |
self.open() | |
self.train() | |
time = self.driver.find_element_by_css_selector(".bVVZDK").text | |
if time == "Closing": | |
while time == "Closing": | |
print("Closing...") | |
sleep(5) | |
time = self.driver.find_element_by_css_selector(".bVVZDK").text | |
while int(time.replace(":","")) > 10: | |
print("Waiting for time <10s") | |
sleep(5) | |
time = self.driver.find_element_by_css_selector(".bVVZDK").text | |
self.scrape_next() | |
self.round_number += 1 | |
sleep(10) | |
else: | |
time = self.driver.find_element_by_css_selector(".bVVZDK").text | |
self.scrape_result() | |
sleep(1) | |
if time == "Closing": | |
while time == "Closing": | |
print("Closing...") | |
sleep(5) | |
time = self.driver.find_element_by_css_selector(".bVVZDK").text | |
self.last_div += 1 | |
while int(time.replace(":","")) > 10: | |
print("Waiting for time <10s") | |
sleep(4) | |
time = self.driver.find_element_by_css_selector(".bVVZDK").text | |
self.scrape_next() | |
self.scrape_result() | |
self.round_number += 1 | |
sleep(10) | |
bot = PanBot(1,0.001,0.1) | |
bot.run() |
Hi Abdel, please send me a message on twitter @pierre_ia to have more details!
Hi Pierre,
I can't find you on twitter :-(.
My twitter is @GGooglebox.
Thanks in advance for your help.
BR,
Hi again :-),
I have this error:
(pancakeswap_prediction) a@a-VirtualBox:~/environments$ python bot.py
Traceback (most recent call last):
File "/home/a/environments/bot.py", line 254, in
bot = PanBot(1,0.001,0.1)
File "/home/a/environments/bot.py", line 15, in init
self.driver = webdriver.Firefox()
File "/home/a/environments/pancakeswap_prediction/lib/python3.9/site-packages/selenium/webdriver/firefox/webdriver.py", line 170, in init
RemoteWebDriver.init(
File "/home/a/environments/pancakeswap_prediction/lib/python3.9/site-packages/selenium/webdriver/remote/webdriver.py", line 157, in init
self.start_session(capabilities, browser_profile)
File "/home/a/environments/pancakeswap_prediction/lib/python3.9/site-packages/selenium/webdriver/remote/webdriver.py", line 252, in start_session
response = self.execute(Command.NEW_SESSION, parameters)
File "/home/a/environments/pancakeswap_prediction/lib/python3.9/site-packages/selenium/webdriver/remote/webdriver.py", line 321, in execute
self.error_handler.check_response(response)
File "/home/a/environments/pancakeswap_prediction/lib/python3.9/site-packages/selenium/webdriver/remote/errorhandler.py", line 242, in check_response
raise exception_class(message, screen, stacktrace)
selenium.common.exceptions.WebDriverException: Message: Process unexpectedly closed with status 1
You think because the prediction feature on pancakeswap is in paused state right now ?
Thanks,
BR
and in the geckodriver.log i have this :
1628158990182 mozrunner::runner INFO Running command: "/usr/bin/firefox" "--marionette" "-foreground" "-no-remote" "-profile" "/tmp/rust_mozprofileJbPAqW"
Error: no DISPLAY environment variable specified
Hi,
Finaly I made it work :-), the prediction is back today yeah.
I had an error on this div.sc-fnebDD element location, can you help to locate the new one ?
Thanks Pierre
@abdel2991 @pierreia Hi Guys , i'm almost running it but the problem i have now is that the gecko driver gets detected and firefox enters in a marionette mode, which doesn't let me connect wallet etc. Any way to bypass this?
Hello,
How can I use this ?
Thanks