Skip to content

Instantly share code, notes, and snippets.

@samklr
samklr / binlog_rds.md
Last active December 20, 2018 14:47
Enabling RDS MySQL for Change Data Capture

Do this before you add any data to your instance.

For RDS : Create a Parameter Group with the following parameters changed :
- Set binlog_format = ROW
- Set binlog_row_image = FULL
- Set binlog_rows_query_log_events = ON (1)
- (Optional) Set max_allowed_packet = Max (Increase ...)

Once the instance is created with the above parameter group, you'll need to change retention time of the binlog

@jeqo
jeqo / KafkaStreamsTopologyGraphvizPrinter.java
Last active July 11, 2018 00:21
Generating Graphviz from Kafka Streams
import org.apache.kafka.streams.TopologyDescription;
import java.io.StringWriter;
import java.util.stream.Stream;
/**
*
*/
public class KafkaStreamsTopologyGraphvizPrinter {
@samklr
samklr / OffsetMngHbase.scala
Last active March 31, 2020 16:02
Offset Management on HBase
/*
Save offsets for each batch into HBase
*/
def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],
hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) ={
val hbaseConf = HBaseConfiguration.create()
hbaseConf.addResource("src/main/resources/hbase-site.xml")
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(hbaseTableName))
val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" +String.valueOf(batchTime.milliseconds)
@jkpl
jkpl / article.md
Last active October 17, 2023 13:45
Error handling pitfalls in Scala

Error handling pitfalls in Scala

There are multiple strategies for error handling in Scala.

Errors can be represented as [exceptions][], which is a common way of dealing with errors in languages such as Java. However, exceptions are invisible to the type system, which can make them challenging to deal with. It's easy to leave out the necessary error handling, which can result in unfortunate runtime errors.

@elakito
elakito / sc_subtest.go
Created December 13, 2016 16:32
sarama_cluster's publisher subscriber samples to test partition assignment
package main
import (
"flag"
"fmt"
"log"
"strings"
"time"
"github.com/Shopify/sarama"
@jkpl
jkpl / article.org
Last active November 9, 2022 18:46
Enforcing invariants in Scala datatypes

Enforcing invariants in Scala datatypes

Scala provides many tools to help us build programs with less runtime errors. Instead of relying on nulls, the recommended practice is to use the Option type. Instead of throwing exceptions, Try and Either types are used for representing potential error scenarios. What’s common with these features is that they’re used for capturing runtime features in the type system, thus lifting the runtime scenario handling to the compilation phase: your program doesn’t compile until you’ve explicitly handled nulls, exceptions, and other runtime features in your code.

In his “Strategic Scala Style” blog post series,

/* Terraform setup to evaluate kafka performances on various aws instances types and ebs sizes */
provider "aws" {
region = "eu-west-1"
}
variable "ssh_key_name" {
default = "ben@ici"
}
@adamw
adamw / windowing.scala
Created August 5, 2016 13:30
Windowing data in Akka
package com.softwaremill.akka
import java.time._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.collection.mutable
import scala.concurrent.Await
@longcao
longcao / SparkCopyPostgres.scala
Last active December 26, 2023 14:47
COPY Spark DataFrame rows to PostgreSQL (via JDBC)
import java.io.InputStream
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.{ DataFrame, Row }
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection
val jdbcUrl = s"jdbc:postgresql://..." // db credentials elided
val connectionProperties = {
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import sqlContext.implicits._
import org.apache.spark.sql.types.{StructType, StructField, DataType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType, StringType, BinaryType, BooleanType, TimestampType, DateType, ArrayType}
class MinBy(valueType: DataType, minType: DataType) extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(StructField("value", valueType) :: StructField("minCol", minType) :: Nil)
def bufferSchema: StructType = StructType(StructField("value", valueType) :: StructField("minCol", minType) :: Nil)