Skip to content

Instantly share code, notes, and snippets.

@dapangmao
Last active August 29, 2015 14:10
Show Gist options
  • Save dapangmao/f6fd24ec283f03ef146f to your computer and use it in GitHub Desktop.
Save dapangmao/f6fd24ec283f03ef146f to your computer and use it in GitHub Desktop.
Spark practice (3): clean and sort Social Security numbers

Sample.txt

Requirements:
1. separate valid SSN and invalid SSN
2. count the number of valid SSN
402-94-7709 
283-90-3049 
124-01-2425 
1231232
088-57-9593 
905-60-3585 
44-82-8341
257581087
327-84-0220
402-94-7709

Thoughts

SSN indexed data is commonly seen in many file systems. The trick to accelerate the speed on Spark is to build a numerical key and use the sortByKey operator.

Single machine solution

#!/usr/bin/env python
# coding=utf-8
htable = {}
valid_cnt = 0
with open('sample.txt', 'rb') as infile, open('sample_bad.txt', 'wb') as outfile:
    for l in infile:
        l = l.strip()
        nums = l.split('-')
        key = -1
        if l.isdigit() and len(l) == 9:
            key = int(l)
        if len(nums) == 3 and map(len, nums) == [3, 2, 4]:
            key = 1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2])
        if key == -1:
            outfile.write(l + '\n')
        else:
            if key not in htable:
                htable[key] = l
                valid_cnt += 1

with open('sample_sorted.txt', 'wb') as outfile:
    for x in sorted(htable):
        outfile.write(htable[x] + '\n')
        
print valid_cnt

Cluster solution

#!/usr/bin/env python
# coding=utf-8
import pyspark
sc = pyspark.SparkContext()
valid_cnt = sc.accumulator(0)

def is_validSSN(l):
    l = l.strip()
    nums = l.split('-')
    cdn1 = (l.isdigit() and len(l) == 9)
    cdn2 = (len(nums) == 3 and map(len, nums) == [3, 2, 4])
    if cdn1 or cdn2:
        return True
    return False

def set_key(l):
    global valid_cnt
    valid_cnt += 1
    l = l.strip()
    if len(l) == 9:
        return (int(l), l)
    nums = l.split('-')
    return (1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2]), l)

rdd = sc.textFile('sample.txt')
rdd1 = rdd.filter(lambda x: not is_validSSN(x))

rdd2 = rdd.filter(is_validSSN).distinct() \
    .map(lambda x: set_key(x)) \
    .sortByKey().map(lambda x: x[1])

for x in rdd1.collect():
    print 'Invalid SSN\t', x

for x in rdd2.collect():
    print 'valid SSN\t', x
    
print '\nNumber of valid SSN is {}'.format(valid_cnt)

# Save RDD to file system
rdd1.saveAsTextFile('sample_bad')
rdd2.saveAsTextFile('sample_sorted')
sc.stop()
@grisaitis
Copy link

very nice. i wonder if there's a way to avoid using a global variable here...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment