Skip to content

Instantly share code, notes, and snippets.

@amitsaha
Last active Mar 13, 2022
Embed
What would you like to do?
Simple implementation of the tail command in Python
'''
Basic tail command implementation
Usage:
tail.py filename numlines
'''
import sys
import linecache
if len(sys.argv) !=3:
print 'Usage: tail.py <file> <nlines>'
sys.exit(1)
# filename and number of lines requested
fname, nlines = sys.argv[1:]
nlines = int(nlines)
# count the total number of lines
tot_lines = len(open(fname).readlines())
# use line cache module to read the lines
for i in range(tot_lines - nlines + 1, tot_lines+1):
print linecache.getline(sys.argv[1],i),
""" This is a more efficient version, since it does not read the entire
file
"""
import sys
import os
bufsize = 8192
lines = int(sys.argv[1])
fname = sys.argv[2]
fsize = os.stat(fname).st_size
iter = 0
with open(sys.argv[2]) as f:
if bufsize > fsize:
bufsize = fsize-1
data = []
while True:
iter +=1
f.seek(fsize-bufsize*iter)
data.extend(f.readlines())
if len(data) >= lines or f.tell() == 0:
print(''.join(data[-lines:]))
break
@Kentzo
Copy link

Kentzo commented Feb 23, 2016

And yet another way of doing this: tailhead and pytailer.

@mikewen
Copy link

mikewen commented Feb 29, 2016

use deque:
from collections import deque
print deque(open(filename), nLines)

@rodmur
Copy link

rodmur commented Apr 7, 2017

For what it's worth, I adapted some code from here, basically just using seek() and read() one at time to read backwards and count newlines, that way you don't need a buffer.

#!/usr/bin/python3

import os,sys

def tail_file(filename, nlines):
    with open(filename) as qfile:
        qfile.seek(0, os.SEEK_END)
        endf = position = qfile.tell()
        linecnt = 0
        while position >= 0:
            qfile.seek(position)
            next_char = qfile.read(1)
            if next_char == "\n" and position != endf-1:
                linecnt += 1

            if linecnt == nlines:
                break
            position -= 1

        if position < 0:
            qfile.seek(0)

        print(qfile.read(),end='')


if __name__ == '__main__':
    filename = sys.argv[1]
    nlines = int(sys.argv[2])
    tail_file(filename, nlines)

@Sairamakrishna-Bhalla
Copy link

Sairamakrishna-Bhalla commented Dec 27, 2017

I tried the second implementation with a ~1.7 GB file and works like a magic whereas the first one fails.

GREAT

@RufusVS
Copy link

RufusVS commented Mar 29, 2018

I'm an oldie, and the thought of reading a sequential file, one character at a time, backwards, from the end, just makes me shudder.

@RufusVS
Copy link

RufusVS commented Mar 29, 2018

I've done more reading this thread, and it appears to me there are problems with all the implementations. The very first one ends up reading the file twice, to no point. It reads the file to count the lines, you can just print from that read buffer, instead of just throwing it away and using "linecache" whatever that is. You could simply use:


file_lines = open(fname).readlines()
# count the total number of lines
tot_lines = len(file_lines)

print '\n'.join(file_lines[-(tot_lines if tot_lines < nlines else nlines):])

@RufusVS
Copy link

RufusVS commented Mar 29, 2018

Also, the one with the "bufsize" moves back in the file by "bufsize" increments, but actually ends up extending the data by the entire file each jump back! To see the problems, use a line numbered file, and set a small buffsize.

Copy link

ghost commented May 30, 2019

Quick quest for all: Does anyone know whether the solution is for Version 2.7 or for Version 3 ? Let me know when you read this post please. Thank you -

@abraker95
Copy link

abraker95 commented Jul 30, 2019

Second implementation breaks for empty files, causing infinite looping. Solved by putting if fsize < 1: return '' before opening file

@yeukhon
Copy link

yeukhon commented Sep 5, 2019

Implementation of tail -n k. This uses offset and doesn't read the whole line. Imagine the line is 10GB large...

def tail(filename, n):
    stat = os.stat(filename)
    if stat.st_size == 0 or n == 0:
        yield ''
        return

    page_size = 5
    offsets = []
    count = _n = n if n >= 0 else -n

    last_byte_read = last_nl_byte = starting_offset = stat.st_size - 1

    with open(filename, 'r') as f:
        while count > 0:
            starting_byte = last_byte_read - page_size
            if last_byte_read == 0:
                offsets.append(0)
                break
            elif starting_byte < 0:
                f.seek(0)
                text = f.read(last_byte_read)
            else:
                f.seek(starting_byte)
                text = f.read(page_size)

            for i in range(-1, -1*len(text)-1, -1):
                last_byte_read -= 1
                if text[i] == '\n':
                    last_nl_byte = last_byte_read
                    starting_offset = last_nl_byte + 1
                    offsets.append(starting_offset)
            count -= 1

    offsets = offsets[len(offsets)-_n:]
    offsets.reverse()

    with open(filename, 'r') as f:
        for i, offset in enumerate(offsets):
            f.seek(offset)

            if i == len(offsets) - 1:
                yield f.read()
            else:
                bytes_to_read = offsets[i+1] - offset
                yield f.read(bytes_to_read)

filename = '/tmp/test.txt'
for x in tail(filename, 10):
    print(x.strip())

@melvilgit
Copy link

melvilgit commented Apr 11, 2020

@amit, There is a small flaw here. If bufsizeiter returns pointer on the first line in the N lines to be printed, then there is a possibility that first item in data[-lines:] can start from anywhere between first and last line of that line ?
Say if line is " John Johny yes pappa" , data[-lines:][0] can be "ny yes pappa" since f.seek(fsize-bufsize
iter) can return anywhere ?
Changing if len(data) >= lines or f.tell() == 0: to if len(data) > lines or f.tell() == 0: Should fix the issue

@blakev
Copy link

blakev commented May 15, 2020

Python 3.8 asyncio version,

#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# >>
#   Blake VandeMerwe, LiveViewTech
# <<

import os
import io
import asyncio
from functools import partial
from typing import AsyncIterator

LINE_BUFFER = 1

async def tail(
    filename: str,
    last_lines: int = 10,
    non_exist_max_secs: float = 30.0,
    fp_poll_secs: float = 0.125
) -> AsyncIterator[str]:
    """Continuously tail a file pointer yielding one line at a time."""

    async def wait_exists() -> bool:
        """Wait for a file to exist, the return statement reflects
        whether or not the file existed when the timeout limits were reached."""
        bail_at: float = time.monotonic() + non_exist_max_secs
        while not os.path.exists(filename):
            if time.monotonic() >= bail_at:
                return False
            await asyncio.sleep(fp_poll_secs)
        return True

    async def check_rotate(_fp) -> io.TextIOBase:
        """Determine if the file rotated in place; same name different inode."""
        nonlocal fino
        if os.stat(filename).st_ino != fino:
            new_fp = open(filename, 'r')
            _fp.close()
            new_fp.seek(0, os.SEEK_SET)
            fino = os.fstat(new_fp.fileno()).st_ino
            return new_fp
        return _fp

    # ~~
    if not await wait_exists():
        return

    buff = io.StringIO()
    stat = os.stat(filename)

    fino: int = stat.st_ino
    size: int = stat.st_size
    blocksize: int = os.statvfs(filename).f_bsize

    fp = open(filename, 'r', LINE_BUFFER)

    if last_lines > 0:
        if stat.st_size <= blocksize:
            # if the file is smaller than 8kb, read all the lines
            for line in fp.readlines()[-last_lines::]:
                yield line.rstrip()
        else:
            # if the file is larger than 8kb, seek 8kb from the end
            #  and return all the lines except the (potential) half-line
            # first element and the null-terminated extra line at the end.
            fp.seek(os.stat(fp.fileno()).st_size - blocksize)
            for line in fp.readlines()[1:-1][-last_lines::]:
                yield line.rstrip()

    # seek to the end of the file for tailing
    #  given the above operations we should already be there.
    fp.seek(0, os.SEEK_END)

    try:
        while True:
            # wait for the file to exist -- generously
            if not os.path.exists(filename):
                if not await wait_exists():
                    return

            fp = await check_rotate(fp)
            n_stat = os.fstat(fp.fileno())
            n_size = n_stat.st_size

            # if the file is the same size, churn
            #  .. this could be error-prone on small files that
            # rotate VERY fast, but that's an edge case for
            #  tailing a persistent log file.
            if n_size == size:
                await asyncio.sleep(fp_poll_secs)
                continue

            # if the file shrank, seek to the beginning
            if n_size < size:
                fp.seek(0, os.SEEK_SET)

            size = n_size
            for chunk in iter(partial(fp.read, blocksize), ''):
                buff.write(chunk)

            buff.seek(0, os.SEEK_SET)

            for line in buff.readlines():
                yield line.rstrip()

            # resize our string buffer
            buff.truncate(0)

    except IOError:
        buff.close()
        fp.close()


if __name__ == '__main__':

    async def main():
        async for line in tail(r'/etc/foldingathome/log.txt'):
            print(line)

    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    except KeyboardInterrupt:
        pass
    loop.stop()
    loop.close()

@therumbler
Copy link

therumbler commented May 22, 2020

This is great, but you don't need the # -*- coding: utf-8 -*- line in Python 3

Python 3.8 asyncio version,

#! /usr/bin/env python
# -*- coding: utf-8 -*-
...

@blakev
Copy link

blakev commented May 22, 2020

@therumbler TIL, thanks! I've been using the same "new file" template for years. Time to update!

@blasti
Copy link

blasti commented Mar 13, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment