Skip to content

Instantly share code, notes, and snippets.

@mrakitin
Last active April 3, 2023 15:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrakitin/0aca1499d96b54f3ffbbcbcb085e9a2d to your computer and use it in GitHub Desktop.
Save mrakitin/0aca1499d96b54f3ffbbcbcb085e9a2d to your computer and use it in GitHub Desktop.
import datetime
import pprint
import uuid
import cv2
import matplotlib.pyplot as plt
from bloptools.gp.utils import get_beam_stats
from bluesky_kafka import RemoteDispatcher
from bluesky_kafka.consume import BasicConsumer
from nslsii.kafka_utils import _read_bluesky_kafka_config_file
# plt.ion()
def print_kafka_messages(beamline_acronym):
# fig, ax = plt.subplots(1, 1)
print(f"Listening for Kafka messages for {beamline_acronym}")
# from databroker import Broker
# db = Broker.named(beamline_acronym)
def print_message(msg, *args, **kwargs):
if "optimization" in args[1]:
print(
f"{datetime.datetime.now().isoformat()}: {type(msg) = } {msg = }\n{args = }\n{kwargs = }"
)
video_stream_url = "http://10.68.57.34/mjpg/video.mjpg"
cap = cv2.VideoCapture(video_stream_url)
img = cap.read()[1].sum(axis=-1)
stats = get_beam_stats(img)
x_min, x_max, y_min, y_max, parability = stats
print(f"{msg = }")
print(f"{stats = }")
print(f"{img.shape = }")
# ax.cla()
# ax.imshow(img)
# fig.canvas.draw_idle()
return True
# this consumer should not be in a group with other consumers
# so generate a unique consumer group id for it
unique_group_id = f"echo-{beamline_acronym}-{str(uuid.uuid4())[:8]}"
kafka_config = _read_bluesky_kafka_config_file(
config_file_path="/etc/bluesky/kafka.yml"
)
# import _thread
# def consumer_func():
consumer = BasicConsumer(
topics=[f"{beamline_acronym}.test"],
bootstrap_servers=kafka_config["bootstrap_servers"],
group_id=unique_group_id,
consumer_config=kafka_config["runengine_producer_config"],
process_message=print_message,
)
consumer.start_polling()
# _thread.start_new_thread(consumer_func, ())
# consumer_func()
# plt.show()
if __name__ == "__main__":
import sys
print_kafka_messages(sys.argv[1])
import uuid
import numpy as np
from bluesky_kafka.produce import BasicProducer
from nslsii.kafka_utils import _read_bluesky_kafka_config_file
if __name__ == "__main__":
kafka_config = _read_bluesky_kafka_config_file("/etc/bluesky/kafka.yml")
basic_producer = BasicProducer(
topic="tes.test",
bootstrap_servers=kafka_config["bootstrap_servers"],
key=str(uuid.uuid4()),
producer_config=kafka_config["runengine_producer_config"],
)
basic_producer.produce("test")
data = np.array([1, 2, 3])
basic_producer.produce(data)
def f(*args, **kwargs):
print(f"{args = }\n{kwargs = }")
if "obj" in kwargs:
kwargs["obj"] = kwargs["obj"].name
msg = {
"optimization": True,
"time": ttime.time(),
"motor_args": args,
"motor_kwargs": kwargs,
}
print(f"{msg = }")
basic_producer.produce(msg)
return msg
kbv.ush.subscribe(f)
kbv.dsh.subscribe(f)
kbh.ush.subscribe(f)
kbh.dsh.subscribe(f)
import datetime
import pprint
import uuid
import nslsii
from bluesky_kafka import RemoteDispatcher
def print_kafka_messages(beamline_acronym):
def print_message(name, doc):
print(
f"{datetime.datetime.now().isoformat()} document: {name}\n"
f"contents: {pprint.pformat(doc)}\n"
)
kafka_config = nslsii._read_bluesky_kafka_config_file(
config_file_path="/etc/bluesky/kafka.yml"
)
# this consumer should not be in a group with other consumers
# so generate a unique consumer group id for it
unique_group_id = f"echo-{beamline_acronym}-{str(uuid.uuid4())[:8]}"
kafka_dispatcher = RemoteDispatcher(
topics=[f"{beamline_acronym}.bluesky.runengine.documents"],
bootstrap_servers=",".join(kafka_config["bootstrap_servers"]),
group_id=unique_group_id,
consumer_config=kafka_config["runengine_producer_config"],
)
kafka_dispatcher.subscribe(print_message)
kafka_dispatcher.start()
import datetime
import pprint
import uuid
from bluesky_kafka import RemoteDispatcher
from nslsii.kafka_utils import _read_bluesky_kafka_config_file
def print_kafka_messages(beamline_acronym):
print(f"Listening for Kafka messages for {beamline_acronym}")
from databroker import Broker
db = Broker.named(beamline_acronym)
def print_message(name, doc):
if name == "stop":
print(
f"{datetime.datetime.now().isoformat()} document: {name}\n"
f"contents: {pprint.pformat(doc)}\n"
)
print(db[doc["run_start"]].table())
kafka_config = _read_bluesky_kafka_config_file(
config_file_path="/etc/bluesky/kafka.yml"
)
# this consumer should not be in a group with other consumers
# so generate a unique consumer group id for it
unique_group_id = f"echo-{beamline_acronym}-{str(uuid.uuid4())[:8]}"
kafka_dispatcher = RemoteDispatcher(
topics=[f"{beamline_acronym}.bluesky.runengine.documents"],
bootstrap_servers=",".join(kafka_config["bootstrap_servers"]),
group_id=unique_group_id,
consumer_config=kafka_config["runengine_producer_config"],
)
kafka_dispatcher.subscribe(print_message)
kafka_dispatcher.start()
if __name__ == "__main__":
import sys
print_kafka_messages(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment