Skip to content

Instantly share code, notes, and snippets.

SELECT corr(bytes,packets), action FROM flows GROUP BY action
SELECT destinationport,
COUNT(*) AS total
FROM flows
WHERE action = ‘REJECT’
GROUP BY destinationport
ORDER BY total DESC LIMIT 10
SELECT sourceaddress,
destinationaddress,
SUM(bytes) AS totalBytes
FROM flows
GROUP BY sourceaddress, destinationaddress
ORDER BY totalBytes DESC LIMIT 10
SELECT
destinationaddress,
SUM(bytes) AS totalBytes
FROM flows
GROUP BY destinationaddress
ORDER BY totalBytes DESC LIMIT 10
CREATE EXTERNAL TABLE `flows`(
`version` int,
`account` string,
`interfaceid` string,
`sourceaddress` string,
`destinationaddress` string,
`sourceport` int,
`destinationport` int,
`protocol` int,
`packets` int,
SchemaBuilder.RecordBuilder<Schema> builder = SchemaBuilder
.record("flow_logs").namespace("com.opsgenie");
schema = builder.fields()
.name("version").type().intType().intDefault(0)
.name("account").type().stringType().stringDefault("")
.name("interfaceId").type().stringType().stringDefault("")
.name("sourceAddress").type().stringType().stringDefault("")
// continues for remaining 10 fields..
Configuration conf = new Configuration();
Path p = new Path("data.parquet"); // This is not java.nio.file.Path
ParquetWriter<FlowLogs> writer = AvroParquetWriter.<FlowLog>builder(p)
.withSchema(ReflectData.AllowNull.get().getSchema(FlowLog.class))
.withDataModel(ReflectData.get())
.withConf(conf)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(Mode.OVERWRITE)
.build();
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.9.0</version>
@mustafaakin
mustafaakin / output
Created January 19, 2018 09:07
hyperkit ubuntu boot output
hyperkit -A -m 512M -s 0:0,hostbridge \
-s 31,lpc \
-l com1,stdio \
-s 1:0,ahci-hd,file://$(pwd)/$QCOW2,format=qcow \
-s 5,ahci-cd,$(pwd)/seed.img \
-f kexec,$KERNEL,$INITRD,$CMDLIN
mirage_block_open: block_config = file:///Users/mustafa/Downloads/xenial-server-cloudimg-amd64-disk1.qcow2 and qcow_config = None and stats_config = None
hyperkit: [INFO] Resized file to 32768 clusters (4194304 sectors)
hyperkit: [INFO] image has 0 free sectors and 32764 used sectors
mirage_block_open: block_config = file:///Users/mustafa/Downloads/xenial-server-cloudimg-amd64-disk1.qcow2 and qcow_config = None and stats_config = None returning 0
// A data stream, in reality it would be Kafka, Kinesis or other streams
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// Format the streaming data into a row format
DataStream<Tuple3<String, Double, Time>> dataset = text
.map(mapFunction)
.assignTimestampsAndWatermarks(extractor);
// Register it so we can refer it as 'sensors' in SQL
tableEnv.registerDataStream("sensors", dataset, "room, temperature, creationDate, rowtime.rowtime");