Skip to content

Instantly share code, notes, and snippets.

@ConnorRigby
Last active October 22, 2020 22:09
Show Gist options
  • Save ConnorRigby/0cbd7a37c8d1af3339e319d683eef3f3 to your computer and use it in GitHub Desktop.
Save ConnorRigby/0cbd7a37c8d1af3339e319d683eef3f3 to your computer and use it in GitHub Desktop.
# import the opencv library
import cv2
import erlang, os, struct
# define a video capture object
vid = cv2.VideoCapture(0)
AtomOk = erlang.OtpErlangAtom(bytes("ok", "utf-8"))
AtomErr = erlang.OtpErlangAtom(bytes("error", "utf-8"))
AtomClose = erlang.OtpErlangAtom(bytes("close", "utf-8"))
AtomGetFrame = erlang.OtpErlangAtom(bytes("get_frame", "utf-8"))
def send(term, stream):
"""Write an Erlang term to an output stream."""
payload = erlang.term_to_binary(term)
header = struct.pack('!I', len(payload))
stream.write(header)
stream.write(payload)
stream.flush()
def recv(stream):
"""Read an Erlang term from an input stream."""
header = stream.read(4)
if len(header) != 4:
return None # EOF
(length,) = struct.unpack('!I', header)
payload = stream.read(length)
if len(payload) != length:
return None
term = erlang.binary_to_term(payload)
return term
def recv_loop(stream):
"""Yield Erlang terms from an input stream."""
message = recv(stream)
while message:
yield message
message = recv(stream)
if __name__ == '__main__':
input, output = os.fdopen(3, 'rb'), os.fdopen(4, 'wb')
for message in recv_loop(input):
operation = message[0]
ref = message[1]
if operation == AtomGetFrame:
ret, frame = vid.read()
_, jpeg = cv2.imencode('.JPEG', frame)
send([ref, (AtomOk, jpeg.tobytes())], output)
elif operation == AtomClose:
# After the loop release the cap object
vid.release()
# Destroy all the windows
cv2.destroyAllWindows()
send([ref, AtomOk], output)
else:
print("unknown message")
send([ref, (AtomErr, 'unknwon message')], output)
defmodule Test do
@moduledoc """
usage:
iex> {:ok, pid} = Test.start_link []
{:ok, #PID<0.119.0>}
{:ok, jpeg} = Test.get_frame(pid)
{:ok,
<<255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0, 1, 1, 0, 0, 1, 0, 1, 0, 0, 255,
219, 0, 67, 0, 2, 1, 1, 1, 1, 1, 2, 1, 1, 1, 2, 2, 2, 2, 2, 4, 3, 2, 2, 2, 2,
5, 4, ...>>}
"""
use GenServer
def get_frame(pid), do: GenServer.call(pid, :get_frame)
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
@impl GenServer
def init(_args) do
port = open_port()
{:ok, %{port: port, caller: nil}}
end
@impl GenServer
def handle_call(:get_frame, from, state) do
true = :erlang.port_command(state.port, :erlang.term_to_binary([:get_frame, make_ref(), []]))
{:noreply, %{state | caller: from}}
end
@impl GenServer
def handle_info({port, {:data, response}}, %{port: port} = state) do
[_ref, response] = :erlang.binary_to_term(response)
case response do
{:ok, jpeg} when is_list(jpeg) ->
GenServer.reply(state.caller, {:ok, :erlang.list_to_binary(jpeg)})
error ->
GenServer.reply(state.caller, error)
end
{:noreply, %{state | caller: nil}}
end
defp open_port do
Port.open(
{:spawn_executable, "/usr/bin/env"},
[:binary, {:packet, 4}, {:args, ["python", "-u", "test.py"]}, :nouse_stdio]
)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment