Skip to content

Instantly share code, notes, and snippets.

Avatar

Anna Geller anna-anisienia

View GitHub Profile
View timestream_1min_schedule.py
import time
import requests
import logging
import boto3
from botocore.config import Config
def send_price_data_to_timestream(write_client):
base_url = "https://min-api.cryptocompare.com/data"
symbols = "BTC,ETH,REP,DASH"
View timestream_while_loop.py
import time
import requests
import logging
import boto3
from botocore.config import Config
def send_price_data_to_timestream(write_client):
base_url = "https://min-api.cryptocompare.com/data"
symbols = "BTC,ETH,REP,DASH"
View kinesis_data_streams.py
import requests
import boto3
import uuid
import time
import json
client = boto3.client('kinesis', region_name='eu-central-1')
partition_key = str(uuid.uuid4())
View kinesis_data_streams.py
import requests
import boto3
import uuid
import time
import json
client = boto3.client('kinesis', region_name='eu-central-1')
partition_key = str(uuid.uuid4())
View sns.py
import boto3
import uuid
import json
import random
import logging
from datetime import datetime
logging.basicConfig(format="[%(levelname)s] [%(name)s] [%(asctime)s]: %(message)s",
level="INFO")
logger = logging.getLogger(__name__)
View sqs.py
import boto3
import uuid
import json
import random
import logging
from datetime import datetime
logging.basicConfig(format="[%(levelname)s] [%(name)s] [%(asctime)s]: %(message)s",
level="INFO")
logger = logging.getLogger(__name__)
View dq_alerts.py
import json
import logging
import requests
import pandas as pd
import awswrangler as wr
from typing import List, Any
logger = logging.getLogger()
logger.setLevel(logging.INFO)
View local_slack_dq_alerts.py
import json
import logging
import requests
import pandas as pd
import awswrangler as wr
from typing import List, Any
class DataQualityAlert:
def __init__(self, slack_webhook_url: str, database: str = "ecommerce"):
View dq_alerts.py
import json
import logging
import requests
import pandas as pd
import awswrangler as wr
from typing import List, Any
class DataQualityAlert:
def __init__(self, slack_webhook_url: str, database: str = "ecommerce"):
View aws_config_query.sql
SELECT
resourceId,
resourceType,
awsRegion,
resourceCreationTime,
tags,
configuration.state.value
WHERE
resourceType NOT IN ('AWS::EC2::SecurityGroup',
'AWS::EC2::Subnet', 'AWS::EC2::VPC',