Skip to content

Instantly share code, notes, and snippets.

@jschoch
Last active August 29, 2015 13:57
Show Gist options
  • Save jschoch/9376038 to your computer and use it in GitHub Desktop.
Save jschoch/9376038 to your computer and use it in GitHub Desktop.
defmodule Bulk do
require Lager
@place 15
def bwrite(filename,sep) do
Lager.info "destroying directory ./import"
clean
make_utl
setup_import_dir
Lager.info "opening file #{filename}"
streams = make_stream(filename,sep)
Lager.info "collecting"
collection = collect(streams)
end
def make_utl do
case :ets.info(:utl) do
:undefined ->
Lager.info "creating :utl table"
:ets.new(:utl,[:set,:named_table])
info ->
Lager.info "ETS exists self: #{inspect self}, #{inspect info}"
pid = info["owner"]
if (pid != self) do
Lager.error "something bad happened with our pid"
end
end
end
defrecord PayCollect,id: nil,accounts: []
def collect(streams) do
count = 0
payer_count = 0
start = {0,[],count,payer_count}
{last,stuff,count,payer_count} = Enum.reduce(streams,start,fn(stream,acc) ->
{last,stuff,count,payer_count} = acc
count = count + 1
print_if("total: ",10000,count)
list = Enum.to_list(stream)
{last,stuff,payer_count} = process_line(list,last,stuff,payer_count)
{last,stuff,count,payer_count}
end)
push(stuff,payer_count)
end
def print_if(s,n,count) do
if (:erlang.rem(count , n) == 0) do
Lager.info "Count: #{s} #{count}"
Lager.info "memory #{inspect :erlang.memory}"
:erlang.garbage_collect()
end
end
def process_line(list,last,stuff,payer_count) do
payer_count = payer_count + 1
case validate(list) do
false ->
nil
an when last == 0 ->
p = PayCollect.new(id: an,accounts: list)
stuff = [p]
last = an
# TODO fix this
an when an != last and payer_count > 2000 ->
Lager.error "payer #{an} count over 2000, skipping the rest. "
push(stuff,payer_count)
payer_count = 1
p =PayCollect.new(id: an,accounts: list)
stuff = [p]
last = an
an when an != last ->
push(stuff,payer_count)
payer_count = 1
p =PayCollect.new(id: an,accounts: list)
stuff = [p]
last = an
an ->
p =PayCollect.new(id: an,accounts: list)
stuff = [p|stuff]
last = an
end
{last,stuff,payer_count}
end
def validate(list) do
an = Enum.at(list,@place)
[{:head_count,head_count}] = :ets.lookup(:utl,:head_count)
list_count = Enum.count(list)
case list_count == head_count && Regex.match?(%r/\d\d\d\d\d\d\d\d\d\d\d\d/,an) do
true ->
an
false ->
Lager.error "#{an} failed validation. list: #{inspect list}"
false
end
end
def push(stuff,payer_count) do
first = Enum.at(stuff,0)
#Lager.info "pushing #{first.id} #{payer_count}"
setup_dir(Mix.env)
file = "import/#{Mix.env}/#{first.id}"
File.write(file,:erlang.term_to_binary(stuff))
end
def setup_dir(an) do
dir = "import/#{an}"
case File.dir?(dir) do
true ->
true
false ->
:ok = File.mkdir(dir)
#Lager.info inspect {"create dir" ,res}
end
end
def setup_import_dir do
case File.dir?("import") do
true -> true
false ->
res = File.mkdir("import")
Lager.info inspect {"create dir" ,res}
end
end
def make_stream(filename,sep) do
fstream = File.stream!(filename,[:read, :utf8],:line)
head = Enum.at(fstream,0) |> String.replace("\n","") |> String.split(sep)
:ets.insert(:utl,{:head_count,Enum.count(head)})
:ets.insert(:utl,{:head,head})
fstream = Stream.drop(fstream,1)
streams = Stream.map(fstream,fn(line) ->
String.replace(line,"\n","") |>
String.split(sep)
end)
streams
end
def bread do
Enum.each(File.ls!("import/#{Mix.env}"), fn(file) ->
#Lager.info inspect file
stuff = File.read!("import/#{Mix.env}/#{file}") |> :erlang.binary_to_term
parse_zip_and_put(file,stuff)
end)
end
def parse_zip_and_put(payer,stuff) do
parsed = Enum.map(stuff,fn(x) ->
list = x.accounts
[{:head,head}] = :ets.lookup(:utl,:head)
zipped = Enum.zip(head,list)
|> Enum.filter(fn({k,v})-> v != "0" || Regex.match?(%r/TOTAL/,k) end)
parsed = Enum.map(zipped,fn(item) ->
parse_item(item)
end)
parsed
end)
#consolidate
Import.send_to_db({payer,parsed},:crap)
end
def parse_item({k,item}) do
case Regex.match?(%r/^\d\d\d\d\d\d\d\d\d\d\d\d$/,item) || Regex.match?(%r/\d+-...-\d+/,item) do
true ->
#Lager.info "match on #{k}"
{k,item}
false ->
case Float.parse(item) do
:error ->
{k,item}
{a,b} when b == "" ->
#Lager.info "parsed #{k}:#{a}"
{k,a}
otherwise ->
Lager.error "float parse produced spectacular and unexpected results #{inspect item}"
{k,item}
end
end
end
def clean do
#File.rmdir!("import")
System.cmd("rm -rf import")
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment