Skip to content

Instantly share code, notes, and snippets.

@ELI7VH
Created May 13, 2020 19:15
Show Gist options
  • Save ELI7VH/cb6330f26f4b8e57aed995be84590220 to your computer and use it in GitHub Desktop.
Save ELI7VH/cb6330f26f4b8e57aed995be84590220 to your computer and use it in GitHub Desktop.
The Dankest of twitch bots!!
#!/usr/bin/env python
import socket,sys,subprocess,os,json,pygame,time,bs4,requests,random,fnmatch,urllib,tweepy,datetime,sqlite3
from glob import glob
print 'dankbot is born...'
#establish working and database paths
homedir = '/home/pi/dankbot/'
dirsb = '/home/pi/soundboard/'
db_file = '/var/www/dank.db'
counter = time.time()
#print(time.strftime('%Y-%m-%d %H:%M:%S'))
with open('/home/pi/dankbot/serverinfo/serverinfo.json') as data_file:
data = json.load(data_file)
#irc login info
server = data["server"]
channel = data["channel"]
nickname = data["nickname"]
password = data["password"]
clientid = data["clientid"]
#twitch oauth for vapsquad
clientsecret = data["clientsecret"]
#twitter variables
consumer_key = data["CONSUMER_KEY_HERE"]
consumer_secret = data["CONSUMER_SECRET_HERE"]
access_token = data["ACCESS_TOKEN_HERE"]
access_token_secret = data["ACCESS_TOKEN_SECRET"]
#set api variables
irc = socket .socket(socket.AF_INET, socket.SOCK_STREAM)
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
twitterapi = tweepy.API(auth)
#init sound and load soundboard filenames but not subdirectories
pygame.mixer.init(frequency=44100, size=-16, channels=1, buffer=4096)
soundlist = [f for f in os.listdir(dirsb) if os.path.isfile(os.path.join(dirsb, f))]
soundlist.sort()
#list subfolders and emotes
dankMemes = [p[20:-1] for p in glob(dirsb+'*/')]
dankEmote = [l for l in dankMemes if 'dank' in l]
#get sub names, admins, laughtrack text and random quotes
dankSubs = open(homedir + 'subs.txt', 'r').read().splitlines()
dankAdmin = open(homedir + 'admin.txt', 'r').read().splitlines()
laughtrack = open(homedir + 'lol.txt', 'r').read().splitlines()
adminCommands = open(homedir + 'admincommands.txt', 'r').read().splitlines()
dankChatters = []
#initialize strings and shit
dankRando = []
subsonly = 1
dankBoard = {}
soundboard = {}
dankMemeDict = {}
numcheck = {}
num=0
#create dictionary of dankMemes, dirsb+dankmemes+'/' - add this to new thing
def createdankmemes():
#get list of dankmemes from directories
txtfile=open(homedir+"dankMemes.txt","w+")
print "Checking File names and updating dictionary ...",
#for each dank meme
for m in dankMemes:
num=0
txtfile.write(m+'\r\n')
memedir = dirsb+m+"/"
dankRando.extend([m])
#and each file in each meme folder
for fname in os.listdir(memedir):
if fname.find(m) != -1:
num+=1
else:
continue
for fname in os.listdir(memedir):
if fname.find(m) != -1:
continue
#print m+' file skipped'
else:
num+=1
os.rename(memedir+fname,memedir+m+str(num)+'.wav')
print fname+'file renamed to ' +memedir+m+str(num)+'.wav'
#crete dankrando dictionary.
fileNames = [f for f in os.listdir(memedir) if os.path.isfile(os.path.join(memedir, f))]
dankMemeDict[m] = fileNames
print "Done!"
txtfile.close()
createdankmemes()
#define database functions and connect to database
conn = sqlite3.connect(db_file)
c = conn.cursor()
#create new table
def createtable():
c.execute("""CREATE TABLE chatlog (
username text,
message text,
timeofday text
)""")
return
#createtable()
def updatechatlog(name, messagebody, tod):
c.execute("INSERT INTO chatlog VALUES (:username, :message, :timeofday)",
{'username': name,'message': messagebody,'timeofday': tod})
return
def updatechatranks():
return
def dankConnect():
print"Connecting to server ...",
irc.connect((server, 6667))
irc.send("USER "+ nickname + "\n")
irc.send("PASS "+ password +"\n")
irc.send("NICK "+ nickname + "\n")
irc.send("JOIN "+ channel +"\n")
print"Connected!"
return
def dankbotmsg(output):
irc.send('PRIVMSG '+ channel +' :' + output + '\n')
return
dankConnect()
#add to dankRando from various sources
print "loading soundboards ...",
time.sleep(0.1)
# define and load the main soundboards
def loadsoundboards():
x = open(homedir + 'dankMemes.txt', 'r').read().splitlines()
for m in x:
dankRando.extend([m])
x = open(homedir + 'vapsquadidle.txt', 'r').read().splitlines()
for m in x:
dankRando.extend([m])
for sounditem in soundlist:
soundboard[sounditem] = pygame.mixer.Sound(dirsb+sounditem)
#make website
with open('/var/www/html/soundboard.html','w') as file:
file.write('<html><link href="https://fonts.googleapis.com/css?family=PT+Sans" rel="stylesheet"><link rel="stylesheet" href="./style.css" /><body><p>')
file.writelines(['<div class="emote">' + emoteName + '</div>' for emoteName in dankEmote])
for memeName in set(dankMemes).difference(dankEmote):
file.writelines(['<div class="memes">' + memeName + '</div>'])
file.writelines(['<div class="sound">!sb ' + sound[0:-4] + '</div>' for sound in soundlist])
file.write('</p></body></html>')
#load dankBoards
for k,v in dankMemeDict.iteritems():
for f in v:
ddir = dirsb+k+'/'
dankBoard[f] = pygame.mixer.Sound(ddir+f)
print "done!"
return
loadsoundboards()
#future integration of twitch API - get followers etc.
def getsubs():
return
def getfollowers():
return
def takepicandtweet():
return
print 'dankbot waits...'
while 1:
if time.time() - counter > 300:
#keepalive()
output = random.choice(dankRando)
irc.send('PRIVMSG '+ channel +' :' + output + '\n')
counter = time.time()
if output in dankMemes:
dankBoard[random.choice(dankMemeDict[output])].play()
buffer = 2048
text = irc.recv(buffer)
if text.find('PING') != -1:
irc.send('PONG ' + text.split() [1] + '\r\n')
print 'dankbot waits...'
if text is None:
dankConnect()
continue
if text.find('PRIVMSG') == -1:
continue
#after privmsg tag, split message into the parts
parts = text.rstrip().split(':', 2)
messagebody = parts[2]
words = messagebody.split()
#get user name to check againts subscriber
namestr = parts[1]
namepart = namestr.split('!', 1)
name = namepart[0]
tod = time.strftime('%Y-%m-%d %H:%M:%S')
#enter message into db
updatechatlog(name, messagebody, tod)
conn.commit()
#print name and message for logging purposes.
print '>> ' + name + ': ' + messagebody
#playsoundboard()
#trigger soundboard
if messagebody.find('!sb ') != -1:
messagesplit = messagebody.split(' ', 1)
soundfile = messagesplit[1]
wavename = soundfile+".wav"
if soundfile == 'rando':
#print soundboard
soundboard[random.choice(soundlist)].play()
print "rando"
continue
if subsonly == 1 and name in dankSubs or subsonly == 0:
if wavename in soundlist:
#print wavename
soundboard[wavename].play()
print "sb play"
continue
if wavename not in soundlist:
dankbotmsg("That sound is not in our soundboard, visit vapsquad.club:8080/soundboard/ to see our list or request one a subscription or donation.")
print "not in list"
if subsonly == 1 and wavename in soundlist:
dankbotmsg("Want to hit specific sounds? Just hit that sub button and you get access to the dankest soundboard on the internet!!! if you want a taste? type !sb rando")
print "sub, bitch"
continue
#dank memes have been spoken
for k in words:
if k in laughtrack:
dankBoard[random.choice(dankMemeDict['lol'])].play()
break
if k in dankMemes and name in dankSubs:
dankBoard[random.choice(dankMemeDict[k])].play()
#print random.choice(dankMemeDict[k])
if k in dankMemes:
break
#get camera to take pic and upload to twitter
if messagebody.find('!saycheese') != -1 and name in dankSubs:
dankSelfie = "dankSelfie.jpg"
os.system('gphoto2 --capture-image')
os.system('gphoto2 --capture-image-and-download --filename=dankSelfie.jpg --force-overwrite')
twitterapi.update_with_media(dankSelfie, status="posted by @" +name+" and DANKBOT! #selfie #vapsquad #chatbot #dankbot #audiencepics http://www.twitch.tv/elijahlucian")
#twitterapi.update_status("DANKBOT LIIIIVES! #vapsquad come say hi to our new bot! http://www.twitch.tv/elijahlucian")
print 'shit has been tweeted'
dankbotmsg('https://twitter.com/vapsquad to see the pic!')
continue
#admin - renew soundlist - have to fix
if messagebody.find('!renew') != -1:
loadsoundboards()
dankbotmsg("Yes maaaasta! Soundboards have been re-initialized!")
continue
#admin - submode switch
if messagebody.find('!submode') != -1 and name in dankAdmin:
subsonly = 0 if subsonly else 1
if subsonly == 1:
print "sub only mode is ON"
output = 'soundboard is sub only'
else:
print "sub only mode is OFF"
output = 'commence mayhem'
dankbotmsg(output)
if messagebody.find('!uptime') != -1:
dankbotmsg(subprocess.check_output("uptime", shell=True))
if messagebody.find('!rank') != -1:
messagesplit = messagebody.split(' ', 1)
c.execute("SELECT * FROM chatlog WHERE username=:username", {'username': name})
dankRank=c.fetchall()
#print dankRank
ranking = []
for m in dankRank:
if m[1] in adminCommands:
continue
else:
ranking.append(m[1])
#print ranking
dankbotmsg(name +' message count: ' + str(len(ranking)))
#add check against all other chatters
conn.commit()
#query database for shit
#emotes 5 points
#message 1 point
#dankbot roam
if messagebody.find('!roam') != -1 and name in dankAdmin:
#trigger dankcrawler and join some other channels.
continue
#disconnect dankbot from db
if messagebody.find('!goodbye') != -1 and name in dankAdmin:
conn.close()
print "database closed"
dankbotmsg("later, skater.")
quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment