Traceback (most recent call first): [44/1997]
<built-in method __enter__ of _thread.lock object at remote 0x7f8e021156c0>
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/event/attr.py", line 284, in exec_once
with self._exec_once_mutex:
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 649, in __connect
).exec_once(self.connection, self)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 437, in __init__
self.__connect(first_connect_check=True)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 308, in _create_connection
from typing import Optional | |
from pathlib import Path | |
from configparser import ConfigParser # type: ignore | |
DEFAULT_CONF_PATH = Path(__file__).parent / "default_conf.cfg" | |
CONFIG_FILE_NAME = "config_name.cfg" | |
class DefaultableConfigParser(ConfigParser): |
[2023-07-27T22:57:23.296000 UTC] {spark_submit.py:521} INFO - 23/07/27 22:57:23 INFO InsertIntoHiveTable: [AIRBNB] Overriding hive.exec.scratchdir to hdfs://ha-nn-uri/tmp/svc_di_data_infra/hive_staging | |
[2023-07-27T22:57:23.298000 UTC] {spark_submit.py:521} INFO - 23/07/27 22:57:23 INFO FileUtils: Creating directory if it doesn't exist: hdfs://ha-nn-uri/tmp/svc_di_data_infra/hive_staging_hive_2023-07-27_22-57-23_296_8878419864542981198-1 | |
[2023-07-27T22:57:23.388000 UTC] {spark_submit.py:521} INFO - Traceback (most recent call last): | |
[2023-07-27T22:57:23.388000 UTC] {spark_submit.py:521} INFO - File "/usr/local/lib/python3.7/dist-packages/sqlmesh/engines/spark/app.py", line 98, in <module> | |
[2023-07-27T22:57:23.388000 UTC] {spark_submit.py:521} INFO - main(args.dialect, args.command_type, args.ddl_concurrent_tasks, args.payload_path) | |
[2023-07-27T22:57:23.388000 UTC] {spark_submit.py:521} INFO - File "/usr/local/lib/python3.7/dist-packages/sqlmesh/engines/spark/app.py", line 70, in main | |
[2023-07-27T22:57:23.38800 |
class SchedulerFramework: | |
""" | |
General Rules: | |
1. SchedulerFramework does not assume scheduler implementation | |
2. SchedulerFramework does not parse dag files | |
""" | |
def __init__(self, executor, scheduler_cls): | |
self.executor = executor | |
self.scheduler = scheduler_cls(executor=self.executor, num_runs=-1) |
commit 8d545982a8483aec18134c198c5adf874e5e8f4a | |
Date: Mon Jul 18 15:37:32 2022 -0700 | |
Error: Specified key was too long; max key length is 767 bytes for mysql | |
5.7 | |
see: https://dev.mysql.com/doc/refman/5.7/en/innodb-limits.html | |
mysql 5.7 uses utf8mb3 charset (which is utf8), thus the max length for |
def protect(*protected): | |
"""Returns a metaclass that protects all attributes given as strings""" | |
class Protect(type): | |
has_base = False | |
def __new__(meta, name, bases, attrs): | |
if meta.has_base: | |
for attribute in attrs: | |
if attribute in protected: | |
raise AttributeError('Overriding of attribute "%s" not allowed.'%attribute) | |
meta.has_base = True |
from datetime import datetime, timedelta | |
from airflow import DAG | |
from airflow.operators.bash import BashOperator | |
from airflow.operators.dummy import DummyOperator | |
with DAG( | |
dag_id='00_ping_test_2', |
CREATE TABLE `task_instance_audit` ( | |
`id` bigint(20) NOT NULL AUTO_INCREMENT, | |
`changedate` TIMESTAMP DEFAULT NULL, | |
`dag_id` varchar(250) COLLATE utf8_unicode_ci DEFAULT NULL, | |
`task_id` varchar(250) COLLATE utf8_unicode_ci DEFAULT NULL, | |
`run_id` varchar(250) COLLATE utf8_unicode_ci DEFAULT NULL, | |
`try_number` int(11) DEFAULT NULL, | |
`state` varchar(20) COLLATE utf8_unicode_ci DEFAULT NULL, |
import csv | |
import io | |
import os | |
import json | |
from datetime import datetime | |
from airflow import DAG | |
from airflow.decorators import task | |
with DAG(dag_id="0_repeated_mapping", start_date=datetime(2022, 3, 4)) as dag: |
RDBMS-based job queues have been criticized recently for being unable to handle heavy loads. And they deserve it, to some extent, because the queries used to safely lock a job have been pretty hairy. SELECT FOR UPDATE followed by an UPDATE works fine at first, but then you add more workers, and each is trying to SELECT FOR UPDATE the same row (and maybe throwing NOWAIT in there, then catching the errors and retrying), and things slow down.
On top of that, they have to actually update the row to mark it as locked, so the rest of your workers are sitting there waiting while one of them propagates its lock to disk (and the disks of however many servers you're replicating to). QueueClassic got some mileage out of the novel idea of randomly picking a row near the front of the queue to lock, but I can't still seem to get more than an an extra few hundred jobs per second out of it under heavy load.
So, many developers have started going straight t