Skip to content

Instantly share code, notes, and snippets.

@loganlinn
Created November 7, 2020 01:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save loganlinn/0a434e780d29a4b13198c39d0afff67a to your computer and use it in GitHub Desktop.
Save loganlinn/0a434e780d29a4b13198c39d0afff67a to your computer and use it in GitHub Desktop.
diff --git a/olap_api/olap/clickhouse/BUILD b/olap_api/olap/clickhouse/BUILD
new file mode 100644
index 0000000..19c4c71
--- /dev/null
+++ b/olap_api/olap/clickhouse/BUILD
@@ -0,0 +1,14 @@
+load("@io_bazel_rules_kotlin//kotlin:kotlin.bzl", "kt_jvm_library")
+
+kt_jvm_library(
+ name = "clickhouse",
+ srcs = glob(["**/*.kt"]),
+ visibility = ["//:__subpackages__"],
+ deps = [
+ "@maven//:io_github_microutils_kotlin_logging",
+ "@maven//:io_micronaut_picocli_micronaut_picocli",
+ "@maven//:joda_time_joda_time",
+ "@maven//:org_slf4j_slf4j_api",
+ "@maven//:ru_yandex_clickhouse_clickhouse_jdbc",
+ ]
+)
\ No newline at end of file
diff --git a/olap_api/olap/clickhouse/tools/schemer.kt b/olap_api/olap/clickhouse/tools/schemer.kt
new file mode 100644
index 0000000..ae4b39a
--- /dev/null
+++ b/olap_api/olap/clickhouse/tools/schemer.kt
@@ -0,0 +1,2 @@
+package olap.clickhouse.tools
+
diff --git a/olap_api/olap/core/BUILD b/olap_api/olap/core/BUILD
new file mode 100644
index 0000000..dae87ca
--- /dev/null
+++ b/olap_api/olap/core/BUILD
@@ -0,0 +1,7 @@
+load("@io_bazel_rules_kotlin//kotlin:kotlin.bzl", "kt_jvm_library")
+
+kt_jvm_library(
+ name = "core",
+ srcs = glob(["**/*.kt"]),
+ visibility = ["//:__subpackages__"],
+)
diff --git a/olap_api/olap/core/dataset.kt b/olap_api/olap/core/dataset.kt
new file mode 100644
index 0000000..60f395c
--- /dev/null
+++ b/olap_api/olap/core/dataset.kt
@@ -0,0 +1,2 @@
+package olap.core
+
diff --git a/olap_api/olap/lib/serdes/avro/BUILD b/olap_api/olap/lib/serdes/avro/BUILD
new file mode 100644
index 0000000..85e62b9
--- /dev/null
+++ b/olap_api/olap/lib/serdes/avro/BUILD
@@ -0,0 +1,12 @@
+load("@io_bazel_rules_kotlin//kotlin:kotlin.bzl", "kt_jvm_library")
+
+kt_jvm_library(
+ name = "avro",
+ srcs = glob(["*.kt"]),
+ visibility = ["//:__subpackages__"],
+ deps = [
+ "@maven//:io_confluent_kafka_schema_serializer",
+ "@maven//:io_confluent_kafka_streams_avro_serde",
+ "@maven//:org_apache_avro_avro",
+ ]
+)
\ No newline at end of file
diff --git a/olap_api/olap/lib/serdes/avro/utils.kt b/olap_api/olap/lib/serdes/avro/utils.kt
new file mode 100644
index 0000000..0186ab3
--- /dev/null
+++ b/olap_api/olap/lib/serdes/avro/utils.kt
@@ -0,0 +1,8 @@
+package olap.lib.serdes.avro
+
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
+import org.apache.avro.specific.SpecificRecord
+
+fun <T : SpecificRecord> specificAvroSerde(config: Map<String, Any?>, isForKeys: Boolean) =
+ SpecificAvroSerde<T>().also { it.configure(config, isForKeys) }
+
diff --git a/olap_api/olap/lib/units/BUILD b/olap_api/olap/lib/units/BUILD
new file mode 100644
index 0000000..fe10fe1
--- /dev/null
+++ b/olap_api/olap/lib/units/BUILD
@@ -0,0 +1,7 @@
+load("@io_bazel_rules_kotlin//kotlin:kotlin.bzl", "kt_jvm_library")
+
+kt_jvm_library(
+ name = "units",
+ srcs = glob(["**/*.kt"]),
+ visibility = ["//:__subpackages__"],
+)
diff --git a/olap_api/olap/lib/units/information.kt b/olap_api/olap/lib/units/information.kt
new file mode 100644
index 0000000..be33426
--- /dev/null
+++ b/olap_api/olap/lib/units/information.kt
@@ -0,0 +1,39 @@
+package olap.lib.units
+
+val Long.bits: BitValue get() = BitValue(this)
+
+val Long.bytes: BitValue get() = (this * 8).bits
+
+val Long.kibibytes: BitValue get() = (this * 1024).bytes
+
+val Long.mebibytes: BitValue get() = (this * 1024).kibibytes
+
+val Long.gibibytes: BitValue get() = (this * 1024).mebibytes
+
+val Long.tebibytes: BitValue get() = (this * 1024).gibibytes
+
+val Long.pebibytes: BitValue get() = (this * 1024).tebibytes
+
+val Int.bits: BitValue get() = toLong().bits
+
+val Int.bytes: BitValue get() = toLong().bytes
+
+val Int.kibibytes: BitValue get() = toLong().kibibytes
+
+val Int.mebibytes: BitValue get() = toLong().mebibytes
+
+val Int.gibibytes: BitValue get() = toLong().gibibytes
+
+val Int.tebibytes: BitValue get() = toLong().tebibytes
+
+val Int.pebibytes: BitValue get() = toLong().pebibytes
+
+data class BitValue internal constructor(internal val bits: Long) {
+ val toBits: Long = bits
+ val toBytes: Long = toBits / 8
+ val toKibibytes: Long = toBytes / 1024
+ val toMebibytes: Long = toKibibytes / 1024
+ val toGibibytes: Long = toMebibytes / 1024
+ val toTebibytes: Long = toGibibytes / 1024
+ val toPebibytes: Long = toTebibytes / 1024
+}
diff --git a/olap_api/olap/lib/units/time.kt b/olap_api/olap/lib/units/time.kt
new file mode 100644
index 0000000..582febe
--- /dev/null
+++ b/olap_api/olap/lib/units/time.kt
@@ -0,0 +1,39 @@
+package olap.lib.units
+
+val Long.nanoseconds: TimeValue get() = TimeValue(this)
+
+val Long.microseconds: TimeValue get() = (this * 1000).nanoseconds
+
+val Long.milliseconds: TimeValue get() = (this * 1000).microseconds
+
+val Long.seconds: TimeValue get() = (this * 1000).milliseconds
+
+val Long.minutes: TimeValue get() = (this * 60).seconds
+
+val Long.hours: TimeValue get() = (this * 60).minutes
+
+val Long.days: TimeValue get() = (this * 24).hours
+
+val Int.nanoseconds: TimeValue get() = toLong().nanoseconds
+
+val Int.microseconds: TimeValue get() = toLong().microseconds
+
+val Int.milliseconds: TimeValue get() = toLong().milliseconds
+
+val Int.seconds: TimeValue get() = toLong().seconds
+
+val Int.minutes: TimeValue get() = toLong().minutes
+
+val Int.hours: TimeValue get() = toLong().hours
+
+val Int.days: TimeValue get() = toLong().days
+
+data class TimeValue internal constructor(internal val ns: Long) {
+ val toNanoseconds = ns
+ val toMicroseconds = toNanoseconds / 1000
+ val toMilliseconds = toMicroseconds / 1000
+ val toSeconds = toMilliseconds / 1000
+ val toMinutes = toSeconds / 60
+ val toHours = toMinutes / 60
+ val toDays = toHours / 24
+}
\ No newline at end of file
diff --git a/olap_api/olap/stream/conversions_sink/BUILD b/olap_api/olap/stream/conversions_sink/BUILD
new file mode 100644
index 0000000..8a6cf75
--- /dev/null
+++ b/olap_api/olap/stream/conversions_sink/BUILD
@@ -0,0 +1,32 @@
+load("@io_bazel_rules_kotlin//kotlin:kotlin.bzl", "kt_jvm_library")
+load("@rules_java//java:defs.bzl", "java_binary")
+
+package(default_visibility = ["//:__subpackages__"])
+
+kt_jvm_library(
+ name = "conversions_sink_lib",
+ srcs = glob(["**/*.kt"]),
+ visibility = ["//:__subpackages__"],
+ deps = [
+ "//olap/lib/units",
+ "//olap/lib/serdes/avro",
+ "//olap/datasets/optimizely/v1:enriched_conversion_java_proto",
+ "@maven//:com_optimizely_avro_schemas_avro_schemas_export",
+ "@maven//:io_confluent_kafka_schema_registry_client",
+ "@maven//:io_confluent_kafka_schema_serializer",
+ "@maven//:io_confluent_kafka_streams_avro_serde",
+ "@maven//:io_github_microutils_kotlin_logging",
+ "@maven//:io_micronaut_picocli_micronaut_picocli",
+ "@maven//:org_apache_avro_avro",
+ "@maven//:org_apache_kafka_kafka_streams",
+ "@maven//:org_slf4j_slf4j_api",
+ "@maven//:ru_yandex_clickhouse_clickhouse_jdbc",
+ ]
+)
+
+java_binary(
+ name = "conversions_sink",
+ main_class = "olap.stream.clickhouse_sink.ConversionsSink",
+ visibility = ["//visibility:public"],
+ runtime_deps = [":conversions_sink_lib"],
+)
diff --git a/olap_api/olap/stream/conversions_sink/ConversionsSink.kt b/olap_api/olap/stream/conversions_sink/ConversionsSink.kt
new file mode 100644
index 0000000..bcd03a0
--- /dev/null
+++ b/olap_api/olap/stream/conversions_sink/ConversionsSink.kt
@@ -0,0 +1,177 @@
+package olap.stream.conversions_sink
+
+import com.optimizely.export.enrich.model.EnrichedEventAvro
+import com.optimizely.export.enrich.model.LayerStateAvro
+import com.optimizely.export.enrich.model.SegmentAvro
+import io.confluent.kafka.serializers.subject.RecordNameStrategy
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
+import mu.KotlinLogging
+import olap.datasets.conversions.v1.EnrichedConversion
+import olap.lib.serdes.avro.specificAvroSerde
+import olap.lib.units.kibibytes
+import olap.lib.units.mebibytes
+import olap.lib.units.seconds
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.config.AbstractConfig
+import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.serialization.Serdes
+import org.apache.kafka.streams.KafkaStreams
+import org.apache.kafka.streams.StreamsBuilder
+import org.apache.kafka.streams.StreamsConfig
+import org.apache.kafka.streams.Topology
+import org.apache.kafka.streams.kstream.Consumed
+import org.apache.kafka.streams.kstream.KStream
+import org.apache.kafka.streams.kstream.ValueMapper
+import picocli.CommandLine
+import picocli.CommandLine.Command
+import picocli.CommandLine.Parameters
+import java.nio.file.Path
+import java.util.*
+import java.util.concurrent.Callable
+import java.util.function.UnaryOperator
+import kotlin.system.exitProcess
+
+private val logger = KotlinLogging.logger {}
+
+const val APPLICATION_NAME = "clickhouse-sink"
+const val APPLICATION_VERSION = "1"
+const val APPLICATION_ID = "$APPLICATION_NAME-$APPLICATION_VERSION"
+
+private val CONFIG_DEFAULTS = mapOf(
+ StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String()::class.java,
+ StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG to SpecificAvroSerde::class.java,
+ "schema.registry.url" to "http://schema-registry",
+ "key.subject.name.strategy" to RecordNameStrategy::class.java,
+ "value.subject.name.strategy" to RecordNameStrategy::class.java,
+ "auto.register.schemas" to false,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
+ ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG to 2.mebibytes.toBytes,
+ ConsumerConfig.FETCH_MAX_BYTES_CONFIG to 100.mebibytes.toBytes,
+ ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 10.seconds.toMilliseconds,
+ ConsumerConfig.RECEIVE_BUFFER_CONFIG to 8.mebibytes.toBytes,
+ ConsumerConfig.SEND_BUFFER_CONFIG to 8.mebibytes.toBytes,
+ ProducerConfig.BATCH_SIZE_CONFIG to 512.kibibytes.toBytes,
+ ProducerConfig.MAX_REQUEST_SIZE_CONFIG to 6.mebibytes.toBytes,
+ ProducerConfig.BUFFER_MEMORY_CONFIG to 128.mebibytes.toBytes
+)
+
+
+@Command(
+ name = APPLICATION_NAME,
+ mixinStandardHelpOptions = true,
+ version = [APPLICATION_VERSION]
+)
+class ConversionsSink : Callable<Int> {
+ private val props: Properties = Properties()
+
+ @Parameters(index = "0", description = ["path to configuration .properties file."])
+ lateinit var configPath: Path
+
+ @CommandLine.Option(
+ names = ["--boostrap-servers"],
+ description = ["The server(s) to connect to."]
+ )
+ var bootstrapServers: String? = null
+
+ override fun call(): Int {
+ props.putAll(CONFIG_DEFAULTS)
+ props.load(configPath.toFile().inputStream())
+ props[StreamsConfig.APPLICATION_ID_CONFIG] = APPLICATION_ID
+ if (bootstrapServers != null) {
+ props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG]
+ }
+
+ val config = ConversionsSinkConfig(props)
+ logger.info { "config $config" }
+
+ val topology: Topology = StreamsBuilder().apply {
+
+ val e3Stream: KStream<String, EnrichedEventAvro> =
+ stream(
+ config.enrichedConversionsTopic,
+ Consumed.with(
+ Serdes.String(),
+ specificAvroSerde<EnrichedEventAvro>(
+ props.stringPropertyNames()
+ .associateWith { props[it] }, false
+ )
+ )
+ )
+
+ val olapStream: KStream<String, EnrichedConversion> =
+ e3Stream.mapValues(EnrichedConversionMapper())
+
+
+ }.build()
+
+ val streams = KafkaStreams(topology, props)
+
+ streams.start()
+
+ Runtime.getRuntime().addShutdownHook(Thread { streams.close() })
+
+ return 0
+ }
+
+ companion object {
+ fun cli() = CommandLine(ConversionsSink())
+
+ @JvmStatic
+ fun main(args: Array<String>): Unit {
+ for (field in EnrichedConversion.getDescriptor().fields) {
+ logger.info("field descriptor=$field options = ${field.options}")
+ }
+
+ exitProcess(cli().execute(*args))
+ }
+ }
+}
+
+internal class ConversionsSinkConfig(originals: Map<Any, Any>) :
+ AbstractConfig(CONFIG_DEF, originals) {
+ val enrichedConversionsTopic: String get() = getString(TOPIC_ENRICHED_CONVERSIONS_CONFIG)
+ val olapEventsTopic: String get() = getString(TOPIC_OLAP_EVENT_CONFIG)
+
+ companion object {
+ const val TOPIC_ENRICHED_CONVERSIONS_CONFIG = "topic.enriched_conversions"
+ const val TOPIC_OLAP_EVENT_CONFIG = "topic.olap_events"
+
+ val CONFIG_DEF: ConfigDef = ConfigDef()
+ .define(
+ TOPIC_ENRICHED_CONVERSIONS_CONFIG,
+ ConfigDef.Type.STRING,
+ null,
+ ConfigDef.NonEmptyString(),
+ ConfigDef.Importance.HIGH,
+ "name of topic to consume E3 conversion events from"
+ )!!
+ .define(
+ TOPIC_OLAP_EVENT_CONFIG,
+ ConfigDef.Type.STRING,
+ null,
+ ConfigDef.NonEmptyString(),
+ ConfigDef.Importance.HIGH,
+ "name of topic to produce OLAP events to"
+ )!!
+ }
+}
+
+internal class EnrichedConversionMapper(
+ val experimentsMapper: (EnrichedEventAvro) -> List<LayerStateAvro> = EnrichedEventAvro::getExperiments,
+ val attributesMapper: (EnrichedEventAvro) -> List<SegmentAvro> = EnrichedEventAvro::getAttributes,
+ val tagsMapper: (EnrichedEventAvro) -> Map<CharSequence, CharSequence> = EnrichedEventAvro::getTags
+) : ValueMapper<EnrichedEventAvro, EnrichedConversion> {
+ override fun apply(value: EnrichedEventAvro): EnrichedConversion {
+ val builder = EnrichedConversion.newBuilder()
+
+ val experiments = experimentsMapper(value)
+ val attributes = attributesMapper(value)
+ val tags = tagsMapper(value)
+
+ TODO()
+
+ return builder.build()
+ }
+}
+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment