Last active
September 21, 2023 21:25
-
-
Save gbzarelli/29337b6a79323d2ad8e1aa494e70d38d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private final AmazonS3 amazonS3; | |
private final KafkaTemplate<String, String> kafkaTemplate; | |
private final RedisTemplate<String, Integer> redisTemplate; | |
public static final String DEFAULT_KEY = "default-key"; | |
public static final String KAFKA_TOPIC = "kafka-topic"; | |
public static final String CSV_DELIMITER = ";"; | |
public void readFile(String file) { | |
final var s3Object = amazonS3.getObject("/file-reader-bucket", file); | |
final var objectContent = s3Object.getObjectContent(); | |
final var hasKey = redisTemplate.hasKey(file); | |
final var keyVal = Boolean.TRUE.equals(hasKey) ? redisTemplate.opsForValue().get(file) : 0; | |
final var stopWatch = new StopWatch(); | |
stopWatch.start(); | |
final var count = new AtomicInteger(keyVal); | |
try (final var reader = new BufferedReader(new InputStreamReader(objectContent))) { | |
reader.lines().skip(keyVal).forEach(line -> { | |
final var key = getKeyFromLine(line, 4); | |
kafkaTemplate.send(KAFKA_TOPIC, key, line); | |
count.incrementAndGet(); | |
}); | |
} catch (final Exception e) { | |
e.printStackTrace(); | |
} finally { | |
redisTemplate.opsForValue().set(file, count.get()); | |
stopWatch.stop(); | |
} | |
System.out.println("Elapsed time for file " + file + " : " + stopWatch.getTotalTimeSeconds()); | |
System.out.println("--------------------"); | |
} | |
public String getKeyFromLine(final String line, final int index) { | |
final var tokenizer = new StringTokenizer(line, CSV_DELIMITER); | |
int counter = 0; | |
while (tokenizer.hasMoreTokens()) { | |
final var token = tokenizer.nextToken(); | |
if (counter == index) { | |
return token; | |
} | |
counter++; | |
} | |
return DEFAULT_KEY; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment