Created
April 17, 2012 12:34
-
-
Save Cairnarvon/2405699 to your computer and use it in GitHub Desktop.
The original point of progscrape was to back an interface like this one.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import argparse | |
import errno | |
import json | |
import os | |
import re | |
import stat | |
import sys | |
import time | |
import urllib2 | |
import fuse # http://code.google.com/p/fusepy/ | |
unix_time = lambda s: \ | |
int(time.mktime(time.strptime(s, '%a, %d %b %Y %H:%M:%S %Z'))) | |
def parse_path(path): | |
""" | |
Paths are at most three levels deep. This always returns a three-member | |
list and fills the blanks with None. | |
""" | |
path = filter(None, path.split('/')) | |
if len(path) > 3: | |
raise fuse.FuseOSError(errno.ENOENT) | |
while len(path) < 3: | |
path.append(None) | |
return path | |
class HeadRequest(urllib2.Request): | |
"""Makes a HEAD request rather than GET.""" | |
get_method = lambda self: 'HEAD' | |
class ProgFS(fuse.LoggingMixIn, fuse.Operations): | |
def __init__(self, board='prog', tmpdir=None): | |
self.board = board | |
self.subject_url = 'http://dis.4chan.org/%s/subject.txt' % self.board | |
self.post_url = 'http://dis.4chan.org/json/%s/%%s/%%s' % self.board | |
self.files = {} | |
self.last_modified = 0 | |
self.threads = {} | |
self._get_subject_txt() | |
def _get_subject_txt(self): | |
""" | |
Checks if subject.txt has changed, and if so, fetches the new file | |
and updates the threads table. | |
""" | |
r = urllib2.urlopen(HeadRequest(self.subject_url)) | |
if self.last_modified >= unix_time(r.headers.getheader('last-modified')): | |
# No change. Stop now. | |
r.close() | |
return | |
r.close() | |
r = urllib2.urlopen(self.subject_url) | |
self.last_modified = unix_time(r.headers.getheader('last-modified')) | |
regex = re.compile(u""" | |
^(?P<subject>.*) # Subject | |
<> | |
.*? # Creator's name | |
<> | |
.*? # Thread icon | |
<> | |
(?P<id>-?\d*) # Time posted/thread ID | |
<> | |
(?P<replies>\d*) # Number of replies | |
<> | |
.*? # ??? | |
<> | |
(?P<last_post>\d*) # Time of last post | |
\\n$""", re.VERBOSE) | |
for line in r.readlines(): | |
# FIXME this loop is slow as balls | |
# Replacing the regex with split('<>') is faster, but Shiichan is | |
# full of corner cases and that loses /prog/ threads. | |
thread = regex.match(line).groupdict() | |
if thread['id'] not in self.threads: | |
self.threads[thread['id']] = {} | |
self.threads[thread['id']]['title'] = thread['subject'] | |
self.threads[thread['id']]['last_modified'] = \ | |
float(thread['last_post']) | |
self.threads[thread['id']]['posts'] = \ | |
int(thread['replies']) | |
r.close() | |
def getattr(self, path, fh=None): | |
path = parse_path(path) | |
if not path[0]: | |
return {'st_mode': (stat.S_IFDIR | 0555), | |
'st_ctime': self.last_modified, | |
'st_mtime': self.last_modified, | |
'st_atime': time.time(), | |
'st_uid': os.getuid(), | |
'st_gid': os.getgid()} | |
if path[0] not in self.threads: | |
raise fuse.FuseOSError(errno.ENOENT) | |
if not path[1]: | |
# Thread folder | |
return {'st_mode': (stat.S_IFDIR | 0555), | |
'st_ctime': self.threads[path[0]]['last_modified'], | |
'st_mtime': self.threads[path[0]]['last_modified'], | |
'st_atime': self.threads[path[0]]['last_modified'], | |
'st_uid': os.getuid(), | |
'st_gid': os.getgid()} | |
if path[1] == 'title' and not path[2]: | |
# Thread title file | |
return {'st_mode': (stat.S_IFREG | 0444), | |
'st_ctime': float(path[0]), | |
'st_mtime': float(path[0]), | |
'st_atime': float(path[0]), | |
'st_size': 1024, | |
'st_uid': os.getuid(), | |
'st_gid': os.getgid()} | |
try: | |
path[1] = int(path[1]) | |
except ValueError: | |
# Not a post folder | |
raise fuse.FuseOSError(errno.ENOENT) | |
if path[1] < 1 or path[1] > self.threads[path[0]]['posts']: | |
# Post index out of range | |
raise fuse.FuseOSError(errno.ENOENT) | |
if not path[2]: | |
# Post folder | |
# TODO fetch post for accurate times | |
return {'st_mode': (stat.S_IFDIR | 0555), | |
'st_ctime': self.threads[path[0]]['last_modified'], | |
'st_mtime': self.threads[path[0]]['last_modified'], | |
'st_atime': self.threads[path[0]]['last_modified'], | |
'st_uid': os.getuid(), | |
'st_gid': os.getgid()} | |
if path[2] not in ('author', 'body'): | |
raise fuse.FuseOSError(errno.ENOENT) | |
# Post data file | |
# TODO fetch post for accurate times | |
return {'st_mode': (stat.S_IFREG | 0444), | |
'st_ctime': self.threads[path[0]]['last_modified'], | |
'st_mtime': self.threads[path[0]]['last_modified'], | |
'st_atime': self.threads[path[0]]['last_modified'], | |
'st_size': 1024 * 1024, | |
'st_uid': os.getuid(), | |
'st_gid': os.getgid()} | |
def read(self, path, size, offset, fh): | |
path = parse_path(path) | |
if path[0] not in self.threads or not path[1]: | |
raise fuse.FuseOSError(errno.ENOENT) | |
if path[1] == 'title': | |
title = self.threads[path[0]]['title'] + '\n' | |
return title[offset : offset + size] | |
if path[2] not in ('author', 'body'): | |
raise fuse.FuseOSError(errno.ENOENT) | |
try: | |
int(path[1]) | |
except ValueError: | |
raise fuse.FuseOSError(errno.ENOENT) | |
r = urllib2.urlopen(self.post_url % (path[0], path[1])) | |
post = json.loads(r.read())[path[1]] | |
r.close() | |
if path[2] == 'body': | |
body = post['com'] + '\n' | |
return body[offset : offset + size] | |
name = post['name'] + '\n' | |
return name[offset : offset + size] | |
def readdir(self, path, fh): | |
self._get_subject_txt() | |
path = parse_path(path) | |
if not path[0]: | |
# Contents of the board directory | |
return ['.', '..'] + self.threads.keys() | |
if path[0] not in self.threads: | |
raise fuse.FuseOSError(errno.ENOENT) | |
if not path[1]: | |
# Contents of a thread directory | |
return ['.', '..', 'title'] + \ | |
[str(i + 1) for i in range(self.threads[path[0]]['posts'])] | |
if path[1] == 'title': | |
raise fuse.FuseOSError(errno.ENOTDIR) | |
try: | |
path[1] = int(path[1]) | |
except ValueError: | |
raise fuse.FuseOSError(errno.ENOENT) | |
if path[1] < 0 or path[1] > self.threads[path[0]]['posts']: | |
raise fuse.FuseOSError(errno.ENOENT) | |
return ['.', '..', 'author', 'body'] | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-b', '--board', action='store', | |
help='board') | |
parser.add_argument('mountpoint', action='store', | |
help='mount point') | |
parser.add_argument('-f', '--foreground', action='store_true', | |
help='run in the foreground (useful for debugging)') | |
args = parser.parse_args() | |
fuse = fuse.FUSE(ProgFS(args.board or 'prog'), | |
args.mountpoint, | |
foreground=args.foreground) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment