Skip to content

Instantly share code, notes, and snippets.

@ryw
Last active May 6, 2016 16:20
Show Gist options
  • Save ryw/3b7d925d9eb6debb2e90b2f9385e27a3 to your computer and use it in GitHub Desktop.
Save ryw/3b7d925d9eb6debb2e90b2f9385e27a3 to your computer and use it in GitHub Desktop.
import { SparkActivity } from 'aries-data';
import AWS from 'aws-sdk';
import moment from 'moment';
export default class AcmeDailyAnalysis extends SparkActivity {
static props = {
name: require('../package.json').name,
version: require('../package.json').version,
};
@singleS3StreamOutput()
async onTask(activityTask, config) {
// Create a dataframe from s3 files.
const dataframe = this.sqlContext.read().jsonSync(this.s3Path());
// Spark query
const ctx = df.select("context.userAgent")
.groupBy('userAgent')
.count()
.sort('count')
// Run the job and return the results.
return ctx.collectSync();
 }
// Return a s3 path for yesterdays data.
s3Path() {
const yesterday = moment.utc().subtract(1, 'day').format('YYYY/MM/DD');
return `s3n://acme-clickstream/data/${yesterday}/*`;
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment