Skip to content

Instantly share code, notes, and snippets.

@eliasah
Last active September 28, 2015 15:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eliasah/76a58145cefc4fec1111 to your computer and use it in GitHub Desktop.
Save eliasah/76a58145cefc4fec1111 to your computer and use it in GitHub Desktop.
This file is the error output of the MySQLSparkES job
We can make this file beautiful and searchable if this error is corrected: Unclosed quoted field in line 15.
id,title,description,merchantId,price,keywords,brandId,categoryId
41082391,Sagem PFA721 PFA721 TONERKIT,,2045,0.000,,55480,251007
37644208,Logitech 980-000431 Z506,5.1 - Puissance évaluée de RMS : 75 W - PC,2045,0.000,,44837,574228
37644207,Philips SPN5087B/19 SPN5087B 8 prises pour Home Cinéma Protection contre les surtensions,"Réduisez votre consommation d'énergie.
Ce parasurtenseur protège vos équipements Home Cinéma. Sa fonction économie d'énergie est simple
d'utilisation. La télécommande fournie permet d'éteindre et d'allumer les 2 zones de prises (1 et 2) de
vos appareils regroupés par usage. Ainsi la consommation d'énergie des appareils en veille est éliminée.
Protection absolue
- 3 200 joules
- Protection des connexions téléphoniques/réseau/DSL
- Protection des connexions coaxiales
- Garantie de 300 000 € sur l'équipement
- Coupure de l'alimentation
Confort intégré
- Bloc 8 prises
- Cordon d'alimentation 3 m
- Dispositif de rangement du cordon intégré
Simplicité
- Prises de sécurité pour enfant automatiques et intégrées
- Voyants de protection intégrés, tranquillité assurée",2045,0.000,,22326,251099
37644206,Logitech 910-001837 M125,USB - Quantité de boutons : 3 - Optique,2045,0.000,,44837,250736
37644196,Cygnett Glam book-style case f/ iPad,,2045,0.000,,70885,251034
37644197,JVC HAFX35SE HA-FX35-SE,intraaural - Avec fil,2045,0.000,,139,251052
37644198,Magimix 11 504 Le Toaster 2,,2045,0.000,,70163,251187
37644199,One For All URC 7557 Digital 5,,2045,0.000,,800,251028
37644200,GEAR4 PG519 PG519,,2045,0.000,,1808,251034
37644201,Terratec 10678 Cinergy T-Stick Dual RC,DVB-T - Contrôle à distance du portable : - USB,2045,0.000,,10298,250737
37644202,Olympus N2287321 WS-650S,,2045,0.000,,20366,251032
37644203,Just-Racks JRC1201 JRC1201,,2045,0.000,,18136,251158
37644204,Samsung SE-S084D/TSBS SE-S084D,Vitesse d'écriture CD : 24 x - Vitesse de réécriture CD : 24 x - Vitesse de lecture CD : 24 x,2045,0.000,,68367,250743
37644205,Verbatim 1TB Store 'n' Go,"Le disque dur portable Verbatim Store n Go est une solution de stockage hautes performances qui utilise une interface USB 3.0 haute vitesse. La norme USB 3.0 permet d'atteindre des débits multipliés par 10 par rapport à l'USB 2.0 (sur la base de mesure de la vitesse de bus USB), vous offrant un taux de transfert de données ultra-rapide.
15/09/28 09:47:06 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
15/09/28 09:47:06 INFO MySQLtoEs: Loading DataFrame
+--------+--------------------+--------------------+----------+-----+--------+-------+----------+
| id| title| description|merchantId|price|keywords|brandId|categoryId|
+--------+--------------------+--------------------+----------+-----+--------+-------+----------+
|41082391|Sagem PFA721 PFA7...| | 2045|0.000| | 55480| 251007|
|37644208|Logitech 980-0004...|5.1 - Puissance é...| 2045|0.000| | 44837| 574228|
|37644207|Philips SPN5087B/...|Réduisez votre co...| 2045|0.000| | 22326| 251099|
|37644206|Logitech 910-0018...|USB - Quantité de...| 2045|0.000| | 44837| 250736|
|37644196|Cygnett Glam book...| | 2045|0.000| | 70885| 251034|
|37644197|JVC HAFX35SE HA-F...|intraaural - Avec...| 2045|0.000| | 139| 251052|
|37644198|Magimix 11 504 Le...| | 2045|0.000| | 70163| 251187|
|37644199|One For All URC 7...| | 2045|0.000| | 800| 251028|
|37644200| GEAR4 PG519 PG519| | 2045|0.000| | 1808| 251034|
|37644201|Terratec 10678 Ci...|DVB-T - Contrôle ...| 2045|0.000| | 10298| 250737|
|37644202|Olympus N2287321 ...| | 2045|0.000| | 20366| 251032|
|37644203|Just-Racks JRC120...| | 2045|0.000| | 18136| 251158|
|37644204|Samsung SE-S084D/...|Vitesse d'écritur...| 2045|0.000| | 68367| 250743|
|37644205|Verbatim 1TB Stor...|Le disque dur por...| 2045|0.000| | 727| 251045|
|37644195|Terratec 10544 Ci...|DVB-S2 - Interne ...| 2045|0.000| | 10298| 250737|
|37644194|Kyocera Toner-Kit...| | 2045|0.000| | 655| 251007|
|37644193|Samsung EA-CBHD10...| CBHD10D| 2045|0.000| | 68367| 251177|
|37644192|Smeg SCP108-8 SCP...|Total oven(s) int...| 2045|0.000| | 2326| 251222|
|37644191|Apple iPod shuffl...|Un lien immédiat....| 2045|0.000| | 42228| 251022|
|37644190|Asus 90-C1CPK5-L0...|2560 x 1600 pixel...| 2045|0.000| | 53864| 251041|
+--------+--------------------+--------------------+----------+-----+--------+-------+----------+
only showing top 20 rows
15/09/28 09:47:25 INFO MySQLtoEs: df.count : 6834294
[Stage 6:====================================================> (9 + 1) / 10]can't handle type 0.000
can't handle type [41082391,Sagem PFA721 PFA721 TONERKIT,,2045,0.000,,55480,251007]
15/09/28 09:47:48 ERROR Executor: Exception in task 9.0 in stage 6.0 (TID 40)
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: Cannot handle type [class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] within type [class scala.Tuple2], instance [[41082391,Sagem PFA721 PFA721 TONERKIT,,2045,0.000,,55480,251007]] within instance [([41082391,Sagem PFA721 PFA721 TONERKIT,,2045,0.000,,55480,251007],StructType(StructField(id,LongType,false), StructField(title,StringType,false), StructField(description,StringType,false), StructField(merchantId,IntegerType,false), StructField(price,DecimalType(15,3),false), StructField(keywords,StringType,false), StructField(brandId,LongType,false), StructField(categoryId,IntegerType,false)))] using writer [org.elasticsearch.spark.sql.DataFrameValueWriter@5ae0bcde]
at org.elasticsearch.hadoop.serialization.builder.ContentBuilder.value(ContentBuilder.java:63)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.doWriteObject(TemplatedBulk.java:71)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:58)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:148)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:49)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:57)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:57)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/09/28 09:47:48 WARN ThrowableSerializationWrapper: Task exception could not be deserialized
java.lang.ClassNotFoundException: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/09/28 09:47:48 ERROR TaskResultGetter: Could not deserialize TaskEndReason: ClassNotFound with classloader org.apache.spark.util.MutableURLClassLoader@59e5ddf
15/09/28 09:47:48 WARN TaskSetManager: Lost task 9.0 in stage 6.0 (TID 40, localhost): UnknownReason
15/09/28 09:47:48 ERROR TaskSetManager: Task 9 in stage 6.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 6.0 failed 1 times, most recent failure: Lost task 9.0 in stage 6.0 (TID 40, localhost): UnknownReason
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:57)
at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:42)
at org.elasticsearch.spark.sql.EsSparkSQL.saveToEs(EsSparkSQL.scala)
at fr.igraal.writer.MySQLtoEs.main(MySQLtoEs.java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/09/28 09:47:48 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/09/28 09:47:48 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.elasticsearch.spark.sql.EsSparkSQL;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public class MySQLtoEs implements Serializable {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.WARN);
Logger LOGGER = org.apache.log4j.Logger
.getLogger(MySQLtoEs.class);
String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
String MYSQL_USERNAME = "root";
String MYSQL_PWD = "root";
String MYSQL_CONNECTION_URL = "jdbc:mysql://localhost:3306/products_fr?user="
+ MYSQL_USERNAME + "&password=" + MYSQL_PWD;
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("MySQLtoEs")
.set("es.index.auto.create", "true")
.set("es.nodes", "127.0.0.1:9200")
.set("es.mapping.id", "id")
.set("spark.serializer", KryoSerializer.class.getName()));
SQLContext sqlContext = new SQLContext(sc);
// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");
// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
"merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "mysql/offers");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment