Skip to content

Instantly share code, notes, and snippets.

Created April 17, 2012 12:34
Show Gist options
  • Save Cairnarvon/2405699 to your computer and use it in GitHub Desktop.
Save Cairnarvon/2405699 to your computer and use it in GitHub Desktop.
The original point of progscrape was to back an interface like this one.
import argparse
import errno
import json
import os
import re
import stat
import sys
import time
import urllib2
import fuse #
unix_time = lambda s: \
int(time.mktime(time.strptime(s, '%a, %d %b %Y %H:%M:%S %Z')))
def parse_path(path):
Paths are at most three levels deep. This always returns a three-member
list and fills the blanks with None.
path = filter(None, path.split('/'))
if len(path) > 3:
raise fuse.FuseOSError(errno.ENOENT)
while len(path) < 3:
return path
class HeadRequest(urllib2.Request):
"""Makes a HEAD request rather than GET."""
get_method = lambda self: 'HEAD'
class ProgFS(fuse.LoggingMixIn, fuse.Operations):
def __init__(self, board='prog', tmpdir=None):
self.board = board
self.subject_url = '' % self.board
self.post_url = '' % self.board
self.files = {}
self.last_modified = 0
self.threads = {}
def _get_subject_txt(self):
Checks if subject.txt has changed, and if so, fetches the new file
and updates the threads table.
r = urllib2.urlopen(HeadRequest(self.subject_url))
if self.last_modified >= unix_time(r.headers.getheader('last-modified')):
# No change. Stop now.
r = urllib2.urlopen(self.subject_url)
self.last_modified = unix_time(r.headers.getheader('last-modified'))
regex = re.compile(u"""
^(?P<subject>.*) # Subject
.*? # Creator's name
.*? # Thread icon
(?P<id>-?\d*) # Time posted/thread ID
(?P<replies>\d*) # Number of replies
.*? # ???
(?P<last_post>\d*) # Time of last post
\\n$""", re.VERBOSE)
for line in r.readlines():
# FIXME this loop is slow as balls
# Replacing the regex with split('<>') is faster, but Shiichan is
# full of corner cases and that loses /prog/ threads.
thread = regex.match(line).groupdict()
if thread['id'] not in self.threads:
self.threads[thread['id']] = {}
self.threads[thread['id']]['title'] = thread['subject']
self.threads[thread['id']]['last_modified'] = \
self.threads[thread['id']]['posts'] = \
def getattr(self, path, fh=None):
path = parse_path(path)
if not path[0]:
return {'st_mode': (stat.S_IFDIR | 0555),
'st_ctime': self.last_modified,
'st_mtime': self.last_modified,
'st_atime': time.time(),
'st_uid': os.getuid(),
'st_gid': os.getgid()}
if path[0] not in self.threads:
raise fuse.FuseOSError(errno.ENOENT)
if not path[1]:
# Thread folder
return {'st_mode': (stat.S_IFDIR | 0555),
'st_ctime': self.threads[path[0]]['last_modified'],
'st_mtime': self.threads[path[0]]['last_modified'],
'st_atime': self.threads[path[0]]['last_modified'],
'st_uid': os.getuid(),
'st_gid': os.getgid()}
if path[1] == 'title' and not path[2]:
# Thread title file
return {'st_mode': (stat.S_IFREG | 0444),
'st_ctime': float(path[0]),
'st_mtime': float(path[0]),
'st_atime': float(path[0]),
'st_size': 1024,
'st_uid': os.getuid(),
'st_gid': os.getgid()}
path[1] = int(path[1])
except ValueError:
# Not a post folder
raise fuse.FuseOSError(errno.ENOENT)
if path[1] < 1 or path[1] > self.threads[path[0]]['posts']:
# Post index out of range
raise fuse.FuseOSError(errno.ENOENT)
if not path[2]:
# Post folder
# TODO fetch post for accurate times
return {'st_mode': (stat.S_IFDIR | 0555),
'st_ctime': self.threads[path[0]]['last_modified'],
'st_mtime': self.threads[path[0]]['last_modified'],
'st_atime': self.threads[path[0]]['last_modified'],
'st_uid': os.getuid(),
'st_gid': os.getgid()}
if path[2] not in ('author', 'body'):
raise fuse.FuseOSError(errno.ENOENT)
# Post data file
# TODO fetch post for accurate times
return {'st_mode': (stat.S_IFREG | 0444),
'st_ctime': self.threads[path[0]]['last_modified'],
'st_mtime': self.threads[path[0]]['last_modified'],
'st_atime': self.threads[path[0]]['last_modified'],
'st_size': 1024 * 1024,
'st_uid': os.getuid(),
'st_gid': os.getgid()}
def read(self, path, size, offset, fh):
path = parse_path(path)
if path[0] not in self.threads or not path[1]:
raise fuse.FuseOSError(errno.ENOENT)
if path[1] == 'title':
title = self.threads[path[0]]['title'] + '\n'
return title[offset : offset + size]
if path[2] not in ('author', 'body'):
raise fuse.FuseOSError(errno.ENOENT)
except ValueError:
raise fuse.FuseOSError(errno.ENOENT)
r = urllib2.urlopen(self.post_url % (path[0], path[1]))
post = json.loads([path[1]]
if path[2] == 'body':
body = post['com'] + '\n'
return body[offset : offset + size]
name = post['name'] + '\n'
return name[offset : offset + size]
def readdir(self, path, fh):
path = parse_path(path)
if not path[0]:
# Contents of the board directory
return ['.', '..'] + self.threads.keys()
if path[0] not in self.threads:
raise fuse.FuseOSError(errno.ENOENT)
if not path[1]:
# Contents of a thread directory
return ['.', '..', 'title'] + \
[str(i + 1) for i in range(self.threads[path[0]]['posts'])]
if path[1] == 'title':
raise fuse.FuseOSError(errno.ENOTDIR)
path[1] = int(path[1])
except ValueError:
raise fuse.FuseOSError(errno.ENOENT)
if path[1] < 0 or path[1] > self.threads[path[0]]['posts']:
raise fuse.FuseOSError(errno.ENOENT)
return ['.', '..', 'author', 'body']
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--board', action='store',
parser.add_argument('mountpoint', action='store',
help='mount point')
parser.add_argument('-f', '--foreground', action='store_true',
help='run in the foreground (useful for debugging)')
args = parser.parse_args()
fuse = fuse.FUSE(ProgFS(args.board or 'prog'),
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment