Skip to content

Instantly share code, notes, and snippets.

@rjurney rjurney/avro.scala
Created Jun 2, 2014

Embed
What would you like to do?
Results when loading a directory full of Avros
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapred.AvroInputFormat
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import java.io.BufferedInputStream
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.FsInput
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.FileInputFormat
import java.io.File
import java.net.URI
import org.apache.spark.SerializableWritable
// spark-shell -usejavacp -classpath "*.jar"
val sparkConf = new SparkConf()
sparkConf.setMaster("spark://hivecluster2:7077")
sparkConf.setAppName("Test Spark App")
sparkConf.setJars(Array("avro-1.7.6.jar", "avro-mapred-1.7.6.jar"))
val sc = new SparkContext(sparkConf)
val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/*.avro"//part-m-000{15,16}.avro"
// Setup the path for the job vai a Hadoop JobConf
val jobConf= new JobConf(sc.hadoopConfiguration)
jobConf.setJobName("Test Scala Job")
FileInputFormat.setInputPaths(jobConf, input)
val rdd = sc.hadoopRDD(
jobConf,
classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
classOf[org.apache.hadoop.io.NullWritable],
1)
val f1 = rdd.first
val a = f1._1.datum
a.get("rawLog")
[hivedata@hivecluster2 ~]$ spark-shell -usejavacp -classpath "*.jar"
14/06/01 18:31:00 INFO spark.HttpServer: Starting HTTP Server
14/06/01 18:31:01 INFO server.Server: jetty-7.6.8.v20121106
14/06/01 18:31:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:59290
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 0.9.0
/_/
Using Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_31)
Type in expressions to have them evaluated.
Type :help for more information.
14/06/01 18:31:15 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/06/01 18:31:15 INFO Remoting: Starting remoting
14/06/01 18:31:16 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@hivecluster2:57718]
14/06/01 18:31:16 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@hivecluster2:57718]
14/06/01 18:31:16 INFO spark.SparkEnv: Registering BlockManagerMaster
14/06/01 18:31:16 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140601183116-f959
14/06/01 18:31:16 INFO storage.MemoryStore: MemoryStore started with capacity 294.4 MB.
14/06/01 18:31:16 INFO network.ConnectionManager: Bound socket to port 38297 with id = ConnectionManagerId(hivecluster2,38297)
14/06/01 18:31:16 INFO storage.BlockManagerMaster: Trying to register BlockManager
14/06/01 18:31:16 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hivecluster2:38297 with 294.4 MB RAM
14/06/01 18:31:16 INFO storage.BlockManagerMaster: Registered BlockManager
14/06/01 18:31:16 INFO spark.HttpServer: Starting HTTP Server
14/06/01 18:31:16 INFO server.Server: jetty-7.6.8.v20121106
14/06/01 18:31:16 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:48043
14/06/01 18:31:16 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.10.30.211:48043
14/06/01 18:31:16 INFO spark.SparkEnv: Registering MapOutputTracker
14/06/01 18:31:16 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-97dbc436-395e-4a97-ba7d-c2c3085a4e01
14/06/01 18:31:16 INFO spark.HttpServer: Starting HTTP Server
14/06/01 18:31:16 INFO server.Server: jetty-7.6.8.v20121106
14/06/01 18:31:16 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:37801
14/06/01 18:31:17 INFO server.Server: jetty-7.6.8.v20121106
14/06/01 18:31:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/06/01 18:31:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/06/01 18:31:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/06/01 18:31:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/06/01 18:31:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/06/01 18:31:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/06/01 18:31:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/06/01 18:31:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/06/01 18:31:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/06/01 18:31:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/06/01 18:31:17 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
14/06/01 18:31:17 INFO ui.SparkUI: Started Spark Web UI at http://hivecluster2:4040
14/06/01 18:31:17 INFO client.AppClient$ClientActor: Connecting to master spark://hivecluster2:7077...
14/06/01 18:31:19 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140601183119-0030
14/06/01 18:31:19 INFO client.AppClient$ClientActor: Executor added: app-20140601183119-0030/0 on worker-20140527153222-hivecluster6.labs.lan-7078 (hivecluster6.labs.lan:7078) with 16 cores
14/06/01 18:31:19 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140601183119-0030/0 on hostPort hivecluster6.labs.lan:7078 with 16 cores, 512.0 MB RAM
14/06/01 18:31:19 INFO client.AppClient$ClientActor: Executor added: app-20140601183119-0030/1 on worker-20140527153210-hivecluster5.labs.lan-7078 (hivecluster5.labs.lan:7078) with 8 cores
14/06/01 18:31:19 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140601183119-0030/1 on hostPort hivecluster5.labs.lan:7078 with 8 cores, 512.0 MB RAM
14/06/01 18:31:19 INFO client.AppClient$ClientActor: Executor added: app-20140601183119-0030/2 on worker-20140527153112-hivecluster3-7078 (hivecluster3:7078) with 8 cores
14/06/01 18:31:19 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140601183119-0030/2 on hostPort hivecluster3:7078 with 8 cores, 512.0 MB RAM
14/06/01 18:31:19 INFO client.AppClient$ClientActor: Executor added: app-20140601183119-0030/3 on worker-20140527153012-hivecluster1.labs.lan-7078 (hivecluster1.labs.lan:7078) with 8 cores
14/06/01 18:31:19 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140601183119-0030/3 on hostPort hivecluster1.labs.lan:7078 with 8 cores, 512.0 MB RAM
14/06/01 18:31:19 INFO client.AppClient$ClientActor: Executor added: app-20140601183119-0030/4 on worker-20140527153130-hivecluster4-7078 (hivecluster4:7078) with 8 cores
14/06/01 18:31:19 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140601183119-0030/4 on hostPort hivecluster4:7078 with 8 cores, 512.0 MB RAM
14/06/01 18:31:19 INFO client.AppClient$ClientActor: Executor updated: app-20140601183119-0030/3 is now RUNNING
14/06/01 18:31:19 INFO client.AppClient$ClientActor: Executor updated: app-20140601183119-0030/1 is now RUNNING
14/06/01 18:31:19 INFO client.AppClient$ClientActor: Executor updated: app-20140601183119-0030/0 is now RUNNING
14/06/01 18:31:19 INFO client.AppClient$ClientActor: Executor updated: app-20140601183119-0030/4 is now RUNNING
14/06/01 18:31:19 INFO client.AppClient$ClientActor: Executor updated: app-20140601183119-0030/2 is now RUNNING
Created spark context..
Spark context available as sc.
scala> 14/06/01 18:31:20 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@hivecluster6.labs.lan:57180/user/Executor#-2120763055] with ID 0
14/06/01 18:31:21 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hivecluster6.labs.lan:60578 with 294.9 MB RAM
14/06/01 18:31:21 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@hivecluster1.labs.lan:60864/user/Executor#-1149856704] with ID 3
14/06/01 18:31:21 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@hivecluster3:59778/user/Executor#-1715115200] with ID 2
14/06/01 18:31:21 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hivecluster1.labs.lan:53067 with 294.4 MB RAM
14/06/01 18:31:21 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hivecluster3:38564 with 294.4 MB RAM
14/06/01 18:31:22 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@hivecluster5.labs.lan:49255/user/Executor#1996710801] with ID 1
14/06/01 18:31:23 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@hivecluster4:46867/user/Executor#48197222] with ID 4
14/06/01 18:31:23 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hivecluster4:57869 with 294.4 MB RAM
14/06/01 18:31:24 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hivecluster5.labs.lan:45652 with 294.4 MB RAM
scala>
scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext._
scala> import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
scala>
scala> import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.GenericRecord
scala> import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapred.AvroKey
scala> import org.apache.avro.mapred.AvroInputFormat
import org.apache.avro.mapred.AvroInputFormat
scala> import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.avro.mapreduce.AvroKeyInputFormat
scala> import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.NullWritable
scala> import org.apache.commons.lang.StringEscapeUtils.escapeCsv
import org.apache.commons.lang.StringEscapeUtils.escapeCsv
scala>
scala> import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileSystem
scala> import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.Path
scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration
scala> import java.io.BufferedInputStream
import java.io.BufferedInputStream
scala> import org.apache.avro.file.DataFileStream
import org.apache.avro.file.DataFileStream
scala> import org.apache.avro.io.DatumReader
import org.apache.avro.io.DatumReader
scala> import org.apache.avro.file.DataFileReader
import org.apache.avro.file.DataFileReader
scala> import org.apache.avro.file.DataFileReader
import org.apache.avro.file.DataFileReader
scala> import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
scala> import org.apache.avro.mapred.FsInput
import org.apache.avro.mapred.FsInput
scala> import org.apache.avro.Schema
import org.apache.avro.Schema
scala> import org.apache.avro.Schema.Parser
import org.apache.avro.Schema.Parser
scala> import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.JobConf
scala> import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.FileInputFormat
scala> import java.io.File
import java.io.File
scala> import java.net.URI
import java.net.URI
scala>
scala> import org.apache.spark.SerializableWritable
import org.apache.spark.SerializableWritable
scala>
scala> val sparkConf = new SparkConf()
sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@7773bb48
scala> sparkConf.setMaster("spark://hivecluster2:7077")
res0: org.apache.spark.SparkConf = org.apache.spark.SparkConf@7773bb48
scala> sparkConf.setAppName("Test Spark App")
res1: org.apache.spark.SparkConf = org.apache.spark.SparkConf@7773bb48
scala> sparkConf.setJars(Array("avro-1.7.6.jar", "avro-mapred-1.7.6.jar"))
res2: org.apache.spark.SparkConf = org.apache.spark.SparkConf@7773bb48
scala> sparkConf.
asInstanceOf clone contains get getAkkaConf getAll getBoolean getDouble getExecutorEnv getInt
getLong getOption isInstanceOf remove set setAll setAppName setExecutorEnv setIfMissing setJars
setMaster setSparkHome toDebugString toString
scala> sparkConf.
asInstanceOf clone contains get getAkkaConf getAll getBoolean getDouble getExecutorEnv getInt
getLong getOption isInstanceOf remove set setAll setAppName setExecutorEnv setIfMissing setJars
setMaster setSparkHome toDebugString toString
scala> sparkConf.toString
res3: String = org.apache.spark.SparkConf@7773bb48
scala> val sc = new SparkContext(sparkConf)
14/06/01 18:32:29 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/06/01 18:32:29 INFO Remoting: Starting remoting
14/06/01 18:32:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@hivecluster2:51543]
14/06/01 18:32:29 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@hivecluster2:51543]
14/06/01 18:32:29 INFO spark.SparkEnv: Registering BlockManagerMaster
14/06/01 18:32:29 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140601183229-b72c
14/06/01 18:32:29 INFO storage.MemoryStore: MemoryStore started with capacity 294.4 MB.
14/06/01 18:32:29 INFO network.ConnectionManager: Bound socket to port 45136 with id = ConnectionManagerId(hivecluster2,45136)
14/06/01 18:32:29 INFO storage.BlockManagerMaster: Trying to register BlockManager
14/06/01 18:32:29 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hivecluster2:45136 with 294.4 MB RAM
14/06/01 18:32:29 INFO storage.BlockManagerMaster: Registered BlockManager
14/06/01 18:32:29 INFO spark.SparkEnv: Registering MapOutputTracker
14/06/01 18:32:29 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-c5dc3d04-1015-4f31-9bb3-412cf7bb2d0b
14/06/01 18:32:29 INFO spark.HttpServer: Starting HTTP Server
14/06/01 18:32:29 INFO server.Server: jetty-7.6.8.v20121106
14/06/01 18:32:29 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:54526
14/06/01 18:32:29 INFO server.Server: jetty-7.6.8.v20121106
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/06/01 18:32:29 WARN component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:126)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at org.eclipse.jetty.server.Server.doStart(Server.java:286)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at org.apache.spark.ui.JettyUtils$$anonfun$1.apply$mcV$sp(JettyUtils.scala:118)
at org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:118)
at org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:118)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.ui.JettyUtils$.connect$1(JettyUtils.scala:118)
at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:129)
at org.apache.spark.ui.SparkUI.bind(SparkUI.scala:57)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:159)
at $line48.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $line48.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
at $line48.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
at $line48.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
at $line48.$read$$iwC$$iwC$$iwC.<init>(<console>:52)
at $line48.$read$$iwC$$iwC.<init>(<console>:54)
at $line48.$read$$iwC.<init>(<console>:56)
at $line48.$read.<init>(<console>:58)
at $line48.$read$.<init>(<console>:62)
at $line48.$read$.<clinit>(<console>)
at $line48.$eval$.<init>(<console>:7)
at $line48.$eval$.<clinit>(<console>)
at $line48.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:788)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:833)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:745)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:593)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:600)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:603)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:926)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:876)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:968)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
14/06/01 18:32:29 WARN component.AbstractLifeCycle: FAILED org.eclipse.jetty.server.Server@3ef039cc: java.net.BindException: Address already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:126)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at org.eclipse.jetty.server.Server.doStart(Server.java:286)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at org.apache.spark.ui.JettyUtils$$anonfun$1.apply$mcV$sp(JettyUtils.scala:118)
at org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:118)
at org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:118)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.ui.JettyUtils$.connect$1(JettyUtils.scala:118)
at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:129)
at org.apache.spark.ui.SparkUI.bind(SparkUI.scala:57)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:159)
at $line48.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $line48.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
at $line48.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
at $line48.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
at $line48.$read$$iwC$$iwC$$iwC.<init>(<console>:52)
at $line48.$read$$iwC$$iwC.<init>(<console>:54)
at $line48.$read$$iwC.<init>(<console>:56)
at $line48.$read.<init>(<console>:58)
at $line48.$read$.<init>(<console>:62)
at $line48.$read$.<clinit>(<console>)
at $line48.$eval$.<init>(<console>:7)
at $line48.$eval$.<clinit>(<console>)
at $line48.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:788)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:833)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:745)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:593)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:600)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:603)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:926)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:876)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:968)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
14/06/01 18:32:29 INFO handler.ContextHandler: stopped o.e.j.s.h.ContextHandler{/,null}
14/06/01 18:32:29 INFO handler.ContextHandler: stopped o.e.j.s.h.ContextHandler{/static,null}
14/06/01 18:32:29 INFO handler.ContextHandler: stopped o.e.j.s.h.ContextHandler{/metrics/json,null}
14/06/01 18:32:29 INFO handler.ContextHandler: stopped o.e.j.s.h.ContextHandler{/executors,null}
14/06/01 18:32:29 INFO handler.ContextHandler: stopped o.e.j.s.h.ContextHandler{/environment,null}
14/06/01 18:32:29 INFO handler.ContextHandler: stopped o.e.j.s.h.ContextHandler{/stages,null}
14/06/01 18:32:29 INFO handler.ContextHandler: stopped o.e.j.s.h.ContextHandler{/stages/pool,null}
14/06/01 18:32:29 INFO handler.ContextHandler: stopped o.e.j.s.h.ContextHandler{/stages/stage,null}
14/06/01 18:32:29 INFO handler.ContextHandler: stopped o.e.j.s.h.ContextHandler{/storage,null}
14/06/01 18:32:29 INFO handler.ContextHandler: stopped o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/06/01 18:32:29 INFO ui.JettyUtils: Failed to create UI at port, 4040. Trying again.
14/06/01 18:32:29 INFO ui.JettyUtils: Error was: Failure(java.net.BindException: Address already in use)
14/06/01 18:32:29 INFO server.Server: jetty-7.6.8.v20121106
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/06/01 18:32:29 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/06/01 18:32:29 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4041
14/06/01 18:32:29 INFO ui.SparkUI: Started Spark Web UI at http://hivecluster2:4041
14/06/01 18:32:29 INFO spark.SparkContext: Added JAR avro-1.7.6.jar at http://10.10.30.211:54526/jars/avro-1.7.6.jar with timestamp 1401672749956
14/06/01 18:32:29 INFO spark.SparkContext: Added JAR avro-mapred-1.7.6.jar at http://10.10.30.211:54526/jars/avro-mapred-1.7.6.jar with timestamp 1401672749966
14/06/01 18:32:29 INFO client.AppClient$ClientActor: Connecting to master spark://hivecluster2:7077...
14/06/01 18:32:30 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140601183230-0031
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@57083eaa
scala>
scala>
scala> val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/*.avro"
input: String = hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/*.avro
scala> val jobConf= new JobConf(sc.hadoopConfiguration)
jobConf: org.apache.hadoop.mapred.JobConf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
scala> jobConf.setJobName("Test Scala Job")
scala> FileInputFormat.setInputPaths(jobConf, input)
scala> val rdd = sc.hadoopRDD(
| //confBroadcast.value.value,
| jobConf,
| classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
| classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
| classOf[org.apache.hadoop.io.NullWritable],
| 1)
14/06/01 18:33:19 INFO storage.MemoryStore: ensureFreeSpace(171879) called with curMem=0, maxMem=308713881
14/06/01 18:33:19 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 167.9 KB, free 294.2 MB)
rdd: org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroWrapper[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] = HadoopRDD[0] at hadoopRDD at <console>:45
scala> rdd.first
14/06/01 18:33:23 INFO mapred.FileInputFormat: Total input paths to process : 17
14/06/01 18:33:23 INFO spark.SparkContext: Starting job: first at <console>:48
14/06/01 18:33:23 INFO scheduler.DAGScheduler: Got job 0 (first at <console>:48) with 1 output partitions (allowLocal=true)
14/06/01 18:33:23 INFO scheduler.DAGScheduler: Final stage: Stage 0 (first at <console>:48)
14/06/01 18:33:23 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/06/01 18:33:23 INFO scheduler.DAGScheduler: Missing parents: List()
14/06/01 18:33:23 INFO scheduler.DAGScheduler: Computing the requested partition locally
14/06/01 18:33:23 INFO rdd.HadoopRDD: Input split: hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00000.avro:0+3864
14/06/01 18:33:24 INFO spark.SparkContext: Job finished: first at <console>:48, took 0.654505543 s
14/06/01 18:33:24 INFO spark.SparkContext: Starting job: first at <console>:48
14/06/01 18:33:24 INFO scheduler.DAGScheduler: Got job 1 (first at <console>:48) with 16 output partitions (allowLocal=true)
14/06/01 18:33:24 INFO scheduler.DAGScheduler: Final stage: Stage 1 (first at <console>:48)
14/06/01 18:33:24 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/06/01 18:33:24 INFO scheduler.DAGScheduler: Missing parents: List()
14/06/01 18:33:24 INFO scheduler.DAGScheduler: Submitting Stage 1 (HadoopRDD[0] at hadoopRDD at <console>:45), which has no missing parents
14/06/01 18:33:24 INFO scheduler.DAGScheduler: Submitting 16 missing tasks from Stage 1 (HadoopRDD[0] at hadoopRDD at <console>:45)
14/06/01 18:33:24 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 16 tasks
14/06/01 18:33:40 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
14/06/01 18:33:55 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
14/06/01 18:34:10 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
14/06/01 18:34:25 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
14/06/01 18:34:40 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
14/06/01 18:34:55 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
14/06/01 18:35:10 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
14/06/01 18:35:25 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
14/06/01 18:35:40 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
14/06/01 18:35:55 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.