Skip to content

Instantly share code, notes, and snippets.

@yohanesgultom
Last active August 11, 2023 16:30
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save yohanesgultom/630a831eff1fbdcd84b3cfec6feabe02 to your computer and use it in GitHub Desktop.
Save yohanesgultom/630a831eff1fbdcd84b3cfec6feabe02 to your computer and use it in GitHub Desktop.
Random python scripts
Random collection of python scripts
"""
Argument parser template
"""
import argparse
parser = argparse.ArgumentParser(description='Your application description')
# simple argument (mandatory)
parser.add_argument('a', help='some description')
# cast positional argument to int
parser.add_argument('b', type=int, help='some description')
# option (optional)
parser.add_argument('-r', help='some description')
# set silent=True if this option available
parser.add_argument('-s', '--silent', action='store_true', default=False, help='some description')
# parse arguments/options to an object args
args = parser.parse_args()
# call the arguments/options
print(args.a)
print(args.b)
print(args.r)
print(args.s)
print(args.silent)
"""
Automatic audio transcription job using AWS Transcribe service https://aws.amazon.com/transcribe/
@author yohanes.gultom@gmail.com
"""
import configparser, boto3, os, time, json
from pprint import pprint
bucket_name = 'yohanesgultom-transcribe-test'
file_path = '/home/yohanesgultom/Downloads/Pidato-Kenegaraan-Presiden-Joko-Widodo-2019-Part-1.mp3'
# source: Pidato Kenegaraan Presiden Joko Widodo (2:21-3:42) https://www.youtube.com/watch?v=yDdQ9pEfcnw&t=155s
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'aws.conf'))
# init AWS session
session = boto3.session.Session(
aws_access_key_id=config['default']['aws_access_key_id'],
aws_secret_access_key=config['default']['aws_secret_access_key'],
region_name=config['default']['region']
)
s3 = session.client('s3')
transcribe = session.client('transcribe')
# create bucket to store transcribe input/output file if not exists
res = s3.list_buckets()
buckets = [b['Name'] for b in res['Buckets']]
if bucket_name not in buckets:
print(f'Creating new bucket: {bucket_name}...')
res = s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': session.region_name}
)
# upload audio input file if not exist
file_name = os.path.basename(file_path)
res = s3.list_objects(Bucket=bucket_name)
contents = res['Contents'] if 'Contents' in res else []
file_names = [c['Key'] for c in contents]
if file_name not in file_names:
print(f'Uploading input file: {file_name}...')
res = s3.upload_file(file_path, bucket_name, file_name)
# create new job if not exist
res = transcribe.list_transcription_jobs()
job_name = file_name
jobs = [j['TranscriptionJobName'] for j in res['TranscriptionJobSummaries']]
if job_name not in jobs:
print(f'Starting transcribe job: {job_name}...')
s3_file = f's3://{bucket_name}/{file_name}'
res = transcribe.start_transcription_job(
TranscriptionJobName=job_name,
LanguageCode='id-ID',
Media={'MediaFileUri': s3_file},
OutputBucketName=bucket_name
)
# wait until job to complete
completed = False
while not completed:
res = transcribe.list_transcription_jobs(
JobNameContains=job_name,
MaxResults=1
)
if 'TranscriptionJobSummaries' in res:
if len(res['TranscriptionJobSummaries']) > 0:
job = res['TranscriptionJobSummaries'][0]
completed = job['TranscriptionJobStatus'] == 'COMPLETED'
print(f'Job has completed')
if not completed:
print(f'Waiting for job to complete...')
time.sleep(5)
# download transcription result
result_file = f'{file_name}.json'
if completed and not os.path.isfile(result_file):
res = s3.list_objects(Bucket=bucket_name)
contents = res['Contents'] if 'Contents' in res else []
for c in contents:
content_name = c['Key']
if content_name == result_file:
print(f'Downloading transcription result...')
s3.download_file(bucket_name, content_name, content_name)
print(f'File downloaded {content_name}')
# print transcription result
if os.path.isfile(result_file):
with open(result_file, 'r') as f:
res_file = json.load(f)
print(res_file['results']['transcripts'][0]['transcript'])
'''
Run mysqldump gzip and send result using SMTP
Reference: https://realpython.com/python-send-email
Config example:
{
"subject" : "Daily backup",
"body" : "This is a daily database backup",
"sender_email" : "sender@gmail.com",
"receiver_email" : "receiver@gmail.com",
"password" : "supersecretpassword",
"smtp_server" : "smtp.gmail.com",
"smtp_host" : 465,
"dbname" : "dbname",
"file_prefix": "dbname_backup"
}
@Author yohanes.gultom@gmail.com
'''
import email, smtplib, ssl
import datetime
import subprocess
import shlex
import json
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
CONFIG_FILE = 'backup_email.json'
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
subject = config['subject']
body = config['body']
sender_email = config['sender_email']
receiver_email = config['receiver_email']
password = config['password']
smtp_server = config['smtp_server']
smtp_host = config['smtp_host']
dbname = config['dbname']
file_prefix = config['file_prefix']
cmd1 = "mysqldump {}".format(dbname)
cmd2 = "gzip -9"
filename = "{}_{}.sql.gz".format(file_prefix, datetime.datetime.now().strftime('%Y%m%d%H%M'))
# Backup database
print('Backing up database..')
with open(filename, 'w') as f:
ps1 = subprocess.Popen(shlex.split(cmd1), stdout=subprocess.PIPE)
ps2 = subprocess.Popen(shlex.split(cmd2), stdin=ps1.stdout, stdout=f)
ps1.wait()
ps2.wait()
if ps2.returncode == 2:
exit(1)
# Create a multipart message and set headers
message = MIMEMultipart()
message["From"] = sender_email
message["To"] = receiver_email
message["Subject"] = subject
message["Bcc"] = receiver_email # Recommended for mass emails
# Add body to email
message.attach(MIMEText(body, "plain"))
# Open PDF file in binary mode
with open(filename, "rb") as attachment:
# Add file as application/octet-stream
# Email client can usually download this automatically as attachment
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
# Encode file in ASCII characters to send by email
encoders.encode_base64(part)
# Add header as key/value pair to attachment part
part.add_header(
"Content-Disposition",
f"attachment; filename= {filename}",
)
# Add attachment to message and convert message to string
message.attach(part)
text = message.as_string()
# Log in to server using secure context and send email
print('Sending email..')
context = ssl.create_default_context()
with smtplib.SMTP_SSL(smtp_server, smtp_host, context=context) as server:
server.login(sender_email, password)
server.sendmail(sender_email, receiver_email, text)
print('Done.')
# Parse BibTex entries from input file and render them in IEEEtran.cls format
# http://www.michaelshell.org/tex/ieeetran/
# Usage: python bibtexconverter.py [bibtex file]
#
# BibTex example (input):
# @article{lecun2015deep,
# title={Deep learning},
# author={LeCun, Yann and Bengio, Yoshua and Hinton, Geoffrey},
# journal={Nature},
# volume={521},
# number={7553},
# pages={436--444},
# year={2015},
# publisher={Nature Publishing Group}
# }
#
# IEEETran example (output):
# \bibitem{lecun2015deep} Y.~LeCun and Y.~Bengio and G.~Hinton, \emph{Deep learning}.\hskip 1em plus 0.5em minus 0.4em\relax Nature, Nature Publishing Group, 2015.
import re
import sys
from pprint import pprint
def ieee(refs):
print '\n'
for ref in refs:
print _ieee(ref) + '\n'
def _ieee(dic):
return """\\bibitem{{{}}} {}, \\emph{{{}}}.\\hskip 1em plus 0.5em minus 0.4em\\relax {}, {}.""".format(
dic['refcode'],
_ieee_author(dic['author']),
dic['title'],
_ieee_publisher(dic),
dic['year']
)
def _ieee_publisher(dic):
publisher = []
keys = ['journal', 'booktitle', 'publisher', 'organization']
for key in keys:
if key in dic:
publisher.append(dic[key])
return ', '.join(publisher)
def _ieee_author(text):
formatted = []
authors = text.split(' and ')
for a in authors:
names = a.split(', ')
if len(names) >= 2:
last, first = names[0], names[1]
formatted.append(first[0].upper() + '.~' + last)
else:
formatted.append(names[0])
return ' and '.join(formatted)
if __name__ == '__main__':
if len(sys.argv) < 2:
print 'Usage: python bibtexconverter.py [bibtex file]'
exit()
filename = sys.argv[1]
# collect BibTex entries from input file
# separated by blank line
entries = []
with open(filename) as f:
entry = []
for line in f:
line = line.strip()
if len(line) > 0:
# save line
entry.append(line)
elif len(entry) > 0:
# blank line
entries.append(entry)
entry = []
# last entry
if len(entry) > 0:
entries.append(entry)
# parse BibTex entries
references = []
for entry in entries:
dic = {}
dic['refcode'] = re.search(r'@(article|inproceedings|thesis){([\w\d]*),', entry[0], re.M | re.I).group(2)
for i in range(1, (len(entry) - 1)):
key, value = entry[i].split('=')
value = re.search(r'{([^{}]*)}', value, re.M | re.I).group(1)
dic[key] = value
references.append(dic)
# render entries in IEEEtran.cls format
# http://www.michaelshell.org/tex/ieeetran/
ieee(references)
"""
Cek status https://ivoting.iaitb.or.id
Dependency: pip install requests
Cara pakai: python cek_status_ivoting.py input.csv
Format file input (csv):
nama1,jurusan1,angkatan1
nama2,jurusan2,angkatan2
nama3,jurusan3,angkatan3
Hasil (csv):
nama1,jurusan1,angkatan1,status
nama2,jurusan2,angkatan2,status
nama3,jurusan3,angkatan3,status
@Author yohanes.gultom@gmail.com
"""
import csv
import requests
import sys
# ambil lagi via browser jika kadaluarsa
api_key = 'bsgcyfgveyujeygfefc387r34ybr39brnr3r3'
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36',
'Origin': 'https://ivoting.iaitb.or.id/',
'api-key': api_key,
}
url_template = 'https://ivoting.iaitb.or.id/api/open/alumnee/simple/all?name={}&page=1&perPage=10&studyprogram={}&generation={}'
input_file = sys.argv[1]
print(f'Membaca input {input_file}...')
input_rows = []
with open(input_file) as f:
reader = csv.reader(f)
for row in reader:
nama = row[0]
jurusan = row[1]
angkatan = int(row[2])
input_rows.append((nama, jurusan, angkatan))
print('Memutakhirkan status...')
with open(input_file, 'w') as f:
writer = csv.writer(f)
for row in input_rows:
nama, jurusan, angkatan = row
try:
res = requests.get(url_template.format(nama, jurusan, angkatan), headers=headers)
body = res.json()
status = body['data'][0]['verificationStatus']
except Exception as e:
status = str(e)
finally:
writer.writerow((nama, jurusan, angkatan, status))
print(f'{nama}| {jurusan} | {angkatan} | {status}')
'''
Convert ENAMEX Named-Entity annotated file to Stanford NLP format (token-based)
@Author yohanes.gultom@gmail
ENAMEX example (2 sentences):
Sementara itu Pengamat Pasar Modal <ENAMEX TYPE="PERSON">Dandossi Matram</ENAMEX> mengatakan, sulit bagi sebuah <ENAMEX TYPE="ORGANIZATION">kantor akuntan publik</ENAMEX> (<ENAMEX TYPE="ORGANIZATION">KAP</ENAMEX>) untuk dapat menyelesaikan audit perusahaan sebesar <ENAMEX TYPE="ORGANIZATION">Telkom</ENAMEX> dalam waktu 3 bulan. 1
<ENAMEX TYPE="ORGANIZATION">Telkom</ENAMEX> akan melakukan RUPS pada 30 Juli 2004 yang selain melaporkan kinerja 2003 juga akan meminta persetujuan untuk pemecahan nilai nominal saham atau stock split 1:2. 2
'''
import sys
import re
START_PATTERN = re.compile(r'^(.*?)<ENAMEX$', re.I)
END_SINGLE_PATTERN = re.compile(r'^TYPE="(.*?)">(.*?)</ENAMEX>(.*?)$', re.I)
TYPE_PATTERN = re.compile(r'^TYPE="(.*?)">(.*?)$', re.I)
END_MULTI_PATTERN = re.compile(r'^(.*?)</ENAMEX>(.*?)$', re.I)
EOS_PATTERN = re.compile(r'^([^<>]*)\.?\t(\d+)$', re.I)
NON_ENTITY_TYPE = 'O'
def check_and_process_eos(token):
match = re.match(EOS_PATTERN, token)
if match:
out.write(match.group(1) + '\t' + cur_type + '\n')
out.write('.' + '\t' + cur_type + '\n')
out.write('\n')
return True
return False
infile = sys.argv[1]
outfile = sys.argv[2]
cur_type = NON_ENTITY_TYPE
with open(infile, 'rb') as f, open(outfile, 'w') as out:
for line in f:
for token in line.strip().split(' '):
token = token.strip()
if not token:
continue
match = re.match(START_PATTERN, token)
if match:
if match.group(1):
out.write(match.group(1) + '\t' + NON_ENTITY_TYPE + '\n')
continue
match = re.match(END_SINGLE_PATTERN, token)
if match:
out.write(match.group(2) + '\t' + match.group(1) + '\n')
cur_type = NON_ENTITY_TYPE
if not check_and_process_eos(match.group(3)):
out.write(match.group(3) + '\t' + cur_type + '\n')
continue
match = re.match(TYPE_PATTERN, token)
if match:
cur_type = match.group(1)
out.write(match.group(2) + '\t' + cur_type + '\n')
continue
match = re.match(END_MULTI_PATTERN, token)
if match:
out.write(match.group(1) + '\t' + cur_type + '\n')
cur_type = NON_ENTITY_TYPE
if not check_and_process_eos(match.group(2)):
out.write(match.group(2) + '\t' + cur_type + '\n')
continue
if check_and_process_eos(token):
continue
out.write(token + '\t' + cur_type + '\n')
#!/usr/bin/env python3
"""
Simple example on compiling & deploying simple smartcontract, and calling its methods
Setup:
pip3 install web3==4.7.2 py-solc==3.2.0
python3 -m solc.install v0.4.24
export PATH="$PATH:$HOME/.py-solc/solc-v0.4.24/bin"
@author yohanes.gultom@gmail.com
"""
from web3 import Web3, HTTPProvider, middleware
from solc import compile_source
import random
def compile_contract(contract_source_file, contractName=None):
"""
Reads file, compiles, returns contract name and interface
"""
with open(contract_source_file, "r") as f:
contract_source_code = f.read()
compiled_sol = compile_source(contract_source_code) # Compiled source code
if not contractName:
contractName = list(compiled_sol.keys())[0]
contract_interface = compiled_sol[contractName]
else:
contract_interface = compiled_sol['<stdin>:' + contractName]
return contractName, contract_interface
def deploy_contract(acct, contract_interface, contract_args=None):
"""
deploys contract using self-signed tx, waits for receipt, returns address
"""
contract = w3.eth.contract(abi=contract_interface['abi'], bytecode=contract_interface['bin'])
constructed = contract.constructor() if not contract_args else contract.constructor(*contract_args)
tx = constructed.buildTransaction({
'from': acct.address,
'nonce': w3.eth.getTransactionCount(acct.address),
})
print ("Signing and sending raw tx ...")
signed = acct.signTransaction(tx)
tx_hash = w3.eth.sendRawTransaction(signed.rawTransaction)
print ("tx_hash = {} waiting for receipt ...".format(tx_hash.hex()))
tx_receipt = w3.eth.waitForTransactionReceipt(tx_hash, timeout=120)
contractAddress = tx_receipt["contractAddress"]
print ("Receipt accepted. gasUsed={gasUsed} contractAddress={contractAddress}".format(**tx_receipt))
return contractAddress
def exec_contract(acct, nonce, func):
"""
call contract transactional function func
"""
construct_txn = func.buildTransaction({'from': acct.address, 'nonce': nonce})
signed = acct.signTransaction(construct_txn)
tx_hash = w3.eth.sendRawTransaction(signed.rawTransaction)
return tx_hash.hex()
if __name__ == '__main__':
"""
// contract.sol:
pragma solidity ^0.4.21;
contract simplestorage {
uint public storedData;
event Updated(address by, uint _old, uint _new);
function set(uint x) {
uint old = storedData;
storedData = x;
emit Updated(msg.sender, old, x);
}
function get() constant returns (uint retVal) {
return storedData;
}
}
"""
# config
RPC_ADDRESS = 'http://localhost:8545'
CONTRACT_SOL = 'contract.sol'
CONTRACT_NAME = 'simplestorage'
PRIVATE_KEY="youraddressprivatekey"
# instantiate web3 object
w3 = Web3(HTTPProvider(RPC_ADDRESS, request_kwargs={'timeout': 120}))
# use additional middleware for PoA (eg. Rinkedby)
# w3.middleware_stack.inject(middleware.geth_poa_middleware, layer=0)
acct = w3.eth.account.privateKeyToAccount(PRIVATE_KEY)
# compile contract to get abi
print('Compiling contract..')
contract_name, contract_interface = compile_contract(CONTRACT_SOL, CONTRACT_NAME)
# deploy contract
print('Deploying contract..')
contract_address = deploy_contract(acct, contract_interface)
# create contract object
contract = w3.eth.contract(address=contract_address, abi=contract_interface['abi'])
# call non-transactional method
val = contract.functions.get().call()
print('Invoke get()={}'.format(val))
assert val == 0
# call transactional method
nonce = w3.eth.getTransactionCount(acct.address)
from_block_number = w3.eth.blockNumber
new_val = random.randint(1, 100)
contract_func = contract.functions.set(new_val)
print('Invoke set()={}'.format(new_val))
tx_hash = exec_contract(acct, nonce, contract_func)
print('tx_hash={} waiting for receipt..'.format(tx_hash))
tx_receipt = w3.eth.waitForTransactionReceipt(tx_hash, timeout=120)
print("Receipt accepted. gasUsed={gasUsed} blockNumber={blockNumber}". format(**tx_receipt))
# catch event
contract_filter = contract.events.Updated.createFilter(fromBlock=from_block_number)
entries = None
print('Waiting for event..')
while not entries: entries = contract_filter.get_all_entries()
# _new == new_val
args = entries[0].args
print(args)
assert args._old == 0
assert args._new == new_val
assert args.by == acct.address
# call non-transactional method
val = contract.functions.get().call()
print('Invoke get()={}'.format(val))
assert val == new_val
import os
import sys
# get directory (of current file)
dir_path = os.path.dirname(os.path.realpath(__file__))
# get base filename (without extension) (of current file)
basename = os.path.basename(os.path.realpath(__file__))
# get relative path from arg
mypath = sys.argv[1]
# iterate dirs and files
for f in os.listdir(mypath):
path = os.path.join(mypath, f)
# print if file
if os.path.isfile(path):
print os.path.join(dir_path, path)
# iterate and rename files
dir = mypath
for f in os.listdir(dir):
basename, ext = os.path.splitext(f)
if ext == '.jpg':
new_name = basename.split('_')[0].lower() + ext
os.rename(os.path.join(dir, f), os.path.join(dir, new_name))
#Copyright 2017 John Frens
#
#Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
#
#The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# Python 2.7 version
import string
# MTLD internal implementation
def mtld_calc(word_array, ttr_threshold):
current_ttr = 1.0
token_count = 0
type_count = 0
types = set()
factors = 0.0
for token in word_array:
token = token.translate(None, string.punctuation).lower() # trim punctuation, make lowercase
token_count += 1
if token not in types:
type_count +=1
types.add(token)
current_ttr = float(type_count) / token_count
if current_ttr <= ttr_threshold:
factors += 1
token_count = 0
type_count = 0
types = set()
current_ttr = 1.0
excess = 1.0 - current_ttr
excess_val = 1.0 - ttr_threshold
factors += float(excess) / excess_val
if factors != 0:
return len(word_array) / factors
return -1
# MTLD implementation
def mtld(word_array, ttr_threshold=0.72):
if isinstance(word_array, str):
raise ValueError("Input should be a list of strings, rather than a string. Try using string.split()")
if len(word_array) < 50:
raise ValueError("Input word list should be at least 50 in length")
return (mtld_calc(word_array, ttr_threshold) + mtld_calc(word_array[::-1], ttr_threshold)) / 2
# HD-D internals
# x! = x(x-1)(x-2)...(1)
def factorial(x):
if x <= 1:
return 1
else:
return x * factorial(x - 1)
# n choose r = n(n-1)(n-2)...(n-r+1)/(r!)
def combination(n, r):
r_fact = factorial(r)
numerator = 1.0
num = n-r+1.0
while num < n+1.0:
numerator *= num
num += 1.0
return numerator / r_fact
# hypergeometric probability: the probability that an n-trial hypergeometric experiment results
# in exactly x successes, when the population consists of N items, k of which are classified as successes.
# (here, population = N, population_successes = k, sample = n, sample_successes = x)
# h(x; N, n, k) = [ kCx ] * [ N-kCn-x ] / [ NCn ]
def hypergeometric(population, population_successes, sample, sample_successes):
return (combination(population_successes, sample_successes) *\
combination(population - population_successes, sample - sample_successes)) /\
combination(population, sample)
# HD-D implementation
def hdd(word_array, sample_size=42.0):
if isinstance(word_array, str):
raise ValueError("Input should be a list of strings, rather than a string. Try using string.split()")
if len(word_array) < 50:
raise ValueError("Input word list should be at least 50 in length")
# Create a dictionary of counts for each type
type_counts = {}
for token in word_array:
token = token.translate(None, string.punctuation).lower() # trim punctuation, make lowercase
if token in type_counts:
type_counts[token] += 1.0
else:
type_counts[token] = 1.0
# Sum the contribution of each token - "If the sample size is 42, the mean contribution of any given
# type is 1/42 multiplied by the percentage of combinations in which the type would be found." (McCarthy & Jarvis 2010)
hdd_value = 0.0
for token_type in type_counts.keys():
contribution = (1.0 - hypergeometric(len(word_array), sample_size, type_counts[token_type], 0.0)) / sample_size
hdd_value += contribution
return hdd_value
"""
Finding fingerprint and calculating simple fuzzy similarity
@author yohanes.gultom@gmail.com
Prerequisites on Ubuntu:
* Python 2.7 and pip
* FFMPEG `sudo apt install ffmpeg`
* AcoustID fingerprinter `sudo apt install acoustid-fingerprinter`
* PyAcoustID `pip install pyacoustid`
* FuzzyWuzzy `pip install fuzzywuzzy[speedup]`
"""
import acoustid
import sys
import os
import chromaprint
import numpy as np
import matplotlib.pyplot as plt
from fuzzywuzzy import fuzz
DIR_DATABASE = 'music/full'
DIR_SAMPLES = 'music/partial'
def get_fingerprint(filepath):
"""
Get fingerprint (list of signed integer), version, duration
"""
duration, fp_encoded = acoustid.fingerprint_file(filepath)
fp, version = chromaprint.decode_fingerprint(fp_encoded)
return fp, version, duration
def build_fingerprint_database(dirpath, file_ext='.mp3'):
"""
Build database from directory of audio files
"""
database = {}
print('Processing {}..'.format(dirpath))
for f in os.listdir(dirpath):
path = os.path.join(dirpath, f)
name, ext = os.path.splitext(f)
if os.path.isfile(path) and ext == file_ext:
print('Getting fingerprint from database item: {}..'.format(f))
database[f], version, duration = get_fingerprint(path)
return database
def plot_fingerprints(db):
"""
Visualize fingerprints in database
"""
fig = plt.figure()
numrows = len(db)
plot_id = 1
for name, fp in db.iteritems():
# single column grid
a = fig.add_subplot(numrows, 1, plot_id)
imgplot = plt.imshow(get_fingerprint_bitmap(fp))
a.set_title(name)
plot_id += 1
plt.show()
def get_fingerprint_bitmap(fp):
"""
Plot list of uint32 as (32, len(list)) bitmap
"""
bitmap = np.transpose(np.array([[b == '1' for b in list('{:32b}'.format(i & 0xffffffff))] for i in fp]))
return bitmap
if __name__ == '__main__':
# load database and samples
database = build_fingerprint_database(DIR_DATABASE)
samples = build_fingerprint_database(DIR_SAMPLES)
print('\n')
# find best match of each samples in database
for sample, sample_fp in samples.iteritems():
print('Similarity score of "{}":'.format(sample))
best_match = None
for name, fp in database.iteritems():
similarity = fuzz.ratio(sample_fp, fp)
if not best_match or best_match['score'] < similarity:
best_match = {
'score': similarity,
'name': name
}
print('{} {}%'.format(name, similarity))
print('Best match: {name} ({score}%)\n'.format(**best_match))
# plot database
plot_fingerprints(database)
# Train a ProbabilisticProjectiveDependencyParser using CoNLL-U treebank from Universal Dependencies https://github.com/UniversalDependencies
# In this script we are using Indonesian treebank https://github.com/UniversalDependencies/UD_Indonesian
from pprint import pprint
from nltk.parse import (
DependencyGraph,
ProbabilisticProjectiveDependencyParser
)
# open treebank file
with open('id-ud-train.conllu', 'r') as f:
# parse dependency graphs from file
graphs = [DependencyGraph(entry, top_relation_label='root') for entry in f.read().decode('utf-8').split('\n\n') if entry]
# train ProbabilisticProjectiveDependencyParser
ppdp = ProbabilisticProjectiveDependencyParser()
print('Training Probabilistic Projective Dependency Parser...')
ppdp.train(graphs)
# try to parse a sentence
# and print tree ordered by probability (most probable first)
sent = ['Melingge', 'adalah', 'gampong', 'di', 'kecamatan', 'Pulo', 'Aceh', '.']
print('Parsing \'' + " ".join(sent) + '\'...')
print('Parse:')
for tree in ppdp.parse(sent):
pprint(tree)
import sys
import time
import json
import os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from Proxy_List_Scrapper import Scrapper
class ProxyGenerator:
def __init__(self, s: Scrapper):
self.scrapper = s
self.data = self.scrapper.getProxies()
def generate(self):
while len(self.data.proxies) <= 0:
print('> Reloading proxies..')
self.data = self.scrapper.getProxies()
p = self.data.proxies.pop(0)
return f'{p.ip}:{p.port}'
# config
chromedriver_path = '/mnt/data/Workspace/webdrivers/chromedriver_89.0.4389.23'
ip_map_path = 'pollingsituajakali_ip_map.json'
target_url = 'https://pollingsituajakali.xyz/pollingxxxxxxxxxxx'
target = 'HARIYONO'
n_repeat = 1
if __name__ == '__main__':
# get n_repeat if provided as argument
if len(sys.argv) > 1:
n_repeat = int(sys.argv[1])
# load ip map if exists
ip_map = {}
if os.path.isfile(ip_map_path):
with open(ip_map_path) as f:
ip_map = json.load(f)
# proxy generator
proxy_gen = ProxyGenerator(Scrapper(category='ALL', print_err_trace=False))
# repeat n_repeat times
count = 0
while count < n_repeat:
print('Attempt #' + str(count+1))
driver = None
proxy = None
try:
# get proxy
print('> Finding proxy..')
while not proxy or proxy in ip_map:
proxy = proxy_gen.generate()
# time.sleep(1)
print(f'> {proxy}')
# setup selenium
options = Options()
options.add_argument('--headless')
options.add_argument(f'--proxy-server={proxy}')
driver = webdriver.Chrome(executable_path=chromedriver_path, options=options)
driver.implicitly_wait(1)
# click button
driver.get(target_url)
btn = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, f'button[data-txt={target}]'))
)
btn.click()
# wait for result
# complete_element_id = 'chart'
# complete_element_id = 'btnRefresh'
complete_element_id = 'spanCount'
print(f'> Waiting for {complete_element_id}..')
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, complete_element_id))
)
alerts = driver.find_elements_by_css_selector('.alert-danger')
print(f'> alerts: {len(alerts)}')
if len(alerts) <= 0:
count += 1
print('> Success')
else:
print('> Failed. IP has been used')
# mark proxy
ip_map[proxy] = 1
with open(ip_map_path, 'w') as f:
json.dump(ip_map, f)
except Exception as e:
print(f'> Failed. {e}')
finally:
if driver:
driver.quit()
print(f'Success: {count}')
import numpy as np
def itob(N):
s = ''
while N > 1:
s = str(N%2) + s
N = N//2
s = str(N) + s
return s
def bingap(N):
gap = 0
tmp = -1
while N > 1:
r = N%2
if r == 1:
if tmp > gap:
gap = tmp
tmp = 0
elif tmp > -1:
tmp += 1
N //= 2
if N == 1:
if tmp > gap:
gap = tmp
return gap
def reverse(A, i, j):
while i < j:
tmp = A[j]
A[j] = A[i]
A[i] = tmp
i+=1
j-=1
def rotate(A, K):
if A:
K = K % len(A)
if K > 0:
reverse(A, 0, len(A)-1)
reverse(A, 0, K-1)
reverse(A, K, len(A)-1)
return A
def tape_diff_slow(A):
md = None
for p in range(1, len(A)):
d = abs(sum(A[:p])-sum(A[p:]))
# print(d)
md = d if md is None or d < md else md
print('min diff: ' + str(md))
def tape_diff(A, n, s1, s2):
if n < 0:
return abs(s1-s2)
else:
inc = tape_diff(A, n-1, s1+A[n], s2)
exc = tape_diff(A, n-1, s1, s2+A[n])
return min(inc, exc)
def tape_diff_fast(A):
return tape_diff(A, len(A)-1, 0, 0)
A = [3,1,2,4,3]
A = np.random.randint(-1000, 1000, 100)
print(A)
print(tape_diff_fast(A))
"""
Load node id and coordinates from pycgrc file generated by https://github.com/AndGem/OsmToRoadGraph to PostGIS database
Read list of coordinates and get nearest node id using index-based KNN https://postgis.net/workshops/postgis-intro/knn.html#index-based-knn
@Author yohanes.gultom@gmail.com
"""
import configparser
import psycopg2
import sys
import csv
from io import StringIO
config = configparser.ConfigParser()
config.read('config.ini')
"""
Example of config.ini:
[respondor]
host = localhost
user = postgres
passwd = postgres
db = respondor
"""
class get_db_connection:
def __init__(self, dbconf: dict):
self.host = dbconf['host']
self.database = dbconf['db']
self.user = dbconf['user']
self.password = dbconf['passwd']
def __enter__(self):
self.conn = psycopg2.connect(
host=self.host,
database=self.database,
user=self.user,
password=self.password)
self.cur = self.conn.cursor()
return self.cur
def __exit__(self, type, value, traceback):
self.conn.commit()
self.cur.close()
self.conn.close()
def load_locations(table_name, file_path):
# read nodes to string io
s = StringIO()
total_nodes = None
count_nodes = 0
with open(file_path) as f:
count = 0
for line in f:
if count == 7:
total_nodes = int(line)
elif count > 8:
# start reading nodes
node_id, lat, lon = line.split()
values = (node_id, f'POINT({lat} {lon})')
s.write('\t'.join(values)+'\n')
count_nodes += 1
if count_nodes >= total_nodes:
break
count += 1
s.seek(0)
# create table and copy data
with get_db_connection(config['respondor']) as cur:
cur.execute(f'DROP TABLE IF EXISTS {table_name}')
cur.execute(f"""
CREATE TABLE {table_name} (
id integer NOT NULL,
coords geometry,
CONSTRAINT {table_name}_pkey PRIMARY KEY (id)
)
"""
)
cur.execute(f'CREATE INDEX {table_name}_coords_index ON {table_name} USING GIST(coords)')
cur.copy_from(s, table_name, columns=('id', 'coords'))
print(count_nodes)
def get_nearest_node_ids(table_name, input_path):
# read input
locations = []
with open(input_path) as f:
reader = csv.reader(f)
for row in reader:
if len(row) >= 3:
name = row[0]
lat = row[1]
lon = row[2]
locations.append((name, lat, lon))
# find nearest location then rewrite input
with get_db_connection(config['respondor']) as cur, open(input_path, 'w+') as f:
writer = csv.writer(f)
for row in locations:
name, lat, lon = row
sql = f"SELECT id FROM {table_name} ORDER BY coords <-> 'POINT({lat} {lon})'::geometry ASC LIMIT 1;"
cur.execute(sql)
res = cur.fetchone()
node_id = res[0]
writer.writerow((name, lat, lon, node_id))
if __name__ == '__main__':
# load data to database
load_locations('locations_jakarta', '/mnt/data/Downloads/jakarta.pycgr/jakarta.pycgrc')
load_locations('locations_lombok', '/mnt/data/Downloads/lombok-island.pycgr/lombok-island.pycgrc')
# find nearest node_id
get_nearest_node_ids('locations_jakarta', '/mnt/data/Downloads/jakarta_locations.csv')
get_nearest_node_ids('locations_lombok', '/mnt/data/Downloads/lombok_locations.csv')
'''
WARNING: this is Python 2.x version
Simple script to test sending email using SMTP server
'''
import smtplib
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
# smtp config
SMTP_SERVER = 'smtp.gmail.com'
SMTP_PORT = 587
SMTP_USER = 'user@gmail.com'
SMTP_PASS = 'password'
# email content
to = "yohanes.gultom@gmail.com"
subject = "Just a test mail"
body = "This is just a test message from a new server. Kindly ignore it and proceed with what you are doing. Thank you!"
if __name__ == '__main__':
msg = MIMEMultipart()
msg['From'] = SMTP_USER
msg['To'] = to
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
server.starttls()
server.login(SMTP_USER, SMTP_PASS)
server.sendmail(SMTP_USER, to, msg.as_string())
server.quit()
"""
Send email Python 3
Reference: https://realpython.com/python-send-email/#sending-your-plain-text-email
"""
import smtplib
import ssl
import configparser
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
config = configparser.ConfigParser()
config.read('config.ini')
smtp_server = config['smtp']['host']
port = 587 # For starttls
sender_email = config['smtp']['user']
password = config['smtp']['password']
receiver_email = config['buk']['recipient']
# Create the plain-text and HTML version of your message
message = MIMEMultipart("alternative")
message["Subject"] = "multipart test"
message["From"] = sender_email
message["To"] = receiver_email
text = """\
Hello world!
This is an HTML email test
"""
html = """\
<html>
<body>
<h1>Hello world!</h1>
<p>This is an <strong>HTML</strong> email test</p>
</body>
</html>
"""
part1 = MIMEText(text, "plain")
part2 = MIMEText(html, "html")
# Add HTML/plain-text parts to MIMEMultipart message
# The email client will try to render the last part first
message.attach(part1)
message.attach(part2)
context = ssl.create_default_context()
with smtplib.SMTP(smtp_server, port) as server:
server.ehlo() # Can be omitted
server.starttls(context=context)
server.ehlo() # Can be omitted
print('authenticating..')
server.login(sender_email, password)
print('sending email...')
server.sendmail(sender_email, receiver_email, message.as_string())
print('done')
'''
Split video by size or number of chunks
Original code: https://stackoverflow.com/a/28884437/1862500
@author yohanes.gultom@gmail.com
'''
import re
import math
from optparse import OptionParser
length_regexp = 'Duration: (\d{2}):(\d{2}):(\d{2})\.\d+,'
re_length = re.compile(length_regexp)
from subprocess import check_call, PIPE, Popen
import shlex
def main():
opt = parse_options()
filename = opt.filename
split_size = opt.split_size
split_count = opt.split_count
if split_size and split_size <= 0:
print("split_size can't be 0")
raise SystemExit
if split_count and split_count <= 1:
print("split_count must be > 1")
raise SystemExit
p1 = Popen(["ffmpeg", "-i", filename], stdout=PIPE, stderr=PIPE, universal_newlines=True)
# get p1.stderr as input
output = Popen(["grep", 'Duration'], stdin=p1.stderr, stdout=PIPE, universal_newlines=True)
p1.stdout.close()
matches = re_length.search(output.stdout.read())
if matches:
video_length = int(matches.group(1)) * 3600 + \
int(matches.group(2)) * 60 + \
int(matches.group(3))
print("Video length in seconds: {}".format(video_length))
else:
print("Can't determine video length.")
raise SystemExit
if split_count:
print("split_count is defined. Ignoring split_size, if defined")
split_size = math.ceil(video_length / split_count)
if not split_count:
split_count = math.ceil(video_length / split_size)
if split_count == 1:
print("Video length is less than the target split length.")
raise SystemExit
for n in range(split_count):
split_start = split_size * n
pth, ext = filename.rsplit(".", 1)
cmd = "ffmpeg -i {} -vcodec copy -strict -2 -ss {} -t {} {}-{}.{}".\
format(filename, split_start, split_size, pth, n, ext)
print("About to run: {}".format(cmd))
check_call(shlex.split(cmd), universal_newlines=True)
def parse_options():
parser = OptionParser()
parser.add_option("-f", "--file",
dest="filename",
help="file to split, for example sample.avi",
type="string",
action="store"
)
parser.add_option("-s", "--split-size",
dest="split_size",
help="split or chunk size in seconds, for example 10",
type="int",
action="store"
)
parser.add_option("-c", "--split-count",
dest="split_count",
help="number of even-sized chunks, for example 4",
type="int",
action="store"
)
(options, args) = parser.parse_args()
if options.filename and (options.split_size or options.split_count):
return options
else:
parser.print_help()
raise SystemExit
if __name__ == '__main__':
try:
main()
except Exception as e:
print(e)
'''
Film actors/actress recommendation based on co-occurrences
DVD Rental database https://www.postgresqltutorial.com/wp-content/uploads/2019/05/dvdrental.zip
@author yohanes.gultom@gmail.com
'''
import sqlalchemy as db
from pprint import pprint
engine = db.create_engine('postgresql://postgres:postgres@localhost/dvdrental')
connection = engine.connect()
query = db.sql.text("""select actor.actor_id, actor.first_name, actor.last_name, x.cooccurrence from (
select film_actor2.actor_id, count(*) as cooccurrence
from film_actor film_actor1 join film_actor film_actor2 on film_actor1.film_id = film_actor2.film_id
where film_actor1.actor_id != film_actor2.actor_id
and film_actor1.actor_id = :actor_id
group by film_actor1.actor_id, film_actor2.actor_id
) x join actor on x.actor_id = actor.actor_id
where x.cooccurrence > 2
order by x.cooccurrence desc
""")
# get actors/actress that often acted together with given input_actor_id
input_actor_id = 107
result = connection.execute(query, actor_id=input_actor_id).fetchall()
pprint(result)
'''
Convert Named-Entity tagged file (Open NLP format) to Stanford NLP format (token-based)
@Author yohanes.gultom@gmail
Tagged file example (2 sentences):
"Internal DPD Sulsel mudah-mudahan dalam waktu dekat ada keputusan. Sudah ada keputusan kita serahkan ke DPP dan Rabu ini kita akan rapat harian soal itu," kata <PERSON>Sudding</PERSON> kepada Tribunnews.com, <TIME>Senin (30/1/2012)</TIME>.
Menurut <PERSON>Sudding</PERSON>, DPP Hanura pada prinsipnya memberikan kesempatan dan ruang sama bagi pengurus DPD dan DPC Hanura Sulsel untuk menyampaikan aspirasinya.
"Dan diberikan kesempatan melakukan verfikasi akar msalah yang terjadi di DPD Hanura Sulsel," kata dia.
'''
import sys
import re
SINGLE_PATTERN = re.compile(r'^([^<>]*)<(\w+)>([^<]*)</(\w+)>([^<>]*)$', re.I)
START_PATTERN = re.compile(r'^([^<>]*)<(\w+)>([^<]*)$', re.I)
END_PATTERN = re.compile(r'^([^<>]*)</(\w+)>([^<]*)$', re.I)
EOS_PATTERN = re.compile(r'^([^<>]*)\.$', re.I)
NON_ENTITY_TYPE = 'O'
infile = sys.argv[1]
outfile = sys.argv[2]
cur_type = NON_ENTITY_TYPE
with open(infile, 'rb') as f, open(outfile, 'w') as out:
for line in f:
for token in line.strip().split(' '):
token = token.strip()
if not token:
continue
match = re.match(SINGLE_PATTERN, token)
if match:
if match.group(1):
out.write(match.group(1) + '\t' + NON_ENTITY_TYPE + '\n')
out.write(match.group(3) + '\t' + match.group(2) + '\n')
if match.group(2) != match.group(4):
raise ValueError('Invalid tag pair: {} and {}'.format(match.group(2), match.group(4)))
if match.group(5):
out.write(match.group(5) + '\t' + NON_ENTITY_TYPE + '\n')
continue
match = re.match(START_PATTERN, token)
if match:
if match.group(1):
out.write(match.group(1) + '\t' + NON_ENTITY_TYPE + '\n')
cur_type = match.group(2)
out.write(match.group(3) + '\t' + cur_type + '\n')
continue
match = re.match(END_PATTERN, token)
if match:
out.write(match.group(1) + '\t' + cur_type + '\n')
if match.group(2) != cur_type:
raise ValueError('Invalid tag pair: {} and {}'.format(cur_type, match.group(2)))
cur_type = NON_ENTITY_TYPE
if match.group(3):
out.write(match.group(3) + '\t' + NON_ENTITY_TYPE + '\n')
continue
match = re.match(EOS_PATTERN, token)
if match:
out.write(match.group(1) + '\t' + cur_type + '\n')
out.write('.' + '\t' + cur_type + '\n')
out.write('\n')
continue
out.write(token + '\t' + cur_type + '\n')
# VIP currency notification script
# Usage: python vip2.py <gmail_username> <gmail_password> <to_email>
# Author: yohanes.gultom@gmail.com
from bs4 import BeautifulSoup
from bs4.element import Tag
from re import sub
from decimal import Decimal
import urllib2
import backoff
import smtplib
import sys
url = 'https://www.vip.co.id'
# rules to send email
rules = [
{'currency': 'SGD', 'op': '>=', 'type': 'buy', 'value': 9400}
]
smtp_config = {
'username': sys.argv[1],
'password': sys.argv[2],
'server': 'smtp.gmail.com',
'port': 465,
'from': 'VIP Bot',
'to': sys.argv[3]
}
message_tpl = '''From: {0}\r\nTo: {1}\r\nSubject: {2} to IDR today\r\nMIME-Version: 1.0\r\nContent-Type: text/html\r\n\r\n
<h1>{2} to IDR</h1>
<ul>
<li>Buy: IDR {3}</li>
<li>Sell: IDR {4}</li>
</ul>
<p>Source: {5}</p>
'''
@backoff.on_exception(backoff.expo, urllib2.URLError, max_tries=3)
def fetch_content(url):
return urllib2.urlopen(url)
def parse_currency(s):
return Decimal(sub(r'[^\d.]', '', str(s)))
# retrieve and parse rates
print('Fetching content from {}..'.format(url))
rates = {}
response = fetch_content(url)
html = response.read()
soup = BeautifulSoup(html, 'html.parser')
rate_table = soup.select('#rate-table tr')
for rate in rate_table[1:]:
values = []
for content in rate.contents:
if isinstance(content, Tag):
if 'title' in content:
values.append(content['title'])
else:
values.append(content.contents[0])
first = parse_currency(values[1])
second = parse_currency(values[2])
rates[str(values[0])] = {
'buy': min(first, second),
'sell': max(first, second)
}
# check rules
print('Checking rules..')
server_ssl = smtplib.SMTP_SSL(smtp_config['server'], smtp_config['port'])
server_ssl.ehlo()
server_ssl.login(smtp_config['username'], smtp_config['password'])
for rule in rules:
if rule['currency'] in rates:
rate = rates[rule['currency']]
rule_expr = '{} {} {}'.format(rate[rule['type']], rule['op'], rule['value'])
if eval(rule_expr, {'__builtins__': None}):
print('Found matching rule: {}'.format(rule))
message = message_tpl.format(
smtp_config['from'],
smtp_config['to'],
rule['currency'],
rate['buy'],
rate['sell'],
url
)
print('Sending email..')
server_ssl.sendmail(smtp_config['from'], smtp_config['to'], message)
server_ssl.close()
print('Done!')
# VIP currency notification script
# Require Python >= 3.5.2
# Usage: python vip3.py <gmail_username> <gmail_password> <to_email>
# Author: yohanes.gultom@gmail.com
from bs4 import BeautifulSoup
from bs4.element import Tag
from re import sub
from decimal import Decimal
from urllib.request import Request, urlopen
import urllib.error
import backoff
import smtplib
import sys
url = 'https://www.vip.co.id'
# rules to send email
rules = [
{'currency': 'SGD', 'op': '>=', 'type': 'buy', 'value': 9400}
]
smtp_config = {
'username': sys.argv[1],
'password': sys.argv[2],
'server': 'smtp.gmail.com',
'port': 465,
'from': 'VIP Bot',
'to': sys.argv[3]
}
message_tpl = '''From: {0}\r\nTo: {1}\r\nSubject: {2} to IDR today\r\nMIME-Version: 1.0\r\nContent-Type: text/html\r\n\r\n
<h1>{2} to IDR</h1>
<ul>
<li>Buy: IDR {3}</li>
<li>Sell: IDR {4}</li>
</ul>
<p>Source: {5}</p>
'''
@backoff.on_exception(backoff.expo, urllib.error.URLError, max_tries=3)
def fetch_content(url):
req = Request(url, headers={'User-Agent': 'Mozilla/5.0'})
return urlopen(req).read()
def parse_currency(s):
return Decimal(sub(r'[^\d.]', '', str(s)))
# retrieve and parse rates
print('Fetching content from {}..'.format(url))
rates = {}
html = fetch_content(url)
soup = BeautifulSoup(html, 'html.parser')
rate_table = soup.select('#rate-table tr')
for rate in rate_table[1:]:
values = []
for content in rate.contents:
if isinstance(content, Tag):
if 'title' in content:
values.append(content['title'])
else:
values.append(content.contents[0])
first = parse_currency(values[1])
second = parse_currency(values[2])
rates[str(values[0])] = {
'buy': min(first, second),
'sell': max(first, second)
}
# check rules
print('Checking rules..')
server_ssl = smtplib.SMTP_SSL(smtp_config['server'], smtp_config['port'])
server_ssl.ehlo()
server_ssl.login(smtp_config['username'], smtp_config['password'])
for rule in rules:
if rule['currency'] in rates:
rate = rates[rule['currency']]
rule_expr = '{} {} {}'.format(rate[rule['type']], rule['op'], rule['value'])
if eval(rule_expr, {'__builtins__': None}):
print('Found matching rule: {}'.format(rule))
message = message_tpl.format(
smtp_config['from'],
smtp_config['to'],
rule['currency'],
rate['buy'],
rate['sell'],
url
)
print('Sending email..')
server_ssl.sendmail(smtp_config['from'], smtp_config['to'], message)
server_ssl.close()
print('Done!')
#!/usr/bin/python
"""
Simple Voting HTTP server with MySQL database
Setup in Ubuntu:
$ sudo apt-get install python-pip python-dev libmysqlclient-dev
$ pip install MySQL-python
"""
import MySQLdb
import cgi
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
# Server and database combination
PORT_NUMBER = 8080
DB_HOST = 'localhost'
DB_USER = 'root'
DB_PASS = 'root'
DB_NAME = 'vote'
class VoteHandler(BaseHTTPRequestHandler):
"""
HTTP request handler for simple voting
"""
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(get_vote_form_html())
return
def do_POST(self):
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': self.headers['Content-Type']}
)
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
try:
candidate = form.getvalue('candidate')
state = form.getvalue('state')
if inc_vote(candidate, state) == 1:
html = get_vote_form_html('Thanks for your vote!', message_color='green')
else:
html = get_vote_form_html('Vote error. Invalid candidate and/or state', message_color='red')
except Exception as e:
print(e)
html = get_vote_form_html('Server error. Please contact support', message_color='red')
self.wfile.write(html)
return
def get_vote_form_html(message_html=None, message_color='green'):
"""
Generate HTML with form for voting
"""
candidate_html = get_radio_group_html('candidate', get_distinct_vote('candidate'))
state_html = get_radio_group_html('state', get_distinct_vote('state'))
html_form = """
<html>
<head><title>Voting App</title></head>
<body>
"""
if message_html:
html_form += """
<p style="color:{}">{}</p>
""".format(message_color, message_html)
html_form += """
<form action="" method="POST">
<table>
<tr><td>Candidates:</td><td>{}</td></tr>
<tr><td>States:</td><td>{}</td></tr>
<tr><td><input type="submit" value="Submit"/></td></tr>
</table>
</form>
""".format(candidate_html, state_html)
html_form += """
</body>
</html>
"""
return html_form
def get_distinct_vote(col):
"""
Get distinct vote column
"""
if not db:
raise Exception('Connection not opened')
else:
cursor = db.cursor()
cursor.execute('SELECT DISTINCT {} FROM vote'.format(col))
results = cursor.fetchall()
return sorted([row[0] for row in results])
def inc_vote(candidate, state):
"""
Increase vote for certain candidate and state by 1
"""
if not db:
raise Exception('Connection not opened')
else:
try:
cursor = db.cursor()
# use parameterized query to prevent sql injection
affected_rows = cursor.execute("UPDATE vote SET total_votes = total_votes + 1 WHERE candidate = %s AND state = %s", [candidate, state])
db.commit()
return affected_rows
except Exception as e:
db.rollback()
raise Exception('Database update failed')
return 0
def get_radio_group_html(group_name, values):
html = []
for val in values:
if not html:
default = 'checked'
else:
default = ''
html.append('<input type="radio" name="{0}" value="{1}" {2}/> {1}'.format(group_name, val, default))
return ' '.join(html)
if __name__ == '__main__':
try:
# connect to database
db = MySQLdb.connect(DB_HOST, DB_USER, DB_PASS, DB_NAME)
print 'Connected to database {}@{}'.format(DB_NAME, DB_HOST)
# start HTTP server
server = HTTPServer(('', PORT_NUMBER), VoteHandler)
print 'Server is started and accessible on http://localhost:{}'.format(PORT_NUMBER)
print 'Press CTRL+C to shutdown..'
server.serve_forever()
except KeyboardInterrupt:
print 'Shutting down the web server'
# shutdown server
server.socket.close()
# close db connection
db.close()
@lucasvazq
Copy link

lucasvazq commented Dec 13, 2021

I leave my grain of rice.

I just was bored and I feel inspired by this https://craft.js.org/docs/overview#extensible

My code has no sense and purpose 😃

from __future__ import annotations

import copy
import ctypes
import uuid
from typing import Any, Callable


# ==============================
# react.py
# ==============================


class Temp: ...


class JSXElement:

    def __init__(self, props: dict = None, parent: JSXElement = None):
        self.id = useNode(self)
        self.props = {} if props is None else copy.copy(props)
        self.parent = parent


class ATag(JSXElement):

    def __init__(self, props: dict = None, *, parent: JSXElement = None, click: Callable[[], Any] = None):
        super().__init__(props, parent)
        self.click = click


# ==============================
# core.py
# ==============================


REFERENCES = {}


def useNode(element: JSXElement):
    key = str(uuid.uuid4())
    REFERENCES[key] = id(element)
    return key


def node(reference: int):
    return ctypes.cast(REFERENCES[reference], ctypes.py_object).value


# ==============================
# MyCustomElement.py
# ==============================


class MyCustomElement(JSXElement):
    """
    This is just a simple component that renders an anchor tag.
    If you click on this anchor tag, the whole component becomes duplicated.
    """

    def construct(self) -> JSXElement:
        return ATag(
            parent=self,
            click=lambda: (
                setattr(Temp, "actual_container", node(self.id))
                or Temp.actual_container.__class__(Temp.actual_container.props).construct()
            )
        )


# ==============================
# MyCustomElementTests.py
# ==============================


container1 = MyCustomElement({"text": 1}).construct()
assert container1.parent.props == {"text": 1}

container2 = container1.click()
assert container2.parent.props == {"text": 1}

container1.parent.props["img"] = "https://..."
assert container1.parent.props == {"text": 1, "img": "https://..."}
assert container2.parent.props == {"text": 1}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment