Skip to content

Instantly share code, notes, and snippets.

@wezhang
wezhang / get.java
Created August 16, 2018 02:56
[Get java command line arguments]
System.getProperty("sun.java.command")
@wezhang
wezhang / request_retry.java
Created February 5, 2018 08:16
[Retry with observable]
return Observable.just(appAttempt)
.map(attempt -> getConnectUri()
.resolve("/yarnui/hn/cluster/appattempt/")
.resolve(attempt.getAppAttemptId())
.toString())
.flatMap(this::loadPageByBrowserObservable)
.retry(getRetriesMax())
.repeatWhen(ob -> ob.delay(getDelaySeconds(), TimeUnit.SECONDS))
.takeUntil(this::isSparkJobYarnAppAttemptNotJustLaunched)
.filter(this::isSparkJobYarnAppAttemptNotJustLaunched)
@wezhang
wezhang / DAGScheduler_customized.scala
Created December 4, 2017 05:24
[Get Spark Job submitted JobConf by reflection]
override private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties): Unit = {
val field = func.getClass.getDeclaredField("writerContainer$1")
field.setAccessible(true)
val writerContainer = field.get(func)
val jobField = writerContainer.getClass.getSuperclass.getDeclaredField("job")
jobField.setAccessible(true)
val job = jobField.get(writerContainer).asInstanceOf[Job].getConfiguration.asInstanceOf[JobConf]
logInfo("Output dir: " + FileOutputFormat.getOutputPath(job).toString)
super.handleJobSubmitted(jobId, finalRDD, func, partitions, callSite, listener, properties)
}
@wezhang
wezhang / fix_version_dep_build.gradle
Created November 21, 2017 03:47
[Force dependency version in gradle]
compile ('com.microsoft.azure:azure-client-runtime:1.1.1') { force = true }
compile ('com.microsoft.azure:azure-client-authentication:1.1.1') { force = true }
@wezhang
wezhang / groovy.pom.xml
Created November 14, 2017 03:15
[Groovy maven plugin]
<plugin>
<groupId>org.codehaus.gmavenplus</groupId>
<artifactId>gmavenplus-plugin</artifactId>
<executions>
<execution>
<phase>generate-test-resources</phase>
<goals>
<goal>execute</goal>
</goals>
</execution>
@wezhang
wezhang / invokeMain.java
Created September 30, 2017 08:38
[invoke java app main by reflection]
final Class<?> jobClass = Class.forName(jobClassName);
System.out.println("Run Spark Job: " + jobClass.getName());
final Method jobMain = jobClass.getMethod("main", String[].class);
final Object[] jobArgs = new Object[]{ args.subList(1, args.size()).toArray(new String[0]) };
jobMain.invoke(null, jobArgs);
@wezhang
wezhang / xstream_broken_sample.java
Created September 29, 2017 10:18
[xstream and jdom usage]
//@XStreamAlias("sparkJobLocalRunConfiguration")
@Tag("spark-local-run-configurable-model")
public class SparkLocalRunConfigurableModel implements CommonJavaRunConfigurationParameters {
@XStreamAlias("isParallelExecution")
private boolean isParallelExecution;
@XStreamAlias("isPassParentEnvs")
private boolean isPassParentEnvs;
@XStreamOmitField
@NotNull
private Project project;
@wezhang
wezhang / get_classpath.scala
Last active September 19, 2017 03:01
[Get current classpath in runtime]
ClassLoader.getSystemClassLoader.asInstanceOf[java.net.URLClassLoader].getURLs
@wezhang
wezhang / add_invocation.scala
Created September 15, 2017 03:35
[Add invocation in mocked method] #jmockit
@Mock
def globStatus(invocation: Invocation, pathPattern: Path, filter: PathFilter): Array[FileStatus] = {
val globberClassConstructor = Class.forName("org.apache.hadoop.fs.Globber").getDeclaredConstructor(classOf[FileSystem], classOf[Path], classOf[PathFilter])
globberClassConstructor.setAccessible(true)
val globber = globberClassConstructor.newInstance(invocation.getInvokedInstance, pathPattern, filter)
Deencapsulation.invoke(globber.asInstanceOf[Object], "glob")
val globberClassConstructor = Class.forName("org.apache.hadoop.fs.Globber").getDeclaredConstructor(classOf[FileSystem], classOf[Path], classOf[PathFilter])
globberClassConstructor.setAccessible(true)
val globber = globberClassConstructor.newInstance(this, pathPattern, filter)