bin/kafka-topics.sh --zookeeper localhost:2181 --list
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mytopic
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
... wait a minute ...
# "%200O" is required to print a full name of the "origin", i.e. PPA name | |
aptitude search '?narrow(?installed, ~Oppa)' -F "%200O" | sort -u |
#!/usr/bin/env bash | |
topic-size() { kafka-log-dirs --command-config /opt/kafka/ssl/client.txt --bootstrap-server server:9093 --topic-list ${1} --describe | tail -n1 | jq '.brokers[0].logDirs[0].partitions | map(.size/1000000000) | add' | xargs echo ${1} =; } | |
list-topics() { kafka-topics --command-config /opt/kafka/ssl/client.txt --bootstrap-server server:9093 --list; } | |
export -f topic-size | |
TEMP_FILE=$(mktemp) | |
list-topics | xargs -I{} bash -c 'topic-size "{}"' > $TEMP_FILE | |
sort -g -k3 $TEMP_FILE | |
rm $TEMP_FILE |
Homebrew build logs for cuetools on Debian GNU/Linux 9.9 (stretch) | |
Build date: 2019-06-25 17:45:47 |
import org.apache.spark.sql.avro.SchemaConverters | |
SchemaConverters.toAvroType(df.schema) // add .toString if you need JSON here |
scala> println("This answer actually got some points on SO https://stackoverflow.com/a/53981675/918211") | |
This answer actually got some points on SO https://stackoverflow.com/a/53981675/918211 | |
scala> println(spark.version) | |
2.4.0 | |
scala> val sq = spark.readStream.format("rate").load | |
sq: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] | |
scala> :type sq |
#!/usr/bin/env bash | |
# RUN THIS ON EACH CASSANDRA NODE! | |
DEBUG=${DEBUG:-true} # change to false or run as 'DEBUG=false backup_restore_cassandra.sh' in prod | |
CQLSH=${CQLSH:-cqlsh} # pass required parameters if needed | |
KEYSPACE_NAME=${KEYSPACE_NAME:-profile} | |
TABLE_NAME=${TABLE_NAME:-device} | |
SNAPSHOT_TAG=${SNAPSHOT_TAG:-${TABLE_NAME}_`date +%Y%m%d_%H%M%S`} | |
KEYSPACE_DIRS="/dcos/volume*/${KEYSPACE_NAME}" # change appropriately! |
#!/usr/bin/env python | |
# pip install kafka-python | |
from kafka import SimpleConsumer, KafkaClient | |
servers = ('broker-01:9092,' | |
'broker-02:9092) | |
topic_name = "test.topic1" | |
offsets = {"17":553593369,"8":553142567,"11":562669633,"20":561215743,"2":2661087706,"5":2663616824,"14":561171342,"13":567403099,"4":2653875446,"16":554258518,"7":545144724,"1":2692486549,"10":557397175,"19":534819310,"18":548724039,"9":559537595,"3":2720217023,"12":548273786,"15":547916993,"6":2693124039,"0":2687886815} | |
group_id = "issue_finder" |
"""{"acp_prod.devices": {""" + df.select($"partition", $"offset").groupBy($"partition").agg(max($"offset")).as[(Int, Long)].collect.map{case (p, o) => s""""$p": $o"""}.mkString(",") + "}}" |
bin/kafka-topics.sh --zookeeper localhost:2181 --list
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mytopic
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
... wait a minute ...