Skip to content

Instantly share code, notes, and snippets.

SELECT i.country,
total_invoice,
total_customer
FROM
(SELECT billing_country AS country,
COUNT(invoice_id) AS total_invoice
FROM invoice
WHERE EXTRACT(YEAR FROM CAST(invoice_date AS date)) =
(SELECT EXTRACT(YEAR FROM CAST(invoice_date AS date)) AS YEAR
FROM invoice
select current_timestamp as test_date_time, 'test_01' as test_name, count(*) = 0 as test_result from (
select * from public_test.dm_settlement_report_expected as ex
full join public_test.dm_settlement_report_actual as ac
on ex.id = ac.id
and ex.restaurant_id = ac.restaurant_id
and ex.settlement_year = ac.settlement_year
and ex.settlement_month = ac.settlement_month
and ex.orders_count = ac.orders_count
and ex.orders_total_sum = ac.orders_total_sum
and ex.orders_bonus_payment_sum = ac.orders_bonus_payment_sum
#ключ: tl0Bn1k6Aw
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F
spark = SparkSession.builder \
delete from mart.f_activity;
delete from mart.f_daily_sales;
--d_calendar
delete from mart.d_calendar;
with all_dates as (
select distinct to_date(date_time::TEXT,'YYYY-MM-DD') as date_time from stage.user_activity_log
union
select distinct to_date(date_time::TEXT,'YYYY-MM-DD') from stage.user_order_log
DROP TABLE IF EXISTS dwh.load_dates_craftsman_report_datamart;
CREATE TABLE IF NOT EXISTS dwh.load_dates_craftsman_report_datamart (
id BIGINT GENERATED ALWAYS AS IDENTITY NOT NULL,
load_dttm DATE NOT NULL,
CONSTRAINT load_dates_craftsman_report_datamart_pk PRIMARY KEY (id)
);
/* Создание измерения "Адреса" */
DROP TABLE IF EXISTS model_lesson.d_address;
CREATE TABLE model_lesson.d_address (
address_id bigint PRIMARY KEY GENERATED ALWAYS AS IDENTITY NOT NULL, --идентификатор покупателя
address_json json NOT NULL, --адрес в формате JSON
city text NOT NULL, --город
street text NOT NULL, --улица
house text NOT NULL, --номер дома
post_index text --почтовый индекс
);
# # auxiliary for test
# from airflow.operators.python import PythonOperator
# business_dt = None
# dag = None
# # /auxiliary for test
# import requests
# from airflow.hooks.base import BaseHook
# def create_files_request(conn_name,business_dt):
# conn = BaseHook.get_connection(conn_name)
-- выполняем обновление показателей в отчёте по существующим мастерам
UPDATE dwh.craftsman_report_datamart SET
craftsman_name = updates.craftsman_name,
craftsman_address = updates.craftsman_address,
craftsman_birthday = updates.craftsman_birthday,
craftsman_email = updates.craftsman_email,
craftsman_money = updates.craftsman_money,
platform_money = updates.platform_money,
count_order = updates.count_order,
avg_price_order = updates.avg_price_order,
DROP SCHEMA IF EXISTS stage CASCADE;
CREATE SCHEMA stage;
--clear
-- DROP TABLE IF EXISTS stage.user_activity_log;
-- DROP TABLE IF EXISTS stage.user_order_log;
-- DROP TABLE IF EXISTS stage.customer_research;
--user activity log