Skip to content

Instantly share code, notes, and snippets.

View sergio11's full-sized avatar
🏠
Working from home

Sergio Sánchez Sánchez sergio11

🏠
Working from home
View GitHub Profile
@sergio11
sergio11 / create_sink_table.py
Created December 2, 2023 11:59
Create Sink Table
from environment import KAFKA_BOOTSTRAP_SERVERS, KAFKA_OUTPUT_TOPIC
# Function to create the sink table in the Flink table environment for processed video frames.
# @Author: Sergio Sánchez Sánchez
def create_sink_table(t_env):
"""
Create the sink table in the Flink table environment.
Parameters
----------
@sergio11
sergio11 / frame_capture_loop.py
Created December 2, 2023 11:48
Frame Capture Loop
# Function for the frame capture loop
def frame_capture_loop(mac_address, camera_url):
"""
Frame capture loop.
Args:
mac_address (str): MAC address of the device.
camera_url (str): URL of the camera feed.
"""
logging.info("Opening video source: %s", camera_url)
@sergio11
sergio11 / authenticate.py
Created December 2, 2023 11:46
CHAP Authentication Protocol
# Function to retrieve a challenge for authentication
def get_challenge(mac_address):
"""
Retrieve a challenge for authentication.
Args:
mac_address (str): MAC address of the device.
Returns:
str: Authentication challenge.
@sergio11
sergio11 / send_frame_over_mqtt.py
Created December 2, 2023 11:43
Send frame over MQTT
# Function to send a frame over MQTT
def send_frame_over_mqtt(timestamp, mac_address):
# Construct the full file path for the captured frame
frame_path = os.path.join(FRAMES_OUTPUT_DIRECTORY, f"frame_{timestamp}.jpg")
try:
# Ensure the file has the ".jpg" extension
frame_path = os.path.splitext(frame_path)[0] + ".jpg"
# Check if the frame file exists
@sergio11
sergio11 / initialize_fog_node.py
Created December 2, 2023 11:38
Initialize Fog Node
def initialize_fog_node(mac_address):
stop_frame_capture()
global camera_id
if not os.path.exists(FRAMES_OUTPUT_DIRECTORY):
os.makedirs(FRAMES_OUTPUT_DIRECTORY)
provisioning_data = perform_provisioning(mac_address)
@sergio11
sergio11 / index_to_elasticsearch_operator.py
Created November 6, 2023 20:05
index_to_elasticsearch_operator.py
from elasticsearch import Elasticsearch
from airflow.utils.decorators import apply_defaults
from operators.base_custom_operator import BaseCustomOperator
from bson import ObjectId
from datetime import datetime
class IndexToElasticsearchOperator(BaseCustomOperator):
@apply_defaults
def __init__(
@sergio11
sergio11 / generate_song_cover_operator.py
Created November 6, 2023 20:00
generate_song_cover_operator.py
from airflow.utils.decorators import apply_defaults
from operators.base_custom_operator import BaseCustomOperator
from bson import ObjectId
from diffusers import StableDiffusionPipeline
import torch
from datetime import datetime
import tempfile
class GenerateSongCoverOperator(BaseCustomOperator):
"""
@sergio11
sergio11 / generate_song_operator.py
Created November 6, 2023 19:56
generate_song_operator.py
from airflow.utils.decorators import apply_defaults
from pydub import AudioSegment
from operators.base_custom_operator import BaseCustomOperator
from bson import ObjectId
import tempfile
from datetime import datetime
class GenerateSongOperator(BaseCustomOperator):
"""
@sergio11
sergio11 / generate_voice_operator.py
Created November 6, 2023 19:52
generate_voice_operator.py
from airflow.utils.decorators import apply_defaults
from operators.base_custom_operator import BaseCustomOperator
from bson import ObjectId
import importlib
import scipy
import tempfile
from datetime import datetime
class GenerateVoiceOperator(BaseCustomOperator):
@sergio11
sergio11 / generate_melody_operator.py
Created November 6, 2023 19:50
generate_melody_operator.py
from airflow.utils.decorators import apply_defaults
from operators.base_custom_operator import BaseCustomOperator
from bson import ObjectId
import importlib
import scipy
import tempfile
from datetime import datetime
class GenerateMelodyOperator(BaseCustomOperator):