Skip to content

Instantly share code, notes, and snippets.

@amitsaha
Last active Mar 13, 2022
Embed
What would you like to do?
Simple implementation of the tail command in Python
'''
Basic tail command implementation
Usage:
tail.py filename numlines
'''
import sys
import linecache
if len(sys.argv) !=3:
print 'Usage: tail.py <file> <nlines>'
sys.exit(1)
# filename and number of lines requested
fname, nlines = sys.argv[1:]
nlines = int(nlines)
# count the total number of lines
tot_lines = len(open(fname).readlines())
# use line cache module to read the lines
for i in range(tot_lines - nlines + 1, tot_lines+1):
print linecache.getline(sys.argv[1],i),
""" This is a more efficient version, since it does not read the entire
file
"""
import sys
import os
bufsize = 8192
lines = int(sys.argv[1])
fname = sys.argv[2]
fsize = os.stat(fname).st_size
iter = 0
with open(sys.argv[2]) as f:
if bufsize > fsize:
bufsize = fsize-1
data = []
while True:
iter +=1
f.seek(fsize-bufsize*iter)
data.extend(f.readlines())
if len(data) >= lines or f.tell() == 0:
print(''.join(data[-lines:]))
break
Copy link

ghost commented May 30, 2019

Quick quest for all: Does anyone know whether the solution is for Version 2.7 or for Version 3 ? Let me know when you read this post please. Thank you -

@abraker95
Copy link

abraker95 commented Jul 30, 2019

Second implementation breaks for empty files, causing infinite looping. Solved by putting if fsize < 1: return '' before opening file

@yeukhon
Copy link

yeukhon commented Sep 5, 2019

Implementation of tail -n k. This uses offset and doesn't read the whole line. Imagine the line is 10GB large...

def tail(filename, n):
    stat = os.stat(filename)
    if stat.st_size == 0 or n == 0:
        yield ''
        return

    page_size = 5
    offsets = []
    count = _n = n if n >= 0 else -n

    last_byte_read = last_nl_byte = starting_offset = stat.st_size - 1

    with open(filename, 'r') as f:
        while count > 0:
            starting_byte = last_byte_read - page_size
            if last_byte_read == 0:
                offsets.append(0)
                break
            elif starting_byte < 0:
                f.seek(0)
                text = f.read(last_byte_read)
            else:
                f.seek(starting_byte)
                text = f.read(page_size)

            for i in range(-1, -1*len(text)-1, -1):
                last_byte_read -= 1
                if text[i] == '\n':
                    last_nl_byte = last_byte_read
                    starting_offset = last_nl_byte + 1
                    offsets.append(starting_offset)
            count -= 1

    offsets = offsets[len(offsets)-_n:]
    offsets.reverse()

    with open(filename, 'r') as f:
        for i, offset in enumerate(offsets):
            f.seek(offset)

            if i == len(offsets) - 1:
                yield f.read()
            else:
                bytes_to_read = offsets[i+1] - offset
                yield f.read(bytes_to_read)

filename = '/tmp/test.txt'
for x in tail(filename, 10):
    print(x.strip())

@melvilgit
Copy link

melvilgit commented Apr 11, 2020

@amit, There is a small flaw here. If bufsizeiter returns pointer on the first line in the N lines to be printed, then there is a possibility that first item in data[-lines:] can start from anywhere between first and last line of that line ?
Say if line is " John Johny yes pappa" , data[-lines:][0] can be "ny yes pappa" since f.seek(fsize-bufsize
iter) can return anywhere ?
Changing if len(data) >= lines or f.tell() == 0: to if len(data) > lines or f.tell() == 0: Should fix the issue

@blakev
Copy link

blakev commented May 15, 2020

Python 3.8 asyncio version,

#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# >>
#   Blake VandeMerwe, LiveViewTech
# <<

import os
import io
import asyncio
from functools import partial
from typing import AsyncIterator

LINE_BUFFER = 1

async def tail(
    filename: str,
    last_lines: int = 10,
    non_exist_max_secs: float = 30.0,
    fp_poll_secs: float = 0.125
) -> AsyncIterator[str]:
    """Continuously tail a file pointer yielding one line at a time."""

    async def wait_exists() -> bool:
        """Wait for a file to exist, the return statement reflects
        whether or not the file existed when the timeout limits were reached."""
        bail_at: float = time.monotonic() + non_exist_max_secs
        while not os.path.exists(filename):
            if time.monotonic() >= bail_at:
                return False
            await asyncio.sleep(fp_poll_secs)
        return True

    async def check_rotate(_fp) -> io.TextIOBase:
        """Determine if the file rotated in place; same name different inode."""
        nonlocal fino
        if os.stat(filename).st_ino != fino:
            new_fp = open(filename, 'r')
            _fp.close()
            new_fp.seek(0, os.SEEK_SET)
            fino = os.fstat(new_fp.fileno()).st_ino
            return new_fp
        return _fp

    # ~~
    if not await wait_exists():
        return

    buff = io.StringIO()
    stat = os.stat(filename)

    fino: int = stat.st_ino
    size: int = stat.st_size
    blocksize: int = os.statvfs(filename).f_bsize

    fp = open(filename, 'r', LINE_BUFFER)

    if last_lines > 0:
        if stat.st_size <= blocksize:
            # if the file is smaller than 8kb, read all the lines
            for line in fp.readlines()[-last_lines::]:
                yield line.rstrip()
        else:
            # if the file is larger than 8kb, seek 8kb from the end
            #  and return all the lines except the (potential) half-line
            # first element and the null-terminated extra line at the end.
            fp.seek(os.stat(fp.fileno()).st_size - blocksize)
            for line in fp.readlines()[1:-1][-last_lines::]:
                yield line.rstrip()

    # seek to the end of the file for tailing
    #  given the above operations we should already be there.
    fp.seek(0, os.SEEK_END)

    try:
        while True:
            # wait for the file to exist -- generously
            if not os.path.exists(filename):
                if not await wait_exists():
                    return

            fp = await check_rotate(fp)
            n_stat = os.fstat(fp.fileno())
            n_size = n_stat.st_size

            # if the file is the same size, churn
            #  .. this could be error-prone on small files that
            # rotate VERY fast, but that's an edge case for
            #  tailing a persistent log file.
            if n_size == size:
                await asyncio.sleep(fp_poll_secs)
                continue

            # if the file shrank, seek to the beginning
            if n_size < size:
                fp.seek(0, os.SEEK_SET)

            size = n_size
            for chunk in iter(partial(fp.read, blocksize), ''):
                buff.write(chunk)

            buff.seek(0, os.SEEK_SET)

            for line in buff.readlines():
                yield line.rstrip()

            # resize our string buffer
            buff.truncate(0)

    except IOError:
        buff.close()
        fp.close()


if __name__ == '__main__':

    async def main():
        async for line in tail(r'/etc/foldingathome/log.txt'):
            print(line)

    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    except KeyboardInterrupt:
        pass
    loop.stop()
    loop.close()

@therumbler
Copy link

therumbler commented May 22, 2020

This is great, but you don't need the # -*- coding: utf-8 -*- line in Python 3

Python 3.8 asyncio version,

#! /usr/bin/env python
# -*- coding: utf-8 -*-
...

@blakev
Copy link

blakev commented May 22, 2020

@therumbler TIL, thanks! I've been using the same "new file" template for years. Time to update!

@blasti
Copy link

blasti commented Mar 13, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment