Skip to content

Instantly share code, notes, and snippets.

@marco-schmidt
Last active July 30, 2021 21:28
Show Gist options
  • Save marco-schmidt/78f86aa595306535fd51ea30ce823c5c to your computer and use it in GitHub Desktop.
Save marco-schmidt/78f86aa595306535fd51ea30ce823c5c to your computer and use it in GitHub Desktop.
Process textual log files with Java 8's stream API
2021-11-30T13:44:59.334Z 44
2021-11-30T13:44:59.400Z 58
2021-11-30T13:44:59.583Z 12
2021-11-30T13:45:00.003Z 29
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Random;
import java.util.zip.GZIPOutputStream;
/**
* Generate log files to be used with {@link LogProcessor}.
*
* <p>
* With Java 11+ run like this:
* </p>
* <pre>
* java LogGenerator.java
* </pre>
* <p>
* Edit configuration variables to have more or bigger files.
* </p>
*
* @author Marco Schmidt
*/
public class LogGenerator
{
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final long MILLIS_PER_DAY = 24L * 60L * 60L * 1000L;
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ISO_INSTANT;
// configuration options
private static boolean gzip = false;
private static long intervalMillis = 1000L; // must be from [1 ; 86400000]
private static long linesPerInterval = 50L; // must be > 0
private static long minResponseMillis = 100L; // must be > 0
private static int numDays = 10; // must be > 0
private static String outputDirectory = "."; // must be a writable directory
private static Random random = new Random(1L); // fixed seed for identical random values
private static int responseIntervalMillis = 30000; // must be >= 0
/**
* Generate and write all lines for one time interval.
*/
static void writeInterval(Writer out, Instant initialTimestamp) throws IOException
{
long initialMillis = initialTimestamp.toEpochMilli();
long numLinesLeft = linesPerInterval;
while (numLinesLeft-- > 0)
{
Instant timestamp = Instant.ofEpochMilli(initialMillis + random.nextInt((int)intervalMillis));
long responseTime = minResponseMillis + (random.nextInt( responseIntervalMillis));
String line = TIMESTAMP_FORMATTER.format(timestamp) + "\t" + responseTime + "\n";
out.write(line);
}
}
/**
* Create a single log file for a given day into a given directory.
*/
static void createLogFile(Path dir, ZonedDateTime date)
{
// assemble file name
String dateString = DATE_FORMATTER.format(date);
String fileName = dateString + ".log" + (gzip ? ".gz" : "");
// create file object for file to opened and print its full path
Path outputPath = dir.resolve(fileName);
File outputFile = outputPath.toFile();
System.out.println(outputFile.getAbsolutePath());
// open file, generate lines for all intervals and write those lines to the file
Instant timestamp = date.toInstant();
try(Writer out = new BufferedWriter(
new OutputStreamWriter(
gzip ?
new GZIPOutputStream(new FileOutputStream(outputFile)) :
new FileOutputStream(outputFile),
StandardCharsets.UTF_8)))
{
long numIntervals = MILLIS_PER_DAY / intervalMillis;
while (numIntervals-- > 0)
{
writeInterval(out, timestamp);
timestamp = timestamp.plus(intervalMillis, ChronoUnit.MILLIS);
}
}
catch (IOException ioe)
{
ioe.printStackTrace();
}
}
public static void main(String[] args)
{
// make sure there's a valid output directory
Path dir = Paths.get(outputDirectory);
if (!Files.isDirectory(dir))
{
System.err.println("Not a valid directory: '" + outputDirectory + "'.");
return;
}
// start with today as first day for which to generate log files
ZonedDateTime date = ZonedDateTime.now(ZoneId.of("UTC"));
// reset time section to 00:00:00.000
// TODO: figure out a more concise way to do this
date = date.minusHours(date.getHour());
date = date.minusMinutes(date.getMinute());
date = date.minusSeconds(date.getSecond());
date = date.minusNanos(date.getNano());
// generate one file per day
while (numDays-- > 0)
{
createLogFile(dir, date);
date = date.plusDays(1);
}
}
}
import java.io.*;
import java.nio.file.*;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Using Java streams to process log lines.
*
* <ul>
* <li>Split lines by tab character.</li>
* <li>Assume first element of line is timestamp and second is an integer response time.</li>
* <li>Parse elements into LogRecord objects. Create a normalized timestamp by throwing away milliseconds.</li>
* <li>Group by normalized timestamp.</li>
* <li>For each normalized point in time, collect response time values, sort them and extract values for each percentile
* defined (default: 50 and 90).</li>
* <li>Put result into TimestampPercentiles objects.</li>
* <li>Print out those objects, timestamp followed by percentiles, everything separated by tab.</li>
* </ul>
*
* With Java 11+ run like this for hard-coded input lines:
*
* <pre>
$ java LogProcessor.java
2021-11-30T13:44:59Z 44 58
2021-11-30T13:45:00Z 29 29
* </pre>
*
* Read from standard input with a single dash as argument:
* <pre>
* $ cat *.tsv | java LogProcessor.java -
* </pre>
*
* Read from files specified as arguments (generate some with {@link LogGenerator}):
* <pre>
* $ java LogProcessor.java *.tsv
* </pre>
*
* Released under the Apache License 2.0.
*
* @author Marco Schmidt
* @see https://gist.github.com/marco-schmidt/78f86aa595306535fd51ea30ce823c5c
*/
public class LogProcessor
{
// configuration options
// timestamp conversion
static DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT;
static DateTimeFormatter parser = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX][X]");
/**
* timestamps from this interval in milliseconds will be grouped together
* program parameter to override like: -g 120000
*/
static long groupIntervalMillis = 60000L;
/**
* the percentiles to be computed, each value must be from 0.0 to 100.0;
* output of this program will be a timestamp followed by as many values as defined here;
* program parameter to override: -p 50.0,75.0,90.0,99,100
*/
static List<Double> percentiles = Arrays.asList(new Double[]
{
50.0, 90.0
});
static boolean colorize = false;
static long minColorized;
static class TimestampPercentiles
{
int elements;
Instant timestamp;
long normalizedTimestampMillis;
List<Long> percentiles;
@Override
public String toString()
{
final StringBuilder sb = new StringBuilder();
sb.append(formatter.format(timestamp));
sb.append('\t');
sb.append(elements);
for (final Long p : percentiles)
{
sb.append('\t');
boolean addColor = colorize && p >= minColorized;
if (addColor)
{
sb.append("\u001B[31m");
}
sb.append(p);
if (addColor)
{
sb.append("\u001B[0m");
}
}
return sb.toString();
}
}
static class LogRecord
{
Instant timestamp;
long normalizedTimestampMillis;
long responseMills;
public long getNormalizedTimestampMillis()
{
return normalizedTimestampMillis;
}
}
public static Instant toInstant(String s)
{
final Instant result = Instant.from(parser.parse(s));
return result;
}
public static long normalize(Instant i)
{
return (i.toEpochMilli() / groupIntervalMillis) * groupIntervalMillis;
}
public static LogRecord parse(String s)
{
final LogRecord r = new LogRecord();
final String[] a = s.split("\t");
r.timestamp = toInstant(a[0]);
r.normalizedTimestampMillis = normalize(r.timestamp);
r.responseMills = Long.parseLong(a[1]);
return r;
}
public static TimestampPercentiles computePercentiles(List<LogProcessor.LogRecord> list)
{
final TimestampPercentiles result = new TimestampPercentiles();
if (!list.isEmpty())
{
result.normalizedTimestampMillis = list.get(0).normalizedTimestampMillis;
result.timestamp = Instant.ofEpochMilli(result.normalizedTimestampMillis);
// copy response time values from list of log records to array
final int numElems = list.size();
result.elements = numElems;
final long[] a = new long[numElems];
int index = 0;
for (final LogRecord r : list)
{
a[index++] = r.responseMills;
}
// sort array for easy retrieval of percentile values
// TODO: instead of sorting the array in O(n log n) there are faster dedicated
// algorithms like https://en.wikipedia.org/wiki/Quickselect
Arrays.sort(a);
// copy percentile value to result object
result.percentiles = new ArrayList<Long>(percentiles.size());
for (final double perc : percentiles)
{
index = (int) Math.ceil(perc / 100.0 * a.length);
result.percentiles.add(a[index - 1]);
}
}
return result;
}
private static List<Double> parsePercentiles(String s)
{
String[] strings = s.split(",");
List<Double> result = new ArrayList<>();
for (String item : strings)
{
result.add(Double.valueOf(item.trim()));
}
return result;
}
private static Stream<String> parseArguments(String[] args) throws Exception
{
boolean stdInput = false;
List<String> fileNames = new ArrayList<>();
Iterator<String> iter = Arrays.asList(args).iterator();
while (iter.hasNext())
{
String s = iter.next();
if ("-".equals(s))
{
stdInput = true;
}
else
if ("-c".equals(s))
{
minColorized = Long.parseLong(iter.next());
colorize = true;
}
else
if ("-g".equals(s))
{
groupIntervalMillis = Long.parseLong(iter.next());
}
else
if ("-p".equals(s))
{
percentiles = parsePercentiles(iter.next());
}
else
{
fileNames.add(s);
}
}
Stream<String> stream;
if (stdInput)
{
// 1) read from standard input
stream = new BufferedReader(new InputStreamReader(System.in)).lines();
}
else
if (!fileNames.isEmpty())
{
// 2) all arguments are file names, read all of their content
stream = fileNames.stream()
.flatMap(fileName ->
{
try
{
return Files.lines(Paths.get(fileName));
}
catch (IOException e)
{
e.printStackTrace();
}
return null;
});
}
else
{
// 3) a few hard-coded lines
final String[] LINES =
{
"2021-11-30T13:44:59.334Z\t44", "2021-11-30T13:44:59.400Z\t58", "2021-11-30T13:44:59.583Z\t12",
"2021-11-30T13:45:00.003Z\t29",
};
stream = Arrays.stream(LINES);
}
return stream;
}
public static void main(String[] args) throws Exception
{
long startMillis = System.currentTimeMillis();
Stream<String> stream = parseArguments(args);
stream
.map(s -> parse(s))
.collect(Collectors.groupingBy(LogRecord::getNormalizedTimestampMillis))
.values()
.stream()
.parallel()
// compute percentiles for each list of LogRecord objects
.map(list -> computePercentiles(list))
.forEach(System.out::println);
// print measured time to standard error to not mix it up with the percentile output
System.err.println("Time (ms): " + (System.currentTimeMillis() - startMillis));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment