Skip to content

Instantly share code, notes, and snippets.

@siyomai
Last active September 25, 2018 09:58
Show Gist options
  • Save siyomai/7c20416f17d94bd96b31ff96a502075a to your computer and use it in GitHub Desktop.
Save siyomai/7c20416f17d94bd96b31ff96a502075a to your computer and use it in GitHub Desktop.
GenServersProducerConsumer
defmodule QueueGenserver do
use GenServer
@moduledoc """
Documentation for ProducerConsumerGenserver.
"""
def start_link(producers \\ 120, consumers \\ 120, size \\ 10) do
GenServer.start_link(__MODULE__, {[], size})
for id <- 1..producers, do: ProducerGenserver.start_link(id, self())
for id <- 1..consumers, do: ConsumerGenserver.start_link(id, self())
end
def produce(pid, job, producer_id, size) do
GenServer.cast(pid, {:produce, job, producer_id, size})
end
def consume(pid, jobs, consumer_id}) do
GenServer.call(pid, {:consume, consumer_id, jobs, size})
end
#callbacks
def handle_cast({:consume, consumer_id, jobs, size}, _from, state) do
consume_job(jobs, consumer_id, size)
end
def handle_cast({:produce, job, producer_id, size}, state) do
process_job({:job, job, producer_id})
end
def handle_info(:work, state) do
IO.inspect state
end
defp consume_job(jobs, consumers, size) when jobs == [] do
{:nojobs, "there are no jobs available"}
end
defp consume_job([job | jobs], consumers, size) do
process_job(job, jobs, consumers, size)
end
defp process_job({:job, job, producer_id}, jobs, consumers, size) when length(jobs) >= size and consumers == [] do
IO.puts "Q #{stats(jobs, consumers)}: Full. Discarding job #{job} from PRODUCER #{producer_id}"
loop(jobs, consumers, size)
end
defp process_job({:job, job, producer_id} = new_job, jobs, consumers, size) when consumers == [] do
jobs = [new_job | jobs]
IO.puts "Q #{stats(jobs, consumers)}: Queueing job #{job} from PRODUCER #{producer_id}"
loop(jobs, consumers, size)
end
defp process_job({:job, job, producer_id} = new_job, jobs, consumers, size) do
[{:consumer, consumer_id, consumer_pid} | consumers] = consumers
IO.puts "Q #{stats(jobs, consumers)}: Sending job #{job} from PRODUCER #{producer_id} to CONSUMER #{consumer_id}"
send(consumer_pid, new_job)
loop(jobs, consumers, size)
end
defp loop(jobs, consumers, size) do
Process.send_after(self(), :work, 2 * 60 * 60 * 1000)
end
# def loop({id, q}) do
# :random.seed(System.os_time)
# job = :random.uniform(10) * 100
# IO.puts "\t\t\t\t\tPRODUCER #{id}: Generating job #{job}"
# Process.sleep(job)
# IO.puts "\t\t\t\t\tPRODUCER #{id}: Sending job #{job} to the queue"
# send(q, {:producer, job, id})
# GenServer.cast(Producer, {:add, unit})
# loop(id, q)
# end
end
defmodule ConsumerGenserver do
use GenServer
@moduledoc """
Documentation for ProducerConsumerGenserver.
"""
def start_link(id, q) do
GenServer.start_link(__MODULE__, {id, q})
QueueGenserver.consume(q, {:consume, id})
end
#callbacks
def handle_call(:consume, _from, state) do
IO.inspect state
end
# def loop({id, q}) do
# :random.seed(System.os_time)
# job = :random.uniform(10) * 100
# IO.puts "\t\t\t\t\tPRODUCER #{id}: Generating job #{job}"
# Process.sleep(job)
# IO.puts "\t\t\t\t\tPRODUCER #{id}: Sending job #{job} to the queue"
# send(q, {:producer, job, id})
# GenServer.cast(Producer, {:add, unit})
# loop(id, q)
# end
end
defmodule ProducerGenserver do
use GenServer
@moduledoc """
Documentation for ProducerConsumerGenserver.
"""
def start_link(producer_id, q) do
GenServer.start_link(__MODULE__, {producer_id, q}, name: Producer)
end
# callbacks
def handle_cast({:produce, job}, state) do
IO.inspect state
IO.inspect job
end
# def loop({id, q}) do
# :random.seed(System.os_time)
# job = :random.uniform(10) * 100
# IO.puts "\t\t\t\t\tPRODUCER #{id}: Generating job #{job}"
# Process.sleep(job)
# IO.puts "\t\t\t\t\tPRODUCER #{id}: Sending job #{job} to the queue"
# send(q, {:producer, job, id})
# GenServer.cast(Producer, {:add, unit})
# loop(id, q)
# end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment