Created
March 27, 2010 19:16
-
-
Save Malex/346302 to your computer and use it in GitHub Desktop.
MyIRCBot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
#!/usr/bin/python3 | |
################################ | |
# MyIRCBot v5 | |
# Author: Malex | |
# Version: 5.0 Release Candidate 2 | |
# Description: and idiot bot | |
# for irc idiots | |
# License: GNU/GPL3 | |
# Propetrty of Malexprojects.com | |
# Website: malexprojects.ath.cx | |
######################################################################### | |
# Copyright (C) 2009 Malex <malex[at]0x00[dot]ath[dot]cx> # | |
# # | |
# This program is free software: you can redistribute it and/or modify # | |
# it under the terms of the GNU General Public License as published by # | |
# the Free Software Foundation, either version 3 of the License, or # | |
# (at your option) any later version. # | |
# # | |
# This program is distributed in the hope that it will be useful, # | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of # | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # | |
# GNU General Public License for more details. # | |
# # | |
# You should have received a copy of the GNU General Public License # | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. # | |
######################################################################### | |
# 14/03/2010 0:24 First Draft | |
# 15/03/2010 23:16 Release Candidate (with some plugins) | |
# 17/03/2010 23:23 Major Bugs fixing | |
# 18/03/2010 21:30 Important bug fixed | |
# 18/03/2010 22:40 Bot.plug_names added | |
# 18/03/2010 23:32 Minor optimization | |
# 26/03/2010 19:00 Minor Bug fixing | |
# 27/03/2010 18:00 Thread freezing fixed, minor optimization (rc2) | |
import re,threading,queue,socket,plugins | |
from sys import modules | |
def to_diz(match): | |
if not match: | |
print("Not matched expression") | |
return "" | |
else: | |
diz = {} | |
for i in ['server','nick','user','host','type','chan','sender','mex']: | |
try: | |
diz[i] = match.group(i) | |
except IndexError: | |
diz[i]="" | |
return diz | |
class Bot: | |
conf = { 'nick' : "Kira", | |
'nick_password' : "", | |
'server' : "malexprojects.ath.cx", | |
'port' : 6667, | |
'server_pass' : "", | |
'ssl' : False, | |
'server_encode' : "latin1" | |
} | |
b_re = re.compile(r":?(((?P<nick>[\w\[\]\^`]+)!(?P<user>[\w\[\]\^`]+)@(?P<host>.+?))\s+|(?P<server>.*?)|)(?P<type>[A-Z0-9]+)\s(?P<chan>#[\w\[\]\^`]+|)\s?(?P<sender>[\w\[\]\^`]+|)\s*:(?P<mex>.+)") | |
plugins=[] | |
plug_names=[] | |
def recv(self,max_data_size=4096): | |
recv = self.s.recv(max_data_size).decode(self.conf['server_encode']) | |
for i in recv.split("\r\n"): | |
if i: | |
yield str(i) | |
def connect(self): | |
self.s.connect((self.conf['server'],self.conf['port'])) | |
self.on_connect() | |
def on_connect(self): | |
self.to_send.put("USER "+self.conf['nick']+" 0 * :"+self.conf['nick']) | |
self.to_send.put("NICK "+self.conf['nick']) | |
t1 = threading.Thread(target=self.__send) | |
t1.start() | |
t2 = threading.Thread(target=self.listen) | |
t2.start() | |
self.start_plugins() | |
def listen(self): | |
while not self.die: | |
try: | |
for i in self.recv(): | |
print(i) | |
self.recved.put_nowait(to_diz(self.b_re.match(i))) | |
except: | |
print("Not managed error") | |
def __init__(self,conf={},plugins=[]): | |
for k,v in conf.items(): | |
self.conf[k]=v | |
for i in plugins: | |
self.loadPlugin(i) | |
self.s = socket.socket() | |
if self.conf['ssl']: | |
globals()['ssl']=__import__("ssl") | |
self.s = ssl.wrap_socket(self.s) | |
self.recved = queue.Queue(0) | |
self.to_send = queue.Queue(0) | |
self.die = False | |
def __send(self): | |
while not self.die: | |
try: | |
tmp = self.to_send.get_nowait() + "\r\n" | |
except queue.Empty: | |
continue | |
print(tmp) | |
self.s.send(tmp.encode(self.conf['server_encode'])) | |
def start_plugins(self): | |
while not self.die: | |
try: | |
tmp = self.recved.get_nowait() | |
except queue.Empty: | |
continue | |
for j in tmp: | |
if j: | |
for i in self.plugins: | |
t3 = threading.Thread(target=i,args=(self,tmp),name=str(i)) | |
t3.start() | |
break | |
def loadPlugin(self,plugin): | |
if not plugin: | |
return "Empty Value" | |
try: | |
del modules['plugins'] | |
plugins=__import__("plugins") | |
tmp=__import__("plugins."+plugin,fromlist=plugin) | |
if vars(tmp)[plugin] in self.plugins: | |
self.plugins.remove(vars(tmp)[plugin]) | |
except: | |
return "Plugin not Found" | |
self.plugins.append(vars(tmp)[plugin]) | |
self.plug_names.append(plugin) | |
del tmp | |
return "Successfully loaded" | |
def unloadPlugin(self,plugin): | |
try: | |
tmp=__import__("plugins."+plugin,fromlist=plugin) | |
if vars(tmp)[plugin] in self.plugins: | |
self.plugins.remove(vars(tmp)[plugin]) | |
self.plug_names.remove(plugin) | |
return "Successfully Unloaded" | |
else: | |
return "Not Loaded Plugin" | |
except: | |
return "Not such Plugin" | |
def send(self,string): | |
self.to_send.put_nowait(string) | |
def close(self): | |
self.die = True | |
self.s.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment