Algorithmic Trading
With Machine & Deep Learning
Workshop by Dr. Yves J. Hilpisch | The Python Quants GmbH
London, 12. October 2017
I hereby claim:
To claim this, I am signing this object:
case class MyDemoType( | |
name: String, | |
version: String, | |
someProperty: Int | |
) |
KafkaIO.read[String, String] | |
.withBootstrapServers(config.broker) | |
.withTopic(config.kafkaTopic) | |
.withKeyDeserializer(classOf[StringDeserializer]) | |
.withValueDeserializer(classOf[StringDeserializer]) | |
.withStartReadTime(Instant.ofEpochMilli(1)) | |
.withoutMetadata |
/** Returns an object of type T from a base64 encoded string. | |
* | |
* Overwrite this function for your deserialization code. In this example, we | |
* assume that Kafka contains plain old Java objects, encoded as base64 | |
* strings. | |
* | |
* @tparam T The type of the object to be recovered | |
* @param s String with a base64 encoded serialized object | |
*/ | |
def string2Object[T](s: String): T = { |
/** Returns a string representation of an object. | |
* | |
* Overwrite this function with your serialization code. In this example, we | |
* serialize the object and transform that into a base64 encoded string, | |
* which will be later on written to Kafka. | |
* | |
* @tparam T The type of the object ot be serialized. The type must be Serializable | |
* @param obj The object to be serialized | |
*/ | |
def object2String[T](obj: T): String = { |
KafkaIO.read[String, String] | |
.withBootstrapServers(config.broker) | |
.withTopic(config.kafkaTopic) | |
.withKeyDeserializer(classOf[StringDeserializer]) | |
.withValueDeserializer(classOf[StringDeserializer]) | |
.withStartReadTime(Instant.ofEpochMilli(1)) | |
.withoutMetadata |
"""Business rules applied to each group in each window.""""" | |
import apache_beam as beam | |
from datetime import datetime | |
from dateutil import parser as dateparser | |
class BusinessRulesDoFn(beam.DoFn): | |
"""This DoFn applies some business rules to a group of messages in a window. |