Skip to content

Instantly share code, notes, and snippets.

@johnib
Created October 17, 2017 17:36
Show Gist options
  • Save johnib/1f03dfd0e41784a9f420a0f4fd43cb1f to your computer and use it in GitHub Desktop.
Save johnib/1f03dfd0e41784a9f420a0f4fd43cb1f to your computer and use it in GitHub Desktop.
using Microsoft.SCOPE.Types;
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using ScopeRuntime;
public class IngestionLatencyDataProcessor : Processor
{
/// <summary>
/// This method defines the output schema.
/// </summary>
/// <param name="requestedColumns">list of column names passed from the PRODUCE keyword</param>
/// <param name="args"></param>
/// <param name="input">the input's schema</param>
/// <returns></returns>
public override Schema Produces(string[] requestedColumns, string[] args, Schema input)
{
Schema outputSchema = input.Clone();
return outputSchema;
}
public override IEnumerable<Row> Process(RowSet input, Row outputRow, string[] args)
{
foreach (Row row in input.Rows)
{
// this count describes the weight of the aggregated data
long itemCount = row[5].Long;
double aggregatedAvailability = row[4].Double;
// these two describe the amount of transactions to be outputed for each of the groups (within / outof SLA)
long itemsWithinSla = (long) Math.Round(itemCount * aggregatedAvailability);
long itemsOutofSla = (long) Math.Round(itemCount * (1 - aggregatedAvailability));
// the output row has the same values as the input row, except for the Availability column
row.CopyTo(outputRow);
for (long i = 0; i < itemsWithinSla; i++)
{
outputRow[4].Set(1);
yield return outputRow;
}
for (long i = 0; i < itemsOutofSla; i++)
{
outputRow[4].Set(0);
yield return outputRow;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment