Created
December 16, 2018 21:43
-
-
Save Gribouillis/58613cc7dbc5612f2a7a8411727da2f1 to your computer and use it in GitHub Desktop.
Attempt of a DSL to extract parts from several files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import deque | |
import io | |
import re | |
import sys | |
__version__ = '0.0.1' | |
def noop(arg): | |
pass | |
class FilePointer: | |
def __init__(self, fp, lineno=1): | |
self.fifo = deque() | |
self.fp = iter(fp) | |
self.lineno = lineno | |
def __iter__(self): | |
return self | |
def __next__(self): | |
if self.fifo: | |
line = self.fifo.popleft() | |
else: | |
line = next(self.fp) | |
self.lineno += 1 | |
return line | |
def pushback(self, line): | |
self.fifo.appendleft(line) | |
self.lineno -= 1 | |
def close(self): | |
self.fp.close() | |
class Processor: | |
def __init__(self, ofile): | |
self.files = {'-': sys.stdin } | |
self.src = '-' | |
self.output = ofile | |
def switch(self, filekey): | |
if filekey not in self.files: | |
if isinstance(filekey, str): | |
fp = open(filekey, 'r') | |
self.files[filekey] = FilePointer(fp) | |
elif hasattr(filekey, 'read'): | |
self.files[filekey] = FilePointer(filekey) | |
else: | |
raise ValueError('Unexpected file key', filekey) | |
self.src = filekey | |
@property | |
def _fp(self): | |
return self.files[self.src] | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
for k, f in self.files.items(): | |
if k != '-': | |
f.close() | |
def _action(self, name, kwargs): | |
sgn = [(x, type(y)) for x, y in kwargs.items()] | |
if sgn not in [ | |
[('count', int)], | |
[('lineno', int)], | |
[('string', str)], | |
[('regex', str)]]: | |
raise ValueError('Invalid kwargs for', name, ':', kwargs) | |
kw, val = kwargs.popitem() | |
send = self.output.write if name == 'take' else noop | |
if kw == 'count': | |
for i in range(val): | |
send(next(self._fp)) | |
elif kw == 'lineno': | |
if self._fp.lineno > val: | |
raise ValueError('Already a previous line') | |
while self._fp.lineno < val: | |
send(next(self._fp)) | |
elif kw == 'string': | |
while True: | |
line = next(self._fp) | |
if val not in line: | |
send(line) | |
else: | |
self._fp.pushback(line) | |
break | |
elif kw == 'regex': | |
while True: | |
line = next(self._fp) | |
if not re.search(val, line): | |
send(line) | |
else: | |
self._fp.pushback(line) | |
break | |
def take(self, **kwargs): | |
self._action('take', kwargs) | |
def drop(self, **kwargs): | |
self._action('drop', kwargs) | |
def main(): | |
print("<<<<<<<<<<<<<<<<<RESTART>>>>>>>>>>>>>>>>>>>>") | |
a = io.StringIO(TEXT_A) | |
b = io.StringIO(TEXT_B) | |
with Processor(sys.stdout) as p: | |
p.switch(a) | |
p.take(count=4) | |
p.drop(count=3) | |
p.take(count=2) | |
p.switch(b) | |
p.take(regex=r"each") | |
p.take(count=1) | |
p.switch(a) | |
p.drop(lineno=14) | |
p.take(lineno=17) | |
p.switch(b) | |
p.drop(regex=r"^Till") | |
p.take(count=3) | |
p.switch(a) | |
p.drop(string="et.") | |
p.take(regex=r"offi[^\s]{4}") | |
p.take(count=1) | |
TEXT_A = """\ | |
Et sed fuga ullam et nostrum. Quas enim hic harum. | |
Vel eum est ab ea mollitia. A provident deserunt | |
necessitatibus nisi totam omnis exercitationem iure. | |
Recusandae sed enim eius laborum. Alias fugit corrupti | |
minus quo repellendus iure maxime. | |
Et totam adipisci incidunt cum deserunt totam. Quia | |
laudantium non laboriosam accusamus in ut autem laudantium. | |
Eos exea quibusdam dolorem omnis eaque ex. Voluptate | |
in inventore architecto cumque. | |
Neque nostrum nemo inventore. Odit nihil voluptatem | |
necessitatibus ut consequatur hic sint. Provident | |
ea fugiat ex libero beatae praesentium. Ea sapiente | |
perferendis maiores quisquam veniam quibusdam nam. | |
Voluptatem distinctio ut id reiciendis qui enim | |
aut. Qui tempora id ut dolor aut voluptatibus et. Vel | |
dolores dolores eos officiis repudiandae. Similique | |
fugit totam aut numquam necessitatibus officiis ducimus | |
alias. | |
Dicta deleniti veritatis et sapiente nobis et blanditiis iste. | |
In officiis ea nulla nemo saepe sequi nostrum suscipit. Sit nobis | |
rerum consequatur non alias quidem. | |
""" | |
TEXT_B = """\ | |
The stream is shrunk -- the pool is dry, | |
And we be comrades, thou and I; | |
With fevered jowl and dusty flank | |
Each jostling each along the bank; | |
And, by one drouthy fear made still, | |
Forgoing thought of quest or kill. | |
Now 'neath his dam the fawn may see, | |
The lean Pack-wolf as cowed as he, | |
And the tall buck, unflinching, note | |
The fangs that tore his father's throat. | |
The pools are shrunk -- the streams are dry, | |
And we be playmates, thou and I, | |
Till yonder cloud -- Good Hunting! -- loose | |
The rain that breaks our Water Truce. | |
How Fear Came. | |
What of the hunting, hunter bold? | |
Brother, the watch was long and cold. | |
What of the quarry ye went to kill? | |
Brother, he crops in the jungle still. | |
Where is the power that made your pride? | |
Brother, it ebbs from my flank and side. | |
Where is the haste that ye hurry by? | |
Brother, I go to my lair to die! | |
"Tiger-Tiger!" | |
""" | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment