Skip to content

Instantly share code, notes, and snippets.

@rmoff
Last active September 17, 2021 09:09
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save rmoff/5ee715f6c2bdf7ee878de3ac1d58c0ec to your computer and use it in GitHub Desktop.
Save rmoff/5ee715f6c2bdf7ee878de3ac1d58c0ec to your computer and use it in GitHub Desktop.
Data Processing and Enrichment in Spark Streaming with Python and Kafka

This Notebook is the source code for the published blog article here.

@rmoff / January 13, 2017

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Data Processing and Enrichment in Spark Streaming with Python and Kafka"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In my [previous blog post](https://www.rittmanmead.com/blog/2017/01/getting-started-with-spark-streaming-with-python-and-kafka/) I introduced Spark Streaming and how it can be used to process 'unbounded' datasets. The example I did was a very basic one - simple counts of inbound tweets and grouping by user. All very good for understanding the framework and not getting bogged down in detail, but ultimately not so useful. \n",
"\n",
"We're going to stay with Twitter as our data source in this post, but we're going to consider a real-world requirement for processing Twitter data with low-latency. Spark Streaming will again be our processing engine, with future posts looking at other possibilities in this area. \n",
"\n",
"Twitter has come a long way from its [early days](https://twitter.com/jack/status/20) as a SMS-driven \"microblogging\" site. Nowadays it's used by millions of people to [discuss technology](https://twitter.com/markrittman/status/819567341977608192), share [food](https://twitter.com/orcldoug/status/722416658598342656) [tips](https://twitter.com/rmoff/status/674146293455953920), and, of course, [track the progress of tea-making](https://twitter.com/markrittman/status/785763443185942529). But it's also used for more nefarious purposes, including spam, and sharing of links to pirated material. The requirement we had for this proof of concept was to filter tweets for suspected copyright-infringing links in order that further action could be taken. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The environment I'm using is the same as [before](https://www.rittmanmead.com/blog/2017/01/getting-started-with-spark-streaming-with-python-and-kafka/) - Spark 2.0.2 running on [Docker](https://github.com/jupyter/docker-stacks/tree/master/all-spark-notebook) with Jupyter Notebooks to develop the code (and this article!). You can download the full notebook [here](https://gist.github.com/rmoff/5ee715f6c2bdf7ee878de3ac1d58c0ec)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The inbound tweets are coming from an Apache Kafka topic. Any matched tweets will be sent to another Kafka topic. The match criteria are: \n",
"\n",
"* Not a retweet\n",
"* Contains at least one URL\n",
"* URL(s) are not on a whitelist (for example, we're not interested in links to spotify.com, or back to twitter.com)\n",
"* From a predefined list of artists, albums, and tracks, the twitter text much contains terms matching at least two.\n",
" * Match terms will take into account common misspellings (Little Mix -> Litle Mix), hashtags (Little Mix -> #LittleMix), etc\n",
"\n",
"We'll also use a separate Kafka topic for audit/debug purposes to inspect any non-matched tweets. \n",
"\n",
"As well as matching the tweet against the above conditions, we will enrich the tweet message body to store the identified artist/album/track to support subsequent downstream processing.\n",
"\n",
"The final part of the requirement is to keep track of the number of inbound tweets, the number of matched vs unmatched, and for those matched, which artists they were for. These counts need to be per batch and over a window of time too."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Getting Started - Prototyping the Processing Code"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Before we get into the meat of the streaming code, let's take a step back and look at what we're wanting the code to achieve. From the [previous examples](https://www.rittmanmead.com/blog/2017/01/getting-started-with-spark-streaming-with-python-and-kafka/) we know we can connect to a Kafka topic, pull in tweets, parse them for given fields, and do windowed counts. So far, so easy (or at least, already figured out!). Let's take a look at nub of the requirement here - the text matching. \n",
"\n",
"If we peruse the [BBC Radio 1 Charts](http://www.bbc.co.uk/radio1/chart/albums) we can see the popular albums and artists of the moment (_Grant me a little nostalgia here; in my day people 'pirated' music from the Radio 1 chart show onto C90 cassettes, trying to get it without the DJ talking over the start and end. Nowadays it's done on a somewhat more technologically advanced basis_). Currently it's \"Little Mix\" with the album \"Glory Days\". A quick Wikipedia or Amazon search gives us the track listing too: \n",
"\n",
" 1. Shout Out to My Ex\n",
" 2. Touch\n",
" 3. F.U.\n",
" 4. Oops - Little Mix feat. Charlie Puth\n",
" 5. You Gotta Not\n",
" 6. Down & Dirty\n",
" 7. Power\n",
" 8. Your Love\n",
" 9. Nobody Like You\n",
" 10. No More Sad Songs\n",
" 11. Private Show\n",
" 12. Nothing Else Matters\n",
" 13. Beep Beep\n",
" 14. Freak\n",
" 15. Touch\n",
" \n",
"A quick twitter search for the first track title gives us this tweet - I have no idea if it's legit or not, but it serves as an example for our matching code requirements: \n",
"\n",
"<blockquote class=\"twitter-tweet\" data-cards=\"hidden\" data-lang=\"en\"><p lang=\"en\" dir=\"ltr\">DOWNLOAD MP3: Little Mix – Shout Out To My Ex (CDQ) Track <a href=\"https://t.co/C30c4Fel4u\">https://t.co/C30c4Fel4u</a> <a href=\"https://t.co/wJjyG4cdjE\">pic.twitter.com/wJjyG4cdjE</a></p>&mdash; Ngvibes Media (@ngvibes_com) <a href=\"https://twitter.com/ngvibes_com/status/794104195922083840\">November 3, 2016</a></blockquote>\n",
"<script async src=\"//platform.twitter.com/widgets.js\" charset=\"utf-8\"></script>\n",
"\n",
"Using the [Twitter developer API](https://dev.twitter.com/rest/reference/get/statuses/show/id) I can retrieve the JSON for this tweet directly. I'm using the excellent [Paw](https://paw.cloud/) tool to do this. \n",
"\n",
"![](images/ssc02.png)\n",
"\n",
"From this we can get the `text` element: \n",
"\n",
" \"text\": \"DOWNLOAD MP3: Little Mix \\u2013 Shout Out To My Ex (CDQ) Track https:\\/\\/t.co\\/C30c4Fel4u https:\\/\\/t.co\\/wJjyG4cdjE\",\n",
" \n",
"The obvious approach would be to have a list of match terms, something like: \n",
"\n",
" match_text=(\"Little Mix\",\"Glory Days\",\"Shout Out to My Ex\",\"Touch\",\"F.U.\")\n",
"\n",
"But - we need to make sure we've matched _two of the three types of metadata (artist/album/track)_, so we need to know which it is that we've matched in the text. We *also* need to handle variations in text for a given match (such as misspellings etc).\n",
"\n",
"What I came up with was this:"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"filters=[]\n",
"filters.append({\"tag\":\"album\",\"value\": \"Glory Days\",\"match\":[\"Glory Days\",\"GloryDays\"]})\n",
"filters.append({\"tag\":\"artist\",\"value\": \"Little Mix\",\"match\":[\"Little Mix\",\"LittleMix\",\"Litel Mixx\"]})\n",
"filters.append({\"tag\":\"track\",\"value\": \"Shout Out To My Ex\",\"match\":[\"Shout Out To My Ex\",\"Shout Out To My X\",\"ShoutOutToMyEx\",\"ShoutOutToMyX\"]})\n",
"filters.append({\"tag\":\"track\",\"value\": \"F.U.\",\"match\":[\"F.U\",\"F.U.\",\"FU\",\"F U\"]})\n",
"filters.append({\"tag\":\"track\",\"value\": \"Touch\",\"match\":[\"Touch\"]})\n",
"filters.append({\"tag\":\"track\",\"value\": \"Oops\",\"match\":[\"Oops\"]})\n",
"\n",
"def test_matching(test_string):\n",
" print 'Input: %s' % test_string\n",
" for f in filters:\n",
" for a in f['match']:\n",
" if a.lower() in test_string.lower():\n",
" print '\\tTag: %s / Value: %s\\n\\t\\t(Match string %s)' % (f['tag'],f['value'],a)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We could then take the test string from above and test it: "
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Input: DOWNLOAD MP3: Little Mix \\u2013 Shout Out To My Ex (CDQ) Track https:\\/\\/t.co\\/C30c4Fel4u https:\\/\\/t.co\\/wJjyG4cdjE\n",
"\tTag: artist / Value: Little Mix\n",
"\t\t(Match string Little Mix)\n",
"\tTag: track / Value: Shout Out To My Ex\n",
"\t\t(Match string Shout Out To My Ex)\n"
]
}
],
"source": [
"test_matching('DOWNLOAD MP3: Little Mix \\u2013 Shout Out To My Ex (CDQ) Track https:\\/\\/t.co\\/C30c4Fel4u https:\\/\\/t.co\\/wJjyG4cdjE')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"as well as making sure that variations in naming were also correctly picked up and tagged:"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Input: DOWNLOAD MP3: Litel Mixx #GloryDayshttps:\\/\\/t.co\\/wJjyG4cdjE\n",
"\tTag: album / Value: Glory Days\n",
"\t\t(Match string GloryDays)\n",
"\tTag: artist / Value: Little Mix\n",
"\t\t(Match string Litel Mixx)\n",
"-------------------------------------------\n",
"Time: 2017-01-13 12:13:00\n",
"-------------------------------------------\n",
"Batch/Inbound: 20\n",
"Batch/Matched: 0\n",
"Batch/Non-Matched: 20\n",
"Batch/Errored: 0\n",
"\n"
]
}
],
"source": [
"test_matching('DOWNLOAD MP3: Litel Mixx #GloryDayshttps:\\/\\/t.co\\/wJjyG4cdjE')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Additional Processing"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"With the text matching figured out, we also needed to address the other requirements:\n",
"* Not a retweet\n",
"* Contains at least one URL\n",
" * URL(s) are not on a whitelist (for example, we're not interested in links to spotify.com, or back to twitter.com)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Not a Retweet"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the old days retweets were simply reposting the same tweet with a `RT` prefix; now it's done as part of the twitter model and in twitter clients displays the original tweet. In the background though, the JSON is different from an orginal tweet (i.e. not a retweet)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Original** tweet: \n",
"```json\n",
"{\n",
" \"created_at\": \"Thu Jan 12 00:36:22 +0000 2017\",\n",
" \"id\": 819342218611728384,\n",
" \"id_str\": \"819342218611728384\",\n",
" \"text\": \"Because we all know how \\\"careful\\\" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate\",\n",
" \n",
"[...]\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Retweet**:\n",
"\n",
"```json\n",
"{\n",
" \"created_at\": \"Thu Jan 12 14:40:44 +0000 2017\",\n",
" \"id\": 819554713083461632,\n",
" \"id_str\": \"819554713083461632\",\n",
" \"text\": \"RT @GeorgeTakei: Because we all know how \\\"careful\\\" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate\",\n",
"\n",
"[...]\n",
"\n",
" \"retweeted_status\": {\n",
" \"created_at\": \"Thu Jan 12 00:36:22 +0000 2017\",\n",
" \"id\": 819342218611728384,\n",
" \"id_str\": \"819342218611728384\",\n",
" \"text\": \"Because we all know how \\\"careful\\\" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate\",\n",
"\n",
"[...]\n",
"\n",
"``` \n",
"\n",
"So retweets have an additional set of elements in the JSON body, under the `retweeted_status` element. We can pick this out using the [`get`](https://docs.python.org/2/library/stdtypes.html#dict.get) method as seen in this code snippet, where `tweet` is the parsed JSON of the tweet:\n",
"\n",
"```python\n",
"if tweet.get('retweeted_status'):\n",
" print 'Tweet is a retweet'\n",
"else:\n",
" print 'Tweet is original'\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Contains a URL, and URL is not on Whitelist"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Twitter are very good to us in the JSON they supply for each tweet. Every possible attribute of the tweet is encoded as elements in the JSON, meaning that we don't have to do any nasty parsing of the tweet text itself. To find out if there are URLs in the tweet, we just check the `entities.urls` element, and iterate through the array if present. \n",
"\n",
"```python\n",
"if not tweet.get('entities'):\n",
" print 'no entities element'\n",
"else:\n",
" if not tweet.get('entities').get('urls'):\n",
" print 'no entities.urls element'\n",
"```\n",
"\n",
"The URL itself is again provided to us by Twitter as the `expanded_url` within the `urls` array, and using the `urlsplit` library [as I did previously](https://www.rittmanmead.com/blog/2016/12/etl-offload-with-spark-and-amazon-emr-part-2-code-development-with-notebooks-and-docker/) we can extract the domain: \n",
"\n",
"```python\n",
"for url in tweet['entities']['urls']:\n",
"\n",
" expanded_url = url['expanded_url']\n",
" domain = urlsplit(expanded_url).netloc\n",
"```\n",
"\n",
"With the domain extracted, we can then compare it to a predefined whitelist so that we don't pick up tweets that are just linking back to sites such as Spotify, iTunes, etc. Here I'm using the Python [`set`](https://docs.python.org/2/library/stdtypes.html#set) type and `issubset` method to compare the list of domain(s) that I've extracted from the tweet into the `url_info` list, against the whitelist:\n",
"\n",
"```python\n",
"try:\n",
" if set(url_info['domain']).issubset(domain_whitelist):\n",
" print 'All domains whitelisted'\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## The Stream Processing Bit"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"With me so far? We've looked at the requirements for what our stream processing needs to do, and worked out the prototype code that will do this. Now we can jump into the actual streaming code. You can see the actual [notebook here](https://gist.github.com/rmoff/5ee715f6c2bdf7ee878de3ac1d58c0ec) if you want to try this yourself."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Job control variables"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"batchIntervalSec=30\n",
"windowIntervalSec=21600 # Six hours\n",
"app_name = 'spark_twitter_enrich_and_count_rm_01e'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Make Kafka available to Jupyter"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import os\n",
"os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'\n",
"os.environ['PYSPARK_PYTHON'] = '/opt/conda/envs/python2/bin/python'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Import dependencies\n",
"\n",
"As well as Spark libraries, we're also bringing in the `KafkaProducer` library which will enable us to send messages _to_ Kafka. This is in the `kafka-python` package. You can install this standalone on your system, or inline as done below."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"0"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Necessary to make Kafka library available to pyspark\n",
"os.system(\"pip install kafka-python\")"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Spark\n",
"from pyspark import SparkContext\n",
"# Spark Streaming\n",
"from pyspark.streaming import StreamingContext\n",
"from pyspark.streaming.kafka import KafkaUtils\n",
"# Kafka\n",
"from kafka import SimpleProducer, KafkaClient\n",
"from kafka import KafkaProducer\n",
"\n",
"# json parsing\n",
"import json\n",
"# url deconstruction\n",
"from urlparse import urlsplit\n",
"# regex domain handling\n",
"import re"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define values to match"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"filters=[]\n",
"filters.append({\"tag\":\"album\",\"value\": \"Glory Days\",\"match\":[\"Glory Days\",\"GloryDays\"]})\n",
"filters.append({\"tag\":\"artist\",\"value\": \"Little Mix\",\"match\":[\"Little Mix\",\"LittleMix\",\"Litel Mixx\"]})\n",
"filters.append({\"tag\":\"track\",\"value\": \"Shout Out To My Ex\",\"match\":[\"Shout Out To My Ex\",\"Shout Out To My X\",\"ShoutOutToMyEx\",\"ShoutOutToMyX\"]})\n",
"filters.append({\"tag\":\"track\",\"value\": \"F.U.\",\"match\":[\"F.U\",\"F.U.\",\"FU\",\"F U\"]})\n",
"filters.append({\"tag\":\"track\",\"value\": \"Touch\",\"match\":[\"Touch\"]})\n",
"filters.append({\"tag\":\"track\",\"value\": \"Oops\",\"match\":[\"Oops\"]})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define whitelisted domains"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"domain_whitelist=[]\n",
"domain_whitelist.append(\"itun.es\")\n",
"domain_whitelist.append(\"wikipedia.org\")\n",
"domain_whitelist.append(\"twitter.com\")\n",
"domain_whitelist.append(\"instagram.com\")\n",
"domain_whitelist.append(\"medium.com\")\n",
"domain_whitelist.append(\"spotify.com\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Function: Unshorten shortened URLs (`bit.ly` etc)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Source: http://stackoverflow.com/a/4201180/350613\n",
"import httplib\n",
"import urlparse\n",
"\n",
"def unshorten_url(url):\n",
" parsed = urlparse.urlparse(url)\n",
" h = httplib.HTTPConnection(parsed.netloc)\n",
" h.request('HEAD', parsed.path)\n",
" response = h.getresponse()\n",
" if response.status/100 == 3 and response.getheader('Location'):\n",
" return response.getheader('Location')\n",
" else:\n",
" return url"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Function: Send messages to Kafka\n",
"\n",
"Following the [guide here](http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd). \n",
"\n",
"To inspect the Kafka topics as messages are sent use: \n",
"\n",
"```bash\n",
"kafka-console-consumer --zookeeper cdh57-01-node-01.moffatt.me:2181 --topic twitter_matched2\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd\n",
"\n",
"def send_to_kafka_matched(partition):\n",
" from kafka import SimpleProducer, KafkaClient\n",
" from kafka import KafkaProducer\n",
"\n",
" kafka_prod = KafkaProducer(bootstrap_servers='cdh57-01-node-01.moffatt.me:9092')\n",
" for record in partition:\n",
" kafka_prod.send('twitter_matched2', str(json.dumps(record)))\n",
" \n",
"def send_to_kafka_notmatched(partition):\n",
" from kafka import SimpleProducer, KafkaClient\n",
" from kafka import KafkaProducer\n",
"\n",
" kafka_prod = KafkaProducer(bootstrap_servers='cdh57-01-node-01.moffatt.me:9092')\n",
" for record in partition:\n",
" kafka_prod.send('twitter_notmatched2', str(record))\n",
"\n",
"def send_to_kafka_err(partition):\n",
" from kafka import SimpleProducer, KafkaClient\n",
" from kafka import KafkaProducer\n",
"\n",
" kafka_prod = KafkaProducer(bootstrap_servers='cdh57-01-node-01.moffatt.me:9092')\n",
" for record in partition:\n",
" kafka_prod.send('twitter_err2', str(record))\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Function: Process each tweet\n",
"\n",
"This is the main processing code. It implements all of the logic described in the requirements above. If a processing condition is not met, the function returns a negative code and description of the condition that was not met. Errors are also caught and returned. "
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def process_tweet(tweet):\n",
" # Check that there's a URLs in the tweet before going any further\n",
" if tweet.get('retweeted_status'):\n",
" return (-1,'retweet - ignored',tweet)\n",
"\n",
" if not tweet.get('entities'):\n",
" return (-2,'no entities element',tweet)\n",
"\n",
" if not tweet.get('entities').get('urls'):\n",
" return (-3,'no entities.urls element',tweet)\n",
"\n",
" # Collect all the domains linked to in the tweet\n",
" url_info={}\n",
" url_info['domain']=[]\n",
" url_info['primary_domain']=[]\n",
" url_info['full_url']=[]\n",
" try:\n",
" for url in tweet['entities']['urls']:\n",
" try:\n",
" expanded_url = url['expanded_url']\n",
" except Exception, err:\n",
" return (-104,err,tweet)\n",
" \n",
" # Try to resolve the URL (assumes it's shortened - bit.ly etc)\n",
" try:\n",
" expanded_url = unshorten_url(expanded_url)\n",
" except Exception, err:\n",
" return (-108,err,tweet)\n",
" \n",
" # Determine the domain\n",
" try:\n",
" domain = urlsplit(expanded_url).netloc\n",
" except Exception, err:\n",
" return (-107,err,tweet)\n",
" try:\n",
" # Extract the 'primary' domain, e.g. www36.foobar.com -> foobar.com\n",
" #\n",
" # This logic is dodgy for UK domains (foo.co.uk, foo.org.uk, etc) \n",
" # since it truncates to the last two parts of the domain only (co.uk)\n",
" #\n",
" re_result = re.search('(\\w+\\.\\w+)$',domain)\n",
" if re_result:\n",
" primary_domain = re_result.group(0)\n",
" else:\n",
" primary_domain = domain\n",
" except Exception, err:\n",
" return (-105,err,tweet)\n",
"\n",
" try:\n",
" url_info['domain'].append(domain)\n",
" url_info['primary_domain'].append(primary_domain)\n",
" url_info['full_url'].append(expanded_url)\n",
" except Exception, err:\n",
" return (-106,err,tweet)\n",
"\n",
" \n",
" # Check domains against the whitelist\n",
" # If every domain found is in the whitelist, we can ignore them\n",
" try:\n",
" if set(url_info['primary_domain']).issubset(domain_whitelist):\n",
" return (-8,'All domains whitelisted',tweet)\n",
" except Exception, err:\n",
" return (-103,err,tweet)\n",
"\n",
" # Check domains against the blacklist\n",
" # Unless a domain is found, we ignore it\n",
" #Only use this if you have first defined the blacklist!\n",
" #if not set(domain_blacklist).intersection(url_info['primary_domain']):\n",
" # return (-9,'No blacklisted domains found',tweet)\n",
" \n",
"\n",
" except Exception, err:\n",
" return (-102,err,tweet)\n",
" \n",
" # Parse the tweet text against list of trigger terms\n",
" # --------------------\n",
" # This is rather messy iterative code that maybe can be optimised\n",
" #\n",
" # Because match terms are not just words, it's not enough to break\n",
" # up the tweet text into words and match against the filter list.\n",
" # Instead we have to take each filter term and see if it exists\n",
" # within the tweet text as a whole\n",
" #\n",
" # Using a set instead of list so that duplicates aren't added\n",
" #\n",
" matched=set()\n",
" try:\n",
" for f in filters:\n",
" for a in f['match']:\n",
" tweet_text = tweet['text']\n",
" match_text = a.decode('utf-8')\n",
" if match_text in tweet_text:\n",
" matched.add((f['tag'],f['value']))\n",
" except Exception, err:\n",
" return (-101,err,tweet)\n",
" \n",
" #-----\n",
" # Add the discovered metadata into the tweet object that this function will return\n",
" try:\n",
" tweet['enriched']={}\n",
" tweet['enriched']['media_details']={}\n",
" tweet['enriched']['url_details']=url_info\n",
" tweet['enriched']['match_count']=len(matched)\n",
" for match in matched:\n",
" tweet['enriched']['media_details'][match[0]]=match[1]\n",
"\n",
" except Exception, err:\n",
" return (-100,err,tweet)\n",
" \n",
" return (len(matched),tweet)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Function: Streaming context definition\n",
"\n",
"This is the function that defines the streaming context. It needs to be a function because we're using windowing and so the streaming context needs to be configured to checkpoint. \n",
"\n",
"As well as processing inbound tweets, it performs counts of: \n",
"* Inbound\n",
"* Outbound, by type (match/no match/error)\n",
"* For matched tweets, top domains and artists\n",
"\n",
"The code is commented inline to explain how it works."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def createContext():\n",
" sc = SparkContext(appName=\"spark_twitter_enrich_and_count_01\")\n",
" sc.setLogLevel(\"WARN\")\n",
" ssc = StreamingContext(sc, batchIntervalSec)\n",
" \n",
" # Define Kafka Consumer and Producer\n",
" kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', app_name, {'twitter':1})\n",
" \n",
" ## Get the JSON tweets payload\n",
" ## >>TODO<< This is very brittle - if the Kafka message retrieved is not valid JSON the whole thing falls over\n",
" tweets_dstream = kafkaStream.map(lambda v: json.loads(v[1]))\n",
"\n",
" ## -- Inbound Tweet counts\n",
" inbound_batch_cnt = tweets_dstream.count()\n",
" inbound_window_cnt = tweets_dstream.countByWindow(windowIntervalSec,batchIntervalSec)\n",
"\n",
" ## -- Process\n",
" ## Match tweet to trigger criteria\n",
" processed_tweets = tweets_dstream.\\\n",
" map(lambda tweet:process_tweet(tweet))\n",
"\n",
" ## Send the matched data to Kafka topic\n",
" ## Only treat it as a match if we hit at least two of the three possible matches (artist/track/album)\n",
" ## \n",
" ## _The first element of the returned object is a count of the number of matches, or a negative \n",
" ## to indicate an error or no URL content in the tweet._\n",
" ## \n",
" ## _We only want to send the actual JSON as the output, so use a `map` to get just this element_\n",
"\n",
" matched_tweets = processed_tweets.\\\n",
" filter(lambda processed_tweet:processed_tweet[0]>1).\\\n",
" map(lambda processed_tweet:processed_tweet[1])\n",
"\n",
" matched_tweets.foreachRDD(lambda rdd: rdd.foreachPartition(send_to_kafka_matched))\n",
" matched_batch_cnt = matched_tweets.count()\n",
" matched_window_cnt = matched_tweets.countByWindow(windowIntervalSec,batchIntervalSec)\n",
"\n",
" ## Set up counts for matched metadata\n",
" ##-- Artists\n",
" matched_artists = matched_tweets.map(lambda tweet:(tweet['enriched']['media_details']['artist']))\n",
" \n",
" matched_artists_batch_cnt = matched_artists.countByValue()\\\n",
" .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\\\n",
" .map(lambda x:\"Batch/Artist: %s\\tCount: %s\" % (x[0],x[1]))\n",
"\n",
" matched_artists_window_cnt = matched_artists.countByValueAndWindow(windowIntervalSec,batchIntervalSec)\\\n",
" .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\\\n",
" .map(lambda x:\"Window/Artist: %s\\tCount: %s\" % (x[0],x[1]))\n",
" \n",
" ##-- Domains\n",
" ## Since url_details.primary_domain is an array, need to flatMap here\n",
" matched_domains = matched_tweets.flatMap(lambda tweet:(tweet['enriched']['url_details']['primary_domain']))\n",
"\n",
" matched_domains_batch_cnt = matched_domains.countByValue()\\\n",
" .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\\\n",
" .map(lambda x:\"Batch/Domain: %s\\tCount: %s\" % (x[0],x[1]))\n",
" matched_domains_window_cnt = matched_domains.countByValueAndWindow(windowIntervalSec,batchIntervalSec)\\\n",
" .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\\\n",
" .map(lambda x:\"Window/Domain: %s\\tCount: %s\" % (x[0],x[1]))\n",
" \n",
" ## Display non-matches for inspection\n",
" ## \n",
" ## Codes less than zero but greater than -100 indicate a non-match (e.g. whitelist hit), but not an error\n",
"\n",
" nonmatched_tweets = processed_tweets.\\\n",
" filter(lambda processed_tweet:(-99<=processed_tweet[0]<=1))\n",
" \n",
" nonmatched_tweets.foreachRDD(lambda rdd: rdd.foreachPartition(send_to_kafka_notmatched))\n",
"\n",
" nonmatched_batch_cnt = nonmatched_tweets.count()\n",
" nonmatched_window_cnt = nonmatched_tweets.countByWindow(windowIntervalSec,batchIntervalSec)\n",
"\n",
" ## Print any erroring tweets\n",
" ## \n",
" ## Codes less than -100 indicate an error (try...except caught)\n",
"\n",
" errored_tweets = processed_tweets.\\\n",
" filter(lambda processed_tweet:(processed_tweet[0]<=-100))\n",
" errored_tweets.foreachRDD(lambda rdd: rdd.foreachPartition(send_to_kafka_err))\n",
"\n",
" errored_batch_cnt = errored_tweets.count()\n",
" errored_window_cnt = errored_tweets.countByWindow(windowIntervalSec,batchIntervalSec)\n",
" \n",
" ## Print counts\n",
" inbound_batch_cnt.map(lambda x:('Batch/Inbound: %s' % x))\\\n",
" .union(matched_batch_cnt.map(lambda x:('Batch/Matched: %s' % x))\\\n",
" .union(nonmatched_batch_cnt.map(lambda x:('Batch/Non-Matched: %s' % x))\\\n",
" .union(errored_batch_cnt.map(lambda x:('Batch/Errored: %s' % x)))))\\\n",
" .pprint()\n",
"\n",
" inbound_window_cnt.map(lambda x:('Window/Inbound: %s' % x))\\\n",
" .union(matched_window_cnt.map(lambda x:('Window/Matched: %s' % x))\\\n",
" .union(nonmatched_window_cnt.map(lambda x:('Window/Non-Matched: %s' % x))\\\n",
" .union(errored_window_cnt.map(lambda x:('Window/Errored: %s' % x)))))\\\n",
" .pprint()\n",
" \n",
" matched_artists_batch_cnt.pprint()\n",
" matched_artists_window_cnt.pprint()\n",
" matched_domains_batch_cnt.pprint()\n",
" matched_domains_window_cnt.pprint()\n",
"\n",
" return ssc"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Start the streaming context"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from pyspark.streaming import StreamingContext\n",
"\n",
"ssc = StreamingContext.getOrCreate('/tmp/%s' % app_name,lambda: createContext())\n",
"ssc.start()\n",
"ssc.awaitTermination()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Stream Output"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Counters"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"From the stdout of the job we can see the simple counts of inbound and output splits, both per batch and accumulating window: "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" -------------------------------------------\n",
" Time: 2017-01-13 11:50:30\n",
" -------------------------------------------\n",
" Batch/Inbound: 9\n",
" Batch/Matched: 0\n",
" Batch/Non-Matched: 9\n",
" Batch/Errored: 0\n",
"\n",
" -------------------------------------------\n",
" Time: 2017-01-13 11:50:30\n",
" -------------------------------------------\n",
" Window/Inbound: 9\n",
" Window/Non-Matched: 9\n",
"\n",
" -------------------------------------------\n",
" Time: 2017-01-13 11:51:00\n",
" -------------------------------------------\n",
" Batch/Inbound: 21\n",
" Batch/Matched: 0\n",
" Batch/Non-Matched: 21\n",
" Batch/Errored: 0\n",
"\n",
" -------------------------------------------\n",
" Time: 2017-01-13 11:51:00\n",
" -------------------------------------------\n",
" Window/Inbound: 30\n",
" Window/Non-Matched: 30\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The details of identified artists within tweets is also tracked, per batch and accumulated over the window period (6 hours, in this example)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" -------------------------------------------\n",
" Time: 2017-01-12 12:45:30\n",
" -------------------------------------------\n",
" Batch/Artist: Major Lazer Count: 4\n",
" Batch/Artist: David Bowie Count: 1\n",
"\n",
" -------------------------------------------\n",
" Time: 2017-01-12 12:45:30\n",
" -------------------------------------------\n",
" Window/Artist: Major Lazer Count: 1320\n",
" Window/Artist: Drake Count: 379\n",
" Window/Artist: Taylor Swift Count: 160\n",
" Window/Artist: Metallica Count: 94\n",
" Window/Artist: David Bowie Count: 84\n",
" Window/Artist: Lady Gaga Count: 37\n",
" Window/Artist: Pink Floyd Count: 11\n",
" Window/Artist: Kate Bush Count: 10\n",
" Window/Artist: Justice Count: 9\n",
" Window/Artist: The Weeknd Count: 8"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Kafka Output"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Matched"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The matched Kafka topic holds a stream of tweets in JSON format, with the discovered metadata (artist/album/track) added. I'm using the Kafka console consumer to view the contents, parsed through [`jq`](https://stedolan.github.io/jq/) to show just the tweet text and metadata that has been added.\n",
"\n",
"\n",
"```bash\n",
"kafka-console-consumer --zookeeper cdh57-01-node-01.moffatt.me:2181 \\\n",
"--topic twitter_matched1 \\\n",
"--from-beginning|jq -r \"[.text,.enriched.url_details.primary_domain[0],.enriched.media_details.artist,.enriched.media_details.album,.enriched.media_details.track,.enriched.match_count] \"\n",
"```\n",
"\n",
" [\n",
" \"Million Reasons by Lady Gaga - this is horrendous sorry @ladygaga https://t.co/rEtePIy3OT\",\n",
" \"youtube.com\",\n",
" \"https://www.youtube.com/watch?v=NvMoctjjdhA&feature=youtu.be\",\n",
" \"Lady Gaga\",\n",
" null,\n",
" \"Million Reasons\",\n",
" 2\n",
" ]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Non-Matched"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"On our Kafka topics outbound we can see the non-matched messages. Probably you'd disable this stream once the processing logic was finalised, but it's useful to be able to audit and validate the reasons for non-matches. Here a retweet is ignored, and we can see it's a retweet from the RT prefix of the `text` field. The -1 is the return code from the `process_tweets` function denoting a non-match: \n",
"\n",
" (-1, 'retweet - ignored', {u'contributors': None, u'truncated': False, u'text': u'RT @ChartLittleMix: Little Mix adicionou datas para a Summer Shout Out 2017 no Reino Unido https://t.co/G4H6hPwkFm', u'@timestamp': u'2017-01-13T11:45:41.000Z', u'is_quote_status': False, u'in_reply_to_status_id': None, u'id': 819873048132288513, u'favorite_count': 0, u'source':"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Summary"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this article we've built on the foundations of the initial [exploration of Spark Streaming on Python](https://www.rittmanmead.com/blog/2017/01/getting-started-with-spark-streaming-with-python-and-kafka/), expanding it out to address a real-world processing requirement. Processing unbounded streams of data this way is not as complex as you may think, particularly for the benefits that it can yield in reducing the latencies between an event occuring and taking action from it. \n",
"\n",
"We've not touched on some of the more complex areas, such as scaling this up to multiple Spark nodes, partitioned Kafka topics, and so on - that's another [kafka] topic (sorry...) for another day. The code itself is also rudimentary - before moving it into Production there'd be some serious refactoring and optimisation review to be performed on it. \n",
"\n",
"You can find the notebook for this article [here](https://gist.github.com/rmoff/5ee715f6c2bdf7ee878de3ac1d58c0ec), and the [previous article](https://www.rittmanmead.com/blog/2017/01/getting-started-with-spark-streaming-with-python-and-kafka/)'s [here](https://gist.github.com/rmoff/fb033086b285655ffe7f9ff0582dedbf). \n",
"\n",
"If you'd like more information on how Rittman Mead can help your business get the most of out its data, please do [get in touch](https://www.rittmanmead.com/contact/)!"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment