Skip to content

Instantly share code, notes, and snippets.

@ijunaid8989
Forked from Manzanit0/pooled_crawler.ex
Created July 3, 2020 13:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ijunaid8989/4d9d7564dd957d3fce8c74efdae95216 to your computer and use it in GitHub Desktop.
Save ijunaid8989/4d9d7564dd957d3fce8c74efdae95216 to your computer and use it in GitHub Desktop.
Web crawler which uses Floki and HTTPoison – does 5 request at a time
# Dependencies:
# {:httpoison, "~> 1.5"},
# {:floki, "~> 0.21.0"}
# {:benchee, "~> 1.0"} (Only for benchmarking – not in the script)
defmodule CrawlQueue do
use Agent
def start_link(urls) do
queue = :queue.from_list(urls)
Agent.start_link(fn -> queue end, name: __MODULE__)
end
def pop do
queue = Agent.get(__MODULE__, & &1)
{value, queue} = pop_value(queue)
Agent.update(__MODULE__, fn _ -> queue end)
value
end
def push(url) do
Agent.update(__MODULE__, &(:queue.in(url, &1)))
end
defp pop_value(queue) do
case :queue.out(queue) do
{{:value, value}, queue} -> {value, queue}
{:empty, queue} -> {:empty, queue}
end
end
end
defmodule Crawler do
alias CrawlQueue, as: Queue
defstruct [:pool_size, :results, :current, :pending_sieve, :sieved]
def init(seed, pool_size) do
Queue.start_link([])
crawl(%__MODULE__{pool_size: pool_size, current: [seed | []], results: []})
end
defp crawl(%{current: [], results: results}), do: results
defp crawl(struct) do
if length(struct.results) >= 500 do
# Enum.each(struct.results, &IO.puts/1)
struct.results
else
struct
|> scan_async() # scans current urls and adds results to pending_sieve
|> sieve() # cleans pending_sieve
|> push_to_queue() # pushes pending_sieve to queue and adds them as results
|> take() # take 5 from queue and add them to current
|> crawl() # reestart crawl process
end
end
defp sieve(%{pending_sieve: pending, results: results} = struct) do
sieved = filter_already_scanned_urls(pending, results)
%{struct | sieved: sieved, pending_sieve: []}
end
defp filter_already_scanned_urls(urls, scanned) do
Enum.filter(urls, fn x -> !Enum.member?(scanned, x) end)
end
defp push_to_queue(%{sieved: pending, results: results} = struct) do
Enum.each(pending, &Queue.push/1)
%{struct | sieved: [], results: results ++ pending}
end
defp take(%{pool_size: pool_size, current: urls} = struct) do
if length(urls) < pool_size do
case Queue.pop() do
url when is_binary(url) -> take(%{struct | current: [url | urls]})
_empty -> struct
end
else
struct
end
end
defp scan_async([]), do: []
defp scan_async(%{current: urls} = struct) when is_list(urls) do
urls
|> Enum.map(fn url -> Task.async(fn -> scan(url) end) end)
|> Enum.map(fn t -> Task.await(t, 15_000) end)
|> List.flatten()
|> mark_as_pending_sieve(struct)
|> clear_current()
end
defp scan(url) do
try do
url
|> HTTPoison.get!([], timeout: 15_000, recv_timeout: 15_000)
|> Map.get(:body)
|> Floki.find("* a")
|> Floki.attribute("href")
rescue
# Any error when getting/parsing will mean no results are retrieved
# from that website -> There are a lot of awkward websites out there (?)
CaseClauseError -> []
HTTPoison.Error -> []
ArgumentError -> []
end
end
defp mark_as_pending_sieve(urls, struct), do: %__MODULE__{struct | pending_sieve: urls}
defp clear_current(struct), do: %__MODULE__{struct | current: []}
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment