Skip to content

Instantly share code, notes, and snippets.

@speeddragon
speeddragon / bin_split.exs
Created September 13, 2023 18:03
Split binary file by odd/even bytes in Elixir
{:ok, binary} = File.read("original.bin")
{f1, f2} = binary
|> :binary.bin_to_list()
|> Enum.reduce({"", ""}, fn hex, {f1, f2} ->
if byte_size(f1) != byte_size(f2) do
{f1, f2 <> <<hex>>}
else
{f1 <> <<hex>>, f2}
end
@speeddragon
speeddragon / AugmentedMessageDeserializer.scala
Created January 25, 2020 23:11
AugmentedMessageDeserializer
class AugmentedMessageDeserializer extends MessageKafkaDeserializer[GenericRecord] with LazyLogging {
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): GenericRecord = {
val genericRecord: Try[GenericRecord] = retrieveGenericRecord(record)
genericRecord match {
case Success(value) => value
case Failure(exception) =>
val key = new String(record.key(), UTF_8)
val value = new String(record.value(), UTF_8)
@speeddragon
speeddragon / streaming_file_sink.scala
Created January 10, 2020 02:51
StreamingFileSink
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = new FlinkKafkaConsumer(settings.kafkaTopic(), new AugmentedMessageDeserializer, kafkaProperties)
val writer = Writer(settings.s3Path(), GenericRecordSchema.schema.toString()).build()
env
.enableCheckpointing(checkpointInterval)
.addSource(source)
.addSink(writer)
@speeddragon
speeddragon / streaming_file_sink_row_encode.scala
Created January 10, 2020 02:47
StreamingFileSink with Custom Rolling Policy
case class Writer(pathString: String, schema: String) {
def build(): StreamingFileSink[GenericRecord] = {
val path = new Path(pathString)
val encoder = new ParquetEncoder();
StreamingFileSink
.forRowFormat(path, encoder)
.withRollingPolicy(new MyRollingPolicy(60000))
.withBucketAssigner(new MessageBucketAssigner)
@speeddragon
speeddragon / custom_file_sink.scala
Last active January 16, 2020 13:30
Custom File Sink for Parquet( List[GenericRecord] )
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...
val source = new FlinkKafkaConsumer(settings.kafkaTopic(), new AugmentedMessageDeserializer, kafkaProperties)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[GenericRecord] {
def extractAscendingTimestamp(element: GenericRecord): Long =
Instant
.parse(element.get("timestamp").asInstanceOf[String])
@speeddragon
speeddragon / facebook.js
Created December 10, 2019 23:14
Facebook Image Crawler for Puppeteer
/**
* Facebook Image Crawler
*/
const puppeteer = require("puppeteer");
var https = require("https");
/**
* Get the biggest image possivel from FBID.
* @param {string} fbid
@speeddragon
speeddragon / server.py
Created August 29, 2019 11:07
Python Server without CORS enabled
#!/usr/bin/env python3
from http.server import HTTPServer, SimpleHTTPRequestHandler, test
import sys
class CORSRequestHandler (SimpleHTTPRequestHandler):
def end_headers (self):
self.send_header('Access-Control-Allow-Origin', '*')
SimpleHTTPRequestHandler.end_headers(self)
if __name__ == '__main__':
description: Build Elixir source code
parameters:
cache-version:
default: v1
description: String key to store cache in
type: string
steps:
- checkout
- restore_cache:
keys:
SELECT id, ST_Distance_Sphere(ST_MakePoint(112.15769, 25.28552), r0."location") AS distance
FROM "restaurant" AS r0
WHERE ST_DWithin(ST_GeographyFromText('POINT(112.15769 25.28552)'), r0.location, 400)
ORDER BY distance
LIMIT 20 OFFSET 0;
SELECT id, ST_Distance_Sphere(ST_MakePoint(112.15769, 25.28552), r0."location") AS distance
FROM "restaurant" AS r0
WHERE ST_Distance_Sphere(ST_MakePoint(112.15769, 25.28552), r0.location) < 400
ORDER BY distance
LIMIT 20 OFFSET 0;