Skip to content

Instantly share code, notes, and snippets.

@radekg
Last active May 6, 2022 21:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save radekg/7a01085e377d206497ace539354fb939 to your computer and use it in GitHub Desktop.
Save radekg/7a01085e377d206497ace539354fb939 to your computer and use it in GitHub Desktop.
Blog: I hacked infinite retention into my open source Kafka: https://gruchalski.com/posts/2021-04-02-kafka-infinite-retention/
diff --git a/build.gradle b/build.gradle
index 4143d8145..ad065bd43 100644
--- a/build.gradle
+++ b/build.gradle
@@ -761,6 +761,7 @@ project(':core') {
compile libs.jacksonJDK8Datatypes
compile libs.joptSimple
compile libs.metrics
+ compile libs.s3sdk
compile libs.scalaCollectionCompat
compile libs.scalaJava8Compat
compile libs.scalaLibrary
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 37882ffa5..7a69f24dd 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -16,6 +16,9 @@
*/
package kafka.log
+import com.amazonaws.services.s3.AmazonS3ClientBuilder
+import com.amazonaws.services.s3.model.PutObjectRequest
+
import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import java.nio.file.attribute.FileTime
@@ -31,6 +34,7 @@ import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampA
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
+import java.net.InetAddress
import scala.jdk.CollectionConverters._
import scala.math._
@@ -619,6 +623,28 @@ class LogSegment private[log] (val log: FileRecords,
}
}
+ if (sys.env.contains("KAFKA_AWS_S3_UPLOADS")) {
+ val start = System.currentTimeMillis()
+ val parentDirname = log.file().getParent().split(File.separator).toList.last
+ val objectKey = InetAddress.getLocalHost().getHostName() + "/" + parentDirname + "/" + log.file().getName()
+ try {
+ info(s"Uploading segment ${objectKey} with size ${log.sizeInBytes()} prior to delete...")
+ val awsClient = AmazonS3ClientBuilder.standard()
+ .withPathStyleAccessEnabled(true)
+ .withRegion(sys.env.getOrElse("KAFKA_AWS_S3_REGION", "eu-central-1")).build()
+ val putResult = awsClient.putObject(new PutObjectRequest(
+ sys.env.getOrElse("KAFKA_AWS_S3_BUCKET", "undefined-kafka-s3-bucket"),
+ objectKey,
+ log.file()
+ ))
+ val took = System.currentTimeMillis() - start
+ info(s"Uploaded segment prior to delete, S3 ETag: ${putResult.getETag()}, took ${took}ms")
+ }
+ catch {
+ case e: Exception => error("Failed uploading segment to S3", e)
+ }
+ }
+
CoreUtils.tryAll(Seq(
() => delete(log.deleteIfExists _, "log", log.file, logIfMissing = true),
() => delete(lazyOffsetIndex.deleteIfExists _, "offset index", lazyOffsetIndex.file, logIfMissing = true),
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index b472c215e..7cadf5dbc 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -105,6 +105,7 @@ versions += [
powermock: "2.0.9",
reflections: "0.9.12",
rocksDB: "5.18.4",
+ s3sdk: "1.11.988",
scalaCollectionCompat: "2.3.0",
scalafmt: "1.5.1",
scalaJava8Compat : "0.9.1",
@@ -186,6 +187,7 @@ libs += [
powermockEasymock: "org.powermock:powermock-api-easymock:$versions.powermock",
reflections: "org.reflections:reflections:$versions.reflections",
rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB",
+ s3sdk: "com.amazonaws:aws-java-sdk-s3:$versions.s3sdk",
scalaCollectionCompat: "org.scala-lang.modules:scala-collection-compat_$versions.baseScala:$versions.scalaCollectionCompat",
scalaJava8Compat: "org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat",
scalaLibrary: "org.scala-lang:scala-library:$versions.scala",
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment