Skip to content

Instantly share code, notes, and snippets.

View adilkhash's full-sized avatar
🇰🇿

Adylzhan Khashtamov adilkhash

🇰🇿
View GitHub Profile
@adilkhash
adilkhash / vkmixin.py
Created May 11, 2011 06:17
Vkontakte Oauth 2.0 Mixin
#!/usr/bin/env python
#
# Vkontatke OAuth 2.0 wrapper
# Copyright 2011, Adil Khashtamov [adil.khashtamov@gmail.com]
# http://khashtamov.kz
#
#
import logging
@adilkhash
adilkhash / habra_pr0xy.py
Last active January 6, 2017 09:03
Ivelum test
# -*- coding: utf8 -*-
import re
import string
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import requests
from bs4 import BeautifulSoup
import csv
import luigi
from luigi.format import UTF8
import requests
import pandas as pd
from bs4 import BeautifulSoup
class AggregateMovieRatingTask(luigi.Task):
@adilkhash
adilkhash / iin_validation.py
Created September 7, 2020 06:46
Валидация ИИН/БИН Республики Казахстан
import re
from operator import add, mul
from functools import reduce
from typing import List
def multiply(iin: str, weights: List[int]) -> int:
result = reduce(
add,
map(lambda i: mul(*i), zip(map(int, iin), weights))
@adilkhash
adilkhash / airflow_rows_affected.py
Last active March 14, 2021 17:39
Apache Airflow Rows Affected Catcher
import logging
from logging import Handler, LogRecord
from airflow.operators.python import get_current_context
class RowsAffectedHandler(Handler):
def emit(self, record: LogRecord) -> None:
msg = self.format(record)
context = get_current_context()
import requests
from http import HTTPStatus
import pprint
class HttpError:
def __init__(self, status_code: int):
self.status_code = status_code
@adilkhash
adilkhash / external_dag_rest_api_trigger.py
Created October 31, 2021 16:31
Trigger external dag via Airflow REST API
import datetime as dt
from urllib.parse import urljoin
import requests
from airflow.models import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
@adilkhash
adilkhash / exception_groups.py
Created October 25, 2022 08:54
Python 3.11 Exception Groups
from urllib.request import urlopen
from urllib.error import HTTPError
from concurrent.futures import ThreadPoolExecutor, as_completed
class HttpError(Exception):
def __init__(self, url: str, code: int) -> None:
self.url = url
self.code = code
import typing as t
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps
import requests
def parallelize(f: t.Callable = None, max_workers: int = 5):
def decorator(func):
@wraps(func)
import time
from multiprocessing import Process, Value
from multiprocessing import Pool
def sum_numbers(numbers):
result = 0
for number in numbers:
result += number
return result