Last active
August 11, 2023 16:30
-
-
Save yohanesgultom/630a831eff1fbdcd84b3cfec6feabe02 to your computer and use it in GitHub Desktop.
Random python scripts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Random collection of python scripts |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Argument parser template | |
""" | |
import argparse | |
parser = argparse.ArgumentParser(description='Your application description') | |
# simple argument (mandatory) | |
parser.add_argument('a', help='some description') | |
# cast positional argument to int | |
parser.add_argument('b', type=int, help='some description') | |
# option (optional) | |
parser.add_argument('-r', help='some description') | |
# set silent=True if this option available | |
parser.add_argument('-s', '--silent', action='store_true', default=False, help='some description') | |
# parse arguments/options to an object args | |
args = parser.parse_args() | |
# call the arguments/options | |
print(args.a) | |
print(args.b) | |
print(args.r) | |
print(args.s) | |
print(args.silent) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Automatic audio transcription job using AWS Transcribe service https://aws.amazon.com/transcribe/ | |
@author yohanes.gultom@gmail.com | |
""" | |
import configparser, boto3, os, time, json | |
from pprint import pprint | |
bucket_name = 'yohanesgultom-transcribe-test' | |
file_path = '/home/yohanesgultom/Downloads/Pidato-Kenegaraan-Presiden-Joko-Widodo-2019-Part-1.mp3' | |
# source: Pidato Kenegaraan Presiden Joko Widodo (2:21-3:42) https://www.youtube.com/watch?v=yDdQ9pEfcnw&t=155s | |
config = configparser.ConfigParser() | |
config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'aws.conf')) | |
# init AWS session | |
session = boto3.session.Session( | |
aws_access_key_id=config['default']['aws_access_key_id'], | |
aws_secret_access_key=config['default']['aws_secret_access_key'], | |
region_name=config['default']['region'] | |
) | |
s3 = session.client('s3') | |
transcribe = session.client('transcribe') | |
# create bucket to store transcribe input/output file if not exists | |
res = s3.list_buckets() | |
buckets = [b['Name'] for b in res['Buckets']] | |
if bucket_name not in buckets: | |
print(f'Creating new bucket: {bucket_name}...') | |
res = s3.create_bucket( | |
Bucket=bucket_name, | |
CreateBucketConfiguration={'LocationConstraint': session.region_name} | |
) | |
# upload audio input file if not exist | |
file_name = os.path.basename(file_path) | |
res = s3.list_objects(Bucket=bucket_name) | |
contents = res['Contents'] if 'Contents' in res else [] | |
file_names = [c['Key'] for c in contents] | |
if file_name not in file_names: | |
print(f'Uploading input file: {file_name}...') | |
res = s3.upload_file(file_path, bucket_name, file_name) | |
# create new job if not exist | |
res = transcribe.list_transcription_jobs() | |
job_name = file_name | |
jobs = [j['TranscriptionJobName'] for j in res['TranscriptionJobSummaries']] | |
if job_name not in jobs: | |
print(f'Starting transcribe job: {job_name}...') | |
s3_file = f's3://{bucket_name}/{file_name}' | |
res = transcribe.start_transcription_job( | |
TranscriptionJobName=job_name, | |
LanguageCode='id-ID', | |
Media={'MediaFileUri': s3_file}, | |
OutputBucketName=bucket_name | |
) | |
# wait until job to complete | |
completed = False | |
while not completed: | |
res = transcribe.list_transcription_jobs( | |
JobNameContains=job_name, | |
MaxResults=1 | |
) | |
if 'TranscriptionJobSummaries' in res: | |
if len(res['TranscriptionJobSummaries']) > 0: | |
job = res['TranscriptionJobSummaries'][0] | |
completed = job['TranscriptionJobStatus'] == 'COMPLETED' | |
print(f'Job has completed') | |
if not completed: | |
print(f'Waiting for job to complete...') | |
time.sleep(5) | |
# download transcription result | |
result_file = f'{file_name}.json' | |
if completed and not os.path.isfile(result_file): | |
res = s3.list_objects(Bucket=bucket_name) | |
contents = res['Contents'] if 'Contents' in res else [] | |
for c in contents: | |
content_name = c['Key'] | |
if content_name == result_file: | |
print(f'Downloading transcription result...') | |
s3.download_file(bucket_name, content_name, content_name) | |
print(f'File downloaded {content_name}') | |
# print transcription result | |
if os.path.isfile(result_file): | |
with open(result_file, 'r') as f: | |
res_file = json.load(f) | |
print(res_file['results']['transcripts'][0]['transcript']) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Run mysqldump gzip and send result using SMTP | |
Reference: https://realpython.com/python-send-email | |
Config example: | |
{ | |
"subject" : "Daily backup", | |
"body" : "This is a daily database backup", | |
"sender_email" : "sender@gmail.com", | |
"receiver_email" : "receiver@gmail.com", | |
"password" : "supersecretpassword", | |
"smtp_server" : "smtp.gmail.com", | |
"smtp_host" : 465, | |
"dbname" : "dbname", | |
"file_prefix": "dbname_backup" | |
} | |
@Author yohanes.gultom@gmail.com | |
''' | |
import email, smtplib, ssl | |
import datetime | |
import subprocess | |
import shlex | |
import json | |
from email import encoders | |
from email.mime.base import MIMEBase | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.text import MIMEText | |
CONFIG_FILE = 'backup_email.json' | |
with open(CONFIG_FILE, 'r') as f: | |
config = json.load(f) | |
subject = config['subject'] | |
body = config['body'] | |
sender_email = config['sender_email'] | |
receiver_email = config['receiver_email'] | |
password = config['password'] | |
smtp_server = config['smtp_server'] | |
smtp_host = config['smtp_host'] | |
dbname = config['dbname'] | |
file_prefix = config['file_prefix'] | |
cmd1 = "mysqldump {}".format(dbname) | |
cmd2 = "gzip -9" | |
filename = "{}_{}.sql.gz".format(file_prefix, datetime.datetime.now().strftime('%Y%m%d%H%M')) | |
# Backup database | |
print('Backing up database..') | |
with open(filename, 'w') as f: | |
ps1 = subprocess.Popen(shlex.split(cmd1), stdout=subprocess.PIPE) | |
ps2 = subprocess.Popen(shlex.split(cmd2), stdin=ps1.stdout, stdout=f) | |
ps1.wait() | |
ps2.wait() | |
if ps2.returncode == 2: | |
exit(1) | |
# Create a multipart message and set headers | |
message = MIMEMultipart() | |
message["From"] = sender_email | |
message["To"] = receiver_email | |
message["Subject"] = subject | |
message["Bcc"] = receiver_email # Recommended for mass emails | |
# Add body to email | |
message.attach(MIMEText(body, "plain")) | |
# Open PDF file in binary mode | |
with open(filename, "rb") as attachment: | |
# Add file as application/octet-stream | |
# Email client can usually download this automatically as attachment | |
part = MIMEBase("application", "octet-stream") | |
part.set_payload(attachment.read()) | |
# Encode file in ASCII characters to send by email | |
encoders.encode_base64(part) | |
# Add header as key/value pair to attachment part | |
part.add_header( | |
"Content-Disposition", | |
f"attachment; filename= {filename}", | |
) | |
# Add attachment to message and convert message to string | |
message.attach(part) | |
text = message.as_string() | |
# Log in to server using secure context and send email | |
print('Sending email..') | |
context = ssl.create_default_context() | |
with smtplib.SMTP_SSL(smtp_server, smtp_host, context=context) as server: | |
server.login(sender_email, password) | |
server.sendmail(sender_email, receiver_email, text) | |
print('Done.') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Parse BibTex entries from input file and render them in IEEEtran.cls format | |
# http://www.michaelshell.org/tex/ieeetran/ | |
# Usage: python bibtexconverter.py [bibtex file] | |
# | |
# BibTex example (input): | |
# @article{lecun2015deep, | |
# title={Deep learning}, | |
# author={LeCun, Yann and Bengio, Yoshua and Hinton, Geoffrey}, | |
# journal={Nature}, | |
# volume={521}, | |
# number={7553}, | |
# pages={436--444}, | |
# year={2015}, | |
# publisher={Nature Publishing Group} | |
# } | |
# | |
# IEEETran example (output): | |
# \bibitem{lecun2015deep} Y.~LeCun and Y.~Bengio and G.~Hinton, \emph{Deep learning}.\hskip 1em plus 0.5em minus 0.4em\relax Nature, Nature Publishing Group, 2015. | |
import re | |
import sys | |
from pprint import pprint | |
def ieee(refs): | |
print '\n' | |
for ref in refs: | |
print _ieee(ref) + '\n' | |
def _ieee(dic): | |
return """\\bibitem{{{}}} {}, \\emph{{{}}}.\\hskip 1em plus 0.5em minus 0.4em\\relax {}, {}.""".format( | |
dic['refcode'], | |
_ieee_author(dic['author']), | |
dic['title'], | |
_ieee_publisher(dic), | |
dic['year'] | |
) | |
def _ieee_publisher(dic): | |
publisher = [] | |
keys = ['journal', 'booktitle', 'publisher', 'organization'] | |
for key in keys: | |
if key in dic: | |
publisher.append(dic[key]) | |
return ', '.join(publisher) | |
def _ieee_author(text): | |
formatted = [] | |
authors = text.split(' and ') | |
for a in authors: | |
names = a.split(', ') | |
if len(names) >= 2: | |
last, first = names[0], names[1] | |
formatted.append(first[0].upper() + '.~' + last) | |
else: | |
formatted.append(names[0]) | |
return ' and '.join(formatted) | |
if __name__ == '__main__': | |
if len(sys.argv) < 2: | |
print 'Usage: python bibtexconverter.py [bibtex file]' | |
exit() | |
filename = sys.argv[1] | |
# collect BibTex entries from input file | |
# separated by blank line | |
entries = [] | |
with open(filename) as f: | |
entry = [] | |
for line in f: | |
line = line.strip() | |
if len(line) > 0: | |
# save line | |
entry.append(line) | |
elif len(entry) > 0: | |
# blank line | |
entries.append(entry) | |
entry = [] | |
# last entry | |
if len(entry) > 0: | |
entries.append(entry) | |
# parse BibTex entries | |
references = [] | |
for entry in entries: | |
dic = {} | |
dic['refcode'] = re.search(r'@(article|inproceedings|thesis){([\w\d]*),', entry[0], re.M | re.I).group(2) | |
for i in range(1, (len(entry) - 1)): | |
key, value = entry[i].split('=') | |
value = re.search(r'{([^{}]*)}', value, re.M | re.I).group(1) | |
dic[key] = value | |
references.append(dic) | |
# render entries in IEEEtran.cls format | |
# http://www.michaelshell.org/tex/ieeetran/ | |
ieee(references) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Cek status https://ivoting.iaitb.or.id | |
Dependency: pip install requests | |
Cara pakai: python cek_status_ivoting.py input.csv | |
Format file input (csv): | |
nama1,jurusan1,angkatan1 | |
nama2,jurusan2,angkatan2 | |
nama3,jurusan3,angkatan3 | |
Hasil (csv): | |
nama1,jurusan1,angkatan1,status | |
nama2,jurusan2,angkatan2,status | |
nama3,jurusan3,angkatan3,status | |
@Author yohanes.gultom@gmail.com | |
""" | |
import csv | |
import requests | |
import sys | |
# ambil lagi via browser jika kadaluarsa | |
api_key = 'bsgcyfgveyujeygfefc387r34ybr39brnr3r3' | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36', | |
'Origin': 'https://ivoting.iaitb.or.id/', | |
'api-key': api_key, | |
} | |
url_template = 'https://ivoting.iaitb.or.id/api/open/alumnee/simple/all?name={}&page=1&perPage=10&studyprogram={}&generation={}' | |
input_file = sys.argv[1] | |
print(f'Membaca input {input_file}...') | |
input_rows = [] | |
with open(input_file) as f: | |
reader = csv.reader(f) | |
for row in reader: | |
nama = row[0] | |
jurusan = row[1] | |
angkatan = int(row[2]) | |
input_rows.append((nama, jurusan, angkatan)) | |
print('Memutakhirkan status...') | |
with open(input_file, 'w') as f: | |
writer = csv.writer(f) | |
for row in input_rows: | |
nama, jurusan, angkatan = row | |
try: | |
res = requests.get(url_template.format(nama, jurusan, angkatan), headers=headers) | |
body = res.json() | |
status = body['data'][0]['verificationStatus'] | |
except Exception as e: | |
status = str(e) | |
finally: | |
writer.writerow((nama, jurusan, angkatan, status)) | |
print(f'{nama}| {jurusan} | {angkatan} | {status}') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Convert ENAMEX Named-Entity annotated file to Stanford NLP format (token-based) | |
@Author yohanes.gultom@gmail | |
ENAMEX example (2 sentences): | |
Sementara itu Pengamat Pasar Modal <ENAMEX TYPE="PERSON">Dandossi Matram</ENAMEX> mengatakan, sulit bagi sebuah <ENAMEX TYPE="ORGANIZATION">kantor akuntan publik</ENAMEX> (<ENAMEX TYPE="ORGANIZATION">KAP</ENAMEX>) untuk dapat menyelesaikan audit perusahaan sebesar <ENAMEX TYPE="ORGANIZATION">Telkom</ENAMEX> dalam waktu 3 bulan. 1 | |
<ENAMEX TYPE="ORGANIZATION">Telkom</ENAMEX> akan melakukan RUPS pada 30 Juli 2004 yang selain melaporkan kinerja 2003 juga akan meminta persetujuan untuk pemecahan nilai nominal saham atau stock split 1:2. 2 | |
''' | |
import sys | |
import re | |
START_PATTERN = re.compile(r'^(.*?)<ENAMEX$', re.I) | |
END_SINGLE_PATTERN = re.compile(r'^TYPE="(.*?)">(.*?)</ENAMEX>(.*?)$', re.I) | |
TYPE_PATTERN = re.compile(r'^TYPE="(.*?)">(.*?)$', re.I) | |
END_MULTI_PATTERN = re.compile(r'^(.*?)</ENAMEX>(.*?)$', re.I) | |
EOS_PATTERN = re.compile(r'^([^<>]*)\.?\t(\d+)$', re.I) | |
NON_ENTITY_TYPE = 'O' | |
def check_and_process_eos(token): | |
match = re.match(EOS_PATTERN, token) | |
if match: | |
out.write(match.group(1) + '\t' + cur_type + '\n') | |
out.write('.' + '\t' + cur_type + '\n') | |
out.write('\n') | |
return True | |
return False | |
infile = sys.argv[1] | |
outfile = sys.argv[2] | |
cur_type = NON_ENTITY_TYPE | |
with open(infile, 'rb') as f, open(outfile, 'w') as out: | |
for line in f: | |
for token in line.strip().split(' '): | |
token = token.strip() | |
if not token: | |
continue | |
match = re.match(START_PATTERN, token) | |
if match: | |
if match.group(1): | |
out.write(match.group(1) + '\t' + NON_ENTITY_TYPE + '\n') | |
continue | |
match = re.match(END_SINGLE_PATTERN, token) | |
if match: | |
out.write(match.group(2) + '\t' + match.group(1) + '\n') | |
cur_type = NON_ENTITY_TYPE | |
if not check_and_process_eos(match.group(3)): | |
out.write(match.group(3) + '\t' + cur_type + '\n') | |
continue | |
match = re.match(TYPE_PATTERN, token) | |
if match: | |
cur_type = match.group(1) | |
out.write(match.group(2) + '\t' + cur_type + '\n') | |
continue | |
match = re.match(END_MULTI_PATTERN, token) | |
if match: | |
out.write(match.group(1) + '\t' + cur_type + '\n') | |
cur_type = NON_ENTITY_TYPE | |
if not check_and_process_eos(match.group(2)): | |
out.write(match.group(2) + '\t' + cur_type + '\n') | |
continue | |
if check_and_process_eos(token): | |
continue | |
out.write(token + '\t' + cur_type + '\n') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Simple example on compiling & deploying simple smartcontract, and calling its methods | |
Setup: | |
pip3 install web3==4.7.2 py-solc==3.2.0 | |
python3 -m solc.install v0.4.24 | |
export PATH="$PATH:$HOME/.py-solc/solc-v0.4.24/bin" | |
@author yohanes.gultom@gmail.com | |
""" | |
from web3 import Web3, HTTPProvider, middleware | |
from solc import compile_source | |
import random | |
def compile_contract(contract_source_file, contractName=None): | |
""" | |
Reads file, compiles, returns contract name and interface | |
""" | |
with open(contract_source_file, "r") as f: | |
contract_source_code = f.read() | |
compiled_sol = compile_source(contract_source_code) # Compiled source code | |
if not contractName: | |
contractName = list(compiled_sol.keys())[0] | |
contract_interface = compiled_sol[contractName] | |
else: | |
contract_interface = compiled_sol['<stdin>:' + contractName] | |
return contractName, contract_interface | |
def deploy_contract(acct, contract_interface, contract_args=None): | |
""" | |
deploys contract using self-signed tx, waits for receipt, returns address | |
""" | |
contract = w3.eth.contract(abi=contract_interface['abi'], bytecode=contract_interface['bin']) | |
constructed = contract.constructor() if not contract_args else contract.constructor(*contract_args) | |
tx = constructed.buildTransaction({ | |
'from': acct.address, | |
'nonce': w3.eth.getTransactionCount(acct.address), | |
}) | |
print ("Signing and sending raw tx ...") | |
signed = acct.signTransaction(tx) | |
tx_hash = w3.eth.sendRawTransaction(signed.rawTransaction) | |
print ("tx_hash = {} waiting for receipt ...".format(tx_hash.hex())) | |
tx_receipt = w3.eth.waitForTransactionReceipt(tx_hash, timeout=120) | |
contractAddress = tx_receipt["contractAddress"] | |
print ("Receipt accepted. gasUsed={gasUsed} contractAddress={contractAddress}".format(**tx_receipt)) | |
return contractAddress | |
def exec_contract(acct, nonce, func): | |
""" | |
call contract transactional function func | |
""" | |
construct_txn = func.buildTransaction({'from': acct.address, 'nonce': nonce}) | |
signed = acct.signTransaction(construct_txn) | |
tx_hash = w3.eth.sendRawTransaction(signed.rawTransaction) | |
return tx_hash.hex() | |
if __name__ == '__main__': | |
""" | |
// contract.sol: | |
pragma solidity ^0.4.21; | |
contract simplestorage { | |
uint public storedData; | |
event Updated(address by, uint _old, uint _new); | |
function set(uint x) { | |
uint old = storedData; | |
storedData = x; | |
emit Updated(msg.sender, old, x); | |
} | |
function get() constant returns (uint retVal) { | |
return storedData; | |
} | |
} | |
""" | |
# config | |
RPC_ADDRESS = 'http://localhost:8545' | |
CONTRACT_SOL = 'contract.sol' | |
CONTRACT_NAME = 'simplestorage' | |
PRIVATE_KEY="youraddressprivatekey" | |
# instantiate web3 object | |
w3 = Web3(HTTPProvider(RPC_ADDRESS, request_kwargs={'timeout': 120})) | |
# use additional middleware for PoA (eg. Rinkedby) | |
# w3.middleware_stack.inject(middleware.geth_poa_middleware, layer=0) | |
acct = w3.eth.account.privateKeyToAccount(PRIVATE_KEY) | |
# compile contract to get abi | |
print('Compiling contract..') | |
contract_name, contract_interface = compile_contract(CONTRACT_SOL, CONTRACT_NAME) | |
# deploy contract | |
print('Deploying contract..') | |
contract_address = deploy_contract(acct, contract_interface) | |
# create contract object | |
contract = w3.eth.contract(address=contract_address, abi=contract_interface['abi']) | |
# call non-transactional method | |
val = contract.functions.get().call() | |
print('Invoke get()={}'.format(val)) | |
assert val == 0 | |
# call transactional method | |
nonce = w3.eth.getTransactionCount(acct.address) | |
from_block_number = w3.eth.blockNumber | |
new_val = random.randint(1, 100) | |
contract_func = contract.functions.set(new_val) | |
print('Invoke set()={}'.format(new_val)) | |
tx_hash = exec_contract(acct, nonce, contract_func) | |
print('tx_hash={} waiting for receipt..'.format(tx_hash)) | |
tx_receipt = w3.eth.waitForTransactionReceipt(tx_hash, timeout=120) | |
print("Receipt accepted. gasUsed={gasUsed} blockNumber={blockNumber}". format(**tx_receipt)) | |
# catch event | |
contract_filter = contract.events.Updated.createFilter(fromBlock=from_block_number) | |
entries = None | |
print('Waiting for event..') | |
while not entries: entries = contract_filter.get_all_entries() | |
# _new == new_val | |
args = entries[0].args | |
print(args) | |
assert args._old == 0 | |
assert args._new == new_val | |
assert args.by == acct.address | |
# call non-transactional method | |
val = contract.functions.get().call() | |
print('Invoke get()={}'.format(val)) | |
assert val == new_val |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
# get directory (of current file) | |
dir_path = os.path.dirname(os.path.realpath(__file__)) | |
# get base filename (without extension) (of current file) | |
basename = os.path.basename(os.path.realpath(__file__)) | |
# get relative path from arg | |
mypath = sys.argv[1] | |
# iterate dirs and files | |
for f in os.listdir(mypath): | |
path = os.path.join(mypath, f) | |
# print if file | |
if os.path.isfile(path): | |
print os.path.join(dir_path, path) | |
# iterate and rename files | |
dir = mypath | |
for f in os.listdir(dir): | |
basename, ext = os.path.splitext(f) | |
if ext == '.jpg': | |
new_name = basename.split('_')[0].lower() + ext | |
os.rename(os.path.join(dir, f), os.path.join(dir, new_name)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Copyright 2017 John Frens | |
# | |
#Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: | |
# | |
#The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. | |
# | |
#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
# Python 2.7 version | |
import string | |
# MTLD internal implementation | |
def mtld_calc(word_array, ttr_threshold): | |
current_ttr = 1.0 | |
token_count = 0 | |
type_count = 0 | |
types = set() | |
factors = 0.0 | |
for token in word_array: | |
token = token.translate(None, string.punctuation).lower() # trim punctuation, make lowercase | |
token_count += 1 | |
if token not in types: | |
type_count +=1 | |
types.add(token) | |
current_ttr = float(type_count) / token_count | |
if current_ttr <= ttr_threshold: | |
factors += 1 | |
token_count = 0 | |
type_count = 0 | |
types = set() | |
current_ttr = 1.0 | |
excess = 1.0 - current_ttr | |
excess_val = 1.0 - ttr_threshold | |
factors += float(excess) / excess_val | |
if factors != 0: | |
return len(word_array) / factors | |
return -1 | |
# MTLD implementation | |
def mtld(word_array, ttr_threshold=0.72): | |
if isinstance(word_array, str): | |
raise ValueError("Input should be a list of strings, rather than a string. Try using string.split()") | |
if len(word_array) < 50: | |
raise ValueError("Input word list should be at least 50 in length") | |
return (mtld_calc(word_array, ttr_threshold) + mtld_calc(word_array[::-1], ttr_threshold)) / 2 | |
# HD-D internals | |
# x! = x(x-1)(x-2)...(1) | |
def factorial(x): | |
if x <= 1: | |
return 1 | |
else: | |
return x * factorial(x - 1) | |
# n choose r = n(n-1)(n-2)...(n-r+1)/(r!) | |
def combination(n, r): | |
r_fact = factorial(r) | |
numerator = 1.0 | |
num = n-r+1.0 | |
while num < n+1.0: | |
numerator *= num | |
num += 1.0 | |
return numerator / r_fact | |
# hypergeometric probability: the probability that an n-trial hypergeometric experiment results | |
# in exactly x successes, when the population consists of N items, k of which are classified as successes. | |
# (here, population = N, population_successes = k, sample = n, sample_successes = x) | |
# h(x; N, n, k) = [ kCx ] * [ N-kCn-x ] / [ NCn ] | |
def hypergeometric(population, population_successes, sample, sample_successes): | |
return (combination(population_successes, sample_successes) *\ | |
combination(population - population_successes, sample - sample_successes)) /\ | |
combination(population, sample) | |
# HD-D implementation | |
def hdd(word_array, sample_size=42.0): | |
if isinstance(word_array, str): | |
raise ValueError("Input should be a list of strings, rather than a string. Try using string.split()") | |
if len(word_array) < 50: | |
raise ValueError("Input word list should be at least 50 in length") | |
# Create a dictionary of counts for each type | |
type_counts = {} | |
for token in word_array: | |
token = token.translate(None, string.punctuation).lower() # trim punctuation, make lowercase | |
if token in type_counts: | |
type_counts[token] += 1.0 | |
else: | |
type_counts[token] = 1.0 | |
# Sum the contribution of each token - "If the sample size is 42, the mean contribution of any given | |
# type is 1/42 multiplied by the percentage of combinations in which the type would be found." (McCarthy & Jarvis 2010) | |
hdd_value = 0.0 | |
for token_type in type_counts.keys(): | |
contribution = (1.0 - hypergeometric(len(word_array), sample_size, type_counts[token_type], 0.0)) / sample_size | |
hdd_value += contribution | |
return hdd_value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Finding fingerprint and calculating simple fuzzy similarity | |
@author yohanes.gultom@gmail.com | |
Prerequisites on Ubuntu: | |
* Python 2.7 and pip | |
* FFMPEG `sudo apt install ffmpeg` | |
* AcoustID fingerprinter `sudo apt install acoustid-fingerprinter` | |
* PyAcoustID `pip install pyacoustid` | |
* FuzzyWuzzy `pip install fuzzywuzzy[speedup]` | |
""" | |
import acoustid | |
import sys | |
import os | |
import chromaprint | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from fuzzywuzzy import fuzz | |
DIR_DATABASE = 'music/full' | |
DIR_SAMPLES = 'music/partial' | |
def get_fingerprint(filepath): | |
""" | |
Get fingerprint (list of signed integer), version, duration | |
""" | |
duration, fp_encoded = acoustid.fingerprint_file(filepath) | |
fp, version = chromaprint.decode_fingerprint(fp_encoded) | |
return fp, version, duration | |
def build_fingerprint_database(dirpath, file_ext='.mp3'): | |
""" | |
Build database from directory of audio files | |
""" | |
database = {} | |
print('Processing {}..'.format(dirpath)) | |
for f in os.listdir(dirpath): | |
path = os.path.join(dirpath, f) | |
name, ext = os.path.splitext(f) | |
if os.path.isfile(path) and ext == file_ext: | |
print('Getting fingerprint from database item: {}..'.format(f)) | |
database[f], version, duration = get_fingerprint(path) | |
return database | |
def plot_fingerprints(db): | |
""" | |
Visualize fingerprints in database | |
""" | |
fig = plt.figure() | |
numrows = len(db) | |
plot_id = 1 | |
for name, fp in db.iteritems(): | |
# single column grid | |
a = fig.add_subplot(numrows, 1, plot_id) | |
imgplot = plt.imshow(get_fingerprint_bitmap(fp)) | |
a.set_title(name) | |
plot_id += 1 | |
plt.show() | |
def get_fingerprint_bitmap(fp): | |
""" | |
Plot list of uint32 as (32, len(list)) bitmap | |
""" | |
bitmap = np.transpose(np.array([[b == '1' for b in list('{:32b}'.format(i & 0xffffffff))] for i in fp])) | |
return bitmap | |
if __name__ == '__main__': | |
# load database and samples | |
database = build_fingerprint_database(DIR_DATABASE) | |
samples = build_fingerprint_database(DIR_SAMPLES) | |
print('\n') | |
# find best match of each samples in database | |
for sample, sample_fp in samples.iteritems(): | |
print('Similarity score of "{}":'.format(sample)) | |
best_match = None | |
for name, fp in database.iteritems(): | |
similarity = fuzz.ratio(sample_fp, fp) | |
if not best_match or best_match['score'] < similarity: | |
best_match = { | |
'score': similarity, | |
'name': name | |
} | |
print('{} {}%'.format(name, similarity)) | |
print('Best match: {name} ({score}%)\n'.format(**best_match)) | |
# plot database | |
plot_fingerprints(database) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Train a ProbabilisticProjectiveDependencyParser using CoNLL-U treebank from Universal Dependencies https://github.com/UniversalDependencies | |
# In this script we are using Indonesian treebank https://github.com/UniversalDependencies/UD_Indonesian | |
from pprint import pprint | |
from nltk.parse import ( | |
DependencyGraph, | |
ProbabilisticProjectiveDependencyParser | |
) | |
# open treebank file | |
with open('id-ud-train.conllu', 'r') as f: | |
# parse dependency graphs from file | |
graphs = [DependencyGraph(entry, top_relation_label='root') for entry in f.read().decode('utf-8').split('\n\n') if entry] | |
# train ProbabilisticProjectiveDependencyParser | |
ppdp = ProbabilisticProjectiveDependencyParser() | |
print('Training Probabilistic Projective Dependency Parser...') | |
ppdp.train(graphs) | |
# try to parse a sentence | |
# and print tree ordered by probability (most probable first) | |
sent = ['Melingge', 'adalah', 'gampong', 'di', 'kecamatan', 'Pulo', 'Aceh', '.'] | |
print('Parsing \'' + " ".join(sent) + '\'...') | |
print('Parse:') | |
for tree in ppdp.parse(sent): | |
pprint(tree) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import time | |
import json | |
import os | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.support.ui import WebDriverWait | |
from Proxy_List_Scrapper import Scrapper | |
class ProxyGenerator: | |
def __init__(self, s: Scrapper): | |
self.scrapper = s | |
self.data = self.scrapper.getProxies() | |
def generate(self): | |
while len(self.data.proxies) <= 0: | |
print('> Reloading proxies..') | |
self.data = self.scrapper.getProxies() | |
p = self.data.proxies.pop(0) | |
return f'{p.ip}:{p.port}' | |
# config | |
chromedriver_path = '/mnt/data/Workspace/webdrivers/chromedriver_89.0.4389.23' | |
ip_map_path = 'pollingsituajakali_ip_map.json' | |
target_url = 'https://pollingsituajakali.xyz/pollingxxxxxxxxxxx' | |
target = 'HARIYONO' | |
n_repeat = 1 | |
if __name__ == '__main__': | |
# get n_repeat if provided as argument | |
if len(sys.argv) > 1: | |
n_repeat = int(sys.argv[1]) | |
# load ip map if exists | |
ip_map = {} | |
if os.path.isfile(ip_map_path): | |
with open(ip_map_path) as f: | |
ip_map = json.load(f) | |
# proxy generator | |
proxy_gen = ProxyGenerator(Scrapper(category='ALL', print_err_trace=False)) | |
# repeat n_repeat times | |
count = 0 | |
while count < n_repeat: | |
print('Attempt #' + str(count+1)) | |
driver = None | |
proxy = None | |
try: | |
# get proxy | |
print('> Finding proxy..') | |
while not proxy or proxy in ip_map: | |
proxy = proxy_gen.generate() | |
# time.sleep(1) | |
print(f'> {proxy}') | |
# setup selenium | |
options = Options() | |
options.add_argument('--headless') | |
options.add_argument(f'--proxy-server={proxy}') | |
driver = webdriver.Chrome(executable_path=chromedriver_path, options=options) | |
driver.implicitly_wait(1) | |
# click button | |
driver.get(target_url) | |
btn = WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, f'button[data-txt={target}]')) | |
) | |
btn.click() | |
# wait for result | |
# complete_element_id = 'chart' | |
# complete_element_id = 'btnRefresh' | |
complete_element_id = 'spanCount' | |
print(f'> Waiting for {complete_element_id}..') | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.ID, complete_element_id)) | |
) | |
alerts = driver.find_elements_by_css_selector('.alert-danger') | |
print(f'> alerts: {len(alerts)}') | |
if len(alerts) <= 0: | |
count += 1 | |
print('> Success') | |
else: | |
print('> Failed. IP has been used') | |
# mark proxy | |
ip_map[proxy] = 1 | |
with open(ip_map_path, 'w') as f: | |
json.dump(ip_map, f) | |
except Exception as e: | |
print(f'> Failed. {e}') | |
finally: | |
if driver: | |
driver.quit() | |
print(f'Success: {count}') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
def itob(N): | |
s = '' | |
while N > 1: | |
s = str(N%2) + s | |
N = N//2 | |
s = str(N) + s | |
return s | |
def bingap(N): | |
gap = 0 | |
tmp = -1 | |
while N > 1: | |
r = N%2 | |
if r == 1: | |
if tmp > gap: | |
gap = tmp | |
tmp = 0 | |
elif tmp > -1: | |
tmp += 1 | |
N //= 2 | |
if N == 1: | |
if tmp > gap: | |
gap = tmp | |
return gap | |
def reverse(A, i, j): | |
while i < j: | |
tmp = A[j] | |
A[j] = A[i] | |
A[i] = tmp | |
i+=1 | |
j-=1 | |
def rotate(A, K): | |
if A: | |
K = K % len(A) | |
if K > 0: | |
reverse(A, 0, len(A)-1) | |
reverse(A, 0, K-1) | |
reverse(A, K, len(A)-1) | |
return A | |
def tape_diff_slow(A): | |
md = None | |
for p in range(1, len(A)): | |
d = abs(sum(A[:p])-sum(A[p:])) | |
# print(d) | |
md = d if md is None or d < md else md | |
print('min diff: ' + str(md)) | |
def tape_diff(A, n, s1, s2): | |
if n < 0: | |
return abs(s1-s2) | |
else: | |
inc = tape_diff(A, n-1, s1+A[n], s2) | |
exc = tape_diff(A, n-1, s1, s2+A[n]) | |
return min(inc, exc) | |
def tape_diff_fast(A): | |
return tape_diff(A, len(A)-1, 0, 0) | |
A = [3,1,2,4,3] | |
A = np.random.randint(-1000, 1000, 100) | |
print(A) | |
print(tape_diff_fast(A)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Load node id and coordinates from pycgrc file generated by https://github.com/AndGem/OsmToRoadGraph to PostGIS database | |
Read list of coordinates and get nearest node id using index-based KNN https://postgis.net/workshops/postgis-intro/knn.html#index-based-knn | |
@Author yohanes.gultom@gmail.com | |
""" | |
import configparser | |
import psycopg2 | |
import sys | |
import csv | |
from io import StringIO | |
config = configparser.ConfigParser() | |
config.read('config.ini') | |
""" | |
Example of config.ini: | |
[respondor] | |
host = localhost | |
user = postgres | |
passwd = postgres | |
db = respondor | |
""" | |
class get_db_connection: | |
def __init__(self, dbconf: dict): | |
self.host = dbconf['host'] | |
self.database = dbconf['db'] | |
self.user = dbconf['user'] | |
self.password = dbconf['passwd'] | |
def __enter__(self): | |
self.conn = psycopg2.connect( | |
host=self.host, | |
database=self.database, | |
user=self.user, | |
password=self.password) | |
self.cur = self.conn.cursor() | |
return self.cur | |
def __exit__(self, type, value, traceback): | |
self.conn.commit() | |
self.cur.close() | |
self.conn.close() | |
def load_locations(table_name, file_path): | |
# read nodes to string io | |
s = StringIO() | |
total_nodes = None | |
count_nodes = 0 | |
with open(file_path) as f: | |
count = 0 | |
for line in f: | |
if count == 7: | |
total_nodes = int(line) | |
elif count > 8: | |
# start reading nodes | |
node_id, lat, lon = line.split() | |
values = (node_id, f'POINT({lat} {lon})') | |
s.write('\t'.join(values)+'\n') | |
count_nodes += 1 | |
if count_nodes >= total_nodes: | |
break | |
count += 1 | |
s.seek(0) | |
# create table and copy data | |
with get_db_connection(config['respondor']) as cur: | |
cur.execute(f'DROP TABLE IF EXISTS {table_name}') | |
cur.execute(f""" | |
CREATE TABLE {table_name} ( | |
id integer NOT NULL, | |
coords geometry, | |
CONSTRAINT {table_name}_pkey PRIMARY KEY (id) | |
) | |
""" | |
) | |
cur.execute(f'CREATE INDEX {table_name}_coords_index ON {table_name} USING GIST(coords)') | |
cur.copy_from(s, table_name, columns=('id', 'coords')) | |
print(count_nodes) | |
def get_nearest_node_ids(table_name, input_path): | |
# read input | |
locations = [] | |
with open(input_path) as f: | |
reader = csv.reader(f) | |
for row in reader: | |
if len(row) >= 3: | |
name = row[0] | |
lat = row[1] | |
lon = row[2] | |
locations.append((name, lat, lon)) | |
# find nearest location then rewrite input | |
with get_db_connection(config['respondor']) as cur, open(input_path, 'w+') as f: | |
writer = csv.writer(f) | |
for row in locations: | |
name, lat, lon = row | |
sql = f"SELECT id FROM {table_name} ORDER BY coords <-> 'POINT({lat} {lon})'::geometry ASC LIMIT 1;" | |
cur.execute(sql) | |
res = cur.fetchone() | |
node_id = res[0] | |
writer.writerow((name, lat, lon, node_id)) | |
if __name__ == '__main__': | |
# load data to database | |
load_locations('locations_jakarta', '/mnt/data/Downloads/jakarta.pycgr/jakarta.pycgrc') | |
load_locations('locations_lombok', '/mnt/data/Downloads/lombok-island.pycgr/lombok-island.pycgrc') | |
# find nearest node_id | |
get_nearest_node_ids('locations_jakarta', '/mnt/data/Downloads/jakarta_locations.csv') | |
get_nearest_node_ids('locations_lombok', '/mnt/data/Downloads/lombok_locations.csv') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
WARNING: this is Python 2.x version | |
Simple script to test sending email using SMTP server | |
''' | |
import smtplib | |
from email.MIMEMultipart import MIMEMultipart | |
from email.MIMEText import MIMEText | |
# smtp config | |
SMTP_SERVER = 'smtp.gmail.com' | |
SMTP_PORT = 587 | |
SMTP_USER = 'user@gmail.com' | |
SMTP_PASS = 'password' | |
# email content | |
to = "yohanes.gultom@gmail.com" | |
subject = "Just a test mail" | |
body = "This is just a test message from a new server. Kindly ignore it and proceed with what you are doing. Thank you!" | |
if __name__ == '__main__': | |
msg = MIMEMultipart() | |
msg['From'] = SMTP_USER | |
msg['To'] = to | |
msg['Subject'] = subject | |
msg.attach(MIMEText(body, 'plain')) | |
server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT) | |
server.starttls() | |
server.login(SMTP_USER, SMTP_PASS) | |
server.sendmail(SMTP_USER, to, msg.as_string()) | |
server.quit() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Send email Python 3 | |
Reference: https://realpython.com/python-send-email/#sending-your-plain-text-email | |
""" | |
import smtplib | |
import ssl | |
import configparser | |
from email.mime.text import MIMEText | |
from email.mime.multipart import MIMEMultipart | |
config = configparser.ConfigParser() | |
config.read('config.ini') | |
smtp_server = config['smtp']['host'] | |
port = 587 # For starttls | |
sender_email = config['smtp']['user'] | |
password = config['smtp']['password'] | |
receiver_email = config['buk']['recipient'] | |
# Create the plain-text and HTML version of your message | |
message = MIMEMultipart("alternative") | |
message["Subject"] = "multipart test" | |
message["From"] = sender_email | |
message["To"] = receiver_email | |
text = """\ | |
Hello world! | |
This is an HTML email test | |
""" | |
html = """\ | |
<html> | |
<body> | |
<h1>Hello world!</h1> | |
<p>This is an <strong>HTML</strong> email test</p> | |
</body> | |
</html> | |
""" | |
part1 = MIMEText(text, "plain") | |
part2 = MIMEText(html, "html") | |
# Add HTML/plain-text parts to MIMEMultipart message | |
# The email client will try to render the last part first | |
message.attach(part1) | |
message.attach(part2) | |
context = ssl.create_default_context() | |
with smtplib.SMTP(smtp_server, port) as server: | |
server.ehlo() # Can be omitted | |
server.starttls(context=context) | |
server.ehlo() # Can be omitted | |
print('authenticating..') | |
server.login(sender_email, password) | |
print('sending email...') | |
server.sendmail(sender_email, receiver_email, message.as_string()) | |
print('done') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Split video by size or number of chunks | |
Original code: https://stackoverflow.com/a/28884437/1862500 | |
@author yohanes.gultom@gmail.com | |
''' | |
import re | |
import math | |
from optparse import OptionParser | |
length_regexp = 'Duration: (\d{2}):(\d{2}):(\d{2})\.\d+,' | |
re_length = re.compile(length_regexp) | |
from subprocess import check_call, PIPE, Popen | |
import shlex | |
def main(): | |
opt = parse_options() | |
filename = opt.filename | |
split_size = opt.split_size | |
split_count = opt.split_count | |
if split_size and split_size <= 0: | |
print("split_size can't be 0") | |
raise SystemExit | |
if split_count and split_count <= 1: | |
print("split_count must be > 1") | |
raise SystemExit | |
p1 = Popen(["ffmpeg", "-i", filename], stdout=PIPE, stderr=PIPE, universal_newlines=True) | |
# get p1.stderr as input | |
output = Popen(["grep", 'Duration'], stdin=p1.stderr, stdout=PIPE, universal_newlines=True) | |
p1.stdout.close() | |
matches = re_length.search(output.stdout.read()) | |
if matches: | |
video_length = int(matches.group(1)) * 3600 + \ | |
int(matches.group(2)) * 60 + \ | |
int(matches.group(3)) | |
print("Video length in seconds: {}".format(video_length)) | |
else: | |
print("Can't determine video length.") | |
raise SystemExit | |
if split_count: | |
print("split_count is defined. Ignoring split_size, if defined") | |
split_size = math.ceil(video_length / split_count) | |
if not split_count: | |
split_count = math.ceil(video_length / split_size) | |
if split_count == 1: | |
print("Video length is less than the target split length.") | |
raise SystemExit | |
for n in range(split_count): | |
split_start = split_size * n | |
pth, ext = filename.rsplit(".", 1) | |
cmd = "ffmpeg -i {} -vcodec copy -strict -2 -ss {} -t {} {}-{}.{}".\ | |
format(filename, split_start, split_size, pth, n, ext) | |
print("About to run: {}".format(cmd)) | |
check_call(shlex.split(cmd), universal_newlines=True) | |
def parse_options(): | |
parser = OptionParser() | |
parser.add_option("-f", "--file", | |
dest="filename", | |
help="file to split, for example sample.avi", | |
type="string", | |
action="store" | |
) | |
parser.add_option("-s", "--split-size", | |
dest="split_size", | |
help="split or chunk size in seconds, for example 10", | |
type="int", | |
action="store" | |
) | |
parser.add_option("-c", "--split-count", | |
dest="split_count", | |
help="number of even-sized chunks, for example 4", | |
type="int", | |
action="store" | |
) | |
(options, args) = parser.parse_args() | |
if options.filename and (options.split_size or options.split_count): | |
return options | |
else: | |
parser.print_help() | |
raise SystemExit | |
if __name__ == '__main__': | |
try: | |
main() | |
except Exception as e: | |
print(e) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Film actors/actress recommendation based on co-occurrences | |
DVD Rental database https://www.postgresqltutorial.com/wp-content/uploads/2019/05/dvdrental.zip | |
@author yohanes.gultom@gmail.com | |
''' | |
import sqlalchemy as db | |
from pprint import pprint | |
engine = db.create_engine('postgresql://postgres:postgres@localhost/dvdrental') | |
connection = engine.connect() | |
query = db.sql.text("""select actor.actor_id, actor.first_name, actor.last_name, x.cooccurrence from ( | |
select film_actor2.actor_id, count(*) as cooccurrence | |
from film_actor film_actor1 join film_actor film_actor2 on film_actor1.film_id = film_actor2.film_id | |
where film_actor1.actor_id != film_actor2.actor_id | |
and film_actor1.actor_id = :actor_id | |
group by film_actor1.actor_id, film_actor2.actor_id | |
) x join actor on x.actor_id = actor.actor_id | |
where x.cooccurrence > 2 | |
order by x.cooccurrence desc | |
""") | |
# get actors/actress that often acted together with given input_actor_id | |
input_actor_id = 107 | |
result = connection.execute(query, actor_id=input_actor_id).fetchall() | |
pprint(result) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Convert Named-Entity tagged file (Open NLP format) to Stanford NLP format (token-based) | |
@Author yohanes.gultom@gmail | |
Tagged file example (2 sentences): | |
"Internal DPD Sulsel mudah-mudahan dalam waktu dekat ada keputusan. Sudah ada keputusan kita serahkan ke DPP dan Rabu ini kita akan rapat harian soal itu," kata <PERSON>Sudding</PERSON> kepada Tribunnews.com, <TIME>Senin (30/1/2012)</TIME>. | |
Menurut <PERSON>Sudding</PERSON>, DPP Hanura pada prinsipnya memberikan kesempatan dan ruang sama bagi pengurus DPD dan DPC Hanura Sulsel untuk menyampaikan aspirasinya. | |
"Dan diberikan kesempatan melakukan verfikasi akar msalah yang terjadi di DPD Hanura Sulsel," kata dia. | |
''' | |
import sys | |
import re | |
SINGLE_PATTERN = re.compile(r'^([^<>]*)<(\w+)>([^<]*)</(\w+)>([^<>]*)$', re.I) | |
START_PATTERN = re.compile(r'^([^<>]*)<(\w+)>([^<]*)$', re.I) | |
END_PATTERN = re.compile(r'^([^<>]*)</(\w+)>([^<]*)$', re.I) | |
EOS_PATTERN = re.compile(r'^([^<>]*)\.$', re.I) | |
NON_ENTITY_TYPE = 'O' | |
infile = sys.argv[1] | |
outfile = sys.argv[2] | |
cur_type = NON_ENTITY_TYPE | |
with open(infile, 'rb') as f, open(outfile, 'w') as out: | |
for line in f: | |
for token in line.strip().split(' '): | |
token = token.strip() | |
if not token: | |
continue | |
match = re.match(SINGLE_PATTERN, token) | |
if match: | |
if match.group(1): | |
out.write(match.group(1) + '\t' + NON_ENTITY_TYPE + '\n') | |
out.write(match.group(3) + '\t' + match.group(2) + '\n') | |
if match.group(2) != match.group(4): | |
raise ValueError('Invalid tag pair: {} and {}'.format(match.group(2), match.group(4))) | |
if match.group(5): | |
out.write(match.group(5) + '\t' + NON_ENTITY_TYPE + '\n') | |
continue | |
match = re.match(START_PATTERN, token) | |
if match: | |
if match.group(1): | |
out.write(match.group(1) + '\t' + NON_ENTITY_TYPE + '\n') | |
cur_type = match.group(2) | |
out.write(match.group(3) + '\t' + cur_type + '\n') | |
continue | |
match = re.match(END_PATTERN, token) | |
if match: | |
out.write(match.group(1) + '\t' + cur_type + '\n') | |
if match.group(2) != cur_type: | |
raise ValueError('Invalid tag pair: {} and {}'.format(cur_type, match.group(2))) | |
cur_type = NON_ENTITY_TYPE | |
if match.group(3): | |
out.write(match.group(3) + '\t' + NON_ENTITY_TYPE + '\n') | |
continue | |
match = re.match(EOS_PATTERN, token) | |
if match: | |
out.write(match.group(1) + '\t' + cur_type + '\n') | |
out.write('.' + '\t' + cur_type + '\n') | |
out.write('\n') | |
continue | |
out.write(token + '\t' + cur_type + '\n') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# VIP currency notification script | |
# Usage: python vip2.py <gmail_username> <gmail_password> <to_email> | |
# Author: yohanes.gultom@gmail.com | |
from bs4 import BeautifulSoup | |
from bs4.element import Tag | |
from re import sub | |
from decimal import Decimal | |
import urllib2 | |
import backoff | |
import smtplib | |
import sys | |
url = 'https://www.vip.co.id' | |
# rules to send email | |
rules = [ | |
{'currency': 'SGD', 'op': '>=', 'type': 'buy', 'value': 9400} | |
] | |
smtp_config = { | |
'username': sys.argv[1], | |
'password': sys.argv[2], | |
'server': 'smtp.gmail.com', | |
'port': 465, | |
'from': 'VIP Bot', | |
'to': sys.argv[3] | |
} | |
message_tpl = '''From: {0}\r\nTo: {1}\r\nSubject: {2} to IDR today\r\nMIME-Version: 1.0\r\nContent-Type: text/html\r\n\r\n | |
<h1>{2} to IDR</h1> | |
<ul> | |
<li>Buy: IDR {3}</li> | |
<li>Sell: IDR {4}</li> | |
</ul> | |
<p>Source: {5}</p> | |
''' | |
@backoff.on_exception(backoff.expo, urllib2.URLError, max_tries=3) | |
def fetch_content(url): | |
return urllib2.urlopen(url) | |
def parse_currency(s): | |
return Decimal(sub(r'[^\d.]', '', str(s))) | |
# retrieve and parse rates | |
print('Fetching content from {}..'.format(url)) | |
rates = {} | |
response = fetch_content(url) | |
html = response.read() | |
soup = BeautifulSoup(html, 'html.parser') | |
rate_table = soup.select('#rate-table tr') | |
for rate in rate_table[1:]: | |
values = [] | |
for content in rate.contents: | |
if isinstance(content, Tag): | |
if 'title' in content: | |
values.append(content['title']) | |
else: | |
values.append(content.contents[0]) | |
first = parse_currency(values[1]) | |
second = parse_currency(values[2]) | |
rates[str(values[0])] = { | |
'buy': min(first, second), | |
'sell': max(first, second) | |
} | |
# check rules | |
print('Checking rules..') | |
server_ssl = smtplib.SMTP_SSL(smtp_config['server'], smtp_config['port']) | |
server_ssl.ehlo() | |
server_ssl.login(smtp_config['username'], smtp_config['password']) | |
for rule in rules: | |
if rule['currency'] in rates: | |
rate = rates[rule['currency']] | |
rule_expr = '{} {} {}'.format(rate[rule['type']], rule['op'], rule['value']) | |
if eval(rule_expr, {'__builtins__': None}): | |
print('Found matching rule: {}'.format(rule)) | |
message = message_tpl.format( | |
smtp_config['from'], | |
smtp_config['to'], | |
rule['currency'], | |
rate['buy'], | |
rate['sell'], | |
url | |
) | |
print('Sending email..') | |
server_ssl.sendmail(smtp_config['from'], smtp_config['to'], message) | |
server_ssl.close() | |
print('Done!') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# VIP currency notification script | |
# Require Python >= 3.5.2 | |
# Usage: python vip3.py <gmail_username> <gmail_password> <to_email> | |
# Author: yohanes.gultom@gmail.com | |
from bs4 import BeautifulSoup | |
from bs4.element import Tag | |
from re import sub | |
from decimal import Decimal | |
from urllib.request import Request, urlopen | |
import urllib.error | |
import backoff | |
import smtplib | |
import sys | |
url = 'https://www.vip.co.id' | |
# rules to send email | |
rules = [ | |
{'currency': 'SGD', 'op': '>=', 'type': 'buy', 'value': 9400} | |
] | |
smtp_config = { | |
'username': sys.argv[1], | |
'password': sys.argv[2], | |
'server': 'smtp.gmail.com', | |
'port': 465, | |
'from': 'VIP Bot', | |
'to': sys.argv[3] | |
} | |
message_tpl = '''From: {0}\r\nTo: {1}\r\nSubject: {2} to IDR today\r\nMIME-Version: 1.0\r\nContent-Type: text/html\r\n\r\n | |
<h1>{2} to IDR</h1> | |
<ul> | |
<li>Buy: IDR {3}</li> | |
<li>Sell: IDR {4}</li> | |
</ul> | |
<p>Source: {5}</p> | |
''' | |
@backoff.on_exception(backoff.expo, urllib.error.URLError, max_tries=3) | |
def fetch_content(url): | |
req = Request(url, headers={'User-Agent': 'Mozilla/5.0'}) | |
return urlopen(req).read() | |
def parse_currency(s): | |
return Decimal(sub(r'[^\d.]', '', str(s))) | |
# retrieve and parse rates | |
print('Fetching content from {}..'.format(url)) | |
rates = {} | |
html = fetch_content(url) | |
soup = BeautifulSoup(html, 'html.parser') | |
rate_table = soup.select('#rate-table tr') | |
for rate in rate_table[1:]: | |
values = [] | |
for content in rate.contents: | |
if isinstance(content, Tag): | |
if 'title' in content: | |
values.append(content['title']) | |
else: | |
values.append(content.contents[0]) | |
first = parse_currency(values[1]) | |
second = parse_currency(values[2]) | |
rates[str(values[0])] = { | |
'buy': min(first, second), | |
'sell': max(first, second) | |
} | |
# check rules | |
print('Checking rules..') | |
server_ssl = smtplib.SMTP_SSL(smtp_config['server'], smtp_config['port']) | |
server_ssl.ehlo() | |
server_ssl.login(smtp_config['username'], smtp_config['password']) | |
for rule in rules: | |
if rule['currency'] in rates: | |
rate = rates[rule['currency']] | |
rule_expr = '{} {} {}'.format(rate[rule['type']], rule['op'], rule['value']) | |
if eval(rule_expr, {'__builtins__': None}): | |
print('Found matching rule: {}'.format(rule)) | |
message = message_tpl.format( | |
smtp_config['from'], | |
smtp_config['to'], | |
rule['currency'], | |
rate['buy'], | |
rate['sell'], | |
url | |
) | |
print('Sending email..') | |
server_ssl.sendmail(smtp_config['from'], smtp_config['to'], message) | |
server_ssl.close() | |
print('Done!') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" | |
Simple Voting HTTP server with MySQL database | |
Setup in Ubuntu: | |
$ sudo apt-get install python-pip python-dev libmysqlclient-dev | |
$ pip install MySQL-python | |
""" | |
import MySQLdb | |
import cgi | |
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer | |
# Server and database combination | |
PORT_NUMBER = 8080 | |
DB_HOST = 'localhost' | |
DB_USER = 'root' | |
DB_PASS = 'root' | |
DB_NAME = 'vote' | |
class VoteHandler(BaseHTTPRequestHandler): | |
""" | |
HTTP request handler for simple voting | |
""" | |
def do_GET(self): | |
self.send_response(200) | |
self.send_header('Content-type', 'text/html') | |
self.end_headers() | |
self.wfile.write(get_vote_form_html()) | |
return | |
def do_POST(self): | |
form = cgi.FieldStorage( | |
fp=self.rfile, | |
headers=self.headers, | |
environ={'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': self.headers['Content-Type']} | |
) | |
self.send_response(200) | |
self.send_header('Content-type', 'text/html') | |
self.end_headers() | |
try: | |
candidate = form.getvalue('candidate') | |
state = form.getvalue('state') | |
if inc_vote(candidate, state) == 1: | |
html = get_vote_form_html('Thanks for your vote!', message_color='green') | |
else: | |
html = get_vote_form_html('Vote error. Invalid candidate and/or state', message_color='red') | |
except Exception as e: | |
print(e) | |
html = get_vote_form_html('Server error. Please contact support', message_color='red') | |
self.wfile.write(html) | |
return | |
def get_vote_form_html(message_html=None, message_color='green'): | |
""" | |
Generate HTML with form for voting | |
""" | |
candidate_html = get_radio_group_html('candidate', get_distinct_vote('candidate')) | |
state_html = get_radio_group_html('state', get_distinct_vote('state')) | |
html_form = """ | |
<html> | |
<head><title>Voting App</title></head> | |
<body> | |
""" | |
if message_html: | |
html_form += """ | |
<p style="color:{}">{}</p> | |
""".format(message_color, message_html) | |
html_form += """ | |
<form action="" method="POST"> | |
<table> | |
<tr><td>Candidates:</td><td>{}</td></tr> | |
<tr><td>States:</td><td>{}</td></tr> | |
<tr><td><input type="submit" value="Submit"/></td></tr> | |
</table> | |
</form> | |
""".format(candidate_html, state_html) | |
html_form += """ | |
</body> | |
</html> | |
""" | |
return html_form | |
def get_distinct_vote(col): | |
""" | |
Get distinct vote column | |
""" | |
if not db: | |
raise Exception('Connection not opened') | |
else: | |
cursor = db.cursor() | |
cursor.execute('SELECT DISTINCT {} FROM vote'.format(col)) | |
results = cursor.fetchall() | |
return sorted([row[0] for row in results]) | |
def inc_vote(candidate, state): | |
""" | |
Increase vote for certain candidate and state by 1 | |
""" | |
if not db: | |
raise Exception('Connection not opened') | |
else: | |
try: | |
cursor = db.cursor() | |
# use parameterized query to prevent sql injection | |
affected_rows = cursor.execute("UPDATE vote SET total_votes = total_votes + 1 WHERE candidate = %s AND state = %s", [candidate, state]) | |
db.commit() | |
return affected_rows | |
except Exception as e: | |
db.rollback() | |
raise Exception('Database update failed') | |
return 0 | |
def get_radio_group_html(group_name, values): | |
html = [] | |
for val in values: | |
if not html: | |
default = 'checked' | |
else: | |
default = '' | |
html.append('<input type="radio" name="{0}" value="{1}" {2}/> {1}'.format(group_name, val, default)) | |
return ' '.join(html) | |
if __name__ == '__main__': | |
try: | |
# connect to database | |
db = MySQLdb.connect(DB_HOST, DB_USER, DB_PASS, DB_NAME) | |
print 'Connected to database {}@{}'.format(DB_NAME, DB_HOST) | |
# start HTTP server | |
server = HTTPServer(('', PORT_NUMBER), VoteHandler) | |
print 'Server is started and accessible on http://localhost:{}'.format(PORT_NUMBER) | |
print 'Press CTRL+C to shutdown..' | |
server.serve_forever() | |
except KeyboardInterrupt: | |
print 'Shutting down the web server' | |
# shutdown server | |
server.socket.close() | |
# close db connection | |
db.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I leave my grain of rice.
I just was bored and I feel inspired by this https://craft.js.org/docs/overview#extensible
My code has no sense and purpose 😃