Skip to content

Instantly share code, notes, and snippets.

@seblin
Last active December 17, 2018 12:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seblin/be07bb481258b115192180c05dcc4100 to your computer and use it in GitHub Desktop.
Save seblin/be07bb481258b115192180c05dcc4100 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2013-2018 Sebastian Linke
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__author__ = 'Sebastian Linke'
__version__ = '0.4-dev'
__license__ = 'MIT'
import fnmatch
import locale
import os
import re
from collections.abc import Mapping
from functools import partial, singledispatch
from io import IOBase
from itertools import chain, zip_longest
from shutil import get_terminal_size
from sys import version_info
if version_info[:2] < (3, 6):
raise RuntimeError('Need Python version >= 3.6')
DEFAULT_CONFIG = {
'spacing': 2,
'spacer': None,
'width': None,
'column_widths': None,
'num_columns': None,
'vertical': True,
'align': 'left',
'wrap': False,
'min_shrink_width': 5,
'sort': False,
'locale': None,
'pattern': None,
'unique': False,
'index': None,
'output_stream': None}
class ConfigHelper(dict):
spec_templates = {
'left': {'default': '{{:<{0}.{0}}}', 'end': '{{:<.{0}}}'},
'center': {'default': '{{:^{0}.{0}}}', 'end': '{{:^.{0}}}'},
'right': {'default': '{{:>{0}.{0}}}', 'end': '{{:>.{0}}}'}}
def __init__(self, config=None, **config_args):
dict.__init__(self, config or DEFAULT_CONFIG)
if config_args:
self._check_keywords(config_args)
self.update(config_args)
def _check_keywords(self, keywords):
for kw in keywords:
if kw not in DEFAULT_CONFIG:
raise TypeError(f'Unexpected keyword: {kw!r}')
def copy(self):
return type(self)(dict.copy(self))
def get_spec(self, width, style='default'):
align = self['align']
if align not in self.spec_templates:
raise ValueError(f'Unexpected alignment: {align!r}')
return self.spec_templates[align][style].format(width)
def get_column_widths(self, items):
widths = self['column_widths']
if not widths:
return list(map(len, items))
return widths[:len(items)]
def get_max_width(self):
return self['width'] or get_terminal_size().columns
def get_spacer(self):
return self['spacer'] or self['spacing'] * ' '
class StringSeries:
def __init__(self, items, config=None):
if not items:
raise ValueError('Need at least one item')
self.items = items
self.config = ConfigHelper(config)
def __iter__(self):
return iter(self.items)
def __str__(self):
try:
return self.get_formatted()
except NotImplementedError:
return str(self.items)
def _raise_not_implemented(self, text):
name = type(self).__name__
raise NotImplementedError(
f'{type(self).__name__} does not implement {text}')
def get_format_string(self):
self._raise_not_implemented('get_format_string()')
def get_formatted(self):
return self.get_format_string().format(*self.items)
def get_width(self):
self._raise_not_implemented('get_width()')
@classmethod
def convert_from(cls, objects, config=None):
items = list(map(str, objects))
return cls(items, config)
class Column(StringSeries):
def get_format_string(self):
spec = self.config.get_spec(self.get_width(), 'end')
return '\n'.join(spec for _ in self.items)
def get_width(self):
return max(map(len, self.items))
class Row(StringSeries):
def get_format_string(self):
widths = self.config.get_column_widths(self.items)
specs = list(map(self.config.get_spec, widths[:-1]))
specs.append(self.config.get_spec(widths[-1], 'end'))
return self.config.get_spacer().join(specs)
def get_width(self):
widths = self.config.get_column_widths(self.items)
last_width = min(len(self.items[-1]), widths[-1])
total_spacing = (len(widths) - 1) * len(self.config.get_spacer())
return sum(widths[:-1]) + last_width + total_spacing
class ItemWrapper:
def __init__(self, min_width):
self.min_width = min_width
def _shrink(self, width, offset):
if width <= self.min_width:
return (width, offset)
shrinked_width = max(width - offset, self.min_width)
shrinked_offset = offset - (width - shrinked_width)
return (shrinked_width, shrinked_offset)
def get_shrinked_widths(self, widths, offset):
shrinked_widths = []
while offset > 0 and widths:
width, offset = self._shrink(widths.pop(), offset)
shrinked_widths.append(width)
return widths + shrinked_widths[::-1]
def wrap_items(self, items, widths):
while any(items):
yield [item[:width] for item, width in zip(items, widths)]
items = [item[width:] for item, width in zip(items, widths)]
class StringColumnizer(StringSeries):
def _wrap(self, rows):
wrapper = ItemWrapper(self.config['min_shrink_width'])
widths = rows[0].config['column_widths']
offset = max(r.get_width() for r in rows) - self.config.get_max_width()
shrinked_widths = wrapper.get_shrinked_widths(widths, offset)
config = ConfigHelper(self.config, column_widths=shrinked_widths)
wrap = partial(wrapper.wrap_items, widths=shrinked_widths)
return [Row(items, config) for row in rows
for items in wrap(row.items)]
def _make_columns(self, ncols, strict=True):
col_size, remaining = divmod(len(self.items), ncols)
if remaining:
col_size += 1
col_items = [
self.items[i * col_size : (i + 1) * col_size] for i in range(ncols)
] if self.config['vertical'] else [
self.items[i::ncols] for i in range(ncols)]
if not col_items[-1]:
if strict:
raise ValueError(f'Items do not fit in {ncols} columns')
return self._make_columns(ncols - 1, strict=False)
config = self.config.copy()
return [Column(items, config) for items in col_items]
def _make_rows(self, ncols, strict=True):
columns = self._make_columns(ncols, strict)
config = self.config.copy()
if not config['column_widths']:
config['column_widths'] = [col.get_width() for col in columns]
rows = [Row(list(filter(None, row_items)), config)
for row_items in zip_longest(*columns)]
return self._wrap(rows) if self.config['wrap'] else rows
def _guess_rows(self):
max_width = self.config.get_max_width()
ncols = min(len(self.items), max_width)
while ncols > 0:
rows = self._make_rows(ncols, strict=False)
if all(row.get_width() <= max_width for row in rows):
return rows
ncols = len(rows[0].config['column_widths']) - 1
def make_rows(self):
num_columns = self.config['num_columns']
column_widths = self.config['column_widths']
if num_columns and column_widths:
raise ValueError('num_columns and column_widths '
'must not defined concurrently')
ncols = num_columns or len(column_widths or [])
return self._make_rows(ncols) if ncols else self._guess_rows()
def get_lines(self, rows):
formatter = rows[0].get_format_string().format
for row in rows:
try:
yield formatter(*row.items)
except IndexError:
# Number of items changed, update the formatter
formatter = row.get_format_string().format
yield formatter(*row.items)
def _join_tty_lines(self, lines):
line_width = self.config.get_max_width()
return ''.join(
line if len(line) == line_width else line + '\n' for line in lines)
def get_formatted(self):
lines = self.get_lines(self.make_rows())
stream = self.config['output_stream']
if stream and not isatty(stream):
return '\n'.join(lines)
return self._join_tty_lines(lines)
@classmethod
def from_mapping(cls, mapping, config=None):
items = chain(mapping.keys(), mapping.values())
config['num_columns'] = 2
config['wrap'] = True
return cls.convert_from(items, config)
def isatty(stream):
if not hasattr(stream, 'fileno'):
return False
return os.isatty(stream.fileno())
@singledispatch
def use_index(source, index):
try:
return (line.split()[index] for line in source)
except IndexError:
raise IndexError('Column index out of range') from None
@use_index.register(Mapping)
def use_index_mapping(source, index):
raise NotImplementedError('Using index not supported for mappings')
@singledispatch
def make_unique(items):
seen = set()
for item in items:
if item not in seen:
seen.add(item)
yield item
@make_unique.register(Mapping)
def make_unique_mapping(items):
raise NotImplementedError('Mapping items are already unique')
@singledispatch
def match(items, pattern):
pattern = re.compile(fnmatch.translate(pattern), re.IGNORECASE)
return (item for item in items if pattern.match(item))
@match.register(Mapping)
def match_mapping(items, pattern):
return {key: items[key] for key in match(items.keys(), pattern)}
@singledispatch
def sort(items, loc=None):
old_locale = locale.getlocale(locale.LC_COLLATE)
locale.setlocale(locale.LC_COLLATE, loc or '')
try:
return sorted(items, key=locale.strxfrm)
except TypeError:
# Fallback for non-string items
return sorted(items)
finally:
locale.setlocale(locale.LC_COLLATE, old_locale)
@sort.register(Mapping)
def sort_mapping(items, loc=None):
# Python >= 3.6 keeps insertion order
return {key: items[key] for key in sort(items.keys(), loc)}
def apply_config(items, config):
if config['index'] is not None:
items = use_index(items, config['index'])
if config['unique']:
items = make_unique(items)
if config['pattern']:
items = match(items, config['pattern'])
if config['sort']:
items = sort(items, config['locale'])
return items
@singledispatch
def get_columnizer(items, config=None):
return StringColumnizer.convert_from(items, config)
@get_columnizer.register(Mapping)
def get_columnizer_mapping(items, config=None):
return StringColumnizer.from_mapping(items, config)
@singledispatch
def print_columnized(items, **config_args):
config = ConfigHelper(**config_args)
items = apply_config(items, config)
columnizer = get_columnizer(items, config)
print(columnizer, end='', file=config['output_stream'])
@print_columnized.register(IOBase)
def print_columnized_stream(source, **config_args):
items = (line.rstrip() for line in source)
return print_columnized(items, **config_args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment