Skip to content

Instantly share code, notes, and snippets.

@kingspp
Last active November 3, 2023 16:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kingspp/4867c977565ca11b7ceb9f9247eede1c to your computer and use it in GitHub Desktop.
Save kingspp/4867c977565ca11b7ceb9f9247eede1c to your computer and use it in GitHub Desktop.
Language Snippets #MySnippet
This gist reveals language snippets
#Check if a program exists
command -v foo >/dev/null 2>&1 || { echo >&2 "I require foo but it's not installed. Aborting."; exit 1; }
#Block Comment in bash
: '
This is a test comment
Author foo bar
Released under GNU
'
#Unicode escape in bash
<var_name>=$'\xe2\x98\xa2 '
# if-else
if [ "foo" = "foo" ]; then
echo expression evaluated as true
else
echo expression evaluated as false
fi
# Grep only Numeric values:
echo "99%" |grep -o '[0-9]*'
# Bash Command output to a variable
OUTPUT="$(ls -1)"
echo "${OUTPUT}"
# SED w variable and line replace
sed -i.bak 's/.*TEXT_TO_BE_REPLACED.*/"'$VARIABLE'"/' VagrantFile >/dev/null 2>&1
# While loop
i="0"
while [ $i -lt 4 ]
do
i=$[$i+1]
done
# Redirect Command output
command 2> /dev/null
#Detect OS
platform='unknown'
unamestr=`uname`
if [[ "$unamestr" == 'Linux' ]]; then
platform='linux'
elif [[ "$unamestr" == 'FreeBSD' ]]; then
platform='freebsd'
fi
# Execute a Gist in bash
#http://max.disposia.org/notes/bash-exec-gist.htm
bash -c "$(curl -fsSL $raw_gist_path)" $arg0 $arg1
# Append multiple lines to a file
cat <<EOT >> greetings.txt
line 1
line 2
EOT
# Change Password using script
echo "USERNAME:NEWPASSWORD" | chpasswd
# Generate SSH key using script
echo -e "\n\n\n" | ssh-keygen -t rsa
# SSH Key using script
sudo apt-get install sshpass
sshpass -p "PASSWORD" ssh-copy-id -o StrictHostKeyChecking=no USERNAME@IP
# Process spawn and wait
/my/process &
/another/process &
wait
echo "All processes done!"
# Alert in bash
echo -e "\a"
# Create offline Installation
sudo apt-get download PACKAGE_NAME && apt-cache depends -i PACKAGE_NAME | awk '/Depends:/ {print $2}' | xargs apt-get download
# Arrays in Bash
ARRAY=(one two three)
echo ${ARRAY[*]} # Print all
echo ${ARRAY[2]} # Print 3rd element
# For loop
for VARIABLE in 1 2 3 4 5 .. N
do
command1
command2
commandN
done
# Iterating through each directory
for f in *; do
if [ -d ${f} ]; then
# Will not run if no directories are available
echo $f
fi
done
# Iterating through an array with index
for i in "${!foo[@]}"; do
printf "%s\t%s\n" "$i" "${foo[$i]}"
done
# Single line condition checker
if ps aux | grep some_proces[s] > /tmp/test.txt; then echo 1; else echo 0; fi
# Array Declaration
array=(1 2 3)
# Print 1st Element
echo ${array[1]}
# Print all elements
echo ${array[@]}
# Get length of an array
echo ${#VAR_NAME[@]}
#Split Name
IN="bla@some.com;john@home.com"
arrIN=(${IN//;/ })
# Silent execution
silent() {
"$@" > /dev/null 2>&1
}
silent echo "Hello World"
#Switch Case
case "$C" in
"1")
do_this()
;;
"2" | "3")
do_what_you_are_supposed_to_do()
;;
*)
do_nothing()
;;
esac
# Adding multiple lines to a file
cat <<EOT >> FILENAME
export KUBE_VERSION=1.2.0
export FLANNEL_VERSION=0.5.0
export ETCD_VERSION=2.2.0
export nodes="vagrant@10.0.168.10 vagrant@10.0.168.11 vagrant@10.0.168.11"
export roles="ai i i"
export NUM_NODES=${NUM_NODES:-3}
export SERVICE_CLUSTER_IP_RANGE=192.168.3.0/24
export FLANNEL_NET=172.16.0.0/16
EOT
# Grep select only numbers
echo "99%" |grep -o '[0-9]*'
# Create a sample from files in a folder
mkdir -p sample
for f in *.csv; do head -101 $f >> sample/$f; done
# Kill app based on port number
PID=`lsof -i:8121 -t`
echo "Stopping previous build if any . . ." && kill -9 ${PID}
# Set Locale
export LC_ALL=en_US.UTF-8
export LANG=en_US.UTF-8
# Convert multiline bash output to an array
var=($(./inner.sh))
# Grep IP Address using regex
echo "$str_containing_ip_addrs" | grep -oE "\b([0-9]{1,3}\.){3}[0-9]{1,3}\b"
# Wait for previous process while executing another command
launch backgroundprocess &
PROC_ID=$!
while kill -0 "$PROC_ID" >/dev/null 2>&1; do
echo "PROCESS IS RUNNING"
done
echo "PROCESS TERMINATED"
exit 0
# To log every command before execution
set -o xtrace # to revert to normal - set +o xtrace
# or bash -x myscript.sh
#list process on port
lsof -i tcp:<port number>
#Select random lines from a file
shuf -n N input > output
# Generate a random number between 1-10
$(( ( RANDOM % 10 ) + 1 ))
# SSH Based Port forwarding
ssh -N -f -L localhost:3000:localhost:3000 username@ip
gitconfig:
[alias]
apply-gitignore = !git ls-files -ci --exclude-standard -z | xargs -0 git rm --cached

Databricks Snippets

Load A python library dynamically (Cluster restarts not required)

# !/databricks/python3/bin/python -m pip uninstall <package> -y
# !/databricks/python3/bin/python -m pip install -e /dbfs/<path>/
# dbutils.library.restartPython()

Load a JAR after cluster start

ADD jar dbfs:/<path to jar>.jar

Mount a bucket

aws_bucket_name = "bucket_name"
mount_name = "research"
dbutils.fs.mount(f"s3a://{aws_bucket_name}", f"/mnt/{mount_name}")
display(dbutils.fs.ls(f"/mnt/{mount_name}/"))
# Save Image
sudo docker save [image_name:tag] > [image_name:tag].tar.gz
# Load Image
sudo docker load -i [image_name:tag].tar.gz
# Delete all containers including its volumes use
docker rm -vf $(docker ps -a -q)
docker rm -vf $(docker-compose ps -q)
# Delete all the images
docker rmi -f $(docker images -a -q)
docker rmi -f $(docker-compose images -q)
# Delete images and containers from docker compose
DOCKER_CONTAINERS=$(docker-compose ps -q) DOCKER_IMAGES=$(docker-compose images -q) docker rm -vf $DOCKER_CONTAINERS && docker rmi -f $DOCKER_IMAGES
# Reset to a commit
git reset --hard <Commit SHA ID>
# Rebase to a commit
git rebase -i <Commit SHA ID>
# Global username and email
git config user.name "username"
git config user.email "email"
//Document Ready - Jquery:
$(document).ready(function() {});
//Read JSON
$.getJSON( "ajax/test.json", function( data ) {
var items = [];
$.each( data, function( key, val ) {
items.push( "<li id='" + key + "'>" + val + "</li>" );
});
});
//Synchronous Looping in JS
/**
* Process an array of data synchronously.
*
* @param data An array of data.
* @param processData A function that processes an item of data.
* Signature: function(item, i, callback), where {@code item} is the i'th item,
* {@code i} is the loop index value and {@code calback} is the
* parameterless function to call on completion of processing an item.
*/
function doSynchronousLoop(data, processData, done) {
if (data.length > 0) {
var loop = function(data, i, processData, done) {
processData(data[i], i, function() {
if (++i < data.length) {
loop(data, i, processData, done);
} else {
done();
}
});
};
loop(data, 0, processData, done);
} else {
done();
}
}
// Set Interval Time
var timeOut = setInterval(myFunction, 2000);
// Replace /n with </br>
str = str.replace(/(?:\r\n|\r|\n)/g, '<br />');
# Notebook Snippets
# Render environment
import gym
from IPython import display
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
env = gym.make('Breakout-v0')
env.reset()
img = plt.imshow(env.render(mode='rgb_array')) # only call this once
plt.xticks([]),plt.yticks([])
for _ in range(100):
img.set_data(env.render(mode='rgb_array')) # just update the data
display.display(plt.gcf())
display.clear_output(wait=True)
action = env.action_space.sample()
env.step(action)
% Required Packages
% Package Declarations
\usepackage{arxiv}
\usepackage[utf8]{inputenc} % allow utf-8 input
\usepackage[T1]{fontenc} % use 8-bit T1 fonts
\usepackage{hyperref} % hyperlinks
\usepackage{url} % simple URL typesetting
\usepackage{booktabs} % professional-quality tables
\usepackage{amsfonts} % blackboard math symbols
\usepackage{nicefrac} % compact symbols for 1/2, etc.
\usepackage{microtype} % microtypography
\usepackage{lipsum} % Lorem Ipsum fill text
\usepackage{multicol} % Support for Multi columns for tables
\usepackage{multirow} % Support for Multi rows fot tables
\usepackage{mathtools} % Advanced mathtools
\usepackage{caption} % Advanced caption configuration
\usepackage{amsmath} % Math package for equations
\usepackage{titlesec} % Title section
\usepackage{graphicx} % For adding labels to parts of the equation
\usepackage{stackrel} % For adding labels to parts of the equation
\usepackage[ruled,vlined]{algorithm2e} % For Algorithms
\usepackage{algorithm}
\usepackage{algpseudocode}
% Theme Configurations
% Set section depth to 4
% \setcounter{secnumdepth}{4}
% \titleformat{\paragraph}
% {\normalfont\normalsize\bfseries}{\theparagraph}{1em}{}
% \titlespacing*{\paragraph}
% {0pt}{3.25ex plus 1ex minus .2ex}{1.5ex plus .2ex}
% Add padding to text below table
\captionsetup[table]{skip=10pt}
% Configure hat tex
\let\oldhat\hat
\renewcommand{\hat}[1]{\oldhat{\mathbf{#1}}}
% Image
\begin{figure}[H]
\centering
\includegraphics[scale=0.2]{assets/sdk.png}
\captionof{figure}{Comparison between Tensorflow and Keras}
\label{fig:workflow}
\end{figure}
% Two column [Image | Text]
\noindent\begin{minipage}{.45\textwidth}
\centering
\includegraphics[scale=0.09]{SimpleFFN.png}
\captionof{figure}{Feed Forward Network}
\label{fig:SimpleFFN.png}
\end{minipage}
\begin{minipage}{.45\textwidth}
\begin{equation}
\label{eq:ffn_math_representation}
\begin{aligned}
Dense_{1} &= \sigma(Input \cdot \hat{W}_{1} + \hat{b}_{1}) &\\
Dense_{2} &= \sigma(Dense_{1} \cdot \hat{W}_{2} + \hat{b}_{2}) &\\
Dense_{3} &= \sigma(Dense_{2} \cdot \hat{W}_{3} + \hat{b}_{3})
\end{aligned}
\end{equation}
\end{minipage}
% Add Paragraph break
\par%
\bigskip
# Fix unmet dependencies error
sudo apt-get -o Dpkg::Options::="--force-overwrite" install --fix-broken
# Show Full File Path in Finder
defaults write com.apple.finder _FXShowPosixPathInTitle -bool YES; killall Finder
# Make Hidden Apps “Hidden” in Dock
defaults write com.apple.Dock showhidden -bool TRUE; killall Dock
# Eliminate the Dock Reveal Delay
defaults write com.apple.dock autohide-time-modifier -float 0.12;killall Dock
# Change Mac software update frequency in days
defaults write com.apple.SoftwareUpdate ScheduleFrequency -int 3
# Disable different language options, and enable key repeat on hold
defaults write -g ApplePressAndHoldEnabled -bool FALSE
# Login Window Text
sudo defaults write /Library/Preferences/com.apple.loginwindow LoginwindowText "ENTER HERE"
# Disable Dashboard
defaults write com.apple.dashboard mcx-disabled -boolean TRUE; killall Dock
# Enable password based compression
zip -er folder_name
# Fix issue with x-code
# Ex: xcrun: error: invalid active developer path (/Library/Developer/CommandLineTools), missing xcrun at: /Library/Developer/CommandLineTools/usr/bin/xcrun
sudo xcode-select --install
# Change screenshot type to png/pdf/tiff/jpg
sudo defaults write com.apple.screencapture type png
# Encrypt or Decrypt files on mac using openssl
# Encrypt
openssl enc -aes-256-cbc -e -in <Input File> -out <Output File> # Enter password and remember it!, when asked
# Decrypt
openssl enc -aes-256-cbc -d -in <Input File> -out <Output File> # Enter previously given password
# Builtin Caffeine (Configurable System On)
# Forever on
caffeinate
# Timeout
caffeinate -u -t <seconds>
# Get rid of allow incoming connections for Python.app for Pycharm
sudo codesign -f -s - /Library/Frameworks/Python.framework/Versions/3.6/Resources/Python.app/
# Disable Gatekeeper for installing apps from unidentified devs
sudo spctl --master-disable
# Install Tree View for listing file structure, Linux Core Utils
brew install tree coreutils
# Rename Fields
# false for upsert
# true for multiple documents
db.<collection_name>.update({"<original_key.h1.h2>": {$exists: true}}, {$rename:{"<original_key.h1.h2>":"<new_name>"}}, false, true);
# Merge two collections
db.c1.find().forEach(function(item) {
db.c2.insert(item);
db.c1.remove(item);
});
# Get Unique keys in a collection
mr = db.runCommand({
"mapreduce" : "my_collection",
"map" : function() {
for (var key in this) { emit(key, null); }
},
"reduce" : function(key, stuff) { return null; },
"out": "my_collection" + "_keys"
})
db[mr.result].distinct("_id")
["foo", "bar", "baz", "_id", ...]
# Get all the diagonals from a matrix
a = np.array()
diags = [a[::-1,:].diagonal(i) for i in range(-a.shape[0]+1,a.shape[1])]
diags.extend(a.diagonal(i) for i in range(a.shape[1]-1,-a.shape[0],-1))
"""
GroupBy Primer: http://wesmckinney.com/blog/groupby-fu-improvements-in-grouping-and-aggregating-data-in-pandas/
"""
# Change column names to String
df.columns =df.columns.map(lambda x: str(float(x)))
# Convert a column to String
df['ColumnID'] = df['ColumnID'].astype(str)
# Check for missing values (NaN)
df.isnull().sum().sum()
# Get the file name without extension:
import os
print os.path.splitext("path_to_file")[0]
#Convert String to unicode:
"".join(["\\x%02x" % ord(i) for i in my_code])
#Get File Directory from file path:
"/".join(file_name.split("/")[0:-1])
# Get File Name from file path
file_name.split("/")[-1].split(".")[0]
#Python Logging:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Python Argument Parsing
import argparse
parser = argparse.ArgumentParser(description='PyFlow: Debugger')
parser.add_argument('-f', '--file', help='Input File for debugging', required=True)
parser.add_argument('-e', help="Run Flask", default=False, action="store_true")
args = vars(parser.parse_args())
# Check if file exists:
os.path.isfile(fname)
# Sorting - Descending order
a= [1,5,3,6,8,3,4,5,7]
a.sort(reverse=True)
# Check common items between two lists
from collections import Counter
A = [2,4,4,5,6,7,8,9]
B = [1,3,4,5,4,3,5,6]
print(len(list((Counter(A) & Counter(B)).elements())))
# Modify each element in a list
list_mod = [x.__add__(1) for x in list_mod]
# Open webpage in default browser
import webbrowser
webbrowser.open(url)
# Function Overloading - Single Dispatch
from functools import singledispatch
class abc(object):
@singledispatch
def __init__(self, arg, verbose=False):
if verbose:
print("Let me just say,", end=" ")
print(arg)
pass
@__init__.register(int)
def _(self, arg, verbose=False):
if verbose:
print("Strength in numbers, eh?", end=" ")
print(arg)
@__init__.register(list)
def _(self, arg, verbose=False):
if verbose:
print("Enumerate this:")
for i, elem in enumerate(arg):
print(i, elem)
abc('abc', verbose=True)
# Adding two sets
a = {0,1,1,1,2,3}
b = {1,2,3,4}
print(a | b)
# Jinja 2 Template Render
from jinja2 import Template
template = Template('Hello {{ name }}!')
print(template.render(name='John Doe'))
# Removing None values from a dictionary
filtered = {k: v for k, v in original.items() if v is not None}
original.clear()
original.update(filtered)
# Accessing Dictionary
d = {
'k1': 'v1',
'k2': 'v2'
}
# Accessing keys
for k in d:
print(k)
# Accessing values
for v in d.values():
print(v)
# Accessing key with values
for k, v in d.items():
print(k, v)
# Preserver json ordering in json loads
import json
json.load(filename, object_pairs_hook=collections.OrderDict)
# Execute a file in python
exec(open('./filename.py').read())
# Concatenate list
import itertools
lists = [['hello'], ['world', 'foo', 'bar']]
combined = list(itertools.chain.from_iterable(lists)) # combined = [item for sublist in lists for item in sublist]
# Padding function
a += [''] * (N - len(a))
# Singleton Design Patter
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Logger(metaclass=Singleton):
pass
# Ordered Dictionary
from collections import OrderedDict
d = OrderedDict([('b',2), ('a', 1)])
# Unit Test
import unittest
from primes import is_prime
class PrimesTestCase(unittest.TestCase):
"""Tests for `primes.py`."""
def test_is_five_prime(self):
"""Is five successfully determined to be prime?"""
self.assertTrue(is_prime(5))
if __name__ == '__main__':
unittest.main()
# DocTest
def square(x):
"""Return the square of x.
>>> square(2)
4
>>> square(-2)
4
"""
return x * x
if __name__ == '__main__':
import doctest
doctest.testmod()
# Shape of a Listception
import numpy as np
a = np.array([[1,2,3],[1,2,3]])
len(a.shape)
# ZIP
sample = [(2, 9), (2, 9), (8, 9), (10, 9), (23, 26), (1, 9), (43, 44)]
first,snd = zip(*sample)
print first,snd
(2, 2, 8, 10, 23, 1, 43) (9, 9, 9, 9, 26, 9, 44)
# Two Lists from list comprehension
rr,tt = zip(*[(i*10, i*12) for i in xrange(4)])
# Generate Combinations
import itertools
a = [[1,2,3],[4,5,6],[7,8,9,10]]
list(itertools.product(*a))
# Python Curses - Static Placement with Dynamic display
import time
import curses
stdscr = curses.initscr()
stdscr.addstr(0, 0, "Hello")
stdscr.refresh()
time.sleep(1)
stdscr.addstr(0, 0, "World! (with curses)")
stdscr.refresh()
# Python blink string
def blink(char):
print(char, end = '\r')
time.sleep(0.5)
print(' ' * 50, end = '\r')
time.sleep(0.5)
# Store shell command output to a variable
import subprocess
output = subprocess.check_output("cat /etc/services", shell=True)
# Simple Threading
from threading import Thread
class MyThread(Thread):
def __init__(self):
''' Constructor. '''
Thread.__init__(self)
def run(self):
pass
thread = MyThread()
thread.start()
thread.join()
# Convert Bytes to string in python3
str(bytes_string,'utf-8')
# Python check if directory exists
import os
print(os.path.isdir("/home/el"))
print(os.path.exists("/home/el/myfile.txt"))
# Custom Thread with stop
# http://stackoverflow.com/questions/16262132/how-terminate-python-thread-without-checking-flag-continuously
import threading
class My_Thread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.process = None
def run(self):
print "Starting " + self.name
cmd = [ "bash", 'process.sh']
self.process = p = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
for line in iter(p.stdout.readline, b''):
print ("-- " + line.rstrip())
print "Exiting " + self.name
def stop(self):
print "Trying to stop thread "
if self.process is not None:
self.process.terminate()
self.process = None
thr = My_Thread()
thr.start()
time.sleep(30)
thr.stop()
thr.join()
# Global Package variables
#a.py
print(foo)
#b.py
import __builtin__
__builtin__.foo = 1
import a
# Redirect STDOUT (print statements) to a file
import sys
sys.stdout = open(r'stdout.txt', 'w')
# Thread Pool Executor in Python
import concurrent.futures
from time import sleep
job_list = [3, 2, 1, 4, 5, 20]
job_result = []
def run(id, sl):
sleep(sl)
print('Job ', id, ' completed')
return id**2
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for job_id, job in enumerate(job_list):
job_result.append(executor.submit(run, job_id, job))
print([i.result() for i in job_result])
# Dict Comprehension
d = {n: n**2 for n in range(5)}
# Append Dict to another Dictionary
x = OrderedDict([(1, 'a'), (2, 'b')])
y = OrderedDict([(3, 'a'), (4, 'b')])
x = OrderedDict(**x, **y)
# Get biggest number
float('inf')
# Get smallest number
float('-inf')
# Simple Python Multi Processing
from multiprocessing import Process, Pipe
def fn(arg . . .):
pass
p = Process(target=fn, args=(arg ...))
p.start()
p.join()
#Find Slope
slope = lambda a, b: (float(a[1]) - float(b[1])) / (float(a[0]) - float(b[0]))
# Create One-hot vectors:
a = np.array([1, 0, 3])
b = np.zeros((3, 4))
b[np.arange(3), a] = 1
# setInterval Function
import threading
def set_interval(func, sec):
def func_wrapper():
set_interval(func, sec)
func()
t = threading.Timer(sec, func_wrapper)
t.start()
return t
# Patch a function in python
import types
class Foo(object):
def call_patched_bar(self, objfn_args:dict):
self.bar(**objfn_args)
pass
def bar(target,x):
print("x=",x)
print("called from", target)
def patch_me(target):
target.bar = types.MethodType(bar,target)
# Patch original class
patch_me(Foo)
Foo().call_patched_bar(objfn_args={'x':6})
# Patch object of a class
f = Foo()
patch_me(f)
f.call_patched_bar(objfn_args={'x':6})
# Timing an expression
import timeit
class Foo(object):
@staticmethod
def bar():
pass
f = Foo()
# Which is a good design?
# a.
Foo.bar()
# b.
f.bar()
exp1 = timeit.repeat('Foo.bar()', 'from __main__ import Foo', repeat=500, number=100000)
exp2 = timeit.repeat('f.bar()', 'from __main__ import f', repeat=500, number=100000)
print('min execution time for a single instance:')
print(min(exp1))
print(min(exp2))
print('max execution time for a single instance:')
print(max(exp1))
print(max(exp2))
print('total execution time:')
print(sum(exp1))
print(sum(exp2))
# With block example
class Foo(object):
def __init__(self):
print('Init called')
def sample_method(self):
print('Sample metod called')
def __enter__(self):
print('Enter called')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print('Exit called')
def __del__(self):
print('Del called')
print('Basic Object creation:\n')
f = Foo()
del f
print('\n\nWith Block object creation:\n')
with Foo() as f:
pass
# Custom Iterator
class Bar(object):
def __init__(self):
self.idx = 0
self.data = range(4)
def __iter__(self):
return self
def __next__(self):
self.idx += 1
try:
return self.data[self.idx-1]
except IndexError:
self.idx = 0
raise StopIteration # Done iterating.
next = __next__ # python2.x compatibility.
# Typehint a lambda function
from typing import Callable
is_even: Callable[[int], bool] = lambda x: (x % 2 == 0)
# Merge List of List
x = [[1,2,3], [4,5,6]]
merged_list = sum(x, [])
# Function Overloading using Lambda Fn
def overload(*functions):
return lambda *args, **kwargs: functions[len(args)](*args, **kwargs)
function_to_override = overload(
None, # For self argument
lambda self, param1: function_to_override_one_param(param1),
lambda self, param1, param2: function_to_override_one_param(param1, param2)
)
"""
Async Function Setup
"""
import threading
class AsyncCall(object):
def __init__(self, fn, callback=None):
self.callable = fn
self.callback = callback
self.result = None
def __call__(self, *args, **kwargs):
self.Thread = threading.Thread(target=self.run, name=self.callable.__name__, args=args, kwargs=kwargs)
self.Thread.start()
return self
def wait(self, timeout=None):
self.Thread.join(timeout)
if self.Thread.isAlive():
raise TimeoutError()
else:
return self.result
def run(self, *args, **kwargs):
self.result = self.callable(*args, **kwargs)
if self.callback:
self.callback(self.result)
class AsyncMethod(object):
def __init__(self, fn, callback=None):
self.callable = fn
self.callback = callback
def __call__(self, *args, **kwargs):
return AsyncCall(self.callable, self.callback)(*args, **kwargs)
def async(f=None, callback=None):
"""
| **@author:** Prathyush SP
|
| Custom Exception Decorator.
:param f: Function
:param callback: Callaback Function
"""
if f is None:
return partial(async, f=f, callback=callback)
@wraps(f)
def wrapped(*args, **kwargs):
return AsyncMethod(f, callback)(*args, **kwargs)
return wrapped
@async
def fnc():
for i in range(10):
print(i)
time.sleep(2)
print('abcd')
fnc()
print('def')
""""
Async Function Setup End
""""
# Enum of Enum's
import enum
class Foo(enum.Enum):
var_a = enum.auto()
class Bar(enum.Enum):
var_b = enum.auto()
def __getattr__(self, item):
if item != '_value_':
return getattr(self.value, item)
raise AttributeError
# Raise only singe exception from multiple hierarchy
try:
1/0
except Exception as e:
raise ValueError('Value Error') from None
# Module Path - Place this in base __init__.py
MODULE_PATH = os.path.dirname(os.path.abspath(__file__))
# Recurring Default Dictionary in Python
class RecurringDefaultDict(dict):
"""Implementation of perl's autovivification feature."""
def __getitem__(self, item):
try:
return dict.__getitem__(self, item)
except KeyError:
value = self[item] = type(self)()
return value
x = RecurringDefaultDict()
x[1][2][3][4]= 25
# Check if -ve no in python list
any(n < 0 for n in any_list)
# Disable import for a function
from contextlib import contextmanager
@contextmanager
def custom_metric():
import tensorflow
t = tensorflow.metrics
delattr(tensorflow, "metrics")
yield None
tensorflow.metrics = t
with custom_metric():
import tensorflow as tf
try:
print(tf.metrics)
except AttributeError:
print("Cannot import metrics inside custom metric")
import tensorflow as tf
print(tf.metrics)
# Test if a function is printing the required log
import logging
from testfixtures import LogCapture
logger = logging.getLogger('')
with LogCapture() as logs:
# my awesome code
logger.error('My code logged an error')
assert 'My code logged an error' in str(logs)
# Simplest form of batching
def batch(iterable, n=1):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]
for x in batch(list(range(0, 10)), 3):
print(x)
# Find all the imported modules
from modulefinder import ModuleFinder
finder = ModuleFinder()
finder.run_script("./main.py")
for name, mod in finder.modules.items():
print(name)
# Thread Safe Writer
class SafeWriter:
def __init__(self, *args):
self.filewriter = open(*args)
self.queue = Queue()
self.finished = False
Thread(name="SafeWriter", target=self.internal_writer).start()
def write(self, data):
self.queue.put(data)
def internal_writer(self):
while not self.finished:
try:
data = self.queue.get(True, 1)
except Empty:
continue
self.filewriter.write(data)
self.queue.task_done()
def close(self):
self.queue.join()
self.finished = True
self.filewriter.close()
# Timeout Whileloop
import time
timeout = 10 # [seconds]
timeout_start = time.time()
while time.time() < timeout_start + timeout:
pass
# Get device configs
from tensorflow.python.client import device_lib
def get_available_devices(cpu: bool = True, gpu: bool = True):
local_device_protos = device_lib.list_local_devices()
devices = []
if cpu:
devices = [x.name for x in local_device_protos if x.device_type == 'CPU']
if gpu:
devices += [x.name for x in local_device_protos if x.device_type == 'GPU']
return devices
# Check CUDA Installation and GPU Availability
print(tf.config.list_physical_devices('GPU'))
# Fetch row indices
x = tf.random.normal([3,2])
x = tf.convert_to_tensor(x)
indices = tf.convert_to_tensor([0,1,0])
one_hot_indices = tf.expand_dims(indices, 1)
range = tf.expand_dims(tf.range(tf.shape(indices)[0]), 1)
ind = tf.concat([range, one_hot_indices], 1)
tf.gather_nd(x, ind)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment