Skip to content

Instantly share code, notes, and snippets.

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import net.jodah.failsafe.Timeout;
public class FailsafeDemo {
public static void main(String[] args) throws Exception {
Supplier<CompletableFuture<Long>> taskSupplier = () -> {
SELECT pop, MIN(m, 30) AS metrics FROM aggregated_metrics GROUP BY pop
WITH aggregated_metrics AS (
SELECT arbitrary(pop) AS pop, arbitrary(idx) AS idx, SUM(m) as m
FROM (
VALUES
(1, ARRAY[2, 5, 7]),
(2, ARRAY[45, 8, 7]),
(1, ARRAY[100, 0, 3])
) AS log (pop, metric)
CROSS JOIN UNNEST(metric) WITH ORDINALITY AS t (m, idx)
GROUP BY pop, idx
@darkjh
darkjh / gist:d232d4379e803283045d62af37c8b3d0
Created June 16, 2021 15:06
Init horovod on EMR master
conda activate tf2
pip install pyspark==2.4.3
pip install cluster-pack
pip install s3fs
pip install petastorm==0.11.1
conda install gxx_linux-64
conda install cmake
sudo yum install hadoop-libhdfs
HOROVOD_CMAKE=/mnt/conda_envs/tf2/bin/cmake HOROVOD_WITH_TENSORFLOW=1 HOROVOD_WITHOUT_PYTORCH=1 HOROVOD_WITHOUT_MXNET=1 HOROVOD_WITH_GLOO=1 HOROVOD_WITHOUT_MPI=1 pip install git+https://github.com/darkjh/horovod@horovod-spark-executable-arg
21/05/24 21:35:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
/home/darkjh/miniconda3/envs/tf-test/lib/python3.7/site-packages/horovod/spark/common/store.py:299: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
self._fs = pa.LocalFileSystem()
/home/darkjh/miniconda3/envs/tf-test/lib/python3.7/site-packages/horovod/spark/common/store.py:299: FutureWarning: pyarrow.filesystem.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
self._fs = pa.LocalFileSystem()
--2021-05-24 21:35:59-- https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/mnist.bz2
Loaded CA certificate '/etc/ssl/certs/ca-certificates.crt'
import java.io.OutputStreamWriter
import java.io.{PipedInputStream, PipedOutputStream, ByteArrayOutputStream, PrintStream}
import java.io.{BufferedReader, InputStreamReader}
import org.apache.commons.io.output.TeeOutputStream
import scala.{App => SApp}
object TestOut extends SApp {
@darkjh
darkjh / outToIn.scala
Created September 4, 2016 23:33
OutputStream to InputStream
import java.io.OutputStreamWriter
import java.io.{PipedInputStream, PipedOutputStream, ByteArrayOutputStream, PrintStream}
import java.io.{BufferedReader, InputStreamReader}
import org.apache.commons.io.output.TeeOutputStream
import scala.{App => SApp}
object TestOut extends SApp {
val orig = System.out
import tv.teads.lib.conf._
import tv.teads.lib.job.{Job, JobValidation}
trait CoursierJob[T <: JobParameters] extends Job[T] with JobValidation {
/**
* Example cli:
* coursier launch `JOB` -- `SPARK_OPTS` -- `JOB_ARGS`
*
* where
import java.util.concurrent.{TimeUnit, Executors}
import java.util.{Collection, Properties}
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.WakeupException
import scala.collection.JavaConversions._
import java.util.concurrent.Executors
import java.util.{Properties, Collection}
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConversions._
class Consumer(threadId: String, topic: String, configs: Properties) extends Runnable {
// gracefully terminates the consumer thread