Skip to content

Instantly share code, notes, and snippets.

@goodbyegangster
goodbyegangster / getNegaPosi.py
Last active October 27, 2021 13:55
negative-positive analyzer script for shikiho-data
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import csv, glob, MeCab
result_file = "XXXXXXXX"
dic_file = "XXXXXXXX"
shikiho_file = "XXXXXXXX"
headmsg = ["code","comment","value"]
@goodbyegangster
goodbyegangster / getShikiho.py
Last active October 27, 2021 13:54
web scraping script to get shikiho-data from SBI-shoken
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os, csv, threading, re
from time import sleep
from selenium import webdriver
from pyquery import PyQuery
from logging import getLogger, FileHandler, Formatter, DEBUG
logger = getLogger(__name__)
@goodbyegangster
goodbyegangster / get_gcs_buckets_info.py
Created October 27, 2021 14:00
Get a list of bucket information from GCS
"""
Google Cloud Storage から Bucket 情報の一覧を取得するスクリプト
- Python 3.9.6
- google-cloud-storage==1.42.3
"""
import csv
from google.cloud import storage
def run():
@goodbyegangster
goodbyegangster / fastapi_first_step.py
Last active November 4, 2021 17:28
FastAPI Tutorial
"""
FastAPIの基本的記法
- Python 3.9.7
- FastAPI 0.70.0
- uvicorn 0.15.0
"""
import uvicorn
from typing import Optional
from fastapi import FastAPI, Query, HTTPException
from pydantic import Field, BaseModel
@goodbyegangster
goodbyegangster / sample_dag.py
Last active June 22, 2022 02:39
Apache Airflow Sample DAG file
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.decorators import task
# define default arguments for operators.
default_args = {
@goodbyegangster
goodbyegangster / sample_branching_dag.py
Last active June 26, 2022 02:29
Apache Airflow Sample BranchPythonOperator DAG file
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.decorators import task
from airflow.decorators.branch_python import branch_task
@task(task_id='beginning')
def beginning() -> int:
@goodbyegangster
goodbyegangster / sample_sco_dag.py
Last active June 26, 2022 02:15
Apache Airflow Sample ShortCircuitOperator DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import ShortCircuitOperator
from airflow.operators.empty import EmptyOperator
def is_even_number() -> bool:
import random
number = random.randint(1, 10)
@goodbyegangster
goodbyegangster / trigger_run_dag.py
Created June 26, 2022 06:12
A Sample TriggerDagRunOperator DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id='trigger_run_dag',
description='A Sample TriggerDagRunOperator DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2022, 6, 1),
@goodbyegangster
goodbyegangster / trigger_target_dag.py
Created June 26, 2022 06:13
A Sample Trigger Target DAG
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
@task(task_id='pring_message')
def print_message(dag_run=None) -> None:
print(f'received message: {dag_run.conf["message"]}.')
@goodbyegangster
goodbyegangster / sample_external_task_sensor.py
Created June 27, 2022 02:16
A Sample ExternalTaskSensor DAG
from datetime import datetime, timedelta, timezone
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import (
ExternalTaskMarker,
ExternalTaskSensor,
)
JST = timezone(timedelta(hours=+9), 'JST')