Skip to content

Instantly share code, notes, and snippets.

View sergio11's full-sized avatar
🏠
Working from home

Sergio Sánchez Sánchez sergio11

🏠
Working from home
View GitHub Profile
@sergio11
sergio11 / generate_song_operator.py
Created November 6, 2023 19:56
generate_song_operator.py
from airflow.utils.decorators import apply_defaults
from pydub import AudioSegment
from operators.base_custom_operator import BaseCustomOperator
from bson import ObjectId
import tempfile
from datetime import datetime
class GenerateSongOperator(BaseCustomOperator):
"""
@sergio11
sergio11 / generate_song_cover_operator.py
Created November 6, 2023 20:00
generate_song_cover_operator.py
from airflow.utils.decorators import apply_defaults
from operators.base_custom_operator import BaseCustomOperator
from bson import ObjectId
from diffusers import StableDiffusionPipeline
import torch
from datetime import datetime
import tempfile
class GenerateSongCoverOperator(BaseCustomOperator):
"""
@sergio11
sergio11 / index_to_elasticsearch_operator.py
Created November 6, 2023 20:05
index_to_elasticsearch_operator.py
from elasticsearch import Elasticsearch
from airflow.utils.decorators import apply_defaults
from operators.base_custom_operator import BaseCustomOperator
from bson import ObjectId
from datetime import datetime
class IndexToElasticsearchOperator(BaseCustomOperator):
@apply_defaults
def __init__(
@sergio11
sergio11 / initialize_fog_node.py
Created December 2, 2023 11:38
Initialize Fog Node
def initialize_fog_node(mac_address):
stop_frame_capture()
global camera_id
if not os.path.exists(FRAMES_OUTPUT_DIRECTORY):
os.makedirs(FRAMES_OUTPUT_DIRECTORY)
provisioning_data = perform_provisioning(mac_address)
@sergio11
sergio11 / send_frame_over_mqtt.py
Created December 2, 2023 11:43
Send frame over MQTT
# Function to send a frame over MQTT
def send_frame_over_mqtt(timestamp, mac_address):
# Construct the full file path for the captured frame
frame_path = os.path.join(FRAMES_OUTPUT_DIRECTORY, f"frame_{timestamp}.jpg")
try:
# Ensure the file has the ".jpg" extension
frame_path = os.path.splitext(frame_path)[0] + ".jpg"
# Check if the frame file exists
@sergio11
sergio11 / authenticate.py
Created December 2, 2023 11:46
CHAP Authentication Protocol
# Function to retrieve a challenge for authentication
def get_challenge(mac_address):
"""
Retrieve a challenge for authentication.
Args:
mac_address (str): MAC address of the device.
Returns:
str: Authentication challenge.
@sergio11
sergio11 / frame_capture_loop.py
Created December 2, 2023 11:48
Frame Capture Loop
# Function for the frame capture loop
def frame_capture_loop(mac_address, camera_url):
"""
Frame capture loop.
Args:
mac_address (str): MAC address of the device.
camera_url (str): URL of the camera feed.
"""
logging.info("Opening video source: %s", camera_url)
@sergio11
sergio11 / create_sink_table.py
Created December 2, 2023 11:59
Create Sink Table
from environment import KAFKA_BOOTSTRAP_SERVERS, KAFKA_OUTPUT_TOPIC
# Function to create the sink table in the Flink table environment for processed video frames.
# @Author: Sergio Sánchez Sánchez
def create_sink_table(t_env):
"""
Create the sink table in the Flink table environment.
Parameters
----------
@sergio11
sergio11 / video_frame_processor.py
Created December 2, 2023 12:02
Video Frame Processor
from environment import KAFKA_BOOTSTRAP_SERVERS, KAFKA_INPUT_TOPIC, KAFKA_GROUP_ID, KAFKA_OUTPUT_TOPIC
from kafka_connectivity_check import kafka_connectivity_check
from get_flink_environment import get_flink_environment
from create_source_table import create_source_table
from create_sink_table import create_sink_table
from pyflink.table.expressions import col, call
from pyflink.table import DataTypes, Row
from pyflink.table.udf import udtf, TableFunction
from logger import logger
from pyflink.table import Row
@sergio11
sergio11 / flask_services_for_user_authentication.py
Created December 2, 2023 12:05
Flask Services for User Authentication
import logging
from common.helpers import generate_response
from common.requires_authentication_decorator import requires_authentication
from flask import Flask, request
import hashlib
import os
import jwt
from pymongo import MongoClient, DESCENDING