Skip to content

Instantly share code, notes, and snippets.

@IgorBerman
IgorBerman / StructuredStreaming.java
Created December 18, 2019 14:21
structured streaming example
package igorprivate;
import shaded.parquet.org.slf4j.LoggerFactory;
import static org.apache.spark.sql.functions.concat_ws;
import static org.apache.spark.sql.functions.date_format;
import static org.apache.spark.sql.functions.from_json;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.struct;
import static org.apache.spark.sql.functions.sum;
import static org.apache.spark.sql.functions.to_json;
Supposing you've installed spark with brew install spark@1.6
it will place it into /usr/local/opt/apache-spark@1.6
to use spark locally from intellij from your script define following:
1. environment variables:
PYSPARK_PYTHON=/Users/igorberm/.runtimes/Python34/bin/python3
PYTHONPATH=/usr/local/opt/apache-spark@1.6/libexec/python/lib/pyspark.zip:/usr/local/opt/apache-spark@1.6/libexec/python/lib/py4j-0.9-src.zip
PYTHONUNBUFFERED=1
SPARK_HOME=/usr/local/opt/apache-spark@1.6/libexec
@IgorBerman
IgorBerman / visualvmEc2Jmx.txt
Last active April 9, 2017 07:32
visualvm ec2 jmx
1. Setup your process java properties
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=1098 -Dcom.sun.management.jmxremote.rmi.port=1098 -Djava.rmi.server.hostname=127.0.0.1
2. ssh to your server on ec2 with a) port forwarding of 1098(jmx port) and b) with socks server on 10003
ssh -L 1098:127.0.0.1:1098 -D 10003 my.ec2.instance.com
3. Open visualvm with socks proxy on on same proxy port
jvisualvm -J-DsocksProxyHost=localhost -J-DsocksProxyPort=10003
4. create local jmx connection on localhost:1098 in visualvm
@IgorBerman
IgorBerman / SubstreamsWithOrder.java
Created April 7, 2017 15:30
Substreams with order and dispatcher, instead of ConsistentHashingPool with ask+timeout with blocking processing
//without backpressure:
ActorRef myActor = system.actorOf(
new akka.routing.ConsistentHashingPool(maxParallelism)
.withHashMapper(hashMapper)
.props(MyActor.props(blLogic).withDispatcher("my-dispatcher"))
, "my-ordered-processor-pool");
final Timeout timeout = Timeout.apply(1000L, TimeUnit.MILLISECONDS);
Flow.of(Event.class)
@IgorBerman
IgorBerman / gist:27d400705e2299c0cf8aa4f52e1fd6aa
Created March 3, 2017 15:25 — forked from debasishg/gist:8172796
A collection of links for streaming algorithms and data structures
  1. General Background and Overview
@IgorBerman
IgorBerman / StreamWithHttpPoolExample.java
Last active February 27, 2018 07:57
akka-http akka-streams java8
package com.example;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import scala.runtime.BoxedUnit;
import scala.util.Try;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
@IgorBerman
IgorBerman / ExceptionHandlingExample.java
Created January 26, 2017 10:22
akka streams in java with kill switch, shutdown hook and error handling
package com.example;
import java.util.concurrent.CompletionStage;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
@IgorBerman
IgorBerman / find-ip-with-knife-search.sh
Created November 21, 2016 09:03
find ip address with knife search
knife search "recipes:xxx*" -a ipaddress | grep ipaddress | awk '{print $2}'
@IgorBerman
IgorBerman / AvroSinkWriter.java
Created April 27, 2016 16:22
Avro writer for flink rolling sink
package org.apache.flink.streaming.connectors.fs.avro;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@IgorBerman
IgorBerman / multithreaded-bulk-loader
Created February 10, 2015 13:36
cassandra multithreaded bulk load
package com.myproject.cassandraloading;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;