Skip to content

Instantly share code, notes, and snippets.

@gbzarelli
Last active September 21, 2023 21:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gbzarelli/29337b6a79323d2ad8e1aa494e70d38d to your computer and use it in GitHub Desktop.
Save gbzarelli/29337b6a79323d2ad8e1aa494e70d38d to your computer and use it in GitHub Desktop.
private final AmazonS3 amazonS3;
private final KafkaTemplate<String, String> kafkaTemplate;
private final RedisTemplate<String, Integer> redisTemplate;
public static final String DEFAULT_KEY = "default-key";
public static final String KAFKA_TOPIC = "kafka-topic";
public static final String CSV_DELIMITER = ";";
public void readFile(String file) {
final var s3Object = amazonS3.getObject("/file-reader-bucket", file);
final var objectContent = s3Object.getObjectContent();
final var hasKey = redisTemplate.hasKey(file);
final var keyVal = Boolean.TRUE.equals(hasKey) ? redisTemplate.opsForValue().get(file) : 0;
final var stopWatch = new StopWatch();
stopWatch.start();
final var count = new AtomicInteger(keyVal);
try (final var reader = new BufferedReader(new InputStreamReader(objectContent))) {
reader.lines().skip(keyVal).forEach(line -> {
final var key = getKeyFromLine(line, 4);
kafkaTemplate.send(KAFKA_TOPIC, key, line);
count.incrementAndGet();
});
} catch (final Exception e) {
e.printStackTrace();
} finally {
redisTemplate.opsForValue().set(file, count.get());
stopWatch.stop();
}
System.out.println("Elapsed time for file " + file + " : " + stopWatch.getTotalTimeSeconds());
System.out.println("--------------------");
}
public String getKeyFromLine(final String line, final int index) {
final var tokenizer = new StringTokenizer(line, CSV_DELIMITER);
int counter = 0;
while (tokenizer.hasMoreTokens()) {
final var token = tokenizer.nextToken();
if (counter == index) {
return token;
}
counter++;
}
return DEFAULT_KEY;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment