Skip to content

Instantly share code, notes, and snippets.

@dev4Fun
dev4Fun / block7.py
Last active February 7, 2020 04:25
self.passed_submissions = read_pickled_set(f"submissions-{username}.pickle")
@dev4Fun
dev4Fun / block8.py
Last active February 7, 2020 04:26
def _process_submission(self, submission):
submission_id = submission.id
if submission_id not in self.passed_submissions:
...
# store.py
import os
import pickle
from pathlib import Path
persistence_dir = Path(os.path.dirname(os.path.abspath(__file__)))
def read_pickled_set(filename):
path = persistence_dir.joinpath(filename)
@dev4Fun
dev4Fun / block5.py
Last active February 7, 2020 04:26
submission_reply_list = ["Nice", "up up", "nice job, friend",
"I like your creativity", "Very nice"]
@dev4Fun
dev4Fun / block4.py
Last active February 7, 2020 04:27
def _process_submission(self, submission):
comments = submission.comments.list()
if len(comments) < 12:
submission_reply = random.choice(submission_reply_list)
submission.upvote()
submission.reply(submission_reply)
@dev4Fun
dev4Fun / block3.py
Last active February 7, 2020 04:27
def work_on_subreddit(self, subreddit: str, **generator_kwargs):
if 'limit' not in generator_kwargs:
generator_kwargs['limit'] = 60
submissions = list(self.reddit_client.subreddit(subreddit).new(**generator_kwargs))
for submission in submissions:
self._process_submission(submission)
# bot.py
import praw
class RedditBot:
def __init__(self, credentials: dict):
self.credentials = credentials
username = credentials['username']
self.credentials['user_agent'] = f"testscript by /u/{username}"
[{
"username": "YOUR_USERNAME",
"password": "YOUR_PASSWORD",
"client_id": "PASTE_CLIENT_ID",
"client_secret": "PASTE_CLIENT_SECRET"
}]
def process_trade(bot, update, user_data):
query = update.callback_query
if query.data == CONFIRM:
trade = TelegramBot.build_trade(user_data)
self._execute_trade(trade)
update.callback_query.message.reply_text(f'Scheduled: {trade}')
else:
show_help(bot, update)
def start_bot(self):
self.updater.start_polling()
@run_async
def _execute_trade(self, trade):
loop = asyncio.new_event_loop()
task = loop.create_task(self.trade_executor.execute_trade(trade))
loop.run_until_complete(task)
@staticmethod