Skip to content

Instantly share code, notes, and snippets.

@sravantit25
Last active July 8, 2024 09:28
Show Gist options
  • Save sravantit25/0e9e620db313c62c1aa9444d3b359cec to your computer and use it in GitHub Desktop.
Save sravantit25/0e9e620db313c62c1aa9444d3b359cec to your computer and use it in GitHub Desktop.
"""
This module consumes data streams from specified endpoints, combines them,
while ensuring non-negative integer values for the combined data, along
with a scaling factor. It uses threading and concurrent
futures for handling multiple data streams.
"""
import concurrent.futures
from queue import Queue
import threading
import requests
def consume_stream(url, token, data_queue, timeout=10):
"""
Consumes a data stream from the given URL and puts each data point into a queue.
Args:
url (str): The URL of the data stream to consume.
token (str): The bearer token for authentication.
data_queue (Queue): The queue to put the data points into.
timeout (int): The timeout for the requests.get call.
"""
headers = {"Authorization": f"Bearer {token}"}
try:
response = requests.get(url, headers=headers, stream=True, timeout=timeout)
print(f"Connected to {url}")
if response.status_code == 200:
for line in response.iter_lines():
# filter out keep-alive new lines
if line:
data = line.decode('utf-8')
data_queue.put(float(data))
else:
print(f"Failed to connect, status code: {response.status_code}")
print(response.text)
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
def combine_streams(queue1, queue2, scaling_factor=7):
"""
Combines data points from two queues to create a complex data stream,
ensuring non-negative integer values and applying a scaling factor.
Args:
queue1 (Queue): The queue for the first data stream.
queue2 (Queue): The queue for the second data stream.
"""
while True:
data1 = queue1.get()
data2 = queue2.get()
combined_value = data1 + data2
combined_value = max(round(combined_value * scaling_factor), 0)
combined_data = {
'stream1_value': data1,
'stream2_value': data2,
'combined_value': combined_value
}
print(f"{combined_data}")
if __name__ == "__main__":
BASE_URL = 'http://datagen.pythonanywhere.com'
ENDPOINT1 = '/normal'
ENDPOINT2 = '/anomalies/clustered'
TOKEN = 'your_token_here'
stream_queue1 = Queue()
stream_queue2 = Queue()
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.submit(consume_stream, BASE_URL + ENDPOINT1, TOKEN, stream_queue1)
executor.submit(consume_stream, BASE_URL + ENDPOINT2, TOKEN, stream_queue2)
combine_thread = threading.Thread(
target=combine_streams,
args=(stream_queue1, stream_queue2),
daemon=True
)
combine_thread.start()
combine_thread.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment