Skip to content

Instantly share code, notes, and snippets.

@Cairnarvon
Created April 17, 2012 12:34
Show Gist options
  • Save Cairnarvon/2405699 to your computer and use it in GitHub Desktop.
Save Cairnarvon/2405699 to your computer and use it in GitHub Desktop.
The original point of progscrape was to back an interface like this one.
#!/usr/bin/python
import argparse
import errno
import json
import os
import re
import stat
import sys
import time
import urllib2
import fuse # http://code.google.com/p/fusepy/
unix_time = lambda s: \
int(time.mktime(time.strptime(s, '%a, %d %b %Y %H:%M:%S %Z')))
def parse_path(path):
"""
Paths are at most three levels deep. This always returns a three-member
list and fills the blanks with None.
"""
path = filter(None, path.split('/'))
if len(path) > 3:
raise fuse.FuseOSError(errno.ENOENT)
while len(path) < 3:
path.append(None)
return path
class HeadRequest(urllib2.Request):
"""Makes a HEAD request rather than GET."""
get_method = lambda self: 'HEAD'
class ProgFS(fuse.LoggingMixIn, fuse.Operations):
def __init__(self, board='prog', tmpdir=None):
self.board = board
self.subject_url = 'http://dis.4chan.org/%s/subject.txt' % self.board
self.post_url = 'http://dis.4chan.org/json/%s/%%s/%%s' % self.board
self.files = {}
self.last_modified = 0
self.threads = {}
self._get_subject_txt()
def _get_subject_txt(self):
"""
Checks if subject.txt has changed, and if so, fetches the new file
and updates the threads table.
"""
r = urllib2.urlopen(HeadRequest(self.subject_url))
if self.last_modified >= unix_time(r.headers.getheader('last-modified')):
# No change. Stop now.
r.close()
return
r.close()
r = urllib2.urlopen(self.subject_url)
self.last_modified = unix_time(r.headers.getheader('last-modified'))
regex = re.compile(u"""
^(?P<subject>.*) # Subject
<>
.*? # Creator's name
<>
.*? # Thread icon
<>
(?P<id>-?\d*) # Time posted/thread ID
<>
(?P<replies>\d*) # Number of replies
<>
.*? # ???
<>
(?P<last_post>\d*) # Time of last post
\\n$""", re.VERBOSE)
for line in r.readlines():
# FIXME this loop is slow as balls
# Replacing the regex with split('<>') is faster, but Shiichan is
# full of corner cases and that loses /prog/ threads.
thread = regex.match(line).groupdict()
if thread['id'] not in self.threads:
self.threads[thread['id']] = {}
self.threads[thread['id']]['title'] = thread['subject']
self.threads[thread['id']]['last_modified'] = \
float(thread['last_post'])
self.threads[thread['id']]['posts'] = \
int(thread['replies'])
r.close()
def getattr(self, path, fh=None):
path = parse_path(path)
if not path[0]:
return {'st_mode': (stat.S_IFDIR | 0555),
'st_ctime': self.last_modified,
'st_mtime': self.last_modified,
'st_atime': time.time(),
'st_uid': os.getuid(),
'st_gid': os.getgid()}
if path[0] not in self.threads:
raise fuse.FuseOSError(errno.ENOENT)
if not path[1]:
# Thread folder
return {'st_mode': (stat.S_IFDIR | 0555),
'st_ctime': self.threads[path[0]]['last_modified'],
'st_mtime': self.threads[path[0]]['last_modified'],
'st_atime': self.threads[path[0]]['last_modified'],
'st_uid': os.getuid(),
'st_gid': os.getgid()}
if path[1] == 'title' and not path[2]:
# Thread title file
return {'st_mode': (stat.S_IFREG | 0444),
'st_ctime': float(path[0]),
'st_mtime': float(path[0]),
'st_atime': float(path[0]),
'st_size': 1024,
'st_uid': os.getuid(),
'st_gid': os.getgid()}
try:
path[1] = int(path[1])
except ValueError:
# Not a post folder
raise fuse.FuseOSError(errno.ENOENT)
if path[1] < 1 or path[1] > self.threads[path[0]]['posts']:
# Post index out of range
raise fuse.FuseOSError(errno.ENOENT)
if not path[2]:
# Post folder
# TODO fetch post for accurate times
return {'st_mode': (stat.S_IFDIR | 0555),
'st_ctime': self.threads[path[0]]['last_modified'],
'st_mtime': self.threads[path[0]]['last_modified'],
'st_atime': self.threads[path[0]]['last_modified'],
'st_uid': os.getuid(),
'st_gid': os.getgid()}
if path[2] not in ('author', 'body'):
raise fuse.FuseOSError(errno.ENOENT)
# Post data file
# TODO fetch post for accurate times
return {'st_mode': (stat.S_IFREG | 0444),
'st_ctime': self.threads[path[0]]['last_modified'],
'st_mtime': self.threads[path[0]]['last_modified'],
'st_atime': self.threads[path[0]]['last_modified'],
'st_size': 1024 * 1024,
'st_uid': os.getuid(),
'st_gid': os.getgid()}
def read(self, path, size, offset, fh):
path = parse_path(path)
if path[0] not in self.threads or not path[1]:
raise fuse.FuseOSError(errno.ENOENT)
if path[1] == 'title':
title = self.threads[path[0]]['title'] + '\n'
return title[offset : offset + size]
if path[2] not in ('author', 'body'):
raise fuse.FuseOSError(errno.ENOENT)
try:
int(path[1])
except ValueError:
raise fuse.FuseOSError(errno.ENOENT)
r = urllib2.urlopen(self.post_url % (path[0], path[1]))
post = json.loads(r.read())[path[1]]
r.close()
if path[2] == 'body':
body = post['com'] + '\n'
return body[offset : offset + size]
name = post['name'] + '\n'
return name[offset : offset + size]
def readdir(self, path, fh):
self._get_subject_txt()
path = parse_path(path)
if not path[0]:
# Contents of the board directory
return ['.', '..'] + self.threads.keys()
if path[0] not in self.threads:
raise fuse.FuseOSError(errno.ENOENT)
if not path[1]:
# Contents of a thread directory
return ['.', '..', 'title'] + \
[str(i + 1) for i in range(self.threads[path[0]]['posts'])]
if path[1] == 'title':
raise fuse.FuseOSError(errno.ENOTDIR)
try:
path[1] = int(path[1])
except ValueError:
raise fuse.FuseOSError(errno.ENOENT)
if path[1] < 0 or path[1] > self.threads[path[0]]['posts']:
raise fuse.FuseOSError(errno.ENOENT)
return ['.', '..', 'author', 'body']
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--board', action='store',
help='board')
parser.add_argument('mountpoint', action='store',
help='mount point')
parser.add_argument('-f', '--foreground', action='store_true',
help='run in the foreground (useful for debugging)')
args = parser.parse_args()
fuse = fuse.FUSE(ProgFS(args.board or 'prog'),
args.mountpoint,
foreground=args.foreground)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment