Skip to content

Instantly share code, notes, and snippets.

@simonlei
Created February 28, 2015 05:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save simonlei/3d4f0e53143d5c069ec2 to your computer and use it in GitHub Desktop.
Save simonlei/3d4f0e53143d5c069ec2 to your computer and use it in GitHub Desktop.
scala> import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.rdd.EsSpark
scala> case class Trip(id:Integer, departure: String, arrival: String)
defined class Trip
scala>
scala> val upcomingTrip = Trip( 1, "OTP", "SFO")
upcomingTrip: Trip = Trip(1,OTP,SFO)
scala> val lastWeekTrip = Trip( 2, "MUC", "OTP")
lastWeekTrip: Trip = Trip(2,MUC,OTP)
scala>
scala> val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
rdd: org.apache.spark.rdd.RDD[Trip] = ParallelCollectionRDD[1] at makeRDD at <console>:21
scala> EsSpark.saveToEs(rdd, "spark/docs2", Map( "es.mapping.id"->"id"))
15/02/28 13:29:40 INFO SparkContext: Starting job: runJob at EsSpark.scala:51
15/02/28 13:29:40 INFO DAGScheduler: Got job 1 (runJob at EsSpark.scala:51) with 4 output partitions (allowLocal=false)
15/02/28 13:29:40 INFO DAGScheduler: Final stage: Stage 1(runJob at EsSpark.scala:51)
15/02/28 13:29:40 INFO DAGScheduler: Parents of final stage: List()
15/02/28 13:29:40 INFO DAGScheduler: Missing parents: List()
15/02/28 13:29:40 INFO DAGScheduler: Submitting Stage 1 (ParallelCollectionRDD[1] at makeRDD at <console>:21), which has no missing parents
15/02/28 13:29:40 INFO MemoryStore: ensureFreeSpace(2104) called with curMem=3743, maxMem=278302556
15/02/28 13:29:40 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.1 KB, free 265.4 MB)
15/02/28 13:29:40 INFO MemoryStore: ensureFreeSpace(1655) called with curMem=5847, maxMem=278302556
15/02/28 13:29:40 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1655.0 B, free 265.4 MB)
15/02/28 13:29:40 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:54038 (size: 1655.0 B, free: 265.4 MB)
15/02/28 13:29:40 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/02/28 13:29:40 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838
15/02/28 13:29:40 INFO DAGScheduler: Submitting 4 missing tasks from Stage 1 (ParallelCollectionRDD[1] at makeRDD at <console>:21)
15/02/28 13:29:40 INFO TaskSchedulerImpl: Adding task set 1.0 with 4 tasks
15/02/28 13:29:40 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 4, localhost, PROCESS_LOCAL, 1360 bytes)
15/02/28 13:29:40 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 5, localhost, PROCESS_LOCAL, 1835 bytes)
15/02/28 13:29:40 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 6, localhost, PROCESS_LOCAL, 1360 bytes)
15/02/28 13:29:40 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 7, localhost, PROCESS_LOCAL, 1835 bytes)
15/02/28 13:29:40 INFO Executor: Running task 0.0 in stage 1.0 (TID 4)
15/02/28 13:29:40 INFO Executor: Running task 1.0 in stage 1.0 (TID 5)
15/02/28 13:29:40 INFO Executor: Running task 2.0 in stage 1.0 (TID 6)
15/02/28 13:29:40 INFO Executor: Running task 3.0 in stage 1.0 (TID 7)
15/02/28 13:29:42 INFO Executor: Finished task 2.0 in stage 1.0 (TID 6). 611 bytes result sent to driver
15/02/28 13:29:42 INFO Executor: Finished task 0.0 in stage 1.0 (TID 4). 611 bytes result sent to driver
15/02/28 13:29:42 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 6) in 2678 ms on localhost (1/4)
15/02/28 13:29:42 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 4) in 2683 ms on localhost (2/4)
15/02/28 13:29:43 ERROR Executor: Exception in task 3.0 in stage 1.0 (TID 7)
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [ScalaMapFieldExtractor for field [id]] cannot extract value from entity [class java.lang.String] | instance [Trip(2,MUC,OTP)]
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:97)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:148)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:47)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:51)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:51)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/28 13:29:43 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 5)
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [ScalaMapFieldExtractor for field [id]] cannot extract value from entity [class java.lang.String] | instance [Trip(1,OTP,SFO)]
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:97)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:148)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:47)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:51)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:51)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/28 13:29:43 WARN TaskSetManager: Lost task 3.0 in stage 1.0 (TID 7, localhost): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [ScalaMapFieldExtractor for field [id]] cannot extract value from entity [class java.lang.String] | instance [Trip(2,MUC,OTP)]
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:97)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:148)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:47)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:51)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:51)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/28 13:29:43 ERROR TaskSetManager: Task 3 in stage 1.0 failed 1 times; aborting job
15/02/28 13:29:43 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/02/28 13:29:43 INFO TaskSchedulerImpl: Cancelling stage 1
15/02/28 13:29:43 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 5, localhost): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [ScalaMapFieldExtractor for field [id]] cannot extract value from entity [class java.lang.String] | instance [Trip(1,OTP,SFO)]
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:97)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:148)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:47)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:51)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:51)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/28 13:29:43 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/02/28 13:29:43 INFO DAGScheduler: Job 1 failed: runJob at EsSpark.scala:51, took 2.700610 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 1 times, most recent failure: Lost task 3.0 in stage 1.0 (TID 7, localhost): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [ScalaMapFieldExtractor for field [id]] cannot extract value from entity [class java.lang.String] | instance [Trip(2,MUC,OTP)]
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:97)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:148)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:47)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:51)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:51)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment