Skip to content

Instantly share code, notes, and snippets.

View geoHeil's full-sized avatar

geoHeil geoHeil

View GitHub Profile
version: "3"
services:
nifi:
image: apache/nifi:1.11.4
ports:
- 8080:8080 # Unsecured HTTP Web Port
environment:
- NIFI_WEB_HTTP_PORT=8080
- NIFI_CLUSTER_IS_NODE=true
- NIFI_CLUSTER_NODE_PROTOCOL_PORT=8082
@geoHeil
geoHeil / docker-compose.yml
Last active June 8, 2020 20:13
NiFi cannot connect to docker / fails to put record with timeout exception
version: "3"
services:
zookeeper: # the configuration manager
hostname: zookeeper
container_name: zookeeper
image: 'bitnami/zookeeper:3.6.1'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
nifi:
image: apache/nifi:1.11.4
@geoHeil
geoHeil / docker-compose.yml
Created June 8, 2020 16:42
nifi docker volume persistence
version: "3"
services:
zookeeper: # the configuration manager
hostname: zookeeper
container_name: zookeeper
image: 'bitnami/zookeeper:3.6.1'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
nifi:
image: apache/nifi:1.11.4
@geoHeil
geoHeil / docker-compose.yml
Created June 7, 2020 17:47
NiFi & Registry in docker with (not working) persistence
version: "3"
services:
nifi:
hostname: demo.nifi
image: apache/nifi:1.11.4
container_name: nifi
ports:
- 8080:8080
- 443:8443
links:
%pylab inline
import pandas as pd
import geopandas as gp
import seaborn as sns; sns.set()
import numpy as np
from h3 import h3
import monix.eval.Task
import monix.execution.Cancelable
import monix.execution.cancelables.CompositeCancelable
import scala.util.{Failure, Success, Try}
object Foo extends App {
val things = Range(1, 10)
val c = CompositeCancelable()
@geoHeil
geoHeil / foo.scala
Last active September 20, 2019 14:23
spark custom File Stream Source
package org.apache.spark.sql.execution.streaming
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
class StatefulFileStreamOptions(parameters: CaseInsensitiveMap[String])
extends FileStreamOptions(parameters) {
def this(parameters: Map[String, String]) =
this(CaseInsensitiveMap(parameters))
@geoHeil
geoHeil / foo.scala
Last active August 24, 2019 12:11
azure event hub captured avro file parsing in spark
// using from_json
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StringType
val schema = spark.read.json(df.select("Body").as[String]).schema
val otherColumns = df.drop("Body").columns.map(col)
val combined = otherColumns :+ from_json(col("Body").cast(StringType), schema).alias("Body_parsed")
val result = df.select(combined:_*)
@geoHeil
geoHeil / foo.scala
Created August 1, 2019 10:44
Validate sequence of numbers within allowed range / bounds
val valuesRight = Seq(1, 2, 3, 4, 5, 6, 7)
val valuesWrong = Seq(1, 2, 5, 6, 7, 8, 9)
val allowedValues = Range(1, 8)
def containsNotAllowedValues(allowed: Range, input: Seq[Int]): Boolean = {
!allowed.containsSlice(input)
}
containsNotAllowedValues(allowedValues, valuesRight) // expected false as no wrong element contained
// result: false ==> correct
@geoHeil
geoHeil / assertion failed
Last active April 11, 2019 07:13
Assertion failed
19/04/11 09:11:13 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 11)
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.geosparksql.expressions.ST_GeomFromWKT.eval(Constructors.scala:123)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:100)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:90)