-
-
Save eliasah/76a58145cefc4fec1111 to your computer and use it in GitHub Desktop.
This file is the error output of the MySQLSparkES job
We can make this file beautiful and searchable if this error is corrected: Unclosed quoted field in line 15.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
id,title,description,merchantId,price,keywords,brandId,categoryId | |
41082391,Sagem PFA721 PFA721 TONERKIT,,2045,0.000,,55480,251007 | |
37644208,Logitech 980-000431 Z506,5.1 - Puissance évaluée de RMS : 75 W - PC,2045,0.000,,44837,574228 | |
37644207,Philips SPN5087B/19 SPN5087B 8 prises pour Home Cinéma Protection contre les surtensions,"Réduisez votre consommation d'énergie. | |
Ce parasurtenseur protège vos équipements Home Cinéma. Sa fonction économie d'énergie est simple | |
d'utilisation. La télécommande fournie permet d'éteindre et d'allumer les 2 zones de prises (1 et 2) de | |
vos appareils regroupés par usage. Ainsi la consommation d'énergie des appareils en veille est éliminée. | |
Protection absolue | |
- 3 200 joules | |
- Protection des connexions téléphoniques/réseau/DSL | |
- Protection des connexions coaxiales | |
- Garantie de 300 000 € sur l'équipement | |
- Coupure de l'alimentation | |
Confort intégré | |
- Bloc 8 prises | |
- Cordon d'alimentation 3 m | |
- Dispositif de rangement du cordon intégré | |
Simplicité | |
- Prises de sécurité pour enfant automatiques et intégrées | |
- Voyants de protection intégrés, tranquillité assurée",2045,0.000,,22326,251099 | |
37644206,Logitech 910-001837 M125,USB - Quantité de boutons : 3 - Optique,2045,0.000,,44837,250736 | |
37644196,Cygnett Glam book-style case f/ iPad,,2045,0.000,,70885,251034 | |
37644197,JVC HAFX35SE HA-FX35-SE,intraaural - Avec fil,2045,0.000,,139,251052 | |
37644198,Magimix 11 504 Le Toaster 2,,2045,0.000,,70163,251187 | |
37644199,One For All URC 7557 Digital 5,,2045,0.000,,800,251028 | |
37644200,GEAR4 PG519 PG519,,2045,0.000,,1808,251034 | |
37644201,Terratec 10678 Cinergy T-Stick Dual RC,DVB-T - Contrôle à distance du portable : - USB,2045,0.000,,10298,250737 | |
37644202,Olympus N2287321 WS-650S,,2045,0.000,,20366,251032 | |
37644203,Just-Racks JRC1201 JRC1201,,2045,0.000,,18136,251158 | |
37644204,Samsung SE-S084D/TSBS SE-S084D,Vitesse d'écriture CD : 24 x - Vitesse de réécriture CD : 24 x - Vitesse de lecture CD : 24 x,2045,0.000,,68367,250743 | |
37644205,Verbatim 1TB Store 'n' Go,"Le disque dur portable Verbatim Store n Go est une solution de stockage hautes performances qui utilise une interface USB 3.0 haute vitesse. La norme USB 3.0 permet d'atteindre des débits multipliés par 10 par rapport à l'USB 2.0 (sur la base de mesure de la vitesse de bus USB), vous offrant un taux de transfert de données ultra-rapide. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
15/09/28 09:47:06 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. | |
15/09/28 09:47:06 INFO MySQLtoEs: Loading DataFrame | |
+--------+--------------------+--------------------+----------+-----+--------+-------+----------+ | |
| id| title| description|merchantId|price|keywords|brandId|categoryId| | |
+--------+--------------------+--------------------+----------+-----+--------+-------+----------+ | |
|41082391|Sagem PFA721 PFA7...| | 2045|0.000| | 55480| 251007| | |
|37644208|Logitech 980-0004...|5.1 - Puissance é...| 2045|0.000| | 44837| 574228| | |
|37644207|Philips SPN5087B/...|Réduisez votre co...| 2045|0.000| | 22326| 251099| | |
|37644206|Logitech 910-0018...|USB - Quantité de...| 2045|0.000| | 44837| 250736| | |
|37644196|Cygnett Glam book...| | 2045|0.000| | 70885| 251034| | |
|37644197|JVC HAFX35SE HA-F...|intraaural - Avec...| 2045|0.000| | 139| 251052| | |
|37644198|Magimix 11 504 Le...| | 2045|0.000| | 70163| 251187| | |
|37644199|One For All URC 7...| | 2045|0.000| | 800| 251028| | |
|37644200| GEAR4 PG519 PG519| | 2045|0.000| | 1808| 251034| | |
|37644201|Terratec 10678 Ci...|DVB-T - Contrôle ...| 2045|0.000| | 10298| 250737| | |
|37644202|Olympus N2287321 ...| | 2045|0.000| | 20366| 251032| | |
|37644203|Just-Racks JRC120...| | 2045|0.000| | 18136| 251158| | |
|37644204|Samsung SE-S084D/...|Vitesse d'écritur...| 2045|0.000| | 68367| 250743| | |
|37644205|Verbatim 1TB Stor...|Le disque dur por...| 2045|0.000| | 727| 251045| | |
|37644195|Terratec 10544 Ci...|DVB-S2 - Interne ...| 2045|0.000| | 10298| 250737| | |
|37644194|Kyocera Toner-Kit...| | 2045|0.000| | 655| 251007| | |
|37644193|Samsung EA-CBHD10...| CBHD10D| 2045|0.000| | 68367| 251177| | |
|37644192|Smeg SCP108-8 SCP...|Total oven(s) int...| 2045|0.000| | 2326| 251222| | |
|37644191|Apple iPod shuffl...|Un lien immédiat....| 2045|0.000| | 42228| 251022| | |
|37644190|Asus 90-C1CPK5-L0...|2560 x 1600 pixel...| 2045|0.000| | 53864| 251041| | |
+--------+--------------------+--------------------+----------+-----+--------+-------+----------+ | |
only showing top 20 rows | |
15/09/28 09:47:25 INFO MySQLtoEs: df.count : 6834294 | |
[Stage 6:====================================================> (9 + 1) / 10]can't handle type 0.000 | |
can't handle type [41082391,Sagem PFA721 PFA721 TONERKIT,,2045,0.000,,55480,251007] | |
15/09/28 09:47:48 ERROR Executor: Exception in task 9.0 in stage 6.0 (TID 40) | |
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: Cannot handle type [class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] within type [class scala.Tuple2], instance [[41082391,Sagem PFA721 PFA721 TONERKIT,,2045,0.000,,55480,251007]] within instance [([41082391,Sagem PFA721 PFA721 TONERKIT,,2045,0.000,,55480,251007],StructType(StructField(id,LongType,false), StructField(title,StringType,false), StructField(description,StringType,false), StructField(merchantId,IntegerType,false), StructField(price,DecimalType(15,3),false), StructField(keywords,StringType,false), StructField(brandId,LongType,false), StructField(categoryId,IntegerType,false)))] using writer [org.elasticsearch.spark.sql.DataFrameValueWriter@5ae0bcde] | |
at org.elasticsearch.hadoop.serialization.builder.ContentBuilder.value(ContentBuilder.java:63) | |
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.doWriteObject(TemplatedBulk.java:71) | |
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:58) | |
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:148) | |
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:49) | |
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:57) | |
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:57) | |
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) | |
at org.apache.spark.scheduler.Task.run(Task.scala:88) | |
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) | |
at java.lang.Thread.run(Thread.java:745) | |
15/09/28 09:47:48 WARN ThrowableSerializationWrapper: Task exception could not be deserialized | |
java.lang.ClassNotFoundException: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException | |
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) | |
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) | |
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) | |
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) | |
at java.lang.Class.forName0(Native Method) | |
at java.lang.Class.forName(Class.java:348) | |
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) | |
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) | |
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) | |
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) | |
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) | |
at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:497) | |
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) | |
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) | |
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) | |
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) | |
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) | |
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) | |
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) | |
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) | |
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) | |
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) | |
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72) | |
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98) | |
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108) | |
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) | |
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) | |
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) | |
at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) | |
at java.lang.Thread.run(Thread.java:745) | |
15/09/28 09:47:48 ERROR TaskResultGetter: Could not deserialize TaskEndReason: ClassNotFound with classloader org.apache.spark.util.MutableURLClassLoader@59e5ddf | |
15/09/28 09:47:48 WARN TaskSetManager: Lost task 9.0 in stage 6.0 (TID 40, localhost): UnknownReason | |
15/09/28 09:47:48 ERROR TaskSetManager: Task 9 in stage 6.0 failed 1 times; aborting job | |
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 6.0 failed 1 times, most recent failure: Lost task 9.0 in stage 6.0 (TID 40, localhost): UnknownReason | |
Driver stacktrace: | |
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280) | |
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268) | |
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267) | |
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) | |
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) | |
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267) | |
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) | |
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) | |
at scala.Option.foreach(Option.scala:236) | |
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) | |
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493) | |
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455) | |
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444) | |
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) | |
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) | |
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) | |
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) | |
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903) | |
at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:57) | |
at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:42) | |
at org.elasticsearch.spark.sql.EsSparkSQL.saveToEs(EsSparkSQL.scala) | |
at fr.igraal.writer.MySQLtoEs.main(MySQLtoEs.java:57) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:497) | |
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) | |
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) | |
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) | |
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) | |
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) | |
15/09/28 09:47:48 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. | |
15/09/28 09:47:48 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.log4j.Level; | |
import org.apache.log4j.Logger; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.serializer.KryoSerializer; | |
import org.apache.spark.sql.DataFrame; | |
import org.apache.spark.sql.SQLContext; | |
import org.elasticsearch.spark.sql.EsSparkSQL; | |
import java.io.Serializable; | |
import java.util.HashMap; | |
import java.util.Map; | |
public class MySQLtoEs implements Serializable { | |
public static void main(String[] args) { | |
Logger.getLogger("org").setLevel(Level.WARN); | |
Logger LOGGER = org.apache.log4j.Logger | |
.getLogger(MySQLtoEs.class); | |
String MYSQL_DRIVER = "com.mysql.jdbc.Driver"; | |
String MYSQL_USERNAME = "root"; | |
String MYSQL_PWD = "root"; | |
String MYSQL_CONNECTION_URL = "jdbc:mysql://localhost:3306/products_fr?user=" | |
+ MYSQL_USERNAME + "&password=" + MYSQL_PWD; | |
JavaSparkContext sc = new JavaSparkContext( | |
new SparkConf().setAppName("MySQLtoEs") | |
.set("es.index.auto.create", "true") | |
.set("es.nodes", "127.0.0.1:9200") | |
.set("es.mapping.id", "id") | |
.set("spark.serializer", KryoSerializer.class.getName())); | |
SQLContext sqlContext = new SQLContext(sc); | |
// Data source options | |
Map<String, String> options = new HashMap<>(); | |
options.put("driver", MYSQL_DRIVER); | |
options.put("url", MYSQL_CONNECTION_URL); | |
options.put("dbtable", "OFFERS"); | |
options.put("partitionColumn", "id"); | |
options.put("lowerBound", "10001"); | |
options.put("upperBound", "499999"); | |
options.put("numPartitions", "10"); | |
// Load MySQL query result as DataFrame | |
LOGGER.info("Loading DataFrame"); | |
DataFrame jdbcDF = sqlContext.load("jdbc", options); | |
DataFrame df = jdbcDF.select("id", "title", "description", | |
"merchantId", "price", "keywords", "brandId", "categoryId"); | |
df.show(); | |
LOGGER.info("df.count : " + df.count()); | |
EsSparkSQL.saveToEs(df, "mysql/offers"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment