This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.utils.decorators import apply_defaults | |
from pydub import AudioSegment | |
from operators.base_custom_operator import BaseCustomOperator | |
from bson import ObjectId | |
import tempfile | |
from datetime import datetime | |
class GenerateSongOperator(BaseCustomOperator): | |
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.utils.decorators import apply_defaults | |
from operators.base_custom_operator import BaseCustomOperator | |
from bson import ObjectId | |
from diffusers import StableDiffusionPipeline | |
import torch | |
from datetime import datetime | |
import tempfile | |
class GenerateSongCoverOperator(BaseCustomOperator): | |
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from elasticsearch import Elasticsearch | |
from airflow.utils.decorators import apply_defaults | |
from operators.base_custom_operator import BaseCustomOperator | |
from bson import ObjectId | |
from datetime import datetime | |
class IndexToElasticsearchOperator(BaseCustomOperator): | |
@apply_defaults | |
def __init__( |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def initialize_fog_node(mac_address): | |
stop_frame_capture() | |
global camera_id | |
if not os.path.exists(FRAMES_OUTPUT_DIRECTORY): | |
os.makedirs(FRAMES_OUTPUT_DIRECTORY) | |
provisioning_data = perform_provisioning(mac_address) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Function to send a frame over MQTT | |
def send_frame_over_mqtt(timestamp, mac_address): | |
# Construct the full file path for the captured frame | |
frame_path = os.path.join(FRAMES_OUTPUT_DIRECTORY, f"frame_{timestamp}.jpg") | |
try: | |
# Ensure the file has the ".jpg" extension | |
frame_path = os.path.splitext(frame_path)[0] + ".jpg" | |
# Check if the frame file exists |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Function to retrieve a challenge for authentication | |
def get_challenge(mac_address): | |
""" | |
Retrieve a challenge for authentication. | |
Args: | |
mac_address (str): MAC address of the device. | |
Returns: | |
str: Authentication challenge. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Function for the frame capture loop | |
def frame_capture_loop(mac_address, camera_url): | |
""" | |
Frame capture loop. | |
Args: | |
mac_address (str): MAC address of the device. | |
camera_url (str): URL of the camera feed. | |
""" | |
logging.info("Opening video source: %s", camera_url) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from environment import KAFKA_BOOTSTRAP_SERVERS, KAFKA_OUTPUT_TOPIC | |
# Function to create the sink table in the Flink table environment for processed video frames. | |
# @Author: Sergio Sánchez Sánchez | |
def create_sink_table(t_env): | |
""" | |
Create the sink table in the Flink table environment. | |
Parameters | |
---------- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from environment import KAFKA_BOOTSTRAP_SERVERS, KAFKA_INPUT_TOPIC, KAFKA_GROUP_ID, KAFKA_OUTPUT_TOPIC | |
from kafka_connectivity_check import kafka_connectivity_check | |
from get_flink_environment import get_flink_environment | |
from create_source_table import create_source_table | |
from create_sink_table import create_sink_table | |
from pyflink.table.expressions import col, call | |
from pyflink.table import DataTypes, Row | |
from pyflink.table.udf import udtf, TableFunction | |
from logger import logger | |
from pyflink.table import Row |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from common.helpers import generate_response | |
from common.requires_authentication_decorator import requires_authentication | |
from flask import Flask, request | |
import hashlib | |
import os | |
import jwt | |
from pymongo import MongoClient, DESCENDING |