Skip to content

Instantly share code, notes, and snippets.

Tinkering

A Deeper Understanding deeperunderstanding

Tinkering
Block or report user

Report or block deeperunderstanding

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
View wordcount.scala
object WordCount extends MapReduce[String, String, Int, Int] {
override def mapper(line: String): Seq[KeyValue[String, Int]] =
"""[\w']+""".r.findAllIn(line).map { word => KeyValue(word, 1) }.toVector
override def reducer(key: String, values: Seq[Int]): KeyValue[String, Int] =
KeyValue(key, values.sum)
}
View CbTickRepository.kt
@Component
class TickRepository(
@Value("\${db.influx.url}") val dburl : String,
@Value("\${db.influx.user}") val user : String,
@Value("\${db.influx.pw}") val pw : String,
@Value("\${db.influx.dbname}") val dbname : String
) {
private final val logger = LoggerFactory.getLogger(javaClass)
val influxDB = lazy { InfluxDBFactory.connect(dburl, user, pw).setDatabase(dbname) }
View CbTickRecordingService.kt
@Component
class TickRecordingService
@Autowired constructor(val socket: CoinbaseWebsocket,
val tickRepository: TickRepository) {
private val logger = LoggerFactory.getLogger(javaClass)
fun recordProducts(products: Array<String>): Flux<List<Message>> {
logger.info("Start Recording Tick Stream for Products: $products")
View CbWebsocket.kt
@Component
class CoinbaseWebsocket @Autowired constructor(
@Value("\${cbpro.websocket.baseurl}") val websocketUrl: String
) {
private final val mapper = ObjectMapper()
private final val logger = LoggerFactory.getLogger(javaClass)
private final val client = ReactorNettyWebSocketClient()
init { client.maxFramePayloadLength = Int.MAX_VALUE }
View tickbase-docker-compose.yml
version: '3'
services:
#-----------------
#--- Influx DB ---
#-----------------
influx:
image: influxdb:alpine
ports:
- "8086:8086"
environment:
View FinalTry.kt
sealed class Try<T> {
companion object {
operator fun <T> invoke(func: () -> T): Try<T> =
try {
Success(func())
} catch (error: Exception) {
Failure(error)
}
View ExampleWithRecover.kt
fun toPet(values: List<String>): Try<Pet> {
val name = values[0]
val ageTry = Try { values[1].toInt() }.recoverWith { Failure<Int>(Exception("Cannot extract age!")) }
val typeTry = PetType.lookup(values[2]).recover { PetType.Invalid }
return ageTry.flatMap { age -> typeTry.map { type -> Pet(name, age, type) } }
}
enum class PetType(val type: String) {
View TrySequenceExtension.kt
object TrySequence {
operator fun <T> Try<T>.component1(): T = when (this) {
is Success -> this.value
is Failure -> throw this.error
}
}
fun <T> Try.Companion.sequential(func: TrySequence.() -> T): Try<T> {
return Try { func(TrySequence) }
}
View ExamplePartition.kt
fun main() {
val lines = Try {
File("./my-pets.csv").readLines().map { it.split(',') }
}
val pets : Try<ResultSet<Pet>> = lines.map {
it.map(::toPet).asResultSet().map { pet -> pet.copy(name = pet.name.toUpperCase()) }
}
View SequentialToPet.kt
fun toPet(values: List<String>) = Try.sequential {
val name = values[0]
val (age) = Try { values[1].trim().toInt() }
val (type) = PetType.lookup(values[2].trim())
Pet(name, age, type)
}
You can’t perform that action at this time.