Last active
May 6, 2022 21:22
-
-
Save radekg/7a01085e377d206497ace539354fb939 to your computer and use it in GitHub Desktop.
Blog: I hacked infinite retention into my open source Kafka: https://gruchalski.com/posts/2021-04-02-kafka-infinite-retention/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/build.gradle b/build.gradle | |
index 4143d8145..ad065bd43 100644 | |
--- a/build.gradle | |
+++ b/build.gradle | |
@@ -761,6 +761,7 @@ project(':core') { | |
compile libs.jacksonJDK8Datatypes | |
compile libs.joptSimple | |
compile libs.metrics | |
+ compile libs.s3sdk | |
compile libs.scalaCollectionCompat | |
compile libs.scalaJava8Compat | |
compile libs.scalaLibrary | |
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala | |
index 37882ffa5..7a69f24dd 100755 | |
--- a/core/src/main/scala/kafka/log/LogSegment.scala | |
+++ b/core/src/main/scala/kafka/log/LogSegment.scala | |
@@ -16,6 +16,9 @@ | |
*/ | |
package kafka.log | |
+import com.amazonaws.services.s3.AmazonS3ClientBuilder | |
+import com.amazonaws.services.s3.model.PutObjectRequest | |
+ | |
import java.io.{File, IOException} | |
import java.nio.file.{Files, NoSuchFileException} | |
import java.nio.file.attribute.FileTime | |
@@ -31,6 +34,7 @@ import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampA | |
import org.apache.kafka.common.record._ | |
import org.apache.kafka.common.utils.{BufferSupplier, Time} | |
+import java.net.InetAddress | |
import scala.jdk.CollectionConverters._ | |
import scala.math._ | |
@@ -619,6 +623,28 @@ class LogSegment private[log] (val log: FileRecords, | |
} | |
} | |
+ if (sys.env.contains("KAFKA_AWS_S3_UPLOADS")) { | |
+ val start = System.currentTimeMillis() | |
+ val parentDirname = log.file().getParent().split(File.separator).toList.last | |
+ val objectKey = InetAddress.getLocalHost().getHostName() + "/" + parentDirname + "/" + log.file().getName() | |
+ try { | |
+ info(s"Uploading segment ${objectKey} with size ${log.sizeInBytes()} prior to delete...") | |
+ val awsClient = AmazonS3ClientBuilder.standard() | |
+ .withPathStyleAccessEnabled(true) | |
+ .withRegion(sys.env.getOrElse("KAFKA_AWS_S3_REGION", "eu-central-1")).build() | |
+ val putResult = awsClient.putObject(new PutObjectRequest( | |
+ sys.env.getOrElse("KAFKA_AWS_S3_BUCKET", "undefined-kafka-s3-bucket"), | |
+ objectKey, | |
+ log.file() | |
+ )) | |
+ val took = System.currentTimeMillis() - start | |
+ info(s"Uploaded segment prior to delete, S3 ETag: ${putResult.getETag()}, took ${took}ms") | |
+ } | |
+ catch { | |
+ case e: Exception => error("Failed uploading segment to S3", e) | |
+ } | |
+ } | |
+ | |
CoreUtils.tryAll(Seq( | |
() => delete(log.deleteIfExists _, "log", log.file, logIfMissing = true), | |
() => delete(lazyOffsetIndex.deleteIfExists _, "offset index", lazyOffsetIndex.file, logIfMissing = true), | |
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle | |
index b472c215e..7cadf5dbc 100644 | |
--- a/gradle/dependencies.gradle | |
+++ b/gradle/dependencies.gradle | |
@@ -105,6 +105,7 @@ versions += [ | |
powermock: "2.0.9", | |
reflections: "0.9.12", | |
rocksDB: "5.18.4", | |
+ s3sdk: "1.11.988", | |
scalaCollectionCompat: "2.3.0", | |
scalafmt: "1.5.1", | |
scalaJava8Compat : "0.9.1", | |
@@ -186,6 +187,7 @@ libs += [ | |
powermockEasymock: "org.powermock:powermock-api-easymock:$versions.powermock", | |
reflections: "org.reflections:reflections:$versions.reflections", | |
rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB", | |
+ s3sdk: "com.amazonaws:aws-java-sdk-s3:$versions.s3sdk", | |
scalaCollectionCompat: "org.scala-lang.modules:scala-collection-compat_$versions.baseScala:$versions.scalaCollectionCompat", | |
scalaJava8Compat: "org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat", | |
scalaLibrary: "org.scala-lang:scala-library:$versions.scala", |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment