Created
June 25, 2017 11:59
-
-
Save wheresalice/e4b0281222b5b9fa6642703daf7d06f2 to your computer and use it in GitHub Desktop.
R Kafka producer using the Confluent REST Proxy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(httr) | |
library(rjson) | |
# This would ordinarily be on port 8082, but we're going through mitmproxy here for debugging | |
proxy_uri = "http://localhost:8080" | |
# Get a list of available topics | |
get_topics <- function() { | |
uri <- paste(proxy_uri, "topics", sep="/") | |
http_topics <- GET(uri) | |
stop_for_status(http_topics) | |
return(fromJSON(content(http_topics, "text"))) | |
} | |
topics <- get_topics() | |
print(paste("There are", length(topics), "topics")) | |
print(topics) | |
# Produce one Avro message | |
# This looks complicated, but really the hardest part was working out that you need to do toJSON on value_schema | |
generatedbody <- list( | |
value_schema=toJSON(list( | |
type="record", | |
name="User", | |
fields=list( | |
list( | |
name="name", | |
type="string" | |
) | |
) | |
)), | |
records=list( | |
list( | |
value=list( | |
name="testUser" | |
) | |
) | |
) | |
) | |
# actually send the request | |
response <- POST(url=paste(proxy_uri, "topics", "avrotest", sep="/"), | |
content_type("application/vnd.kafka.avro.v2+json"), | |
accept("application/vnd.kafka.v2+json"), | |
body=toJSON(generatedbody), | |
encode='json' | |
) | |
stop_for_status(response) | |
content(response, "text") | |
if (is.null(fromJSON(content(response, "text"))$offsets[[1]]$error)){ | |
print("OK") | |
} | |
# publish a data.frame to Kafka | |
# cars is an inbuilt data.frame to test with | |
df <- head(cars, n=2) # limit to just two records to make testing easier | |
fields <- list() # start with an empty list of fields | |
field_types <- lapply(df, typeof) # work out the type of the fields | |
# make an array of fields like [{name="name", type="type"}] | |
for(f in colnames(df)) { | |
fields <- append(fields, list(list(name=f, type=field_types[[f]]))) | |
} | |
value_schema <- | |
list( | |
type="record", | |
name="Car", # @TODO turn this into a function and accept a name for the record | |
fields=fields | |
) | |
# For each car, generate and send an HTTP request to the REST api | |
for(car in 1:nrow(df)) { | |
dfbody <- list( | |
value_schema=toJSON(value_schema), | |
records=list(list(value=list(speed=df[car, "speed"], dist=df[car, "dist"]))) #@TODO working out how to dynamicly generate column list here is breaking my head | |
) | |
print(toJSON(dfbody)) | |
dfresponse <- POST(url=paste(proxy_uri, "topics", "avrocars", sep="/"), | |
content_type("application/vnd.kafka.avro.v2+json"), | |
accept("application/vnd.kafka.v2+json"), | |
body=toJSON(dfbody), | |
encode='json' | |
) | |
stop_for_status(dfresponse) | |
content(dfresponse, "text") | |
if (is.null(fromJSON(content(dfresponse, "text"))$offsets[[1]]$error)){ | |
print("OK") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment