Skip to content

Instantly share code, notes, and snippets.

@zacky1972
Last active September 7, 2022 14:04
Show Gist options
  • Save zacky1972/62c4bf532db7a72260bd90a528c56bcf to your computer and use it in GitHub Desktop.
Save zacky1972/62c4bf532db7a72260bd90a528c56bcf to your computer and use it in GitHub Desktop.
Distributed Image Processing by Node

DistributedImageProcessing

Distributed Image Processing by Node.

Installation and Usage

  1. mix new distributed_image_processing
  2. cd distributed_image_processing
  3. edit the following files:
  • mix.exs
  • config/config.exs
  • lib/distributed_image_processing.ex
  1. mix deps.get
  2. At the worker node, iex --name worker_node --cookie cookie -S mix
  3. At the main node, elixir --name main_node --cookie cookie -S mix run -e 'DistributedImageProcessing.distribute([:worker_node], ["ZACKY-3000.jpg"])'
import Config
config :nx, :default_backend, EXLA.Backend
defmodule DistributedImageProcessing do
def distribute(workers, images) do
Enum.each(workers, &Node.connect/1)
worker_stream =
Stream.repeatedly(fn -> workers end) # worker_stream is generated repeatedly
|> Stream.flat_map(& &1)
sender_pid = self()
Stream.zip(worker_stream, images)
|> Flow.from_enumerable()
|> Flow.map(fn {worker, image} ->
IO.puts("enter spawn_link")
{
Node.spawn_link(worker, fn ->
receive do # worker receives an image from main
{:img, sender_pid, img} ->
{dst_file, img} = process_image(img) # call process_image
binary = Nx.to_binary(img) # An image should be converted into binary, shape and type before sending.
shape = Nx.shape(img)
type = Nx.type(img)
send(sender_pid, {dst_file, type, shape, binary})
IO.puts("respond")
end
end),
image
}
end)
|> Flow.map(fn {pid, src_file} ->
IO.puts("enter reader")
img = Evision.imread!(src_file) |> Evision.Nx.to_nx!()
binary = Nx.to_binary(img) # An image should be converted into binary, shape and type before sending.
shape = Nx.shape(img)
type = Nx.type(img)
send(pid, {:img, sender_pid, {src_file, type, shape, binary}})
end)
|> Enum.to_list()
|> Enum.map(fn _ ->
IO.puts("enter receiver")
receive do
{dst_file, type, shape, binary} ->
img = Nx.from_binary(binary, type) |> Nx.reshape(shape) |> Evision.Nx.to_mat!() # reconstruction of an image
Evision.imwrite!(dst_file, img)
end
end)
|> Enum.to_list()
end
def process_image({src_file, type, shape, binary}) do
IO.puts("enter processor")
# file name conversion
src_file_ext = Path.extname(src_file)
src_file_basename = Path.basename(src_file, src_file_ext)
dst_file = "#{src_file_basename}_d#{src_file_ext}"
dst_img =
Nx.from_binary(binary, type) # reconstruction of an image
|> Nx.reshape(shape)
|> Evision.Nx.to_mat!()
|> Evision.threshold!(127, 255, Evision.cv_THRESH_BINARY) # image processing
|> elem(1)
|> Evision.Nx.to_nx!()
{dst_file, dst_img}
end
end
# Setting for Evision to use the precompiled version
System.put_env("EVISION_PREFER_PRECOMPILED", "true") # Remove if you use a platform on which Evision does not provide a pre-compiled library.
System.put_env("EVISION_PRECOMPILED_CACHE_DIR", "#{System.user_home!()}/.cache")
defmodule DistributedImageProcessing.MixProject do
use Mix.Project
def project do
[
app: :distributed_image_processing,
version: "0.1.0",
elixir: "~> 1.13",
start_permanent: Mix.env() == :prod,
deps: deps()
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger]
]
end
# Run "mix help deps" to learn about dependencies.
defp deps do
[
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
{:nx, "~> 0.3"},
{:exla, "~> 0.3"},
{:evision, "~> 0.1.2", github: "cocoa-xu/evision", tag: "v0.1.2"},
{:flow, "~> 1.2"}
]
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment