Skip to content

Instantly share code, notes, and snippets.

View pingzh's full-sized avatar

Ping Zhang pingzh

View GitHub Profile
import sys
import asyncio
import time
import httpx
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor, Executor, ProcessPoolExecutor
def get_links():
from typing import Optional
from pathlib import Path
from configparser import ConfigParser # type: ignore
DEFAULT_CONF_PATH = Path(__file__).parent / "default_conf.cfg"
CONFIG_FILE_NAME = "config_name.cfg"
class DefaultableConfigParser(ConfigParser):
[2023-07-27T22:57:23.296000 UTC] {spark_submit.py:521} INFO - 23/07/27 22:57:23 INFO InsertIntoHiveTable: [AIRBNB] Overriding hive.exec.scratchdir to hdfs://ha-nn-uri/tmp/svc_di_data_infra/hive_staging
[2023-07-27T22:57:23.298000 UTC] {spark_submit.py:521} INFO - 23/07/27 22:57:23 INFO FileUtils: Creating directory if it doesn't exist: hdfs://ha-nn-uri/tmp/svc_di_data_infra/hive_staging_hive_2023-07-27_22-57-23_296_8878419864542981198-1
[2023-07-27T22:57:23.388000 UTC] {spark_submit.py:521} INFO - Traceback (most recent call last):
[2023-07-27T22:57:23.388000 UTC] {spark_submit.py:521} INFO - File "/usr/local/lib/python3.7/dist-packages/sqlmesh/engines/spark/app.py", line 98, in <module>
[2023-07-27T22:57:23.388000 UTC] {spark_submit.py:521} INFO - main(args.dialect, args.command_type, args.ddl_concurrent_tasks, args.payload_path)
[2023-07-27T22:57:23.388000 UTC] {spark_submit.py:521} INFO - File "/usr/local/lib/python3.7/dist-packages/sqlmesh/engines/spark/app.py", line 70, in main
[2023-07-27T22:57:23.38800
Traceback (most recent call first):                                                                                                                                                                     [44/1997]
  <built-in method __enter__ of _thread.lock object at remote 0x7f8e021156c0>
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/event/attr.py", line 284, in exec_once
    with self._exec_once_mutex:
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 649, in __connect
    ).exec_once(self.connection, self)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 437, in __init__
    self.__connect(first_connect_check=True)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 308, in _create_connection
class SchedulerFramework:
"""
General Rules:
1. SchedulerFramework does not assume scheduler implementation
2. SchedulerFramework does not parse dag files
"""
def __init__(self, executor, scheduler_cls):
self.executor = executor
self.scheduler = scheduler_cls(executor=self.executor, num_runs=-1)
commit 8d545982a8483aec18134c198c5adf874e5e8f4a
Date: Mon Jul 18 15:37:32 2022 -0700
Error: Specified key was too long; max key length is 767 bytes for mysql
5.7
see: https://dev.mysql.com/doc/refman/5.7/en/innodb-limits.html
mysql 5.7 uses utf8mb3 charset (which is utf8), thus the max length for
def protect(*protected):
"""Returns a metaclass that protects all attributes given as strings"""
class Protect(type):
has_base = False
def __new__(meta, name, bases, attrs):
if meta.has_base:
for attribute in attrs:
if attribute in protected:
raise AttributeError('Overriding of attribute "%s" not allowed.'%attribute)
meta.has_base = True
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
with DAG(
dag_id='00_ping_test_2',
CREATE TABLE `task_instance_audit` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`changedate` TIMESTAMP DEFAULT NULL,
`dag_id` varchar(250) COLLATE utf8_unicode_ci DEFAULT NULL,
`task_id` varchar(250) COLLATE utf8_unicode_ci DEFAULT NULL,
`run_id` varchar(250) COLLATE utf8_unicode_ci DEFAULT NULL,
`try_number` int(11) DEFAULT NULL,
`state` varchar(20) COLLATE utf8_unicode_ci DEFAULT NULL,
@pingzh
pingzh / 0_repeated_mapping.py
Last active May 31, 2022 22:52
test dag for 2.3.2rc1
import csv
import io
import os
import json
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(dag_id="0_repeated_mapping", start_date=datetime(2022, 3, 4)) as dag: