Handle state:
- GenServer: persistent stateful server; common for embedding state in request-response.
- DRYer macros: https://github.com/sasa1977/exactor
- GenEvent: event handler, has state too, seems similar to GenServer
- GenFsm: FSM, simpler than GenServer (which can also store state in its State variable).
- plain_fsm: OTP like gen_fsm but with selective receive (reduces FSM complexity).
- Task: one action, no communication, do tasks from queue but not for periodic tasks (1.0: can't get system messages yet)
- Agent: accessible (shared) mutable state; do expensive operations outside or they'll block.
Pooling:
- Poolboy or a finite number of actors?
- concurrent example: https://gist.github.com/henrik/ceede9c4d9bf3fcb4dd5
- ^ Pooler 'protects members (gen_servers/gen_fsms) from being used concurrently'; I imagine Tasks may not need this?
- Or.. is there an 'unconstrained' Poolboy? Just high overflow even if the default number is low?
- This pooling assumes I don't care about their differences in state; isn't 'anonymous worker' the domain of Tasks?
- Can keep static references specific workers in [app].ex's children[] as well.
^ what of mailboxes as queue?
Flow diagram (GenEvent?):
- constant?
- constant
- variable:
- private?
- temp?
- variable
- semi-persistent:
- Task
- temp?
- public:
- no processing?
- need persistence?
- (parts) processed in order?
- Queue
- random access:
- k/v store
- (parts) processed in order?
- no persistence:
- Agent
- need persistence?
- processing (+ persistent?):
- state simple?
- want selective receive?
- plain_fsm
- simple receive:
- GenFsm
- want selective receive?
- complex state:
- GenServer
- state simple?
- no processing?
- private?
Use cases:
-
holding queue channel: Agent (or GenServer but doesn't need to do processing)
- if messages big, queue connection small: Agent
- pass channel copy to workers on creation
- mem worth asking only once (right?)
- on reboot possibly need new AMQP/socket channels though
- pass channel copy to workers on creation
- if messages small, queue connection big: GenServer
- if messages big, queue connection small: Agent
-
managing websockets:
- ^ client events handled by channel, question is just about accessing socket after async processing
- ideomatic approach: instead use sync processing and rely on many threads?
- would work normally but not for scraping
- send events through the channel's send -> handle_info?
- Agent storing socket[s?] for retrieval?
- GenServer storing socket to send events?
- Task consuming queue events?
- one task per socket? seems elegant.
- also easier to kill canceled tasks?
- holding all sockets? no, memory overhead.
- retrieve sockets from Agent each time? meh, CPU overhead.
- one task per socket? seems elegant.
- so Task vs. GenServer... try GenServer due to the ability to persist state across restarts?
- ^ if messages small, queue connection big: GenServer
- if messages big, queue connection small: Agent storing all sockets; fetcher:
- Task? -- I don't think so, this isn't so much about active consuming, in the sense you subscribe to the AMQP topic and need to handle some different communication from the AMQP server.
- GenServer
-
scraping/processing: Tasks (or GenServers but don't need to ask stuff) consuming queue
-
check if metadata can make round-trip so as to give response ^ uh-oh, Kafka doesn't do message-level custom metadata, try AMQP instead?
- later: reconstruct request-reply to correlate response
-
port scraper (fetchir)
-
integrate front/back
RabbitMQ drop-in: FIFO queues, persistent + by domain
- GenServers - shouldn't survive reboot
- Agents (can block) - shouldn't survive reboot
- (D)ETS:
- select: -> Selection; supposedly faster??
- match: returns bindings -> [Match]
- match_object: returns objects -> Objects
- all /3 (table = domain, pattern = '$1', n = 1); use continuation with /1 until end
- select/match {x, Continuation} | '$end_of_table' | {error, Reason}
safe_fixtable(Name, Fix) -> ok
delays resizing until released.- table type
ordered_set
30~40% faster thanset
(no DETS); using key faster too. - pattern: wildcard '_', pattern variables $0/$1/... to match results.
- [{ #person{idno='', name='', age='$1'}, [], ['$1']}]
- {'_',dog,'$1'}
- Mnesia: RAM/disk (ETS/DETS) combination with sharding, transactions, and distribution.
- http://www.erlang.org/doc/man/mnesia.html
- ETS works on tuples so no field names; Mnesia uses records.
- has wrappers of (D)ETS's
select/3
andmatch_object
- http://erlang.org/pipermail/erlang-questions/2008-February/032742.html
- ^ use mnesia:select(Tab, MatchSpec, NObjs, Lock) (not a popping transaction?) or QLC (includes delete but much more costly than limited select()):
C = qlc:cursor(qlc:q([X || X <- qlc:append(QH1, QH2)],
{unique,true})),
R = qlc:next_answers(C, 5),
ok = qlc:delete_cursor(C),
R.
- maybe check how RabbitMQ uses mnesia for this?
- uses native Erlang OTP queue module, though I'll need to tag on persistence...
Rate limiting:
Maar denk dat ik concurrency in Elixir nog niet goed genoeg begrijp... weet b.v. niet zeker of er nou al actors zijn waarvoor het concurrency b.v. al automatisch afhandelt ofzo. Want voor state-loze delen (alles buiten GenServers e.a.) lijkt dat me ergens een logische stap in al deze Erlang zooi.
Maar daarnaast zijn er nog wel andere dingen waarvan ik nog weinig weet. Voor rate limiting erbij zeg maar, ik weet nog niet eens zeker of gewoon sleep() gebruiken b.v. je qua CPU cycles keihard verneukt ofzo (danwel voorkomt dat zo'n actor als een GenServer messages in z'n mailbox kan afhandelen). Wellicht ook niet de meest standaard use-case, al lijkt iets met focus op concurrency als dit er wel minder slecht voor dan... andere zooi.
Had nu een throttler (GenServer) die met scheduling (send_interval) credits bijtelde ja. Had die zeg maar centraal b.w.v. garantie dat het globaal was i.p.v. dat verschillende threads (over meerdere nodes?) elk eigen credits konden maken, al kan dat wellicht ook anders.
Bestaande rate-limit libs leken nl. veelal boolean "ja je mag al" vs. "nee nog niet". Wat me dus vragen gaf over het voorkomen dat je CPU cycles allemaal gealloceerd worden aan "are we there yet?" danwel blocking sleeps ofzo.
Maak je maar 1 consumer dan gaat schedulen in het proces inderdaad ook wel (i.i.g. geen "are we there yet?"), maar werk (URLs) zo laat mogelijk over workers (scrapers) alloceren was eigenlijk juist gunstig i.v.m. potentieel verschil in prioriteit, allocatie over een variabel aantal workers, binnen dat RabbitMQ paradigma ack timeouts, ... . Dus komt wel het een en ander bij kijken qua overwegingen...
Tests (concurrency
project): sleeping threads blocken niet elkaar (extern); voor sleeping genserver wel inkomende messages (intern)?
concurrent: ```
spawn(fn -> IO.puts(CounterAgent.sleep_client(c)) end)
def sleep_client(pid) do
n = Agent.get(pid, &(&1)); :timer.sleep(1000); n
end
sequential: ```
spawn(fn -> IO.puts(CounterAgent.sleep_server(c)) end)
def sleep_server(pid) do
Agent.get(pid, fn n -> :timer.sleep(1000); n end)
end
so sleeping seems fine. so I'll use sequential blocking sleeps within the Throttler, with one Throttler instance per domain (per node). so far I only had one instance. names/ids seem to solve this. what's the difference?
- names: substitute for process ID: ```
{:ok, pid} = GenServer.start_link(Api.Throttler, nil, name: :foo)
GenServer.call(pid, {:get, "a"})
GenServer.call(:foo, {:get, "a"})
Process.whereis(:foo) # :: pid | port | nil
GenServer.call({MyStack, :"node@10.0.3.179"}, :pop)
- ids: defined at the top level as an option in `worker()`: ```
# :id - a name used to identify the child specification internally by the supervisor; defaults to the given module name
worker(Api.AmqpSub, [args], id: :responder)
if the throttler sleeps ('server' block, sequential) I'd need a separate throttler per domain. alt, current route: make the client sleep; can no longer have more clients per throttler.