Skip to content

Instantly share code, notes, and snippets.

View andrewstevenson's full-sized avatar

Andrew Stevenson andrewstevenson

View GitHub Profile
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties hdfs-nasty.properties
... start up logging ...
[2016-07-18 16:24:57,814] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.StackOverflowError
at org.apache.kafka.connect.data.SchemaBuilder.struct(SchemaBuilder.java:308)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1101)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1015)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1092)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1015)
./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic nasty --from-beginning
{
"type": "record",
"name": "example.nice",
"fields": [
{
"name": "elements",
"type": {
"type": "array",
"items" : {
"type": "record",
return pb.mapValues(
function(value)
for i, element in ipairs(value.elements) do
element.children = nil
element.id = i
element.parent = 0
end
return value
end
)
./plumber.sh -i nasty -d avro \
-o nice -s avro=nice.avsc \
-l remove.lua \
-p nasty-to-nice.properties
./bin/kafka-avro-console-consumer \
--zookeeper localhost:2181 --topic nice --from-beginning
return pb.mapValues(
function(value)
function flatten (acc, element, parentId)
local id = #acc + 1
acc[id] = { value = element.value, id = id, parent = parentId }
for i, child in ipairs(element.children) do
flatten(acc, child, id)
end
local someInputs = {
pb.value{
elements = {
{
value = a,
children = {
{ value = a-a, children = {} },
{ value = a-b, children = {} }
}
},
./plumber.sh -i nasty -d avro \
-o nice -s avro=nice.avsc \
-l flatten.lua -t flatten.test.lua \
-p nasty-to-nice.properties
2016-07-18 16:16:09,862 INFO [main] [com.eneco.energy.kafka.streams.plumber.StreamingOperations] [<init>:136] we output key-values of type KeyValueType(com.eneco.energy.kafka.streams.plumber.VoidType$@12468a38,AvroType(Some({"type":"record","name":"nice","namespace":"example","fields":[{"name":"elements","type":{"type":"array","items":{"type":"record","name":"element","namespace":"example.nice","fields":[{"name":"value","type":"string"},{"name":"id","type":"int"},{"name":"parent","type":"int"}]}}}]})))
2016-07-18 16:16:09,889 INFO [main] [com.eneco.energy.kafka.streams.plumber.Plumber$] [exec:37] OK, expectations provided in `flatten.test.lua` were met
* @param getConnection a function that returns an open Connection.
* The RDD takes care of closing the connection.
* @param sql the text of the query.
* The query must contain two ? placeholders for parameters used to partition the results.
* @param upperBound the maximum value of the second placeholder * The lower and upper bounds are inclusive. * @param numPartitions the number of partitions. * Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, * the query would be executed twice, once with (1, 10) and once with (11, 20) * @param mapRow a function from a ResultSet to a single row of the desired result type(s). * This should only call getInt, getString, etc; the RDD takes care of calling next. * The default maps a ResultSet to an array of Object. */
class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () = Connection,
sql: String,