Skip to content

Instantly share code, notes, and snippets.

View adilkhash's full-sized avatar
🇰🇿

Adylzhan Khashtamov adilkhash

🇰🇿
View GitHub Profile
import time
from multiprocessing import Process, Value
from multiprocessing import Pool
def sum_numbers(numbers):
result = 0
for number in numbers:
result += number
return result
import typing as t
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps
import requests
def parallelize(f: t.Callable = None, max_workers: int = 5):
def decorator(func):
@wraps(func)
@adilkhash
adilkhash / exception_groups.py
Created October 25, 2022 08:54
Python 3.11 Exception Groups
from urllib.request import urlopen
from urllib.error import HTTPError
from concurrent.futures import ThreadPoolExecutor, as_completed
class HttpError(Exception):
def __init__(self, url: str, code: int) -> None:
self.url = url
self.code = code
@adilkhash
adilkhash / external_dag_rest_api_trigger.py
Created October 31, 2021 16:31
Trigger external dag via Airflow REST API
import datetime as dt
from urllib.parse import urljoin
import requests
from airflow.models import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
import requests
from http import HTTPStatus
import pprint
class HttpError:
def __init__(self, status_code: int):
self.status_code = status_code
@adilkhash
adilkhash / airflow_rows_affected.py
Last active March 14, 2021 17:39
Apache Airflow Rows Affected Catcher
import logging
from logging import Handler, LogRecord
from airflow.operators.python import get_current_context
class RowsAffectedHandler(Handler):
def emit(self, record: LogRecord) -> None:
msg = self.format(record)
context = get_current_context()
@adilkhash
adilkhash / iin_validation.py
Created September 7, 2020 06:46
Валидация ИИН/БИН Республики Казахстан
import re
from operator import add, mul
from functools import reduce
from typing import List
def multiply(iin: str, weights: List[int]) -> int:
result = reduce(
add,
map(lambda i: mul(*i), zip(map(int, iin), weights))
import csv
import luigi
from luigi.format import UTF8
import requests
import pandas as pd
from bs4 import BeautifulSoup
class AggregateMovieRatingTask(luigi.Task):
@adilkhash
adilkhash / habra_pr0xy.py
Last active January 6, 2017 09:03
Ivelum test
# -*- coding: utf8 -*-
import re
import string
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import requests
from bs4 import BeautifulSoup
@adilkhash
adilkhash / vkmixin.py
Created May 11, 2011 06:17
Vkontakte Oauth 2.0 Mixin
#!/usr/bin/env python
#
# Vkontatke OAuth 2.0 wrapper
# Copyright 2011, Adil Khashtamov [adil.khashtamov@gmail.com]
# http://khashtamov.kz
#
#
import logging