Skip to content

Instantly share code, notes, and snippets.

@hamdifourati
Created March 14, 2018 09:44
Show Gist options
  • Save hamdifourati/17064e99f99e8fc1bcc6b39bbd4f7563 to your computer and use it in GitHub Desktop.
Save hamdifourati/17064e99f99e8fc1bcc6b39bbd4f7563 to your computer and use it in GitHub Desktop.
Use Dataflow to count top imported packages in Java
#!/usr/bin/env python
"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import apache_beam as beam
import argparse
def startsWith(line, term):
if line.startswith(term):
yield line
def splitPackageName(packageName):
"""e.g. given com.example.appname.library.widgetname
returns com
com.example
com.example.appname
etc.
"""
result = []
end = packageName.find('.')
while end > 0:
result.append(packageName[0:end])
end = packageName.find('.', end+1)
result.append(packageName)
return result
def getPackages(line, keyword):
start = line.find(keyword) + len(keyword)
end = line.find(';', start)
if start < end:
packageName = line[start:end].strip()
return splitPackageName(packageName)
return []
def packageUse(line, keyword):
packages = getPackages(line, keyword)
for p in packages:
yield (p, 1)
def by_value(kv1, kv2):
key1, value1 = kv1
key2, value2 = kv2
return value1 < value2
PROJECT='test-hamdi'
BUCKET='hamdi-coursera-labs'
def run():
pipeline_args = [
'--project={0}'.format(PROJECT),
'--job_name=topimport',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
p = beam.Pipeline(argv=pipeline_args)
input = 'gs://{0}/top-import/*.java'.format(BUCKET)
output_prefix = 'gs://{0}/top-import/output'.format(BUCKET)
keyword = 'import'
# find most used packages
(p
| 'GetJava' >> beam.io.ReadFromText(input)
| 'GetImports' >> beam.FlatMap(lambda line: startsWith(line, keyword))
| 'PackageUse' >> beam.FlatMap(lambda line: packageUse(line, keyword))
| 'TotalUse' >> beam.CombinePerKey(sum)
| 'Top_5' >> beam.transforms.combiners.Top.Of(5, by_value)
| 'write' >> beam.io.WriteToText(output_prefix)
)
p.run()
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment