Skip to content

Instantly share code, notes, and snippets.

View danthelion's full-sized avatar
🔫
Wait, it's all data?

Daniel Palma danthelion

🔫
Wait, it's all data?
View GitHub Profile
@danthelion
danthelion / wildcard-ssl-certificate.md
Created August 28, 2018 14:30 — forked from talyguryn/wildcard-ssl-certificate.md
How to get a wildcard ssl certificate and set up Nginx.

Request a new certificate

Get certbot

Go to any directory and clone repo with sources.

cd ~
git clone https://github.com/certbot/certbot
def main(data, context, dataset='test_dataset_dev', table='beer'):
print('Event ID: {}'.format(context.event_id))
print('Event type: {}'.format(context.event_type))
print('Importing required modules.')
from google.cloud import bigquery
print('This is the data: {}'.format(data))
input_bucket_name = data['bucket']
@danthelion
danthelion / findmy_item_stream.py
Created June 19, 2022 10:31
SingerIO stream for the Find My Item Cache
class ItemStream(FindMyStream):
name = "Item"
primary_keys = ["name", "timestamp"]
schema = th.PropertiesList(
th.Property("name", th.StringType),
th.Property("location", th.ObjectType(
th.Property("longitude", th.NumberType()),
th.Property("latitude", th.NumberType()),
th.Property("altitude", th.NumberType()),
th.Property("timeStamp", th.NumberType()),
@danthelion
danthelion / meltano.yml
Created June 19, 2022 10:34
Meltano extractor and loader
extractors:
- name: tap-findmy
namespace: tap_findmy
pip_url: -e /Dev/findmyairtag/tap-findmy
capabilities:
- state
- catalog
- discover
settings:
- name: item_name
@danthelion
danthelion / stg_find_my_item.sql
Created June 19, 2022 10:46
Staging model for Find My Items
{{
config(
materialized='table'
)
}}
with raw as (
select
name as name,
timestamp as ts,
@danthelion
danthelion / item_location_1d.sql
Created June 19, 2022 10:47
Find My Item reporting model
{{
config(
materialized='table'
)
}}
select
latitude,
longitude,
@danthelion
danthelion / eventstreams.py
Created June 27, 2022 06:52
Load Wikimedia change events into Kafka
import json
from kafka import KafkaProducer
from sseclient import SSEClient as EventSource
def produce_events_from_url(url: str, topic: str) -> None:
for event in EventSource(url):
if event.event == "message":
try:
WITH jsonified_source AS (
SELECT
(data ->> 'title') :: string as title,
(data ->> '$schema') :: string as schema,
(data ->> 'type') :: string as type,
(data ->> 'bot') :: boolean as bot,
(data ->> 'comment') :: string as comment,
(data ->> 'id') :: integer as id,
(data ->> 'length') :: jsonb as length,
(data ->> 'log_action') :: string as log_action,
{{ config(materialized='materializedview') }}
select
server_name,
count(id)
from {{ ref('stg_event_changes') }}
WHERE mz_logical_timestamp() >= timestamp * 1000
AND mz_logical_timestamp() < timestamp * 1000 + 3600000
group by server_name
order by count desc
def new_messages(interval="1m"):
results = engine.execute(f"SELECT count(*) FROM changes_by_server_{interval}")
return None if results.fetchone()[0] == 0 else True
async def event_generator(interval: str):
if new_messages(interval=interval):
print(f"New messages in {interval}")
connection = engine.raw_connection()
with connection.cursor() as cur: