Skip to content

Instantly share code, notes, and snippets.

@rmoff
Last active January 15, 2018 16:47
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rmoff/6cb9e28245cde010e6f694348286046c to your computer and use it in GitHub Desktop.
Save rmoff/6cb9e28245cde010e6f694348286046c to your computer and use it in GitHub Desktop.
Ingest Twitter data and flatten some of the json nested objects and arrays
# @rmoff
# July 21, 2016
input {
twitter {
# add your auth data
consumer_key => ""
consumer_secret => ""
oauth_token => ""
oauth_token_secret => ""
keywords => ["never","gonna","give","you","up"]
full_tweet => true
}
}
filter {
mutate {
add_field => { "user_name" => "%{[user][name]}" }
add_field => { "user_screen_name" => "%{[user][screen_name]}" }
add_field => { "user_followers_count" => "%{[user][followers_count]}" }
add_field => { "user_friends_count" => "%{[user][friends_count]}" }
add_field => { "user_listed_count" => "%{[user][listed_count]}" }
add_field => { "user_favourites_count" => "%{[user][favourites_count]}" }
add_field => { "user_statuses_count" => "%{[user][statuses_count]}" }
add_field => { "user_created_at" => "%{[user][created_at]}" }
}
if [retweeted_status] {
mutate {
add_field => { "retweeted_screen_name" => "%{[retweeted_status][user][screen_name]}" }
}
}
if [place] {
mutate {
add_field => { "place_name" => "%{[place][name]}" }
add_field => { "place_country" => "%{[place][country]}" }
}
}
ruby {
code => 'event["hashtags_array"] = event["[entities][hashtags]"].collect { |m| m["text"] } unless event["[entities][hashtags]"].nil?
event["hashtags_list"] = event["hashtags_array"].join(",") unless event["[hashtags_array]"].nil?
event["urls_array"] = event["[entities][urls]"].collect { |m| m["display_url"] } unless event["[entities][urls]"].nil?
event["urls_list"] = event["urls_array"].join(",") unless event["[urls_array]"].nil?
event["user_mentions_screen_name_array"] = event["[entities][user_mentions]"].collect { |m| m["screen_name"] } unless event["[entities][user_mentions]"].nil?
event["user_mentions_screen_name_list"] = event["user_mentions_screen_name_array"].join(",") unless event["[user_mentions_screen_name_array]"].nil?
event["longitude"] = event["coordinates"]["coordinates"][0] unless event["[coordinates]"].nil?
event["latitude"] = event["coordinates"]["coordinates"][1] unless event["[coordinates]"].nil?'
}
}
output {
kafka {
topic_id => "twitter_flattened"
bootstrap_servers => "localhost:9092"
}
stdout { codec => rubydebug }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment