Skip to content

Instantly share code, notes, and snippets.

View geoHeil's full-sized avatar

geoHeil geoHeil

View GitHub Profile
@geoHeil
geoHeil / Foo.scala
Last active July 23, 2020 08:44
failing
// setup in bash
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.11.1/flink-connector-kafka_2.11-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka-base_2.11/1.11.1/flink-connector-kafka-base_2.11-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.11.1/flink-avro-confluent-registry-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.11.1/flink-avro-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/force-shading/1.11.1/force-shading-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P lib/
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.11.1/jackson-core-2.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-d
@geoHeil
geoHeil / Tweet.scala
Created June 29, 2020 12:16
new specific type class
/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package com.github.geoheil.streamingreference
import scala.annotation.switch
final case class Tweet(var tweet_id: Option[String], var text: Option[String], var source: Option[String], var geo: Option[String], var place: Option[String], var lang: Option[String], var created_at: Option[String], var timestamp_ms: Option[String], var coordinates: Option[String], var user_id: Option[Long], var user_name: Option[String], var screen_name: Option[String], var user_created_at: Option[String], var followers_count: Option[Long], var friends_count: Option[Long], var user_lang: Option[String], var user_location: Option[String], var hashtags: Option[Seq[String]]) extends org.apache.avro.specific.SpecificRecordBase {
def this() = this(None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None)
def get(field$: Int): AnyRef = {
(field$: @switch) match {
case 0 => {
version: "3"
services:
nifi:
image: apache/nifi:1.11.4
ports:
- 8080:8080 # Unsecured HTTP Web Port
environment:
- NIFI_WEB_HTTP_PORT=8080
- NIFI_CLUSTER_IS_NODE=true
- NIFI_CLUSTER_NODE_PROTOCOL_PORT=8082
@geoHeil
geoHeil / docker-compose.yml
Last active June 8, 2020 20:13
NiFi cannot connect to docker / fails to put record with timeout exception
version: "3"
services:
zookeeper: # the configuration manager
hostname: zookeeper
container_name: zookeeper
image: 'bitnami/zookeeper:3.6.1'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
nifi:
image: apache/nifi:1.11.4
@geoHeil
geoHeil / docker-compose.yml
Created June 7, 2020 17:47
NiFi & Registry in docker with (not working) persistence
version: "3"
services:
nifi:
hostname: demo.nifi
image: apache/nifi:1.11.4
container_name: nifi
ports:
- 8080:8080
- 443:8443
links:
import monix.eval.Task
import monix.execution.Cancelable
import monix.execution.cancelables.CompositeCancelable
import scala.util.{Failure, Success, Try}
object Foo extends App {
val things = Range(1, 10)
val c = CompositeCancelable()
@geoHeil
geoHeil / foo.scala
Last active September 20, 2019 14:23
spark custom File Stream Source
package org.apache.spark.sql.execution.streaming
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
class StatefulFileStreamOptions(parameters: CaseInsensitiveMap[String])
extends FileStreamOptions(parameters) {
def this(parameters: Map[String, String]) =
this(CaseInsensitiveMap(parameters))
@geoHeil
geoHeil / foo.scala
Last active August 24, 2019 12:11
azure event hub captured avro file parsing in spark
// using from_json
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StringType
val schema = spark.read.json(df.select("Body").as[String]).schema
val otherColumns = df.drop("Body").columns.map(col)
val combined = otherColumns :+ from_json(col("Body").cast(StringType), schema).alias("Body_parsed")
val result = df.select(combined:_*)
@geoHeil
geoHeil / foo.scala
Created August 1, 2019 10:44
Validate sequence of numbers within allowed range / bounds
val valuesRight = Seq(1, 2, 3, 4, 5, 6, 7)
val valuesWrong = Seq(1, 2, 5, 6, 7, 8, 9)
val allowedValues = Range(1, 8)
def containsNotAllowedValues(allowed: Range, input: Seq[Int]): Boolean = {
!allowed.containsSlice(input)
}
containsNotAllowedValues(allowedValues, valuesRight) // expected false as no wrong element contained
// result: false ==> correct
@geoHeil
geoHeil / assertion failed
Last active April 11, 2019 07:13
Assertion failed
19/04/11 09:11:13 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 11)
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.geosparksql.expressions.ST_GeomFromWKT.eval(Constructors.scala:123)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:100)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:90)