Skip to content

Instantly share code, notes, and snippets.

View natewave's full-sized avatar

Nizar S. natewave

View GitHub Profile
package natewave
import akka.Done
import scala.concurrent.Future
import scala.util.{Failure, Success}
object GraphTest extends App {
import akka.actor.ActorSystem
import akka.stream._
def upsert[Pk, A](dbContext: DbContext[Pk, A], x: A): Query[Unit, Connection] =
queryBuilder.write { implicit connection => implicit ec =>
val params = dbContext.params(x)
val keys = params.map(_.name)
val cols = keys.mkString("(", ",", ")")
val placeholders = keys.mkString("({", "},{", "})")
val values = keys.flatMap { key =>
params.find(_.name == key).map(DbContext.anormNamedParameter)
}
def getCommittedOffsetsForConsumer(consumerConfig: KafkaConsumerConfig, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = {
val result = withCustomStringConsumer(consumer => {
consumer.assign(topicPartitions.toList.asJava)
val tps = topicPartitions.map( tp => {
val offsetAndMetadata = consumer.committed(tp)
val startOffset: Long = KafkaCluster.getLogBeginningOffset(consumerConfig.topic).values.head
Option(offsetAndMetadata) match {
case None => {
// log "TopicPartition $tp doesn't have committed offsets"
sealed trait NodeState
trait Follower extends NodeState
trait Candidate extends NodeState
trait Leader extends NodeState
final class NotLeader extends NodeState with Follower with Candidate
final class NotFollower extends NodeState with Candidate with Leader
final class NotCandidate extends NodeState with Follower with Candidate
class Node[State <: NodeState] private () {
import scala.concurrent.{ Future, ExecutionContext }
object GroupedAsync {
def sequencePar[K, V, R](fs: ((K, V)) => Future[(K, R)])(input: Seq[(K, V)])(implicit ec: ExecutionContext): Future[Seq[(K, R)]] = {
val result: Future[Seq[(K, R)]] = {
val grouped: Map[K, Seq[(K, V)]] = input.groupBy(_._1)
val futurePerKeyMap: Map[K, Future[Seq[(K, R)]]] = grouped.mapValues { elements =>
val l: Future[Seq[(K, R)]] = elements.foldLeft(Future(Seq.empty[(K, R)])) {
(previousFuture, next) =>
class KafkaListener(saveProgress: Array[OffsetRange] => Boolean) extends SparkListener with Logging {
private val runningJobs = collection.mutable.Set.empty[SparkListenerJobStart]
private val kafkaRDDs = collection.mutable.Map.empty[Int, Array[OffsetRange]]
override def onJobStart(jobStart: SparkListenerJobStart) {
runningJobs += jobStart
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
runningJobs.find(_.jobId == jobEnd.jobId).map { job =>
val sRule = implicitly[RuleLike[AvroValue, String]]
val drRule = implicitly[RuleLike[AvroValue, DefectResult]]
def kTransform(g: GenericRecord) = RecordTransformations.convert(g, sRule)
def vTransform(g: GenericRecord) = RecordTransformations.convert(g, drRule)
val defectResults: DStream[(String, DefectResult)] = KafkaCommons.read[String, DefectResult](consumerConfig, ssc)(kTransform, vTransform)
def read[A, B](consumerConfig: ConsumerConfig, streamingContext: StreamingContext, maxRate: Int = DEFAULT_MAX_RATE)
(keyRule:GenericRecord => VA[A], valueRule: GenericRecord => VA[B]): DStream[(A, B)] = {
val sparkConf = streamingContext.sparkContext.getConf
val appName = sparkConf.get("spark.app.name")
val offsetsCoordinator = OffsetsCoordinator.get(
channel = OffsetsCoordinator.newChannel(consumerConfig.host, consumerConfig.port),
clientId = consumerConfig.clientId,
groupId = consumerConfig.groupId)
class KafkaCustomDecoder[T](implicit keyRule: RuleLike[AvroValue, T]) extends kafka.serializer.Decoder[VA[T]] with Logging {
import kafka.message.Message
import java.util.Properties
def fromAvro(value: GenericRecord) =
Avro.fromAvro[T](Avro.wrap(value))
def fromBytes(bytes: Array[Byte]): VA[T] = {
val props: Properties = new Properties()
props.put("schema.registry.url", "http://localhost:8081")
def read[A, B](consumerConfig: ConsumerConfig, streamingContext: StreamingContext, maxRate: Int = DEFAULT_MAX_RATE)
(implicit keyRule: RuleLike[AvroValue, A], valueRule: RuleLike[AvroValue, B]): DStream[(VA[A], VA[B])] = {
val sparkConf = streamingContext.sparkContext.getConf
val appName = sparkConf.get("spark.app.name")
val offsetsCoordinator = OffsetsCoordinator.get(
channel = OffsetsCoordinator.newChannel(consumerConfig.host, consumerConfig.port),
clientId = consumerConfig.clientId,
groupId = consumerConfig.groupId)