Skip to content

Instantly share code, notes, and snippets.

@bvaradar
Last active April 29, 2019 22:42
Show Gist options
  • Save bvaradar/d155b813ae9932bef08237badf9faee0 to your computer and use it in GitHub Desktop.
Save bvaradar/d155b813ae9932bef08237badf9faee0 to your computer and use it in GitHub Desktop.
Spark Stage Retry Reproducing Patch
diff --git a/docker/compose/docker-compose_hadoop284_hive233_spark231.yml b/docker/compose/docker-compose_hadoop284_hive233_spark231.yml
index bbb9f10e..015c9e2b 100644
--- a/docker/compose/docker-compose_hadoop284_hive233_spark231.yml
+++ b/docker/compose/docker-compose_hadoop284_hive233_spark231.yml
@@ -145,6 +145,45 @@ services:
- "8081:8081"
environment:
- "SPARK_MASTER=spark://sparkmaster:7077"
+ - "SPARK_WORKER_WEBUI_PORT=8081"
+ links:
+ - "hivemetastore"
+ - "hiveserver"
+ - "hive-metastore-postgresql"
+ - "namenode"
+
+ spark-worker-2:
+ image: apachehudi/hudi-hadoop_2.8.4-hive_2.3.3-sparkworker_2.3.1:latest
+ hostname: spark-worker-2
+ container_name: spark-worker-2
+ env_file:
+ - ./hadoop.env
+ depends_on:
+ - sparkmaster
+ ports:
+ - "8082:8082"
+ environment:
+ - "SPARK_MASTER=spark://sparkmaster:7077"
+ - "SPARK_WORKER_WEBUI_PORT=8082"
+ links:
+ - "hivemetastore"
+ - "hiveserver"
+ - "hive-metastore-postgresql"
+ - "namenode"
+
+ spark-worker-3:
+ image: apachehudi/hudi-hadoop_2.8.4-hive_2.3.3-sparkworker_2.3.1:latest
+ hostname: spark-worker-3
+ container_name: spark-worker-3
+ env_file:
+ - ./hadoop.env
+ depends_on:
+ - sparkmaster
+ ports:
+ - "8083:8083"
+ environment:
+ - "SPARK_MASTER=spark://sparkmaster:7077"
+ - "SPARK_WORKER_WEBUI_PORT=8083"
links:
- "hivemetastore"
- "hiveserver"
diff --git a/docker/demo/config/spark-defaults.conf b/docker/demo/config/spark-defaults.conf
index e496b46c..f348ed67 100644
--- a/docker/demo/config/spark-defaults.conf
+++ b/docker/demo/config/spark-defaults.conf
@@ -19,7 +19,7 @@
# This is useful for setting default environmental settings.
# Example:
-spark.master local[3]
+spark.master spark://sparkmaster:7077
spark.eventLog.dir hdfs://namenode:8020/tmp/spark-events
spark.serializer org.apache.spark.serializer.KryoSerializer
#spark.executor.memory 4g
diff --git a/docker/demo/data/batch_1.json b/docker/demo/data/batch_1.json
index 3e7b149a..f9c895b3 100644
--- a/docker/demo/data/batch_1.json
+++ b/docker/demo/data/batch_1.json
@@ -3480,3 +3480,5 @@
{"volume": 10294, "symbol": "CRM", "ts": "2018-08-31 10:27:00", "month": "08", "high": 153.14, "low": 153.0607, "key": "CRM_2018-08-31 10", "year": 2018, "date": "2018/08/31", "close": 153.0607, "open": 153.14, "day": "31"}
{"volume": 6031, "symbol": "CRM", "ts": "2018-08-31 10:28:00", "month": "08", "high": 153.0, "low": 152.88, "key": "CRM_2018-08-31 10", "year": 2018, "date": "2018/08/31", "close": 152.88, "open": 153.0, "day": "31"}
{"volume": 6057, "symbol": "CRM", "ts": "2018-08-31 10:29:00", "month": "08", "high": 152.88, "low": 152.87, "key": "CRM_2018-08-31 10", "year": 2018, "date": "2018/08/31", "close": 152.87, "open": 152.88, "day": "31"}
+{"volume": 483951, "symbol": "MSFT", "ts": "2018-07-31 09:30:00", "month": "08", "high": 111.74, "low": 111.55, "key": "MSFT_2018-07-31 09", "year": 2018, "date": "2018/07/31", "close": 111.72, "open": 111.55, "day": "31"}
+{"volume": 1533226, "symbol": "AAPL", "ts": "2018-07-31 09:30:00", "month": "08", "high": 227.3101, "low": 226.23, "key": "AAPL_2018-07-31 09", "year": 2018, "date": "2018/07/31", "close": 227.3101, "open": 226.53, "day": "31"}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
index 40572f2c..c8f5b7d2 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
@@ -416,6 +416,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> preppedRecords,
String commitTime, HoodieTable<T> hoodieTable, final boolean isUpsert) {
+ logger.info("VB - Local code running !!");
// Cache the tagged records, so we don't end up computing both
// TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
index 49129eaf..6b99b36f 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
@@ -25,6 +25,7 @@ import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieInsertException;
import com.uber.hoodie.io.storage.HoodieStorageWriter;
import com.uber.hoodie.io.storage.HoodieStorageWriterFactory;
@@ -50,15 +51,16 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
private long recordsDeleted = 0;
private Iterator<HoodieRecord<T>> recordIterator;
private boolean useWriterSchema = false;
+ private final String partitionPath;
+ private boolean createdRetryFile = false;
public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId) {
super(config, commitTime, fileId, hoodieTable);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
-
this.path = makeNewPath(partitionPath);
-
+ this.partitionPath = partitionPath;
try {
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, commitTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
@@ -88,6 +90,38 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
return storageWriter.canWrite() && record.getPartitionPath().equals(writeStatus.getPartitionPath());
}
+ public void reproduceRetryFailure(String partitionPath) throws Exception {
+ Path markerRootPath = new Path(hoodieTable.getMetaClient().getMarkerFolderPath(commitTime));
+ Path path = new Path(FSUtils.getPartitionPath(markerRootPath, partitionPath), ".retry");
+ if (partitionPath.equals("2017/08/31")) {
+ if (!fs.exists(path)) {
+ createdRetryFile = true;
+ fs.create(path, false);
+ logger.warn("Sleeping for 1 min. Will write after that");
+ Thread.sleep(60 * 1000);
+ } else if (createdRetryFile) {
+ // Same task which created retry file. Let it succeed
+ logger.warn("Attempt#" + TaskContext.get().attemptNumber() + " Allowing through");
+ } else {
+ // Duplicate run
+ logger.warn("Sleeping for 1.5 mins. Will error out after that");
+ Thread.sleep(90 * 1000);
+ throw new IOException("Dummy Error");
+ }
+ } else {
+ if (!fs.exists(path)) {
+ createdRetryFile = true;
+ fs.create(path, false);
+ logger.warn("Sleeping for 10 secs. Will exit after that to force stage retry");
+ Thread.sleep(10 * 1000);
+ logger.warn("Exiting !!");
+ System.exit(-1);
+ } else {
+ logger.warn("Attempt#" + TaskContext.get().attemptNumber() + " Passing through");
+ }
+ }
+ }
+
/**
* Perform the actual writing of the given record into the backing file.
*/
@@ -116,6 +150,11 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
writeStatus.markFailure(record, t, recordMetadata);
logger.error("Error writing record " + record, t);
}
+ try {
+ reproduceRetryFailure(partitionPath);
+ } catch (Exception e) {
+ throw new HoodieException(e.getMessage(), e);
+ }
}
/**
@bvaradar
Copy link
Author

bvaradar commented Apr 29, 2019

Using the above patch, I was able to reproduce Stage retries where duplicate tasks are retried.

Steps:

  1. Apply the above patch
  2. mvn clean package -DskipTests
  3. Run docker demo (Steps 1-2) of http://hudi.incubator.apache.org/docker_demo.html
  4. While running step 2, monitor marker directory paths, http://localhost:50070/explorer.html#/user/hive/warehouse/stock_ticks_mor/.hoodie/.temp//2018/0[7,8]/31/

You will notice 2 marker files per partition. One of them is a duplicate. After deltastreamer completes, you should see that the marker folder is erased and only one parquet file per partition exists. Duplicate is cleared during finalize.

In the logs, you will also notice :

2019-04-29 22:07:37 WARN TaskSetManager:66 - Lost task 1.2 in stage 21.0 (TID 28, 172.18.0.11, executor 0): FetchFailed(null, shuffleId=6, mapId=-1, reduceId=1, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 6
at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:867)
......
2019-04-29 22:07:37 INFO TaskSetManager:54 - Task 1.2 in stage 21.0 (TID 28) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment