-
-
Save qunwang6/03242ae15c15f2bdcd6436a76d24ce04 to your computer and use it in GitHub Desktop.
Language Snippets #MySnippet
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This gist reveals language snippets |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Check if a program exists | |
command -v foo >/dev/null 2>&1 || { echo >&2 "I require foo but it's not installed. Aborting."; exit 1; } | |
#Block Comment in bash | |
: ' | |
This is a test comment | |
Author foo bar | |
Released under GNU | |
' | |
#Unicode escape in bash | |
<var_name>=$'\xe2\x98\xa2 ' | |
# if-else | |
if [ "foo" = "foo" ]; then | |
echo expression evaluated as true | |
else | |
echo expression evaluated as false | |
fi | |
# Grep only Numeric values: | |
echo "99%" |grep -o '[0-9]*' | |
# Bash Command output to a variable | |
OUTPUT="$(ls -1)" | |
echo "${OUTPUT}" | |
# SED w variable and line replace | |
sed -i.bak 's/.*TEXT_TO_BE_REPLACED.*/"'$VARIABLE'"/' VagrantFile >/dev/null 2>&1 | |
# While loop | |
i="0" | |
while [ $i -lt 4 ] | |
do | |
i=$[$i+1] | |
done | |
# Redirect Command output | |
command 2> /dev/null | |
#Detect OS | |
platform='unknown' | |
unamestr=`uname` | |
if [[ "$unamestr" == 'Linux' ]]; then | |
platform='linux' | |
elif [[ "$unamestr" == 'FreeBSD' ]]; then | |
platform='freebsd' | |
fi | |
# Execute a Gist in bash | |
#http://max.disposia.org/notes/bash-exec-gist.htm | |
bash -c "$(curl -fsSL $raw_gist_path)" $arg0 $arg1 | |
# Append multiple lines to a file | |
cat <<EOT >> greetings.txt | |
line 1 | |
line 2 | |
EOT | |
# Change Password using script | |
echo "USERNAME:NEWPASSWORD" | chpasswd | |
# Generate SSH key using script | |
echo -e "\n\n\n" | ssh-keygen -t rsa | |
# SSH Key using script | |
sudo apt-get install sshpass | |
sshpass -p "PASSWORD" ssh-copy-id -o StrictHostKeyChecking=no USERNAME@IP | |
# Process spawn and wait | |
/my/process & | |
/another/process & | |
wait | |
echo "All processes done!" | |
# Alert in bash | |
echo -e "\a" | |
# Create offline Installation | |
sudo apt-get download PACKAGE_NAME && apt-cache depends -i PACKAGE_NAME | awk '/Depends:/ {print $2}' | xargs apt-get download | |
# Arrays in Bash | |
ARRAY=(one two three) | |
echo ${ARRAY[*]} # Print all | |
echo ${ARRAY[2]} # Print 3rd element | |
# For loop | |
for VARIABLE in 1 2 3 4 5 .. N | |
do | |
command1 | |
command2 | |
commandN | |
done | |
# Iterating through each directory | |
for f in *; do | |
if [ -d ${f} ]; then | |
# Will not run if no directories are available | |
echo $f | |
fi | |
done | |
# Iterating through an array with index | |
for i in "${!foo[@]}"; do | |
printf "%s\t%s\n" "$i" "${foo[$i]}" | |
done | |
# Single line condition checker | |
if ps aux | grep some_proces[s] > /tmp/test.txt; then echo 1; else echo 0; fi | |
# Array Declaration | |
array=(1 2 3) | |
# Print 1st Element | |
echo ${array[1]} | |
# Print all elements | |
echo ${array[@]} | |
# Get length of an array | |
echo ${#VAR_NAME[@]} | |
#Split Name | |
IN="bla@some.com;john@home.com" | |
arrIN=(${IN//;/ }) | |
# Silent execution | |
silent() { | |
"$@" > /dev/null 2>&1 | |
} | |
silent echo "Hello World" | |
#Switch Case | |
case "$C" in | |
"1") | |
do_this() | |
;; | |
"2" | "3") | |
do_what_you_are_supposed_to_do() | |
;; | |
*) | |
do_nothing() | |
;; | |
esac | |
# Adding multiple lines to a file | |
cat <<EOT >> FILENAME | |
export KUBE_VERSION=1.2.0 | |
export FLANNEL_VERSION=0.5.0 | |
export ETCD_VERSION=2.2.0 | |
export nodes="vagrant@10.0.168.10 vagrant@10.0.168.11 vagrant@10.0.168.11" | |
export roles="ai i i" | |
export NUM_NODES=${NUM_NODES:-3} | |
export SERVICE_CLUSTER_IP_RANGE=192.168.3.0/24 | |
export FLANNEL_NET=172.16.0.0/16 | |
EOT | |
# Grep select only numbers | |
echo "99%" |grep -o '[0-9]*' | |
# Create a sample from files in a folder | |
mkdir -p sample | |
for f in *.csv; do head -101 $f >> sample/$f; done | |
# Kill app based on port number | |
PID=`lsof -i:8121 -t` | |
echo "Stopping previous build if any . . ." && kill -9 ${PID} | |
# Set Locale | |
export LC_ALL=en_US.UTF-8 | |
export LANG=en_US.UTF-8 | |
# Convert multiline bash output to an array | |
var=($(./inner.sh)) | |
# Grep IP Address using regex | |
echo "$str_containing_ip_addrs" | grep -oE "\b([0-9]{1,3}\.){3}[0-9]{1,3}\b" | |
# Wait for previous process while executing another command | |
launch backgroundprocess & | |
PROC_ID=$! | |
while kill -0 "$PROC_ID" >/dev/null 2>&1; do | |
echo "PROCESS IS RUNNING" | |
done | |
echo "PROCESS TERMINATED" | |
exit 0 | |
# To log every command before execution | |
set -o xtrace # to revert to normal - set +o xtrace | |
# or bash -x myscript.sh | |
#list process on port | |
lsof -i tcp:<port number> | |
#Select random lines from a file | |
shuf -n N input > output | |
# Generate a random number between 1-10 | |
$(( ( RANDOM % 10 ) + 1 )) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
gitconfig: | |
[alias] | |
apply-gitignore = !git ls-files -ci --exclude-standard -z | xargs -0 git rm --cached |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Save Image | |
sudo docker save [image_name:tag] > [image_name:tag].tar.gz | |
# Load Image | |
sudo docker load -i [image_name:tag].tar.gz |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Reset to a commit | |
git reset --hard <Commit SHA ID> | |
# Rebase to a commit | |
git rebase -i <Commit SHA ID> | |
# Global username and email | |
git config user.name "username" | |
git config user.email "email" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Document Ready - Jquery: | |
$(document).ready(function() {}); | |
//Read JSON | |
$.getJSON( "ajax/test.json", function( data ) { | |
var items = []; | |
$.each( data, function( key, val ) { | |
items.push( "<li id='" + key + "'>" + val + "</li>" ); | |
}); | |
}); | |
//Synchronous Looping in JS | |
/** | |
* Process an array of data synchronously. | |
* | |
* @param data An array of data. | |
* @param processData A function that processes an item of data. | |
* Signature: function(item, i, callback), where {@code item} is the i'th item, | |
* {@code i} is the loop index value and {@code calback} is the | |
* parameterless function to call on completion of processing an item. | |
*/ | |
function doSynchronousLoop(data, processData, done) { | |
if (data.length > 0) { | |
var loop = function(data, i, processData, done) { | |
processData(data[i], i, function() { | |
if (++i < data.length) { | |
loop(data, i, processData, done); | |
} else { | |
done(); | |
} | |
}); | |
}; | |
loop(data, 0, processData, done); | |
} else { | |
done(); | |
} | |
} | |
// Set Interval Time | |
var timeOut = setInterval(myFunction, 2000); | |
// Replace /n with </br> | |
str = str.replace(/(?:\r\n|\r|\n)/g, '<br />'); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Show Full File Path in Finder | |
defaults write com.apple.finder _FXShowPosixPathInTitle -bool YES; killall Finder | |
# Make Hidden Apps “Hidden” in Dock | |
defaults write com.apple.Dock showhidden -bool TRUE; killall Dock | |
# Eliminate the Dock Reveal Delay | |
defaults write com.apple.dock autohide-time-modifier -float 0.12;killall Dock | |
# Change Mac software update frequency in days | |
defaults write com.apple.SoftwareUpdate ScheduleFrequency -int 3 | |
# Disable different language options, and enable key repeat on hold | |
defaults write -g ApplePressAndHoldEnabled -bool FALSE | |
# Login Window Text | |
sudo defaults write /Library/Preferences/com.apple.loginwindow LoginwindowText "ENTER HERE" | |
# Disable Dashboard | |
defaults write com.apple.dashboard mcx-disabled -boolean TRUE; killall Dock | |
# Enable password based compression | |
zip -er folder_name | |
# Fix issue with x-code | |
# Ex: xcrun: error: invalid active developer path (/Library/Developer/CommandLineTools), missing xcrun at: /Library/Developer/CommandLineTools/usr/bin/xcrun | |
sudo xcode-select --install | |
# Change screenshot type to png/pdf/tiff/jpg | |
sudo defaults write com.apple.screencapture type png | |
# Encrypt or Decrypt files on mac using openssl | |
# Encrypt | |
openssl enc -aes-256-cbc -e -in <Input File> -out <Output File> # Enter password and remember it!, when asked | |
# Decrypt | |
openssl enc -aes-256-cbc -d -in <Input File> -out <Output File> # Enter previously given password | |
# Builtin Caffeine (Configurable System On) | |
# Forever on | |
caffeinate | |
# Timeout | |
caffeinate -u -t <seconds> | |
# Get rid of allow incoming connections for Python.app for Pycharm | |
sudo codesign -f -s - /Library/Frameworks/Python.framework/Versions/3.6/Resources/Python.app/ | |
# Disable Gatekeeper for installing apps from unidentified devs | |
sudo spctl --master-disable | |
# Install Tree View for listing file structure, Linux Core Utils | |
brew install tree coreutils |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Rename Fields | |
# false for upsert | |
# true for multiple documents | |
db.<collection_name>.update({"<original_key.h1.h2>": {$exists: true}}, {$rename:{"<original_key.h1.h2>":"<new_name>"}}, false, true); | |
# Merge two collections | |
db.c1.find().forEach(function(item) { | |
db.c2.insert(item); | |
db.c1.remove(item); | |
}); | |
# Get Unique keys in a collection | |
mr = db.runCommand({ | |
"mapreduce" : "my_collection", | |
"map" : function() { | |
for (var key in this) { emit(key, null); } | |
}, | |
"reduce" : function(key, stuff) { return null; }, | |
"out": "my_collection" + "_keys" | |
}) | |
db[mr.result].distinct("_id") | |
["foo", "bar", "baz", "_id", ...] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Get all the diagonals from a matrix | |
a = np.array() | |
diags = [a[::-1,:].diagonal(i) for i in range(-a.shape[0]+1,a.shape[1])] | |
diags.extend(a.diagonal(i) for i in range(a.shape[1]-1,-a.shape[0],-1)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
GroupBy Primer: http://wesmckinney.com/blog/groupby-fu-improvements-in-grouping-and-aggregating-data-in-pandas/ | |
""" | |
# Change column names to String | |
df.columns =df.columns.map(lambda x: str(float(x))) | |
# Convert a column to String | |
df['ColumnID'] = df['ColumnID'].astype(str) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Get the file name without extension: | |
import os | |
print os.path.splitext("path_to_file")[0] | |
#Convert String to unicode: | |
"".join(["\\x%02x" % ord(i) for i in my_code]) | |
#Get File Directory from file path: | |
"/".join(file_name.split("/")[0:-1]) | |
# Get File Name from file path | |
file_name.split("/")[-1].split(".")[0] | |
#Python Logging: | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Python Argument Parsing | |
import argparse | |
parser = argparse.ArgumentParser(description='PyFlow: Debugger') | |
parser.add_argument('-f', '--file', help='Input File for debugging', required=True) | |
parser.add_argument('-e', help="Run Flask", default=False, action="store_true") | |
args = vars(parser.parse_args()) | |
# Check if file exists: | |
os.path.isfile(fname) | |
# Sorting - Descending order | |
a= [1,5,3,6,8,3,4,5,7] | |
a.sort(reverse=True) | |
# Check common items between two lists | |
from collections import Counter | |
A = [2,4,4,5,6,7,8,9] | |
B = [1,3,4,5,4,3,5,6] | |
print(len(list((Counter(A) & Counter(B)).elements()))) | |
# Modify each element in a list | |
list_mod = [x.__add__(1) for x in list_mod] | |
# Open webpage in default browser | |
import webbrowser | |
webbrowser.open(url) | |
# Function Overloading - Single Dispatch | |
from functools import singledispatch | |
class abc(object): | |
@singledispatch | |
def __init__(self, arg, verbose=False): | |
if verbose: | |
print("Let me just say,", end=" ") | |
print(arg) | |
pass | |
@__init__.register(int) | |
def _(self, arg, verbose=False): | |
if verbose: | |
print("Strength in numbers, eh?", end=" ") | |
print(arg) | |
@__init__.register(list) | |
def _(self, arg, verbose=False): | |
if verbose: | |
print("Enumerate this:") | |
for i, elem in enumerate(arg): | |
print(i, elem) | |
abc('abc', verbose=True) | |
# Adding two sets | |
a = {0,1,1,1,2,3} | |
b = {1,2,3,4} | |
print(a | b) | |
# Jinja 2 Template Render | |
from jinja2 import Template | |
template = Template('Hello {{ name }}!') | |
print(template.render(name='John Doe')) | |
# Removing None values from a dictionary | |
filtered = {k: v for k, v in original.items() if v is not None} | |
original.clear() | |
original.update(filtered) | |
# Accessing Dictionary | |
d = { | |
'k1': 'v1', | |
'k2': 'v2' | |
} | |
# Accessing keys | |
for k in d: | |
print(k) | |
# Accessing values | |
for v in d.values(): | |
print(v) | |
# Accessing key with values | |
for k, v in d.items(): | |
print(k, v) | |
# Preserver json ordering in json loads | |
import json | |
json.load(filename, object_pairs_hook=collections.OrderDict) | |
# Execute a file in python | |
exec(open('./filename.py').read()) | |
# Concatenate list | |
import itertools | |
lists = [['hello'], ['world', 'foo', 'bar']] | |
combined = list(itertools.chain.from_iterable(lists)) # combined = [item for sublist in lists for item in sublist] | |
# Padding function | |
a += [''] * (N - len(a)) | |
# Singleton Design Patter | |
class Singleton(type): | |
_instances = {} | |
def __call__(cls, *args, **kwargs): | |
if cls not in cls._instances: | |
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) | |
return cls._instances[cls] | |
class Logger(metaclass=Singleton): | |
pass | |
# Ordered Dictionary | |
from collections import OrderedDict | |
d = OrderedDict([('b',2), ('a', 1)]) | |
# Unit Test | |
import unittest | |
from primes import is_prime | |
class PrimesTestCase(unittest.TestCase): | |
"""Tests for `primes.py`.""" | |
def test_is_five_prime(self): | |
"""Is five successfully determined to be prime?""" | |
self.assertTrue(is_prime(5)) | |
if __name__ == '__main__': | |
unittest.main() | |
# DocTest | |
def square(x): | |
"""Return the square of x. | |
>>> square(2) | |
4 | |
>>> square(-2) | |
4 | |
""" | |
return x * x | |
if __name__ == '__main__': | |
import doctest | |
doctest.testmod() | |
# Shape of a Listception | |
import numpy as np | |
a = np.array([[1,2,3],[1,2,3]]) | |
len(a.shape) | |
# ZIP | |
sample = [(2, 9), (2, 9), (8, 9), (10, 9), (23, 26), (1, 9), (43, 44)] | |
first,snd = zip(*sample) | |
print first,snd | |
(2, 2, 8, 10, 23, 1, 43) (9, 9, 9, 9, 26, 9, 44) | |
# Two Lists from list comprehension | |
rr,tt = zip(*[(i*10, i*12) for i in xrange(4)]) | |
# Generate Combinations | |
import itertools | |
a = [[1,2,3],[4,5,6],[7,8,9,10]] | |
list(itertools.product(*a)) | |
# Python Curses - Static Placement with Dynamic display | |
import time | |
import curses | |
stdscr = curses.initscr() | |
stdscr.addstr(0, 0, "Hello") | |
stdscr.refresh() | |
time.sleep(1) | |
stdscr.addstr(0, 0, "World! (with curses)") | |
stdscr.refresh() | |
# Python blink string | |
def blink(char): | |
print(char, end = '\r') | |
time.sleep(0.5) | |
print(' ' * 50, end = '\r') | |
time.sleep(0.5) | |
# Store shell command output to a variable | |
import subprocess | |
output = subprocess.check_output("cat /etc/services", shell=True) | |
# Simple Threading | |
from threading import Thread | |
class MyThread(Thread): | |
def __init__(self): | |
''' Constructor. ''' | |
Thread.__init__(self) | |
def run(self): | |
pass | |
thread = MyThread() | |
thread.start() | |
thread.join() | |
# Convert Bytes to string in python3 | |
str(bytes_string,'utf-8') | |
# Python check if directory exists | |
import os | |
print(os.path.isdir("/home/el")) | |
print(os.path.exists("/home/el/myfile.txt")) | |
# Custom Thread with stop | |
# http://stackoverflow.com/questions/16262132/how-terminate-python-thread-without-checking-flag-continuously | |
import threading | |
class My_Thread(threading.Thread): | |
def __init__(self): | |
threading.Thread.__init__(self) | |
self.process = None | |
def run(self): | |
print "Starting " + self.name | |
cmd = [ "bash", 'process.sh'] | |
self.process = p = subprocess.Popen(cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT) | |
for line in iter(p.stdout.readline, b''): | |
print ("-- " + line.rstrip()) | |
print "Exiting " + self.name | |
def stop(self): | |
print "Trying to stop thread " | |
if self.process is not None: | |
self.process.terminate() | |
self.process = None | |
thr = My_Thread() | |
thr.start() | |
time.sleep(30) | |
thr.stop() | |
thr.join() | |
# Global Package variables | |
#a.py | |
print(foo) | |
#b.py | |
import __builtin__ | |
__builtin__.foo = 1 | |
import a | |
# Redirect STDOUT (print statements) to a file | |
import sys | |
sys.stdout = open(r'stdout.txt', 'w') | |
# Thread Pool Executor in Python | |
import concurrent.futures | |
from time import sleep | |
job_list = [3, 2, 1, 4, 5, 20] | |
job_result = [] | |
def run(id, sl): | |
sleep(sl) | |
print('Job ', id, ' completed') | |
return id**2 | |
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor: | |
for job_id, job in enumerate(job_list): | |
job_result.append(executor.submit(run, job_id, job)) | |
print([i.result() for i in job_result]) | |
# Dict Comprehension | |
d = {n: n**2 for n in range(5)} | |
# Append Dict to another Dictionary | |
x = OrderedDict([(1, 'a'), (2, 'b')]) | |
y = OrderedDict([(3, 'a'), (4, 'b')]) | |
x = OrderedDict(**x, **y) | |
# Get biggest number | |
float('inf') | |
# Get smallest number | |
float('-inf') | |
# Simple Python Multi Processing | |
from multiprocessing import Process, Pipe | |
def fn(arg . . .): | |
pass | |
p = Process(target=fn, args=(arg ...)) | |
p.start() | |
p.join() | |
#Find Slope | |
slope = lambda a, b: (float(a[1]) - float(b[1])) / (float(a[0]) - float(b[0])) | |
# Create One-hot vectors: | |
a = np.array([1, 0, 3]) | |
b = np.zeros((3, 4)) | |
b[np.arange(3), a] = 1 | |
# setInterval Function | |
import threading | |
def set_interval(func, sec): | |
def func_wrapper(): | |
set_interval(func, sec) | |
func() | |
t = threading.Timer(sec, func_wrapper) | |
t.start() | |
return t | |
# Patch a function in python | |
import types | |
class Foo(object): | |
def call_patched_bar(self, objfn_args:dict): | |
self.bar(**objfn_args) | |
pass | |
def bar(target,x): | |
print("x=",x) | |
print("called from", target) | |
def patch_me(target): | |
target.bar = types.MethodType(bar,target) | |
# Patch original class | |
patch_me(Foo) | |
Foo().call_patched_bar(objfn_args={'x':6}) | |
# Patch object of a class | |
f = Foo() | |
patch_me(f) | |
f.call_patched_bar(objfn_args={'x':6}) | |
# Timing an expression | |
import timeit | |
class Foo(object): | |
@staticmethod | |
def bar(): | |
pass | |
f = Foo() | |
# Which is a good design? | |
# a. | |
Foo.bar() | |
# b. | |
f.bar() | |
exp1 = timeit.repeat('Foo.bar()', 'from __main__ import Foo', repeat=500, number=100000) | |
exp2 = timeit.repeat('f.bar()', 'from __main__ import f', repeat=500, number=100000) | |
print('min execution time for a single instance:') | |
print(min(exp1)) | |
print(min(exp2)) | |
print('max execution time for a single instance:') | |
print(max(exp1)) | |
print(max(exp2)) | |
print('total execution time:') | |
print(sum(exp1)) | |
print(sum(exp2)) | |
# With block example | |
class Foo(object): | |
def __init__(self): | |
print('Init called') | |
def sample_method(self): | |
print('Sample metod called') | |
def __enter__(self): | |
print('Enter called') | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
print('Exit called') | |
def __del__(self): | |
print('Del called') | |
print('Basic Object creation:\n') | |
f = Foo() | |
del f | |
print('\n\nWith Block object creation:\n') | |
with Foo() as f: | |
pass | |
# Custom Iterator | |
class Bar(object): | |
def __init__(self): | |
self.idx = 0 | |
self.data = range(4) | |
def __iter__(self): | |
return self | |
def __next__(self): | |
self.idx += 1 | |
try: | |
return self.data[self.idx-1] | |
except IndexError: | |
self.idx = 0 | |
raise StopIteration # Done iterating. | |
next = __next__ # python2.x compatibility. | |
# Typehint a lambda function | |
from typing import Callable | |
is_even: Callable[[int], bool] = lambda x: (x % 2 == 0) | |
# Merge List of List | |
x = [[1,2,3], [4,5,6]] | |
merged_list = sum(x, []) | |
# Function Overloading using Lambda Fn | |
def overload(*functions): | |
return lambda *args, **kwargs: functions[len(args)](*args, **kwargs) | |
function_to_override = overload( | |
None, # For self argument | |
lambda self, param1: function_to_override_one_param(param1), | |
lambda self, param1, param2: function_to_override_one_param(param1, param2) | |
) | |
""" | |
Async Function Setup | |
""" | |
import threading | |
class AsyncCall(object): | |
def __init__(self, fn, callback=None): | |
self.callable = fn | |
self.callback = callback | |
self.result = None | |
def __call__(self, *args, **kwargs): | |
self.Thread = threading.Thread(target=self.run, name=self.callable.__name__, args=args, kwargs=kwargs) | |
self.Thread.start() | |
return self | |
def wait(self, timeout=None): | |
self.Thread.join(timeout) | |
if self.Thread.isAlive(): | |
raise TimeoutError() | |
else: | |
return self.result | |
def run(self, *args, **kwargs): | |
self.result = self.callable(*args, **kwargs) | |
if self.callback: | |
self.callback(self.result) | |
class AsyncMethod(object): | |
def __init__(self, fn, callback=None): | |
self.callable = fn | |
self.callback = callback | |
def __call__(self, *args, **kwargs): | |
return AsyncCall(self.callable, self.callback)(*args, **kwargs) | |
def async(f=None, callback=None): | |
""" | |
| **@author:** Prathyush SP | |
| | |
| Custom Exception Decorator. | |
:param f: Function | |
:param callback: Callaback Function | |
""" | |
if f is None: | |
return partial(async, f=f, callback=callback) | |
@wraps(f) | |
def wrapped(*args, **kwargs): | |
return AsyncMethod(f, callback)(*args, **kwargs) | |
return wrapped | |
@async | |
def fnc(): | |
for i in range(10): | |
print(i) | |
time.sleep(2) | |
print('abcd') | |
fnc() | |
print('def') | |
"""" | |
Async Function Setup End | |
"""" | |
# Enum of Enum's | |
import enum | |
class Foo(enum.Enum): | |
var_a = enum.auto() | |
class Bar(enum.Enum): | |
var_b = enum.auto() | |
def __getattr__(self, item): | |
if item != '_value_': | |
return getattr(self.value, item) | |
raise AttributeError | |
# Raise only singe exception from multiple hierarchy | |
try: | |
1/0 | |
except Exception as e: | |
raise ValueError('Value Error') from None | |
# Module Path - Place this in base __init__.py | |
MODULE_PATH = os.path.dirname(os.path.abspath(__file__)) | |
# Recurring Default Dictionary in Python | |
class RecurringDefaultDict(dict): | |
"""Implementation of perl's autovivification feature.""" | |
def __getitem__(self, item): | |
try: | |
return dict.__getitem__(self, item) | |
except KeyError: | |
value = self[item] = type(self)() | |
return value | |
x = RecurringDefaultDict() | |
x[1][2][3][4]= 25 | |
# Check if -ve no in python list | |
any(n < 0 for n in any_list) | |
# Disable import for a function | |
from contextlib import contextmanager | |
@contextmanager | |
def custom_metric(): | |
import tensorflow | |
t = tensorflow.metrics | |
delattr(tensorflow, "metrics") | |
yield None | |
tensorflow.metrics = t | |
with custom_metric(): | |
import tensorflow as tf | |
try: | |
print(tf.metrics) | |
except AttributeError: | |
print("Cannot import metrics inside custom metric") | |
import tensorflow as tf | |
print(tf.metrics) | |
# Test if a function is printing the required log | |
import logging | |
from testfixtures import LogCapture | |
logger = logging.getLogger('') | |
with LogCapture() as logs: | |
# my awesome code | |
logger.error('My code logged an error') | |
assert 'My code logged an error' in str(logs) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Get device configs | |
from tensorflow.python.client import device_lib | |
def get_available_devices(cpu: bool = True, gpu: bool = True): | |
local_device_protos = device_lib.list_local_devices() | |
devices = [] | |
if cpu: | |
devices = [x.name for x in local_device_protos if x.device_type == 'CPU'] | |
if gpu: | |
devices += [x.name for x in local_device_protos if x.device_type == 'GPU'] | |
return devices |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment