Skip to content

Instantly share code, notes, and snippets.

@wheresalice
Created June 25, 2017 11:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wheresalice/e4b0281222b5b9fa6642703daf7d06f2 to your computer and use it in GitHub Desktop.
Save wheresalice/e4b0281222b5b9fa6642703daf7d06f2 to your computer and use it in GitHub Desktop.
R Kafka producer using the Confluent REST Proxy
library(httr)
library(rjson)
# This would ordinarily be on port 8082, but we're going through mitmproxy here for debugging
proxy_uri = "http://localhost:8080"
# Get a list of available topics
get_topics <- function() {
uri <- paste(proxy_uri, "topics", sep="/")
http_topics <- GET(uri)
stop_for_status(http_topics)
return(fromJSON(content(http_topics, "text")))
}
topics <- get_topics()
print(paste("There are", length(topics), "topics"))
print(topics)
# Produce one Avro message
# This looks complicated, but really the hardest part was working out that you need to do toJSON on value_schema
generatedbody <- list(
value_schema=toJSON(list(
type="record",
name="User",
fields=list(
list(
name="name",
type="string"
)
)
)),
records=list(
list(
value=list(
name="testUser"
)
)
)
)
# actually send the request
response <- POST(url=paste(proxy_uri, "topics", "avrotest", sep="/"),
content_type("application/vnd.kafka.avro.v2+json"),
accept("application/vnd.kafka.v2+json"),
body=toJSON(generatedbody),
encode='json'
)
stop_for_status(response)
content(response, "text")
if (is.null(fromJSON(content(response, "text"))$offsets[[1]]$error)){
print("OK")
}
# publish a data.frame to Kafka
# cars is an inbuilt data.frame to test with
df <- head(cars, n=2) # limit to just two records to make testing easier
fields <- list() # start with an empty list of fields
field_types <- lapply(df, typeof) # work out the type of the fields
# make an array of fields like [{name="name", type="type"}]
for(f in colnames(df)) {
fields <- append(fields, list(list(name=f, type=field_types[[f]])))
}
value_schema <-
list(
type="record",
name="Car", # @TODO turn this into a function and accept a name for the record
fields=fields
)
# For each car, generate and send an HTTP request to the REST api
for(car in 1:nrow(df)) {
dfbody <- list(
value_schema=toJSON(value_schema),
records=list(list(value=list(speed=df[car, "speed"], dist=df[car, "dist"]))) #@TODO working out how to dynamicly generate column list here is breaking my head
)
print(toJSON(dfbody))
dfresponse <- POST(url=paste(proxy_uri, "topics", "avrocars", sep="/"),
content_type("application/vnd.kafka.avro.v2+json"),
accept("application/vnd.kafka.v2+json"),
body=toJSON(dfbody),
encode='json'
)
stop_for_status(dfresponse)
content(dfresponse, "text")
if (is.null(fromJSON(content(dfresponse, "text"))$offsets[[1]]$error)){
print("OK")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment