Skip to content

Instantly share code, notes, and snippets.

@amitsaha
Last active Mar 13, 2022
Embed
What would you like to do?
Simple implementation of the tail command in Python
'''
Basic tail command implementation
Usage:
tail.py filename numlines
'''
import sys
import linecache
if len(sys.argv) !=3:
print 'Usage: tail.py <file> <nlines>'
sys.exit(1)
# filename and number of lines requested
fname, nlines = sys.argv[1:]
nlines = int(nlines)
# count the total number of lines
tot_lines = len(open(fname).readlines())
# use line cache module to read the lines
for i in range(tot_lines - nlines + 1, tot_lines+1):
print linecache.getline(sys.argv[1],i),
""" This is a more efficient version, since it does not read the entire
file
"""
import sys
import os
bufsize = 8192
lines = int(sys.argv[1])
fname = sys.argv[2]
fsize = os.stat(fname).st_size
iter = 0
with open(sys.argv[2]) as f:
if bufsize > fsize:
bufsize = fsize-1
data = []
while True:
iter +=1
f.seek(fsize-bufsize*iter)
data.extend(f.readlines())
if len(data) >= lines or f.tell() == 0:
print(''.join(data[-lines:]))
break
@blakev
Copy link

blakev commented May 15, 2020

Python 3.8 asyncio version,

#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# >>
#   Blake VandeMerwe, LiveViewTech
# <<

import os
import io
import asyncio
from functools import partial
from typing import AsyncIterator

LINE_BUFFER = 1

async def tail(
    filename: str,
    last_lines: int = 10,
    non_exist_max_secs: float = 30.0,
    fp_poll_secs: float = 0.125
) -> AsyncIterator[str]:
    """Continuously tail a file pointer yielding one line at a time."""

    async def wait_exists() -> bool:
        """Wait for a file to exist, the return statement reflects
        whether or not the file existed when the timeout limits were reached."""
        bail_at: float = time.monotonic() + non_exist_max_secs
        while not os.path.exists(filename):
            if time.monotonic() >= bail_at:
                return False
            await asyncio.sleep(fp_poll_secs)
        return True

    async def check_rotate(_fp) -> io.TextIOBase:
        """Determine if the file rotated in place; same name different inode."""
        nonlocal fino
        if os.stat(filename).st_ino != fino:
            new_fp = open(filename, 'r')
            _fp.close()
            new_fp.seek(0, os.SEEK_SET)
            fino = os.fstat(new_fp.fileno()).st_ino
            return new_fp
        return _fp

    # ~~
    if not await wait_exists():
        return

    buff = io.StringIO()
    stat = os.stat(filename)

    fino: int = stat.st_ino
    size: int = stat.st_size
    blocksize: int = os.statvfs(filename).f_bsize

    fp = open(filename, 'r', LINE_BUFFER)

    if last_lines > 0:
        if stat.st_size <= blocksize:
            # if the file is smaller than 8kb, read all the lines
            for line in fp.readlines()[-last_lines::]:
                yield line.rstrip()
        else:
            # if the file is larger than 8kb, seek 8kb from the end
            #  and return all the lines except the (potential) half-line
            # first element and the null-terminated extra line at the end.
            fp.seek(os.stat(fp.fileno()).st_size - blocksize)
            for line in fp.readlines()[1:-1][-last_lines::]:
                yield line.rstrip()

    # seek to the end of the file for tailing
    #  given the above operations we should already be there.
    fp.seek(0, os.SEEK_END)

    try:
        while True:
            # wait for the file to exist -- generously
            if not os.path.exists(filename):
                if not await wait_exists():
                    return

            fp = await check_rotate(fp)
            n_stat = os.fstat(fp.fileno())
            n_size = n_stat.st_size

            # if the file is the same size, churn
            #  .. this could be error-prone on small files that
            # rotate VERY fast, but that's an edge case for
            #  tailing a persistent log file.
            if n_size == size:
                await asyncio.sleep(fp_poll_secs)
                continue

            # if the file shrank, seek to the beginning
            if n_size < size:
                fp.seek(0, os.SEEK_SET)

            size = n_size
            for chunk in iter(partial(fp.read, blocksize), ''):
                buff.write(chunk)

            buff.seek(0, os.SEEK_SET)

            for line in buff.readlines():
                yield line.rstrip()

            # resize our string buffer
            buff.truncate(0)

    except IOError:
        buff.close()
        fp.close()


if __name__ == '__main__':

    async def main():
        async for line in tail(r'/etc/foldingathome/log.txt'):
            print(line)

    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    except KeyboardInterrupt:
        pass
    loop.stop()
    loop.close()

@therumbler
Copy link

therumbler commented May 22, 2020

This is great, but you don't need the # -*- coding: utf-8 -*- line in Python 3

Python 3.8 asyncio version,

#! /usr/bin/env python
# -*- coding: utf-8 -*-
...

@blakev
Copy link

blakev commented May 22, 2020

@therumbler TIL, thanks! I've been using the same "new file" template for years. Time to update!

@blasti
Copy link

blasti commented Mar 13, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment