This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from environment import KAFKA_BOOTSTRAP_SERVERS, KAFKA_OUTPUT_TOPIC | |
# Function to create the sink table in the Flink table environment for processed video frames. | |
# @Author: Sergio Sánchez Sánchez | |
def create_sink_table(t_env): | |
""" | |
Create the sink table in the Flink table environment. | |
Parameters | |
---------- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Function for the frame capture loop | |
def frame_capture_loop(mac_address, camera_url): | |
""" | |
Frame capture loop. | |
Args: | |
mac_address (str): MAC address of the device. | |
camera_url (str): URL of the camera feed. | |
""" | |
logging.info("Opening video source: %s", camera_url) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Function to retrieve a challenge for authentication | |
def get_challenge(mac_address): | |
""" | |
Retrieve a challenge for authentication. | |
Args: | |
mac_address (str): MAC address of the device. | |
Returns: | |
str: Authentication challenge. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Function to send a frame over MQTT | |
def send_frame_over_mqtt(timestamp, mac_address): | |
# Construct the full file path for the captured frame | |
frame_path = os.path.join(FRAMES_OUTPUT_DIRECTORY, f"frame_{timestamp}.jpg") | |
try: | |
# Ensure the file has the ".jpg" extension | |
frame_path = os.path.splitext(frame_path)[0] + ".jpg" | |
# Check if the frame file exists |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def initialize_fog_node(mac_address): | |
stop_frame_capture() | |
global camera_id | |
if not os.path.exists(FRAMES_OUTPUT_DIRECTORY): | |
os.makedirs(FRAMES_OUTPUT_DIRECTORY) | |
provisioning_data = perform_provisioning(mac_address) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from elasticsearch import Elasticsearch | |
from airflow.utils.decorators import apply_defaults | |
from operators.base_custom_operator import BaseCustomOperator | |
from bson import ObjectId | |
from datetime import datetime | |
class IndexToElasticsearchOperator(BaseCustomOperator): | |
@apply_defaults | |
def __init__( |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.utils.decorators import apply_defaults | |
from operators.base_custom_operator import BaseCustomOperator | |
from bson import ObjectId | |
from diffusers import StableDiffusionPipeline | |
import torch | |
from datetime import datetime | |
import tempfile | |
class GenerateSongCoverOperator(BaseCustomOperator): | |
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.utils.decorators import apply_defaults | |
from pydub import AudioSegment | |
from operators.base_custom_operator import BaseCustomOperator | |
from bson import ObjectId | |
import tempfile | |
from datetime import datetime | |
class GenerateSongOperator(BaseCustomOperator): | |
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.utils.decorators import apply_defaults | |
from operators.base_custom_operator import BaseCustomOperator | |
from bson import ObjectId | |
import importlib | |
import scipy | |
import tempfile | |
from datetime import datetime | |
class GenerateVoiceOperator(BaseCustomOperator): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.utils.decorators import apply_defaults | |
from operators.base_custom_operator import BaseCustomOperator | |
from bson import ObjectId | |
import importlib | |
import scipy | |
import tempfile | |
from datetime import datetime | |
class GenerateMelodyOperator(BaseCustomOperator): |