Skip to content

Instantly share code, notes, and snippets.

@Malex
Created March 27, 2010 19:16
Show Gist options
  • Save Malex/346302 to your computer and use it in GitHub Desktop.
Save Malex/346302 to your computer and use it in GitHub Desktop.
MyIRCBot
# -*- coding: utf-8 -*-
#!/usr/bin/python3
################################
# MyIRCBot v5
# Author: Malex
# Version: 5.0 Release Candidate 2
# Description: and idiot bot
# for irc idiots
# License: GNU/GPL3
# Propetrty of Malexprojects.com
# Website: malexprojects.ath.cx
#########################################################################
# Copyright (C) 2009 Malex <malex[at]0x00[dot]ath[dot]cx> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <http://www.gnu.org/licenses/>. #
#########################################################################
# 14/03/2010 0:24 First Draft
# 15/03/2010 23:16 Release Candidate (with some plugins)
# 17/03/2010 23:23 Major Bugs fixing
# 18/03/2010 21:30 Important bug fixed
# 18/03/2010 22:40 Bot.plug_names added
# 18/03/2010 23:32 Minor optimization
# 26/03/2010 19:00 Minor Bug fixing
# 27/03/2010 18:00 Thread freezing fixed, minor optimization (rc2)
import re,threading,queue,socket,plugins
from sys import modules
def to_diz(match):
if not match:
print("Not matched expression")
return ""
else:
diz = {}
for i in ['server','nick','user','host','type','chan','sender','mex']:
try:
diz[i] = match.group(i)
except IndexError:
diz[i]=""
return diz
class Bot:
conf = { 'nick' : "Kira",
'nick_password' : "",
'server' : "malexprojects.ath.cx",
'port' : 6667,
'server_pass' : "",
'ssl' : False,
'server_encode' : "latin1"
}
b_re = re.compile(r":?(((?P<nick>[\w\[\]\^`]+)!(?P<user>[\w\[\]\^`]+)@(?P<host>.+?))\s+|(?P<server>.*?)|)(?P<type>[A-Z0-9]+)\s(?P<chan>#[\w\[\]\^`]+|)\s?(?P<sender>[\w\[\]\^`]+|)\s*:(?P<mex>.+)")
plugins=[]
plug_names=[]
def recv(self,max_data_size=4096):
recv = self.s.recv(max_data_size).decode(self.conf['server_encode'])
for i in recv.split("\r\n"):
if i:
yield str(i)
def connect(self):
self.s.connect((self.conf['server'],self.conf['port']))
self.on_connect()
def on_connect(self):
self.to_send.put("USER "+self.conf['nick']+" 0 * :"+self.conf['nick'])
self.to_send.put("NICK "+self.conf['nick'])
t1 = threading.Thread(target=self.__send)
t1.start()
t2 = threading.Thread(target=self.listen)
t2.start()
self.start_plugins()
def listen(self):
while not self.die:
try:
for i in self.recv():
print(i)
self.recved.put_nowait(to_diz(self.b_re.match(i)))
except:
print("Not managed error")
def __init__(self,conf={},plugins=[]):
for k,v in conf.items():
self.conf[k]=v
for i in plugins:
self.loadPlugin(i)
self.s = socket.socket()
if self.conf['ssl']:
globals()['ssl']=__import__("ssl")
self.s = ssl.wrap_socket(self.s)
self.recved = queue.Queue(0)
self.to_send = queue.Queue(0)
self.die = False
def __send(self):
while not self.die:
try:
tmp = self.to_send.get_nowait() + "\r\n"
except queue.Empty:
continue
print(tmp)
self.s.send(tmp.encode(self.conf['server_encode']))
def start_plugins(self):
while not self.die:
try:
tmp = self.recved.get_nowait()
except queue.Empty:
continue
for j in tmp:
if j:
for i in self.plugins:
t3 = threading.Thread(target=i,args=(self,tmp),name=str(i))
t3.start()
break
def loadPlugin(self,plugin):
if not plugin:
return "Empty Value"
try:
del modules['plugins']
plugins=__import__("plugins")
tmp=__import__("plugins."+plugin,fromlist=plugin)
if vars(tmp)[plugin] in self.plugins:
self.plugins.remove(vars(tmp)[plugin])
except:
return "Plugin not Found"
self.plugins.append(vars(tmp)[plugin])
self.plug_names.append(plugin)
del tmp
return "Successfully loaded"
def unloadPlugin(self,plugin):
try:
tmp=__import__("plugins."+plugin,fromlist=plugin)
if vars(tmp)[plugin] in self.plugins:
self.plugins.remove(vars(tmp)[plugin])
self.plug_names.remove(plugin)
return "Successfully Unloaded"
else:
return "Not Loaded Plugin"
except:
return "Not such Plugin"
def send(self,string):
self.to_send.put_nowait(string)
def close(self):
self.die = True
self.s.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment