|
# Get the file name without extension: |
|
import os |
|
print os.path.splitext("path_to_file")[0] |
|
|
|
#Convert String to unicode: |
|
"".join(["\\x%02x" % ord(i) for i in my_code]) |
|
|
|
#Get File Directory from file path: |
|
"/".join(file_name.split("/")[0:-1]) |
|
|
|
# Get File Name from file path |
|
file_name.split("/")[-1].split(".")[0] |
|
|
|
#Python Logging: |
|
import logging |
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
# Python Argument Parsing |
|
import argparse |
|
parser = argparse.ArgumentParser(description='PyFlow: Debugger') |
|
parser.add_argument('-f', '--file', help='Input File for debugging', required=True) |
|
parser.add_argument('-e', help="Run Flask", default=False, action="store_true") |
|
args = vars(parser.parse_args()) |
|
|
|
# Check if file exists: |
|
os.path.isfile(fname) |
|
|
|
# Sorting - Descending order |
|
a= [1,5,3,6,8,3,4,5,7] |
|
a.sort(reverse=True) |
|
|
|
# Check common items between two lists |
|
from collections import Counter |
|
A = [2,4,4,5,6,7,8,9] |
|
B = [1,3,4,5,4,3,5,6] |
|
print(len(list((Counter(A) & Counter(B)).elements()))) |
|
|
|
# Modify each element in a list |
|
list_mod = [x.__add__(1) for x in list_mod] |
|
|
|
# Open webpage in default browser |
|
import webbrowser |
|
webbrowser.open(url) |
|
|
|
# Function Overloading - Single Dispatch |
|
from functools import singledispatch |
|
|
|
class abc(object): |
|
|
|
@singledispatch |
|
def __init__(self, arg, verbose=False): |
|
if verbose: |
|
print("Let me just say,", end=" ") |
|
print(arg) |
|
pass |
|
|
|
@__init__.register(int) |
|
def _(self, arg, verbose=False): |
|
if verbose: |
|
print("Strength in numbers, eh?", end=" ") |
|
print(arg) |
|
|
|
@__init__.register(list) |
|
def _(self, arg, verbose=False): |
|
if verbose: |
|
print("Enumerate this:") |
|
for i, elem in enumerate(arg): |
|
print(i, elem) |
|
|
|
|
|
abc('abc', verbose=True) |
|
|
|
# Adding two sets |
|
a = {0,1,1,1,2,3} |
|
b = {1,2,3,4} |
|
|
|
print(a | b) |
|
|
|
# Jinja 2 Template Render |
|
from jinja2 import Template |
|
template = Template('Hello {{ name }}!') |
|
print(template.render(name='John Doe')) |
|
|
|
# Removing None values from a dictionary |
|
filtered = {k: v for k, v in original.items() if v is not None} |
|
original.clear() |
|
original.update(filtered) |
|
|
|
# Accessing Dictionary |
|
d = { |
|
'k1': 'v1', |
|
'k2': 'v2' |
|
} |
|
|
|
# Accessing keys |
|
for k in d: |
|
print(k) |
|
# Accessing values |
|
for v in d.values(): |
|
print(v) |
|
|
|
# Accessing key with values |
|
for k, v in d.items(): |
|
print(k, v) |
|
|
|
# Preserver json ordering in json loads |
|
import json |
|
json.load(filename, object_pairs_hook=collections.OrderDict) |
|
|
|
# Execute a file in python |
|
exec(open('./filename.py').read()) |
|
|
|
# Concatenate list |
|
import itertools |
|
|
|
lists = [['hello'], ['world', 'foo', 'bar']] |
|
combined = list(itertools.chain.from_iterable(lists)) # combined = [item for sublist in lists for item in sublist] |
|
|
|
# Padding function |
|
a += [''] * (N - len(a)) |
|
|
|
# Singleton Design Patter |
|
class Singleton(type): |
|
_instances = {} |
|
def __call__(cls, *args, **kwargs): |
|
if cls not in cls._instances: |
|
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) |
|
return cls._instances[cls] |
|
|
|
class Logger(metaclass=Singleton): |
|
pass |
|
|
|
# Ordered Dictionary |
|
from collections import OrderedDict |
|
d = OrderedDict([('b',2), ('a', 1)]) |
|
|
|
# Unit Test |
|
import unittest |
|
from primes import is_prime |
|
|
|
class PrimesTestCase(unittest.TestCase): |
|
"""Tests for `primes.py`.""" |
|
|
|
def test_is_five_prime(self): |
|
"""Is five successfully determined to be prime?""" |
|
self.assertTrue(is_prime(5)) |
|
|
|
if __name__ == '__main__': |
|
unittest.main() |
|
|
|
# DocTest |
|
def square(x): |
|
"""Return the square of x. |
|
|
|
>>> square(2) |
|
4 |
|
>>> square(-2) |
|
4 |
|
""" |
|
|
|
return x * x |
|
|
|
if __name__ == '__main__': |
|
import doctest |
|
doctest.testmod() |
|
|
|
# Shape of a Listception |
|
import numpy as np |
|
a = np.array([[1,2,3],[1,2,3]]) |
|
len(a.shape) |
|
|
|
# ZIP |
|
sample = [(2, 9), (2, 9), (8, 9), (10, 9), (23, 26), (1, 9), (43, 44)] |
|
first,snd = zip(*sample) |
|
print first,snd |
|
(2, 2, 8, 10, 23, 1, 43) (9, 9, 9, 9, 26, 9, 44) |
|
|
|
# Two Lists from list comprehension |
|
rr,tt = zip(*[(i*10, i*12) for i in xrange(4)]) |
|
|
|
# Generate Combinations |
|
import itertools |
|
a = [[1,2,3],[4,5,6],[7,8,9,10]] |
|
list(itertools.product(*a)) |
|
|
|
# Python Curses - Static Placement with Dynamic display |
|
import time |
|
import curses |
|
stdscr = curses.initscr() |
|
stdscr.addstr(0, 0, "Hello") |
|
stdscr.refresh() |
|
time.sleep(1) |
|
stdscr.addstr(0, 0, "World! (with curses)") |
|
stdscr.refresh() |
|
|
|
# Python blink string |
|
def blink(char): |
|
print(char, end = '\r') |
|
time.sleep(0.5) |
|
print(' ' * 50, end = '\r') |
|
time.sleep(0.5) |
|
|
|
# Store shell command output to a variable |
|
import subprocess |
|
output = subprocess.check_output("cat /etc/services", shell=True) |
|
|
|
# Simple Threading |
|
from threading import Thread |
|
class MyThread(Thread): |
|
def __init__(self): |
|
''' Constructor. ''' |
|
Thread.__init__(self) |
|
|
|
def run(self): |
|
pass |
|
thread = MyThread() |
|
thread.start() |
|
thread.join() |
|
|
|
# Convert Bytes to string in python3 |
|
str(bytes_string,'utf-8') |
|
|
|
# Python check if directory exists |
|
import os |
|
print(os.path.isdir("/home/el")) |
|
print(os.path.exists("/home/el/myfile.txt")) |
|
|
|
# Custom Thread with stop |
|
# http://stackoverflow.com/questions/16262132/how-terminate-python-thread-without-checking-flag-continuously |
|
import threading |
|
class My_Thread(threading.Thread): |
|
|
|
def __init__(self): |
|
threading.Thread.__init__(self) |
|
self.process = None |
|
|
|
def run(self): |
|
print "Starting " + self.name |
|
cmd = [ "bash", 'process.sh'] |
|
self.process = p = subprocess.Popen(cmd, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT) |
|
for line in iter(p.stdout.readline, b''): |
|
print ("-- " + line.rstrip()) |
|
print "Exiting " + self.name |
|
|
|
def stop(self): |
|
print "Trying to stop thread " |
|
if self.process is not None: |
|
self.process.terminate() |
|
self.process = None |
|
|
|
thr = My_Thread() |
|
thr.start() |
|
time.sleep(30) |
|
thr.stop() |
|
thr.join() |
|
|
|
# Global Package variables |
|
#a.py |
|
print(foo) |
|
|
|
#b.py |
|
import __builtin__ |
|
__builtin__.foo = 1 |
|
import a |
|
|
|
# Redirect STDOUT (print statements) to a file |
|
import sys |
|
sys.stdout = open(r'stdout.txt', 'w') |
|
|
|
# Thread Pool Executor in Python |
|
import concurrent.futures |
|
from time import sleep |
|
|
|
job_list = [3, 2, 1, 4, 5, 20] |
|
job_result = [] |
|
|
|
def run(id, sl): |
|
sleep(sl) |
|
print('Job ', id, ' completed') |
|
return id**2 |
|
|
|
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor: |
|
for job_id, job in enumerate(job_list): |
|
job_result.append(executor.submit(run, job_id, job)) |
|
print([i.result() for i in job_result]) |
|
|
|
# Dict Comprehension |
|
d = {n: n**2 for n in range(5)} |
|
|
|
# Append Dict to another Dictionary |
|
x = OrderedDict([(1, 'a'), (2, 'b')]) |
|
y = OrderedDict([(3, 'a'), (4, 'b')]) |
|
x = OrderedDict(**x, **y) |
|
|
|
# Get biggest number |
|
float('inf') |
|
|
|
# Get smallest number |
|
float('-inf') |
|
|
|
# Simple Python Multi Processing |
|
from multiprocessing import Process, Pipe |
|
def fn(arg . . .): |
|
pass |
|
|
|
p = Process(target=fn, args=(arg ...)) |
|
p.start() |
|
p.join() |
|
|
|
#Find Slope |
|
slope = lambda a, b: (float(a[1]) - float(b[1])) / (float(a[0]) - float(b[0])) |
|
|
|
# Create One-hot vectors: |
|
a = np.array([1, 0, 3]) |
|
b = np.zeros((3, 4)) |
|
b[np.arange(3), a] = 1 |
|
|
|
# setInterval Function |
|
import threading |
|
|
|
def set_interval(func, sec): |
|
def func_wrapper(): |
|
set_interval(func, sec) |
|
func() |
|
t = threading.Timer(sec, func_wrapper) |
|
t.start() |
|
return t |
|
|
|
|
|
# Patch a function in python |
|
import types |
|
|
|
class Foo(object): |
|
|
|
def call_patched_bar(self, objfn_args:dict): |
|
self.bar(**objfn_args) |
|
pass |
|
|
|
def bar(target,x): |
|
print("x=",x) |
|
print("called from", target) |
|
|
|
def patch_me(target): |
|
target.bar = types.MethodType(bar,target) |
|
|
|
|
|
# Patch original class |
|
patch_me(Foo) |
|
Foo().call_patched_bar(objfn_args={'x':6}) |
|
|
|
# Patch object of a class |
|
f = Foo() |
|
patch_me(f) |
|
f.call_patched_bar(objfn_args={'x':6}) |
|
|
|
# Timing an expression |
|
import timeit |
|
|
|
|
|
class Foo(object): |
|
|
|
@staticmethod |
|
def bar(): |
|
pass |
|
|
|
|
|
f = Foo() |
|
|
|
# Which is a good design? |
|
|
|
# a. |
|
Foo.bar() |
|
|
|
# b. |
|
f.bar() |
|
|
|
exp1 = timeit.repeat('Foo.bar()', 'from __main__ import Foo', repeat=500, number=100000) |
|
exp2 = timeit.repeat('f.bar()', 'from __main__ import f', repeat=500, number=100000) |
|
print('min execution time for a single instance:') |
|
print(min(exp1)) |
|
print(min(exp2)) |
|
|
|
print('max execution time for a single instance:') |
|
print(max(exp1)) |
|
print(max(exp2)) |
|
|
|
print('total execution time:') |
|
print(sum(exp1)) |
|
print(sum(exp2)) |
|
|
|
|
|
# With block example |
|
|
|
class Foo(object): |
|
def __init__(self): |
|
print('Init called') |
|
|
|
def sample_method(self): |
|
print('Sample metod called') |
|
|
|
def __enter__(self): |
|
print('Enter called') |
|
return self |
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
print('Exit called') |
|
|
|
def __del__(self): |
|
print('Del called') |
|
|
|
|
|
print('Basic Object creation:\n') |
|
f = Foo() |
|
del f |
|
|
|
print('\n\nWith Block object creation:\n') |
|
with Foo() as f: |
|
pass |
|
|
|
|
|
# Custom Iterator |
|
class Bar(object): |
|
def __init__(self): |
|
self.idx = 0 |
|
self.data = range(4) |
|
def __iter__(self): |
|
return self |
|
def __next__(self): |
|
self.idx += 1 |
|
try: |
|
return self.data[self.idx-1] |
|
except IndexError: |
|
self.idx = 0 |
|
raise StopIteration # Done iterating. |
|
next = __next__ # python2.x compatibility. |
|
|
|
# Typehint a lambda function |
|
from typing import Callable |
|
is_even: Callable[[int], bool] = lambda x: (x % 2 == 0) |
|
|
|
# Merge List of List |
|
x = [[1,2,3], [4,5,6]] |
|
merged_list = sum(x, []) |
|
|
|
# Function Overloading using Lambda Fn |
|
def overload(*functions): |
|
return lambda *args, **kwargs: functions[len(args)](*args, **kwargs) |
|
|
|
function_to_override = overload( |
|
None, # For self argument |
|
lambda self, param1: function_to_override_one_param(param1), |
|
lambda self, param1, param2: function_to_override_one_param(param1, param2) |
|
) |
|
|
|
|
|
""" |
|
Async Function Setup |
|
""" |
|
|
|
import threading |
|
|
|
class AsyncCall(object): |
|
def __init__(self, fn, callback=None): |
|
self.callable = fn |
|
self.callback = callback |
|
self.result = None |
|
|
|
def __call__(self, *args, **kwargs): |
|
self.Thread = threading.Thread(target=self.run, name=self.callable.__name__, args=args, kwargs=kwargs) |
|
self.Thread.start() |
|
return self |
|
|
|
def wait(self, timeout=None): |
|
self.Thread.join(timeout) |
|
if self.Thread.isAlive(): |
|
raise TimeoutError() |
|
else: |
|
return self.result |
|
|
|
def run(self, *args, **kwargs): |
|
self.result = self.callable(*args, **kwargs) |
|
if self.callback: |
|
self.callback(self.result) |
|
|
|
|
|
class AsyncMethod(object): |
|
def __init__(self, fn, callback=None): |
|
self.callable = fn |
|
self.callback = callback |
|
|
|
def __call__(self, *args, **kwargs): |
|
return AsyncCall(self.callable, self.callback)(*args, **kwargs) |
|
|
|
|
|
def async(f=None, callback=None): |
|
""" |
|
| **@author:** Prathyush SP |
|
| |
|
| Custom Exception Decorator. |
|
:param f: Function |
|
:param callback: Callaback Function |
|
""" |
|
if f is None: |
|
return partial(async, f=f, callback=callback) |
|
|
|
@wraps(f) |
|
def wrapped(*args, **kwargs): |
|
return AsyncMethod(f, callback)(*args, **kwargs) |
|
|
|
return wrapped |
|
|
|
@async |
|
def fnc(): |
|
for i in range(10): |
|
print(i) |
|
time.sleep(2) |
|
|
|
|
|
print('abcd') |
|
fnc() |
|
print('def') |
|
|
|
"""" |
|
Async Function Setup End |
|
"""" |
|
|
|
|
|
# Enum of Enum's |
|
import enum |
|
class Foo(enum.Enum): |
|
var_a = enum.auto() |
|
|
|
class Bar(enum.Enum): |
|
var_b = enum.auto() |
|
|
|
def __getattr__(self, item): |
|
if item != '_value_': |
|
return getattr(self.value, item) |
|
raise AttributeError |
|
|
|
# Raise only singe exception from multiple hierarchy |
|
try: |
|
1/0 |
|
except Exception as e: |
|
raise ValueError('Value Error') from None |
|
|
|
|
|
# Module Path - Place this in base __init__.py |
|
MODULE_PATH = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
# Recurring Default Dictionary in Python |
|
class RecurringDefaultDict(dict): |
|
"""Implementation of perl's autovivification feature.""" |
|
def __getitem__(self, item): |
|
try: |
|
return dict.__getitem__(self, item) |
|
except KeyError: |
|
value = self[item] = type(self)() |
|
return value |
|
|
|
x = RecurringDefaultDict() |
|
x[1][2][3][4]= 25 |
|
|
|
# Check if -ve no in python list |
|
any(n < 0 for n in any_list) |
|
|
|
# Disable import for a function |
|
from contextlib import contextmanager |
|
|
|
@contextmanager |
|
def custom_metric(): |
|
import tensorflow |
|
t = tensorflow.metrics |
|
delattr(tensorflow, "metrics") |
|
yield None |
|
tensorflow.metrics = t |
|
|
|
|
|
with custom_metric(): |
|
import tensorflow as tf |
|
try: |
|
print(tf.metrics) |
|
except AttributeError: |
|
print("Cannot import metrics inside custom metric") |
|
|
|
import tensorflow as tf |
|
print(tf.metrics) |
|
|
|
# Test if a function is printing the required log |
|
import logging |
|
from testfixtures import LogCapture |
|
logger = logging.getLogger('') |
|
with LogCapture() as logs: |
|
# my awesome code |
|
logger.error('My code logged an error') |
|
assert 'My code logged an error' in str(logs) |
|
|
|
# Simplest form of batching |
|
def batch(iterable, n=1): |
|
l = len(iterable) |
|
for ndx in range(0, l, n): |
|
yield iterable[ndx:min(ndx + n, l)] |
|
|
|
for x in batch(list(range(0, 10)), 3): |
|
print(x) |
|
|
|
# Find all the imported modules |
|
from modulefinder import ModuleFinder |
|
finder = ModuleFinder() |
|
finder.run_script("./main.py") |
|
for name, mod in finder.modules.items(): |
|
print(name) |
|
|
|
|
|
# Thread Safe Writer |
|
class SafeWriter: |
|
def __init__(self, *args): |
|
self.filewriter = open(*args) |
|
self.queue = Queue() |
|
self.finished = False |
|
Thread(name="SafeWriter", target=self.internal_writer).start() |
|
|
|
def write(self, data): |
|
self.queue.put(data) |
|
|
|
def internal_writer(self): |
|
while not self.finished: |
|
try: |
|
data = self.queue.get(True, 1) |
|
except Empty: |
|
continue |
|
self.filewriter.write(data) |
|
self.queue.task_done() |
|
|
|
def close(self): |
|
self.queue.join() |
|
self.finished = True |
|
self.filewriter.close() |
|
|
|
|
|
# Timeout Whileloop |
|
import time |
|
timeout = 10 # [seconds] |
|
timeout_start = time.time() |
|
while time.time() < timeout_start + timeout: |
|
pass |