Skip to content

Instantly share code, notes, and snippets.

@afrad
Created May 2, 2016 18:33
Show Gist options
  • Save afrad/0cb20afa8c76b24a768d6d0acdac6c1d to your computer and use it in GitHub Desktop.
Save afrad/0cb20afa8c76b24a768d6d0acdac6c1d to your computer and use it in GitHub Desktop.
Read and search common crawl wrac files directly from s3 using spark
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
import org.apache.spark.rdd.RDD
import org.apache.spark.{SerializableWritable, SparkConf, SparkContext}
import org.warcbase.io.WarcRecordWritable
import org.warcbase.mapreduce.WacWarcInputFormat
import org.warcbase.spark.archive.io.{ArchiveRecord, WarcRecord}
import org.warcbase.spark.rdd.RecordRDD._;
/**
* Created by Adel
*/
object WracBaseTest {
val pattern = """<YOUR PATTERN>"""".r
val AWS_ACCESS_KEY_ID = "fs.s3n.awsAccessKeyId"
val AWS_SECRET_ACCESS_KEY = "fs.s3n.awsSecretAccessKey"
val DEFAULT_FS = "fs.defaultFS"
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val conf = new Configuration()
conf.set(AWS_ACCESS_KEY_ID, "<YOUR KEYID>")
conf.set(AWS_SECRET_ACCESS_KEY, "<YOUR ACCESS KEY>")
conf.set(DEFAULT_FS, "s3n://aws-publicdatasets")
//val job: Job = Job.getInstance(conf)
// WarcFileFilter.setFilter(".warc.gz")
// FileInputFormat.setInputPathFilter(job, classOf[WarcFileFilter])
// FileInputFormat.setInputPaths(job, "s3n://aws-publicdatasets")
/* RecordLoader.
loadWarc("common-crawl/crawl-data/CC-MAIN-2016-07/segments/1454702039825.90/warc/CC-MAIN-20160205195359-00348-ip-10-236-182-209.ec2.internal.warc.gz", sc)*/
val r = sc.newAPIHadoopFile("/common-crawl/crawl-data/CC-MAIN-2016-07/segments/1454702039825.90/warc/CC-MAIN-20160205195359-00348-ip-10-236-182-209.ec2.internal.warc.gz",
classOf[WacWarcInputFormat], classOf[LongWritable], classOf[WarcRecordWritable], conf)
.filter(r => r._2.getRecord.getHeader.getHeaderValue("WARC-Type").equals("response"))
.map(r => new WarcRecord(new SerializableWritable(r._2))).asInstanceOf[RDD[ArchiveRecord]]
r.keepValidPages().filter (
r =>
(pattern findFirstIn r.getContentString).isDefined
).map(
r => r.getUrl
).foreach(println)
}
}
@dportabella
Copy link

cool, thanks for sharing this.
How can I specify where is the S3 storage is?
(eg, in the amazon us-east-1 region, or in a local s3n mirror)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment