View users.py
import sys | |
import time | |
import os | |
import json | |
import random | |
from dateutil import parser | |
from confluent_kafka import Producer | |
import names # https://pypi.org/project/names/ | |
import random |
View build_instr_bin.sh
#!/usr/bin/env bash | |
go test . -tags testbincover -coverpkg=./... -c -o instr_bin -ldflags="-X github.com/confluentinc/bincover/examples/echo-arg.isTest=true" |
View main.go
package main | |
import ( | |
"fmt" | |
"os" | |
"strconv" | |
"github.com/confluentinc/bincover" | |
) |
View main_test.go
package main | |
import ( | |
"fmt" | |
"log" | |
"os" | |
"os/exec" | |
"regexp" | |
"testing" |
View docker-compose.yml
--- | |
version: '2' | |
services: | |
zookeeper: | |
image: confluentinc/cp-zookeeper:5.3.1 | |
hostname: zookeeper | |
container_name: zookeeper | |
ports: | |
- "2181:2181" |
View Streaming_Machine_Learning_with_Kafka_and_TensorFlow.py
import numpy as np | |
import tensorflow as tf | |
import tensorflow_io.kafka as kafka_io | |
# 1. Consume streaming data with Kafka and TensorFlow I/O | |
def func_x(x): | |
# Decode image to (28, 28) | |
x = tf.io.decode_raw(x, out_type=tf.uint8) | |
x = tf.reshape(x, [28, 28]) | |
# Convert to float32 for tf.keras |
View increased-partitions.sql
CREATE STREAM products ...; | |
CREATE STREAM products_repartitioned | |
WITH (PARTITIONS=30) AS | |
SELECT * FROM products | |
EMIT CHANGES; |
View aggregation.java
// Continuously aggregating a KStream into a KTable. | |
KStream<String, String> locationUpdatesStream = ...; | |
KTable<String, Long> locationsPerUser | |
= locationUpdatesStream | |
.groupBy((k, v) -> v.username) | |
.count(); |
View aggregation.sql
-- Continuously aggregating a stream into a table with a ksqlDB push query. | |
CREATE STREAM locationUpdatesStream ...; | |
CREATE TABLE locationsPerUser AS | |
SELECT username, COUNT(*) | |
FROM locationUpdatesStream | |
GROUP BY username | |
EMIT CHANGES; |
View topic-as-table.java
// Create KTable from Kafka topic. | |
KTable<String, String> table = builder.table("input-topic", Consumed.with(Serdes.String(), Serdes.String())); |
NewerOlder