Skip to content

Instantly share code, notes, and snippets.

View deeperunderstanding's full-sized avatar
Tinkering

A Deeper Understanding deeperunderstanding

Tinkering
View GitHub Profile
-- show running queries (pre 9.2)
SELECT procpid, age(clock_timestamp(), query_start), usename, current_query
FROM pg_stat_activity
WHERE current_query != '<IDLE>' AND current_query NOT ILIKE '%pg_stat_activity%'
ORDER BY query_start desc;
-- show running queries (9.2)
SELECT pid, age(clock_timestamp(), query_start), usename, query
FROM pg_stat_activity
WHERE query != '<IDLE>' AND query NOT ILIKE '%pg_stat_activity%'
object WordCount extends MapReduce[String, String, Int, Int] {
override def mapper(line: String): Seq[KeyValue[String, Int]] =
"""[\w']+""".r.findAllIn(line).map { word => KeyValue(word, 1) }.toVector
override def reducer(key: String, values: Seq[Int]): KeyValue[String, Int] =
KeyValue(key, values.sum)
}
@Component
class TickRepository(
@Value("\${db.influx.url}") val dburl : String,
@Value("\${db.influx.user}") val user : String,
@Value("\${db.influx.pw}") val pw : String,
@Value("\${db.influx.dbname}") val dbname : String
) {
private final val logger = LoggerFactory.getLogger(javaClass)
val influxDB = lazy { InfluxDBFactory.connect(dburl, user, pw).setDatabase(dbname) }
@Component
class TickRecordingService
@Autowired constructor(val socket: CoinbaseWebsocket,
val tickRepository: TickRepository) {
private val logger = LoggerFactory.getLogger(javaClass)
fun recordProducts(products: Array<String>): Flux<List<Message>> {
logger.info("Start Recording Tick Stream for Products: $products")
@Component
class CoinbaseWebsocket @Autowired constructor(
@Value("\${cbpro.websocket.baseurl}") val websocketUrl: String
) {
private final val mapper = ObjectMapper()
private final val logger = LoggerFactory.getLogger(javaClass)
private final val client = ReactorNettyWebSocketClient()
init { client.maxFramePayloadLength = Int.MAX_VALUE }
version: '3'
services:
#-----------------
#--- Influx DB ---
#-----------------
influx:
image: influxdb:alpine
ports:
- "8086:8086"
environment:
sealed class Try<T> {
companion object {
operator fun <T> invoke(func: () -> T): Try<T> =
try {
Success(func())
} catch (error: Exception) {
Failure(error)
}
fun toPet(values: List<String>): Try<Pet> {
val name = values[0]
val ageTry = Try { values[1].toInt() }.recoverWith { Failure<Int>(Exception("Cannot extract age!")) }
val typeTry = PetType.lookup(values[2]).recover { PetType.Invalid }
return ageTry.flatMap { age -> typeTry.map { type -> Pet(name, age, type) } }
}
enum class PetType(val type: String) {
object TrySequence {
operator fun <T> Try<T>.component1(): T = when (this) {
is Success -> this.value
is Failure -> throw this.error
}
}
fun <T> Try.Companion.sequential(func: TrySequence.() -> T): Try<T> {
return Try { func(TrySequence) }
}
fun main() {
val lines = Try {
File("./my-pets.csv").readLines().map { it.split(',') }
}
val pets : Try<ResultSet<Pet>> = lines.map {
it.map(::toPet).asResultSet().map { pet -> pet.copy(name = pet.name.toUpperCase()) }
}