Skip to content

Instantly share code, notes, and snippets.

@smartkiwi
Created August 28, 2014 15:18
Show Gist options
  • Save smartkiwi/3558260348421aa395bc to your computer and use it in GitHub Desktop.
Save smartkiwi/3558260348421aa395bc to your computer and use it in GitHub Desktop.
User request histogram.ipynb
{
"metadata": {
"name": "User request histogram-s3-1file"
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "code",
"collapsed": false,
"input": "import pprint\nprint sc",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "<pyspark.context.SparkContext object at 0x3578f90>\n"
}
],
"prompt_number": 35
},
{
"cell_type": "code",
"collapsed": false,
"input": "#Create first RDD - loading text file from S3\nraw_bids = sc.textFile(\"s3n:/.../2014/08/01/14/1406904600_bid030.smq.mgnt.cc_bids.log.gz\")\nprint raw_bids.first()",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "1406904600\t-\t66.177.194.172\tUS\t561\tUS-GA\t31520\tAmerica/New_York\t4b4dd8f74d1406904600\t20B072A011ECE2EC\ts\t\t\toverstock.com\t160x600\t1\t15-00\t550000\t2006\t8916\t15933\t\t\t\t10035,9956,10034,9331,8958,8644,10108,4943,9350,4942,8956,7515,10133,9886,5019,9564,4945,7512,8738,4944,8995,9724,9322,7815,9374,10168,10132,7232,7823,5907,5067,5896,5906,8333,7025,10107,8646,7089,7819,8645,9657,9989,2740,10296,8152,9746,9656,9408,8400,9795,7155,6550,9519,9839,9000,10042,5598,9386,2730,9955,6587,9403,9377,5905,6021,4941,10001,10171,8924,8923,9405,8918,8916,8865,9610,7241,9892,8957,7524,8724,8295,10435\t\tMozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; WOW64; Trident/6.0)\thttp://www.overstock.com/Home-Garden/Dining-Sets/18551/subcat.html?TID=TN:Furn:DinSets\t1\t750\t1000\t1000\ten\tiab_realestate,iab_business,iab_home,iab_style\t\t55\t25\t25\t4500\t15\t35\t35000\t1000\t1000\t1000\t1000\t1000\t1000\t1000\tnone\t0.421806\t\t\t0.000136999350443\t\t\t\t0\t\t\t\n"
}
],
"prompt_number": 36
},
{
"cell_type": "code",
"collapsed": false,
"input": "def parse_line_lite(line):\n \"\"\"parses TSV string to dict\"\"\"\n line = line.rstrip('\\n')\n f = line.split(\"\\t\")\n d = {}\n d['log_timestamp'] = f[0]\n d['user_cookie'] = f[1]\n if len(f)>=6:\n d['geo_region_code'] = f[5]\n else:\n d['geo_region_code'] = \"\"\n return d",
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 37
},
{
"cell_type": "code",
"collapsed": false,
"input": "#Parse our text into list of dicts\nparsed_bids = raw_bids.map(parse_line_lite)\n#Cache result for using later\nparsed_bids.cache()",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "pyout",
"prompt_number": 38,
"text": "PythonRDD[103] at RDD at PythonRDD.scala:37"
}
],
"prompt_number": 38
},
{
"cell_type": "code",
"collapsed": false,
"input": "print parsed_bids.first()",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "{'log_timestamp': u'1406904600', 'user_cookie': u'-', 'geo_region_code': u'US-GA'}\n"
}
],
"prompt_number": 39
},
{
"cell_type": "code",
"collapsed": false,
"input": "#Build (K, V) pairs out of list dicts\nkeyd_parsed_bids = parsed_bids.map(lambda bid: (bid[\"user_cookie\"], bid))\nprint keyd_parsed_bids.first()",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "(u'-', {'log_timestamp': u'1406904600', 'user_cookie': u'-', 'geo_region_code': u'US-GA'})\n"
}
],
"prompt_number": 40
},
{
"cell_type": "code",
"collapsed": false,
"input": "#Group [(K, V)] -> [(K, [V, V, ...])]\ngrouped_bids = keyd_parsed_bids.groupByKey()\n#print grouped_bids.toDebugString()\nprint grouped_bids.take(2)",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "[(u'', <pyspark.resultiterable.ResultIterable object at 0x38de250>), (u'dda135adc77e4eaa899cff8bf1963de3', <pyspark.resultiterable.ResultIterable object at 0x38de310>)]\n"
}
],
"prompt_number": 45
},
{
"cell_type": "code",
"collapsed": false,
"input": "# (K, [V, V, ...]) -> (K, len(V))\ncount_by_user = grouped_bids.map(lambda x: (x[0], len(x[1])))\ncount_by_user.cache()\nprint count_by_user.take(2)",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "[(u'', 76), (u'dda135adc77e4eaa899cff8bf1963de3', 2)]\n"
}
],
"prompt_number": 42
},
{
"cell_type": "code",
"collapsed": false,
"input": "print count_by_user.count()",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "46569\n"
}
],
"prompt_number": 46
},
{
"cell_type": "code",
"collapsed": false,
"input": "#Swap elements in pairs: (K, V) -> (V, K)\nraw_histogram = count_by_user.map(lambda x: (x[1], x[0]))\nprint raw_histogram.sample(1, 1, 1).take(10)",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "[(76, u''), (1, u'8e3e1d510753489b962e353332d5a7a3'), (1, u'bdf82b072a5c4c96aa8f378da51f2ca5'), (1, u'd219344c126049cd9a0c5f5ef76aa6e3'), (1, u'96b81ed3bacb43e2a764c7a6177c3eb5'), (1, u'bddeb23ba17f4b0d888c194e455dee7b'), (3, u'8fbf7a3432e34b93bb5bbaa519067003'), (3, u'8fbf7a3432e34b93bb5bbaa519067003'), (1, u'6b33d8ba8e4b44f595da6f54b87ad0fe'), (1, u'6b33d8ba8e4b44f595da6f54b87ad0fe')]\n"
}
],
"prompt_number": 47
},
{
"cell_type": "code",
"collapsed": false,
"input": "histogram = raw_histogram.groupByKey().map(lambda x: (x[0], len(x[1])))\nprint histogram.first()",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "(24, 1)\n"
}
],
"prompt_number": 48
},
{
"cell_type": "code",
"collapsed": false,
"input": "#print histogram.toDebugString()",
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 15
},
{
"cell_type": "code",
"collapsed": false,
"input": "print histogram.count()",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "31\n"
}
],
"prompt_number": 49
},
{
"cell_type": "code",
"collapsed": false,
"input": "for k, v in histogram.sortByKey().take(20):\n print \"cookies with %d bids: %d\" % (k, v)",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "cookies with 1 bids: 37925\ncookies with 2 bids: 5459\ncookies with 3 bids: 1657\ncookies with 4 bids: 670\ncookies with 5 bids: 346\ncookies with 6 bids: 169\ncookies with 7 bids: 87\ncookies with 8 bids: 69\ncookies with 9 bids: 47\ncookies with 10 bids: 29\ncookies with 11 bids: 23\ncookies with 12 bids: 15\ncookies with 13 bids: 14\ncookies with 14 bids: 10\ncookies with 15 bids: 3\ncookies with 16 bids: 10\ncookies with 17 bids: 11\ncookies with 18 bids: 4\ncookies with 19 bids: 2\ncookies with 20 bids: 2\n"
}
],
"prompt_number": 50
},
{
"cell_type": "code",
"collapsed": false,
"input": "#Spark SQL\nfrom pyspark.sql import SQLContext\nsqlContext = SQLContext(sc)",
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 51
},
{
"cell_type": "code",
"collapsed": false,
"input": "schemaPeople = sqlContext.inferSchema(parsed_bids)\nschemaPeople.registerAsTable(\"bids\")",
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 52
},
{
"cell_type": "code",
"collapsed": false,
"input": "min_max =sqlContext.sql(\"SELECT min(log_timestamp) as ts_min, max(log_timestamp) as ts_max FROM bids\")\n#print min_max.toDebugString()\nprint min_max.collect()",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "[{u'ts_min': u'1406904600', u'ts_max': u'1406905199'}]\n"
}
],
"prompt_number": 53
},
{
"cell_type": "code",
"collapsed": false,
"input": "result = sqlContext.sql(\"\"\"SELECT count(*) as records_c \nFROM bids where log_timestamp>1406904600 and log_timestamp<1406904630\"\"\")\n#print result.toDebugString()\nprint result.collect()",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "[{u'records_c': 3478}]\n"
}
],
"prompt_number": 54
},
{
"cell_type": "code",
"collapsed": false,
"input": "result = sqlContext.sql(\"\"\"SELECT geo_region_code, count(*) as records_c \nFROM bids \nGROUP BY geo_region_code ORDER BY records_c DESC\"\"\")\n#print result.toDebugString()\npprint.pprint(result.take(10))",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "[{u'geo_region_code': u'US-NY', u'records_c': 7358},\n {u'geo_region_code': u'US-FL', u'records_c': 4892},\n {u'geo_region_code': u'US-IL', u'records_c': 4568},\n {u'geo_region_code': u'US-PA', u'records_c': 4435},\n {u'geo_region_code': u'US-NJ', u'records_c': 3765},\n {u'geo_region_code': u'US-OH', u'records_c': 3444},\n {u'geo_region_code': u'US-MA', u'records_c': 3201},\n {u'geo_region_code': u'US-GA', u'records_c': 2875},\n {u'geo_region_code': u'US-TX', u'records_c': 2824},\n {u'geo_region_code': u'US-VA', u'records_c': 2785}]\n"
}
],
"prompt_number": 55
},
{
"cell_type": "code",
"collapsed": true,
"input": "#Lazy\nparsed_bids = raw_bids.map(parse_line_lite)\nkeyd_parsed_bids = parsed_bids.map(lambda bid: (bid[\"user_cookie\"], bid))\ngrouped_bids = parsed_bids.groupByKey()\nprint grouped_bids.toDebugString()",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "PythonRDD[77] at RDD at PythonRDD.scala:37 (12 partitions)\n MappedRDD[76] at values at NativeMethodAccessorImpl.java:-2 (12 partitions)\n ShuffledRDD[75] at partitionBy at NativeMethodAccessorImpl.java:-2 (12 partitions)\n PairwiseRDD[74] at RDD at PythonRDD.scala:252 (1 partitions)\n PythonRDD[73] at RDD at PythonRDD.scala:37 (1 partitions)\n MappedRDD[4] at textFile at NativeMethodAccessorImpl.java:-2 (1 partitions)\n HadoopRDD[3] at textFile at NativeMethodAccessorImpl.java:-2 (1 partitions)\n"
}
],
"prompt_number": 23
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment