Created
January 21, 2011 22:49
-
-
Save e000/790588 to your computer and use it in GitHub Desktop.
youtube module
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet.task import LoopingCall | |
from twisted.web.client import getPage | |
from twisted.internet.defer import DeferredLock, inlineCallbacks, returnValue | |
from Core.ZalgoUtil.CmdWraps import command, hook | |
from Core.ZalgoUtil.ModUtil import dateDiff, utf8 | |
from Core.ZalgoBase.BaseModule import BaseModule | |
from Core.ZalgoUtil.DeferredCache import deferred_lfu_cache | |
from Core.ZalgoHooks import Hooks | |
from xml.etree.cElementTree import fromstring | |
import re | |
from time import time, mktime | |
from iso8601 import parse_date | |
class YoutubeVideo: | |
@staticmethod | |
def fromTreeObject(xml): | |
vid = YoutubeVideo() | |
find = xml.find | |
vid.id = find('{http://www.w3.org/2005/Atom}id').text | |
vid.published = parse_date(find('{http://www.w3.org/2005/Atom}published').text) | |
vid.title = find('{http://www.w3.org/2005/Atom}title').text | |
vid.author = find('{http://www.w3.org/2005/Atom}author/{http://www.w3.org/2005/Atom}name').text | |
vid.rating = dict(find('{http://schemas.google.com/g/2005}rating').items()) | |
vid.views = int(find('{http://gdata.youtube.com/schemas/2007}statistics').get('viewCount')) | |
vid.duration = int(find('{http://search.yahoo.com/mrss/}group/{http://gdata.youtube.com/schemas/2007}duration').get('seconds')) | |
vid.got = time() | |
return vid | |
@staticmethod | |
def fromXml(xml): | |
return YoutubeVideo.fromTreeObject(fromstring(xml)) | |
def __repr__(self): | |
return '<YoutubeVideo title: %s, author: %s, duration: %s, rating: %s, views: %s>' % ( | |
self.title, self.author, self.duration, self.rating['average'], self.views | |
) | |
class Module(BaseModule): | |
parseCommands = True | |
parseHooks = True | |
regex = re.compile('youtube\.com\/watch\?v=([0-9A-z-_]{11})') | |
match = regex.search | |
finditer = regex.findall | |
def init(self): | |
self.lc = LoopingCall(self.getYoutubeVideo.clear) # clear the cache every hour. | |
self.lc.start(60*60, now = False) | |
def onUnload(self): | |
if self.lc.running: | |
self.lc.stop() # stop the cache clearer. | |
del self.lc | |
@hook(Hooks.PRIVMSG) | |
def isYtLink(self, hook, user, channel, line, params): | |
if self.match(line): | |
q = [] | |
for vid in self.finditer(line): | |
if vid not in q: | |
self.findYoutubeLink(channel, vid) | |
q.append(id) | |
return Hooks.STOP | |
@inlineCallbacks | |
def findYoutubeLink(self, channel, videoId): | |
try: | |
video, isCached = yield self.getYoutubeVideo(videoId) | |
out = "\x02\x037,01[\x030You\x034Tube\x037] \x0307%s \x0315(%i:%02i) \x0308by \x0311%s \x0308%.2f\x0315/\x03095 \x0308stars\x038, \x0309%i\x038 views\x0315" % ( | |
video.title, video.duration//60, video.duration % 60, video.author, float(video.rating['average']), video.views, | |
) | |
if isCached: | |
out += ' cached about %s ago.' % (dateDiff(time() - video.got)) | |
else: | |
out += '.' | |
self.bot.privmsg(channel, utf8(out)) | |
except: | |
self.bot.privmsg(channel, "\x02\x037,01[\x030You\x034Tube\x037] \x034Video \x0308%s\x034 not found." % videoId) | |
@deferred_lfu_cache(maxsize=25) | |
def getYoutubeVideo(self, videoID): | |
return getPage( | |
'http://gdata.youtube.com/feeds/api/videos/%s' % videoID, | |
agent='Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.16 (KHTML, like Gecko) Chrome/10.0.634.0 Safari/534.16' | |
).addCallback( | |
lambda xml: YoutubeVideo.fromXml(xml) | |
) | |
@command('.youtube-cachestats', level = 1) | |
def stats(self, cmd, user, channel, line, params): | |
f = self.getYoutubeVideo | |
self.bot.privmsg(channel, | |
"[\x02YouTubeParser\x02] deferred_lfu_cache hits: %s, misses: %s, size: %s, waiting: %s, maxsize: %s" % | |
(f.hits, f.misses, f.size(), f.waiting(), f.maxsize)) | |
@command('.youtube-cacheclear', level = 1) | |
def clear(self, cmd, user, channel, line, params): | |
self.getYoutubeVideo.clear() | |
self.bot.privmsg(channel, "[\x02YouTubeParser\x02] deferred_lfu_cache cleared.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment