Skip to content

Instantly share code, notes, and snippets.

View abhirockzz's full-sized avatar
👋
fmt.Println(hello, world!)

Abhishek Gupta abhirockzz

👋
fmt.Println(hello, world!)
View GitHub Profile
static class MetricsCountAndSumSerde implements Serde<MetricsCountAndSum> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public void close() {
}
countAndSumAggregate.mapValues(new ValueMapperWithKey<String, MetricsCountAndSum, Double>() {
@Override
public Double apply(String key, MetricsCountAndSum countAndSum) {
Double average = countAndSum.getSum() / (double) countAndSum.getCount();
System.out.println("Average so far for machine " + key + " is " + average);
return average;
}
}, Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(STATE_STORE_NAME).withKeySerde(Serdes.String())
.withValueSerde(Serdes.Double()));
KTable<String, MetricsCountAndSum> countAndSumAggregate = metricsStream.groupByKey()
.aggregate(new Initializer<MetricsCountAndSum>() {
@Override
public MetricsCountAndSum apply() {
return new MetricsCountAndSum(0L, 0L);
}
}, new Aggregator<String, String, MetricsCountAndSum>() {
@Override
public MetricsCountAndSum apply(String key, String value, MetricsCountAndSum current) {
Long newCount = current.getCount() + 1;
public class Broadcaster implements Runnable {
private final KafkaStreams globalHashtagCountStream;
public Broadcaster(KafkaStreams globalHashtagCountStream) {
this.globalHashtagCountStream = globalHashtagCountStream;
}
@Override
public void run() {
public static void broadcast(String stat) {
for(Session client : CLIENTS) {
client.getAsyncRemote().sendText(stat, new SendHandler() {
@Override
public void onResult(SendResult sr) {
if(sr.isOK()) {
System.out.println("send stat "+ stat + " to client " + client.getId());
}
}
});
@abhirockzz
abhirockzz / Dockerfile_default
Created December 10, 2018 04:09
Default Dockerfile for Java Fn functions
FROM fnproject/fn-java-fdk-build:jdk9-1.0.75 as build-stage
WORKDIR /function
ENV MAVEN_OPTS -Dhttp.proxyHost= -Dhttp.proxyPort= -Dhttps.proxyHost= -Dhttps.proxyPort= -Dhttp.nonProxyHosts= -Dmaven.repo.local=/usr/share/maven/ref/repository
ADD pom.xml /function/pom.xml
RUN ["mvn", "package", "dependency:copy-dependencies", "-DincludeScope=runtime", "-DskipTests=true", "-Dmdep.prependGroupId=true", "-DoutputDirectory=target", "--fail-never"]
ADD src /function/src
RUN ["mvn", "package"]
FROM fnproject/fn-java-fdk:jdk9-1.0.75
WORKDIR /function
COPY --from=build-stage /function/target/*.jar /function/app/
package main
import (
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
kafkaBroker := "localhost:9092"
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
func main() {
kafkaBroker := "localhost:9092"
p, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker})
topic := "bar"
partition := kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}
msg := &kafka.Message{TopicPartition: partition, Key: []byte("hello"), Value: []byte("world")}
p.Produce(msg, nil)
fmt.Println("done...")
}