Skip to content

Instantly share code, notes, and snippets.

@VigneshChennai
Last active September 24, 2018 09:27
Show Gist options
  • Save VigneshChennai/05acd7ad00b2d6de7899a01185ec96f5 to your computer and use it in GitHub Desktop.
Save VigneshChennai/05acd7ad00b2d6de7899a01185ec96f5 to your computer and use it in GitHub Desktop.
Python script to tail (like the posix tail command) a csv file which has records which span multiple lines
#!/usr/bin/env python
# tailcsv.py is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# tailcsv.py is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with tailcsv.py. If not, see <http://www.gnu.org/licenses/>.
import argparse
import sys
class LastNItem:
sentinal = object()
def __init__(self, n):
self.n = n
self.items = [self.sentinal] * n
self.i = -1
def add(self, item):
self.i += 1
self.i %= self.n
self.items[self.i] = item
def getall(self):
if self.i == -1:
return []
for i in range(self.i + 1, self.i + self.n + 1):
j = i % self.n
if self.items[j] is self.sentinal:
continue
else:
yield self.items[j]
def main():
parser = argparse.ArgumentParser(description='Tail a csv file which'
'has records which span multiple lines')
parser.add_argument('-n', '--lines', type=int,
help='number of lines to show', required=True)
parser.add_argument('-q', '--quotechar',
help='quote character used for field', default="\"")
parser.add_argument('-ih', '--include-headers', action="store_true", default=False,
help='include headers')
args = parser.parse_args()
record = []
inside_quote = False
last_n_items = LastNItem(args.lines)
first_line = args.include_headers
parsed = bytearray()
for line in sys.stdin.buffer.readlines():
for c in line:
if c == ord(args.quotechar):
inside_quote = not inside_quote
parsed.append(c)
elif c == ord("\n") and not inside_quote:
parsed.append(c)
record.append(bytes(parsed))
if first_line:
first_line = False
sys.stdout.buffer.writelines(record)
else:
last_n_items.add(record)
record = []
parsed = bytearray()
else:
parsed.append(c)
if len(parsed) > 0:
record.append(bytes(parsed))
if len(record) > 0:
last_n_items.add(record)
for line in last_n_items.getall():
sys.stdout.buffer.writelines(line)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment