Skip to content

Instantly share code, notes, and snippets.

@cdecl
Last active June 21, 2017 10:23
Show Gist options
  • Save cdecl/d6cb5c43c4cdea9fd8317dc42338141a to your computer and use it in GitHub Desktop.
Save cdecl/d6cb5c43c4cdea9fd8317dc42338141a to your computer and use it in GitHub Desktop.
import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import scala.io.Source
// 22 columna
case class Log22 (
date: String,
time: String,
s_sitename: String,
s_computername: String,
s_ip: String,
cs_method: String,
cs_uri_stem: String,
cs_uri_query: String,
s_port: String,
cs_username: String,
c_ip: String,
cs_version: String,
cs_User_Agent: String,
cs_Cookie : String,
cs_Referer : String,
cs_host: String,
sc_status: String,
sc_substatus: String,
sc_win32_status: String,
sc_bytes: String,
cs_bytes: String,
time_taken: String
)
val files = sc.wholeTextFiles("/home/cdecl/20170603/*/*/*.zip").toDF.select("_1").collect
files.foreach(fns =>
sc.binaryFiles(fns(0) match { case s: String => s})
.flatMap((file: (String, PortableDataStream)) => {
val zipStream = new ZipInputStream(file._2.open)
val entry = zipStream.getNextEntry
val iter = Source.fromInputStream(zipStream, "ISO-8859-1").getLines
iter.next
iter
})
.filter(_(0) != '#').map(_.split(" "))
.map(x => Log22(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(0), x(8), x(9), x(10), x(11), x(12), x(13), x(14), x(15), x(16), x(17), x(18), x(19), x(20), x(21)))
.toDF.write.mode("overwrite").parquet("/home/cdecl/20170603.parquet")
)
// %sh
// for f in `find . -name *.zip`; do unzip $f -d $(dirname "${f}") ; done
// for f in `find . -name *.log`; do gzip $f ; done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment