Last active
July 30, 2021 21:28
-
-
Save marco-schmidt/78f86aa595306535fd51ea30ce823c5c to your computer and use it in GitHub Desktop.
Process textual log files with Java 8's stream API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*.gz | |
*.log |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2021-11-30T13:44:59.334Z | 44 | |
---|---|---|
2021-11-30T13:44:59.400Z | 58 | |
2021-11-30T13:44:59.583Z | 12 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2021-11-30T13:45:00.003Z | 29 |
---|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.BufferedWriter; | |
import java.io.File; | |
import java.io.FileOutputStream; | |
import java.io.IOException; | |
import java.io.OutputStreamWriter; | |
import java.io.Writer; | |
import java.nio.charset.StandardCharsets; | |
import java.nio.file.Files; | |
import java.nio.file.Path; | |
import java.nio.file.Paths; | |
import java.time.Instant; | |
import java.time.LocalDate; | |
import java.time.ZoneId; | |
import java.time.ZonedDateTime; | |
import java.time.format.DateTimeFormatter; | |
import java.time.temporal.ChronoUnit; | |
import java.time.temporal.TemporalUnit; | |
import java.util.Random; | |
import java.util.zip.GZIPOutputStream; | |
/** | |
* Generate log files to be used with {@link LogProcessor}. | |
* | |
* <p> | |
* With Java 11+ run like this: | |
* </p> | |
* <pre> | |
* java LogGenerator.java | |
* </pre> | |
* <p> | |
* Edit configuration variables to have more or bigger files. | |
* </p> | |
* | |
* @author Marco Schmidt | |
*/ | |
public class LogGenerator | |
{ | |
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); | |
private static final long MILLIS_PER_DAY = 24L * 60L * 60L * 1000L; | |
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ISO_INSTANT; | |
// configuration options | |
private static boolean gzip = false; | |
private static long intervalMillis = 1000L; // must be from [1 ; 86400000] | |
private static long linesPerInterval = 50L; // must be > 0 | |
private static long minResponseMillis = 100L; // must be > 0 | |
private static int numDays = 10; // must be > 0 | |
private static String outputDirectory = "."; // must be a writable directory | |
private static Random random = new Random(1L); // fixed seed for identical random values | |
private static int responseIntervalMillis = 30000; // must be >= 0 | |
/** | |
* Generate and write all lines for one time interval. | |
*/ | |
static void writeInterval(Writer out, Instant initialTimestamp) throws IOException | |
{ | |
long initialMillis = initialTimestamp.toEpochMilli(); | |
long numLinesLeft = linesPerInterval; | |
while (numLinesLeft-- > 0) | |
{ | |
Instant timestamp = Instant.ofEpochMilli(initialMillis + random.nextInt((int)intervalMillis)); | |
long responseTime = minResponseMillis + (random.nextInt( responseIntervalMillis)); | |
String line = TIMESTAMP_FORMATTER.format(timestamp) + "\t" + responseTime + "\n"; | |
out.write(line); | |
} | |
} | |
/** | |
* Create a single log file for a given day into a given directory. | |
*/ | |
static void createLogFile(Path dir, ZonedDateTime date) | |
{ | |
// assemble file name | |
String dateString = DATE_FORMATTER.format(date); | |
String fileName = dateString + ".log" + (gzip ? ".gz" : ""); | |
// create file object for file to opened and print its full path | |
Path outputPath = dir.resolve(fileName); | |
File outputFile = outputPath.toFile(); | |
System.out.println(outputFile.getAbsolutePath()); | |
// open file, generate lines for all intervals and write those lines to the file | |
Instant timestamp = date.toInstant(); | |
try(Writer out = new BufferedWriter( | |
new OutputStreamWriter( | |
gzip ? | |
new GZIPOutputStream(new FileOutputStream(outputFile)) : | |
new FileOutputStream(outputFile), | |
StandardCharsets.UTF_8))) | |
{ | |
long numIntervals = MILLIS_PER_DAY / intervalMillis; | |
while (numIntervals-- > 0) | |
{ | |
writeInterval(out, timestamp); | |
timestamp = timestamp.plus(intervalMillis, ChronoUnit.MILLIS); | |
} | |
} | |
catch (IOException ioe) | |
{ | |
ioe.printStackTrace(); | |
} | |
} | |
public static void main(String[] args) | |
{ | |
// make sure there's a valid output directory | |
Path dir = Paths.get(outputDirectory); | |
if (!Files.isDirectory(dir)) | |
{ | |
System.err.println("Not a valid directory: '" + outputDirectory + "'."); | |
return; | |
} | |
// start with today as first day for which to generate log files | |
ZonedDateTime date = ZonedDateTime.now(ZoneId.of("UTC")); | |
// reset time section to 00:00:00.000 | |
// TODO: figure out a more concise way to do this | |
date = date.minusHours(date.getHour()); | |
date = date.minusMinutes(date.getMinute()); | |
date = date.minusSeconds(date.getSecond()); | |
date = date.minusNanos(date.getNano()); | |
// generate one file per day | |
while (numDays-- > 0) | |
{ | |
createLogFile(dir, date); | |
date = date.plusDays(1); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.*; | |
import java.nio.file.*; | |
import java.time.Instant; | |
import java.time.format.DateTimeFormatter; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.stream.Collectors; | |
import java.util.stream.Stream; | |
/** | |
* Using Java streams to process log lines. | |
* | |
* <ul> | |
* <li>Split lines by tab character.</li> | |
* <li>Assume first element of line is timestamp and second is an integer response time.</li> | |
* <li>Parse elements into LogRecord objects. Create a normalized timestamp by throwing away milliseconds.</li> | |
* <li>Group by normalized timestamp.</li> | |
* <li>For each normalized point in time, collect response time values, sort them and extract values for each percentile | |
* defined (default: 50 and 90).</li> | |
* <li>Put result into TimestampPercentiles objects.</li> | |
* <li>Print out those objects, timestamp followed by percentiles, everything separated by tab.</li> | |
* </ul> | |
* | |
* With Java 11+ run like this for hard-coded input lines: | |
* | |
* <pre> | |
$ java LogProcessor.java | |
2021-11-30T13:44:59Z 44 58 | |
2021-11-30T13:45:00Z 29 29 | |
* </pre> | |
* | |
* Read from standard input with a single dash as argument: | |
* <pre> | |
* $ cat *.tsv | java LogProcessor.java - | |
* </pre> | |
* | |
* Read from files specified as arguments (generate some with {@link LogGenerator}): | |
* <pre> | |
* $ java LogProcessor.java *.tsv | |
* </pre> | |
* | |
* Released under the Apache License 2.0. | |
* | |
* @author Marco Schmidt | |
* @see https://gist.github.com/marco-schmidt/78f86aa595306535fd51ea30ce823c5c | |
*/ | |
public class LogProcessor | |
{ | |
// configuration options | |
// timestamp conversion | |
static DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT; | |
static DateTimeFormatter parser = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX][X]"); | |
/** | |
* timestamps from this interval in milliseconds will be grouped together | |
* program parameter to override like: -g 120000 | |
*/ | |
static long groupIntervalMillis = 60000L; | |
/** | |
* the percentiles to be computed, each value must be from 0.0 to 100.0; | |
* output of this program will be a timestamp followed by as many values as defined here; | |
* program parameter to override: -p 50.0,75.0,90.0,99,100 | |
*/ | |
static List<Double> percentiles = Arrays.asList(new Double[] | |
{ | |
50.0, 90.0 | |
}); | |
static boolean colorize = false; | |
static long minColorized; | |
static class TimestampPercentiles | |
{ | |
int elements; | |
Instant timestamp; | |
long normalizedTimestampMillis; | |
List<Long> percentiles; | |
@Override | |
public String toString() | |
{ | |
final StringBuilder sb = new StringBuilder(); | |
sb.append(formatter.format(timestamp)); | |
sb.append('\t'); | |
sb.append(elements); | |
for (final Long p : percentiles) | |
{ | |
sb.append('\t'); | |
boolean addColor = colorize && p >= minColorized; | |
if (addColor) | |
{ | |
sb.append("\u001B[31m"); | |
} | |
sb.append(p); | |
if (addColor) | |
{ | |
sb.append("\u001B[0m"); | |
} | |
} | |
return sb.toString(); | |
} | |
} | |
static class LogRecord | |
{ | |
Instant timestamp; | |
long normalizedTimestampMillis; | |
long responseMills; | |
public long getNormalizedTimestampMillis() | |
{ | |
return normalizedTimestampMillis; | |
} | |
} | |
public static Instant toInstant(String s) | |
{ | |
final Instant result = Instant.from(parser.parse(s)); | |
return result; | |
} | |
public static long normalize(Instant i) | |
{ | |
return (i.toEpochMilli() / groupIntervalMillis) * groupIntervalMillis; | |
} | |
public static LogRecord parse(String s) | |
{ | |
final LogRecord r = new LogRecord(); | |
final String[] a = s.split("\t"); | |
r.timestamp = toInstant(a[0]); | |
r.normalizedTimestampMillis = normalize(r.timestamp); | |
r.responseMills = Long.parseLong(a[1]); | |
return r; | |
} | |
public static TimestampPercentiles computePercentiles(List<LogProcessor.LogRecord> list) | |
{ | |
final TimestampPercentiles result = new TimestampPercentiles(); | |
if (!list.isEmpty()) | |
{ | |
result.normalizedTimestampMillis = list.get(0).normalizedTimestampMillis; | |
result.timestamp = Instant.ofEpochMilli(result.normalizedTimestampMillis); | |
// copy response time values from list of log records to array | |
final int numElems = list.size(); | |
result.elements = numElems; | |
final long[] a = new long[numElems]; | |
int index = 0; | |
for (final LogRecord r : list) | |
{ | |
a[index++] = r.responseMills; | |
} | |
// sort array for easy retrieval of percentile values | |
// TODO: instead of sorting the array in O(n log n) there are faster dedicated | |
// algorithms like https://en.wikipedia.org/wiki/Quickselect | |
Arrays.sort(a); | |
// copy percentile value to result object | |
result.percentiles = new ArrayList<Long>(percentiles.size()); | |
for (final double perc : percentiles) | |
{ | |
index = (int) Math.ceil(perc / 100.0 * a.length); | |
result.percentiles.add(a[index - 1]); | |
} | |
} | |
return result; | |
} | |
private static List<Double> parsePercentiles(String s) | |
{ | |
String[] strings = s.split(","); | |
List<Double> result = new ArrayList<>(); | |
for (String item : strings) | |
{ | |
result.add(Double.valueOf(item.trim())); | |
} | |
return result; | |
} | |
private static Stream<String> parseArguments(String[] args) throws Exception | |
{ | |
boolean stdInput = false; | |
List<String> fileNames = new ArrayList<>(); | |
Iterator<String> iter = Arrays.asList(args).iterator(); | |
while (iter.hasNext()) | |
{ | |
String s = iter.next(); | |
if ("-".equals(s)) | |
{ | |
stdInput = true; | |
} | |
else | |
if ("-c".equals(s)) | |
{ | |
minColorized = Long.parseLong(iter.next()); | |
colorize = true; | |
} | |
else | |
if ("-g".equals(s)) | |
{ | |
groupIntervalMillis = Long.parseLong(iter.next()); | |
} | |
else | |
if ("-p".equals(s)) | |
{ | |
percentiles = parsePercentiles(iter.next()); | |
} | |
else | |
{ | |
fileNames.add(s); | |
} | |
} | |
Stream<String> stream; | |
if (stdInput) | |
{ | |
// 1) read from standard input | |
stream = new BufferedReader(new InputStreamReader(System.in)).lines(); | |
} | |
else | |
if (!fileNames.isEmpty()) | |
{ | |
// 2) all arguments are file names, read all of their content | |
stream = fileNames.stream() | |
.flatMap(fileName -> | |
{ | |
try | |
{ | |
return Files.lines(Paths.get(fileName)); | |
} | |
catch (IOException e) | |
{ | |
e.printStackTrace(); | |
} | |
return null; | |
}); | |
} | |
else | |
{ | |
// 3) a few hard-coded lines | |
final String[] LINES = | |
{ | |
"2021-11-30T13:44:59.334Z\t44", "2021-11-30T13:44:59.400Z\t58", "2021-11-30T13:44:59.583Z\t12", | |
"2021-11-30T13:45:00.003Z\t29", | |
}; | |
stream = Arrays.stream(LINES); | |
} | |
return stream; | |
} | |
public static void main(String[] args) throws Exception | |
{ | |
long startMillis = System.currentTimeMillis(); | |
Stream<String> stream = parseArguments(args); | |
stream | |
.map(s -> parse(s)) | |
.collect(Collectors.groupingBy(LogRecord::getNormalizedTimestampMillis)) | |
.values() | |
.stream() | |
.parallel() | |
// compute percentiles for each list of LogRecord objects | |
.map(list -> computePercentiles(list)) | |
.forEach(System.out::println); | |
// print measured time to standard error to not mix it up with the percentile output | |
System.err.println("Time (ms): " + (System.currentTimeMillis() - startMillis)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment