Skip to content

Instantly share code, notes, and snippets.

@mikekap
Last active November 4, 2016 03:25
Show Gist options
  • Save mikekap/640f7908e3f7c08ce3f6 to your computer and use it in GitHub Desktop.
Save mikekap/640f7908e3f7c08ce3f6 to your computer and use it in GitHub Desktop.
CFFI PCRE Implementation
from __future__ import unicode_literals
from cffi import FFI
from collections import OrderedDict
import string
import sys
assert sys.maxunicode == 0x10FFFF
ffi = FFI()
ffi.cdef("""
typedef ... pcre32;
typedef struct pcre32_extra {
unsigned long int flags; /* Bits for which fields are set */
void *study_data; /* Opaque data from pcre_study() */
unsigned long int match_limit; /* Maximum number of calls to match() */
void *callout_data; /* Data passed back in callouts */
const unsigned char *tables; /* Pointer to character tables */
unsigned long int match_limit_recursion; /* Max recursive calls to match() */
wchar_t **mark; /* For passing back a mark pointer */
void *executable_jit; /* Contains a pointer to a compiled jit code */
} pcre32_extra;
#define PCRE_CASELESS ...
#define PCRE_INFO_CAPTURECOUNT ...
#define PCRE_ANCHORED ...
#define PCRE_MULTILINE ...
#define PCRE_DOTALL ...
#define PCRE_EXTENDED ...
#define PCRE_UTF32 ...
#define PCRE_DUPNAMES ...
#define PCRE_ERROR_NOMATCH ...
#define PCRE_STUDY_JIT_COMPILE ...
#define PCRE_STUDY_JIT_PARTIAL_SOFT_COMPILE ...
#define PCRE_STUDY_JIT_PARTIAL_HARD_COMPILE ...
#define PCRE_STUDY_EXTRA_NEEDED ...
pcre32 *pcre32_compile(const wchar_t pattern[], int options,
const char **errptr, int *erroffset,
const unsigned char *tableptr);
void pcre32_free(pcre32* code);
pcre32_extra *pcre32_study(const pcre32 *code, int options,
const char **errptr);
void pcre32_free_study(pcre32_extra* study);
int pcre32_exec(const pcre32 *code, const pcre32_extra *extra,
const wchar_t subject[], int length, int startoffset,
int options, int *ovector, int ovecsize);
int pcre32_dfa_exec(const pcre32 *code, const pcre32_extra *extra,
const wchar_t subject[], int length, int startoffset,
int options, int* ovector, int ovecsize,
int* workspace, int wscount);
int pcre32_fullinfo(const pcre32 *code, const pcre32_extra *extra,
int what, void *where);
int pcre32_get_stringnumber(const pcre32 *code, const wchar_t *name);
""")
pcre = ffi.verify("""
#include <pcre.h>
""", libraries=[b'pcre32'])
pcre_compile = pcre.pcre32_compile
pcre_exec = pcre.pcre32_exec
pcre_dfa_exec = pcre.pcre32_dfa_exec
pcre_fullinfo = pcre.pcre32_fullinfo
pcre_get_stringnumber = pcre.pcre32_get_stringnumber
pcre_study = pcre.pcre32_study
pcre_free_study = pcre.pcre32_free_study
pcre_free = pcre.pcre32_free
DEBUG = 0 # TODO
IGNORECASE = I = pcre.PCRE_CASELESS
MULTILINE = M = pcre.PCRE_MULTILINE
DOTALL = S = pcre.PCRE_DOTALL
VERBOSE = X = pcre.PCRE_EXTENDED
MULTILINE = M = pcre.PCRE_MULTILINE
ASCII = A = 0x40000000
UNICODE = U = 0x80000000
# Character escape sequences.
CHARACTER_ESCAPES = {
"a": "\a",
"b": "\b",
"f": "\f",
"n": "\n",
"r": "\r",
"t": "\t",
"v": "\v",
}
ALPHA = frozenset(string.ascii_letters)
DIGITS = frozenset(string.digits)
HEX_ESCAPES = {"x": 2, "u": 4, "U": 8}
HEX_DIGITS = frozenset(string.hexdigits)
def parse_repl_hex_escape(source, expected_len):
"Parses a hex escape sequence in a replacement string."
digits = []
current_pos = -100
for _ in xrange(expected_len):
current_pos, ch = next(source, (None, None))
if ch not in HEX_DIGITS:
raise RuntimeError("bad hex escape at position %d" % source.pos)
digits.append(ch)
return (current_pos, int("".join(digits), 16))
class PCREMatch(object):
__slots__ = (b're', b'string', b'offsets', b'pos', b'endpos', b'last_index')
def __init__(self, pattern, s, offsets, pos=0, endpos=None, last_index=None):
self.re = pattern
self.string = s
self.offsets = offsets
self.pos = pos
self.endpos = endpos
self.last_index = last_index
def expand(self, template):
if not isinstance(template, unicode):
raise NotImplementedError("String needs to be unicode")
l = []
literal_start = [0]
current_pos = 0
def flush_literal():
assert literal_start[0] >= 0
l.append(template[literal_start[0]:current_pos])
literal_start[0] = -1
iterator = iter(enumerate(template))
for current_pos, c in iterator:
if c == '\\':
flush_literal()
current_pos, c = next(iterator, (None, None))
if c in ALPHA:
value = CHARACTER_ESCAPES.get(c)
if c in CHARACTER_ESCAPES:
# \n, \r, etc
literal_start[0] = current_pos + 1
l.append(value)
elif c in HEX_ESCAPES:
current_pos, value = parse_repl_hex_escape(iterator, HEX_ESCAPES[c])
literal_start[0] = current_pos + 1
l.append(unichr(value))
elif c == 'g':
current_pos, c = next(iterator, (None, None))
if c != '<':
raise RuntimeError("Bad replacement pattern - expected < for replacement group")
name_start_pos = current_pos + 1
all_digits = True
while c != '>' and c is not None:
current_pos, c = next(iterator, (None, None))
if c not in DIGITS:
all_digits = False
if c != '>':
raise RuntimeError("Bad replacement pattern - expected > for replacement group end")
name = template[name_start_pos:current_pos]
if all_digits:
name = int(name)
literal_start[0] = current_pos + 1
l.append(self.group(name))
elif c == 'N':
# Unicode named character.
raise NotImplementedError()
else:
# Just a bad escape
literal_start[0] = current_pos - 1
elif c == '0':
# Octal escape
raise NotImplementedError()
elif c in DIGITS:
# Group reference
if template[current_pos + 1] in DIGITS:
index = int(template[current_pos:current_pos + 2])
current_pos, c = next(iterator, (None, None))
else:
index = int(template[current_pos:current_pos + 1])
literal_start[0] = current_pos + 1
l.append(self.group(index))
elif c == '\\':
literal_start[0] = current_pos
elif c is None:
raise RuntimeError("Unexpected end of replacement pattern")
else:
# Just a bad escape
literal_start[0] = current_pos - 1
current_pos += 1
flush_literal()
return ''.join(l)
def group(self, *n, **kwargs):
if len(n) <= 1:
default = kwargs.pop('default', None)
assert not kwargs
start, end = self.span(n[0] if n else 0)
if start == -1:
return default
return self.string[start:end]
return tuple(self.group(i, **kwargs) for i in n)
def groups(self, default=None):
return tuple(self.group(i + 1, default=default) for i in xrange(self.re.groups))
def groupdict(self, default=None):
return {
k: self.group(v, default=default)
for k, v in self.re.groupindex
}
def _group_to_index(self, g):
if not isinstance(g, (int, long)):
g = self.re._index_for_capture_group(g)
if not 0 <= g <= self.re.groups:
raise RuntimeError('Bad group index: %s' % (g))
return g
def start(self, n=0):
n = self._group_to_index(n)
value = self.offsets[n * 2]
return value
def end(self, n=0):
n = self._group_to_index(n)
value = self.offsets[n * 2 + 1]
return value
def span(self, n=0):
n = self._group_to_index(n)
return tuple(self.offsets[n * 2:n * 2 + 2])
@property
def lastgroup(self):
raise NotImplementedError()
class PCREPattern(object):
def __init__(self, pattern, flags=0):
self.pattern = pattern
self.flags = flags
flags &= ~(ASCII | UNICODE)
flags |= pcre.PCRE_UTF32
errptr = ffi.new("char*[1]")
erroffset = ffi.new("int[1]")
regex = pcre_compile(pattern, flags, errptr, erroffset, ffi.NULL)
if not regex:
raise RuntimeError("Regexp compilation failure: %s" % (ffi.string(errptr[0])))
regex = ffi.gc(regex, pcre_free)
study = pcre_study(regex, pcre.PCRE_STUDY_JIT_COMPILE, errptr)
if study:
study = ffi.gc(study, pcre_free_study)
self.study = study
buf = ffi.new("int[1]")
rv = pcre_fullinfo(regex, ffi.NULL, pcre.PCRE_INFO_CAPTURECOUNT, ffi.cast("void *", buf))
if rv < 0:
raise RuntimeError('Regex error %s' % (rv))
self.obj = regex
self.groups = buf[0]
self._capture_group_index_cache = {}
self.ov = ffi.new("int[]", 3 * (self.groups + 1))
self.use_dfa = False
if self.use_dfa:
self.wssize = 300
self.workspace = ffi.new("int[]", self.wssize)
@property
def groupindex(self):
raise NotImplementedError()
def _index_for_capture_group(self, name):
rv = self._capture_group_index_cache.get(name)
if rv is not None:
return rv
rv = pcre_get_stringnumber(self.obj, name)
if rv < 0:
if rv == pcre.PCRE_ERROR_NOSUBSTRING:
raise RuntimeError("Bad capture group: %s" % (name))
raise RuntimeError("PCRE Error %s" % (rv))
self._capture_group_index_cache[name] = rv
return rv
def search(self, s, pos=0, endpos=sys.maxint, _flags=0):
if not isinstance(s, unicode):
raise NotImplementedError()
if endpos < 0 or endpos > len(s):
endpos = len(s)
if pos > endpos:
return None
if self.use_dfa:
num = pcre_dfa_exec(self.obj, self.study, s, endpos, pos, _flags,
self.ov, 3 * (self.groups + 1),
self.workspace, self.wssize)
else:
num = pcre_exec(self.obj, self.study, s, endpos, pos, _flags,
self.ov, 3 * (self.groups + 1))
if num == pcre.PCRE_ERROR_NOMATCH:
return None
elif num < 0:
raise RuntimeError("Re error %s" % (num,))
# We don't care how many matching subexpressions we got, we
# care only about total # of capturing ones (including empty)
return PCREMatch(self, s, self.ov, pos=pos, endpos=endpos, last_index=num)
def match(self, s, pos=0, endpos=sys.maxint):
return self.search(s, pos, endpos, pcre.PCRE_ANCHORED)
def finditer(self, s, pos=0, endpos=sys.maxint):
while True:
m = self.search(s, pos=pos, endpos=endpos)
if not m:
break
pos = m.start() + 1
yield m
def findall(self, s, pos=0, endpos=sys.maxint):
res = []
while True:
m = self.search(s, pos=pos, endpos=endpos)
if not m:
break
if self.groups <= 1:
res.append(m.group(self.groups)) # aka 0 if 0, 1 if 1
else:
res.append(m.groups(default=b''))
if m.start() == m.end():
pos = m.end() + 1
else:
pos = m.end()
return res
def subn(self, repl, s, count=0):
subs_made = 0
res = []
match_iter = self.finditer(s)
last_end = 0
for m in match_iter:
if (count > 0 and subs_made >= count):
break
beg, end = m.span()
res.append(s[last_end:beg])
if callable(repl):
res.append(repl(m))
else:
res.append(m.expand(repl))
last_end = end
subs_made += 1
res.append(s[last_end:])
return (''.join(res), subs_made)
def sub(self, repl, s, count=0):
return self.subn(repl, s, count)[0]
def split(self, s, maxsplit=0):
splits_made = 0
res = []
match_iter = self.finditer(s)
last_end = 0
for m in match_iter:
if (maxsplit > 0 and splits_made >= maxsplit):
break
if not m.group():
continue # empty match
beg, end = m.span(0)
res.append(s[last_end:beg])
if self.groups > 0:
res.extend(m.groups())
last_end = end
splits_made += 1
res.append(s[last_end:])
return res
def _compile(pattern, flags=0):
if not isinstance(pattern, unicode):
raise NotImplementedError()
return PCREPattern(pattern, flags)
_MAXCACHE = 500
_cache = OrderedDict()
def compile(pattern, flags=0):
try:
cache_key = (pattern,)
return _cache[pattern]
except KeyError:
pass
if flags & ASCII:
raise NotImplementedError("ASCII Regexps not supported")
# TODO: Something something thread safety.
result = _cache[cache_key] = _compile(pattern, flags)
if len(_cache) >= _MAXCACHE:
_cache.popitem(last=False)
return result
def purge():
_cache.clear()
def search(pattern, string, flags=0):
r = compile(pattern, flags)
return r.search(string)
def match(pattern, string, flags=0):
r = compile(pattern, flags)
return r.match(string)
def findall(pattern, string, flags=0):
r = compile(pattern, flags)
return r.findall(string)
def finditer(pattern, string, flags=0):
r = compile(pattern, flags)
return r.finditer(string)
def sub(pattern, repl, s, count=0, flags=0):
r = compile(pattern, flags)
return r.sub(repl, s, count=count)
def subn(pattern, repl, s, count=0, flags=0):
r = compile(pattern, flags)
return r.subn(repl, s, count=count)
def split(pattern, s, maxsplit=0, flags=0):
r = compile(pattern, flags)
return r.split(s, maxsplit)
def escape(s):
res = ""
for c in s:
if '0' <= c <= '9' or 'A' <= c <= 'Z' or 'a' <= c <= 'z' or c == '_':
res += c
else:
res += "\\" + c
return res
if __name__ == '__main__':
import re
import sys
if len(sys.argv) <= 2:
sys.argv.append('pcre')
if sys.argv[2] == 're':
print 'RE'
compile = re.compile
elif sys.argv[2] == 'regex':
print 'REGEX'
import regex
compile = regex.compile
elif sys.argv[2] == 'cpcre':
print 'CPCRE'
import pcre
compile = pcre.compile
else:
print 'PCRE'
from timeit import default_timer
r = compile('([a-zA-Z][a-zA-Z0-9]*)://([^ /]+)(/[^ ]*)?|([^ @]+)@([^ @]+)')
while True:
start = default_timer()
for line in open(sys.argv[1], 'r'):
r.match(line.decode('utf8'))
print 'Finished round in ', default_timer() - start
@rodrickbrown
Copy link

Good work +1:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment