Skip to content

Instantly share code, notes, and snippets.

View abhirockzz's full-sized avatar
👋
fmt.Println(hello, world!)

Abhishek Gupta abhirockzz

👋
fmt.Println(hello, world!)
View GitHub Profile
private Metrics getLocalMetrics() {
HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();
KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();
String source = thisInstance.host() + ":" + thisInstance.port();
Metrics localMetrics = new Metrics();
ReadOnlyKeyValueStore<String, Double> averageStore = ks
.store(storeName,
QueryableStoreTypes.<String, Double>keyValueStore());
private Metrics getLocalMetrics(String machine) {
LOGGER.log(Level.INFO, "Getting Metrics for machine {0}", machine);
HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();
KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();
String source = thisInstance.host() + ":" + thisInstance.port();
Metrics localMetrics = new Metrics();
ReadOnlyKeyValueStore<String, Double> averageStore = ks
@GET
@Path("{machine}")
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Response getMachineMetric(@PathParam("machine") String machine) {
LOGGER.log(Level.INFO, "Fetching metrics for machine {0}", machine);
KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();
HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();
Metrics metrics = null;
static class MetricsCountAndSumSerde implements Serde<MetricsCountAndSum> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public void close() {
}
countAndSumAggregate.mapValues(new ValueMapperWithKey<String, MetricsCountAndSum, Double>() {
@Override
public Double apply(String key, MetricsCountAndSum countAndSum) {
Double average = countAndSum.getSum() / (double) countAndSum.getCount();
System.out.println("Average so far for machine " + key + " is " + average);
return average;
}
}, Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(STATE_STORE_NAME).withKeySerde(Serdes.String())
.withValueSerde(Serdes.Double()));
KTable<String, MetricsCountAndSum> countAndSumAggregate = metricsStream.groupByKey()
.aggregate(new Initializer<MetricsCountAndSum>() {
@Override
public MetricsCountAndSum apply() {
return new MetricsCountAndSum(0L, 0L);
}
}, new Aggregator<String, String, MetricsCountAndSum>() {
@Override
public MetricsCountAndSum apply(String key, String value, MetricsCountAndSum current) {
Long newCount = current.getCount() + 1;
public class Broadcaster implements Runnable {
private final KafkaStreams globalHashtagCountStream;
public Broadcaster(KafkaStreams globalHashtagCountStream) {
this.globalHashtagCountStream = globalHashtagCountStream;
}
@Override
public void run() {
public static void broadcast(String stat) {
for(Session client : CLIENTS) {
client.getAsyncRemote().sendText(stat, new SendHandler() {
@Override
public void onResult(SendResult sr) {
if(sr.isOK()) {
System.out.println("send stat "+ stat + " to client " + client.getId());
}
}
});
@abhirockzz
abhirockzz / Dockerfile_default
Created December 10, 2018 04:09
Default Dockerfile for Java Fn functions
FROM fnproject/fn-java-fdk-build:jdk9-1.0.75 as build-stage
WORKDIR /function
ENV MAVEN_OPTS -Dhttp.proxyHost= -Dhttp.proxyPort= -Dhttps.proxyHost= -Dhttps.proxyPort= -Dhttp.nonProxyHosts= -Dmaven.repo.local=/usr/share/maven/ref/repository
ADD pom.xml /function/pom.xml
RUN ["mvn", "package", "dependency:copy-dependencies", "-DincludeScope=runtime", "-DskipTests=true", "-Dmdep.prependGroupId=true", "-DoutputDirectory=target", "--fail-never"]
ADD src /function/src
RUN ["mvn", "package"]
FROM fnproject/fn-java-fdk:jdk9-1.0.75
WORKDIR /function
COPY --from=build-stage /function/target/*.jar /function/app/
package main
import (
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {