Skip to content

Instantly share code, notes, and snippets.

@johnnyaug
Last active December 27, 2022 14:55
Show Gist options
  • Save johnnyaug/a1f11f11cebcf7b1078e9f5379fa8336 to your computer and use it in GitHub Desktop.
Save johnnyaug/a1f11f11cebcf7b1078e9f5379fa8336 to your computer and use it in GitHub Desktop.
Understanding GC in lakeFS
// Databricks notebook source
// MAGIC %md
// MAGIC ### Understanding Garbage Collection in lakeFS
// MAGIC
// MAGIC This notebook will allow you to investigate the results of a GC dry run.
// MAGIC
// MAGIC Run the cells of this notebook one by one.
// MAGIC
// MAGIC **In the next cell, fill in the repository name.**
// COMMAND ----------
val GC_REPO_NAME = "gctest"
// COMMAND ----------
// MAGIC %md
// MAGIC #### Configuring this notebook
// MAGIC
// MAGIC Make sure the cluster running this notebook is configured with your lakeFS credentials using the `spark.hadoop.lakefs.api.*` configurations.
// MAGIC
// MAGIC **If the next cell completes without errors, you're good to go.**
// COMMAND ----------
val apiURL = sc.hadoopConfiguration.get("lakefs.api.url")
val apiAccessKey = sc.hadoopConfiguration.get("lakefs.api.access_key")
val apiSecretKey = sc.hadoopConfiguration.get("lakefs.api.secret_key")
import io.lakefs.clients.api
import io.lakefs.clients.api.{ConfigApi, RetentionApi, ObjectsApi, RepositoriesApi, CommitsApi, RefsApi}
import io.lakefs.clients.api.model._
val client = new api.ApiClient
client.setUsername(apiAccessKey)
client.setPassword(apiSecretKey)
client.setBasePath(apiURL)
client.setReadTimeout(30000)
client.setConnectTimeout(10000)
val commitsApi = new CommitsApi(client)
val objectsApi = new ObjectsApi(client)
val repositoriesApi = new RepositoriesApi(client)
val refsApi = new RefsApi(client)
val configApi = new ConfigApi(client)
repositoriesApi.getRepository(GC_REPO_NAME);
val blockstoreType = configApi.getStorageConfig.getBlockstoreType
// COMMAND ----------
// MAGIC %md
// MAGIC #### Performing the dry run
// MAGIC 1. Run GC with the `spark.hadoop.lakefs.debug.gc.no_delete` configuration set to "true".
// MAGIC 2. Take note of the run ID of the GC (this is a UUID4 string) and fill it in the following cell.
// COMMAND ----------
val RUN_ID="7824a532-60a1-4155-800c-e6f29a808e62"
// COMMAND ----------
// Prepare some addresses for reading and writing metadata:
import java.time.format.DateTimeFormatter
import org.apache.hadoop.fs._;
import scala.collection.mutable.ListBuffer
import spark.implicits._
import io.treeverse.clients.ApiClient
import java.net.URI
val r = repositoriesApi.getRepository(GC_REPO_NAME)
val storageNamespace = ApiClient.translateURI(URI.create(r.getStorageNamespace), blockstoreType).toString
val GC_COMMITS_PATH = s"${storageNamespace}/_lakefs/retention/gc/commits/run_id=${RUN_ID}/"
val GC_ADDRESSES_PATH = s"${storageNamespace}/_lakefs/retention/gc/addresses/run_id=${RUN_ID}/"
val GC_PARQUET_METADATA_PREFIX =s"${storageNamespace}/_lakefs/parquet_metadata/"
val commits = spark.read.option("header", value = true).option("inferSchema", value = true).csv(GC_COMMITS_PATH)
// COMMAND ----------
// MAGIC %md
// MAGIC #### Exploring expired commits
// MAGIC
// MAGIC Expired commits are determined according to garbage collection rules.
// MAGIC When all commits pointing to an object are expired, the object can be deleted from the storage.
// MAGIC
// MAGIC Let's see how many commits are expired, and find some examples.
// COMMAND ----------
// prepare configurations and clients
import io.treeverse.clients.ConfigMapper
import spark.implicits._
import scala.collection.JavaConverters._
val hcValues = sc.broadcast( sc.hadoopConfiguration.iterator.asScala
.filter(c => c.getKey.startsWith("fs.") || c.getKey.startsWith("lakefs."))
.map(entry => (entry.getKey, entry.getValue))
.toArray)
val configMapper = new ConfigMapper(hcValues)
val apiConf = new io.treeverse.clients.APIConfigurations(apiURL, apiAccessKey, apiSecretKey, "1", "10")
val getter = new io.treeverse.clients.LakeFSRangeGetter(apiConf, configMapper)
val helper = new io.treeverse.clients.LakeFSRangeHelper(getter)
val gc = new io.treeverse.clients.GarbageCollector(getter, configMapper)
import scala.collection.mutable.HashMap
var commitMap = new HashMap[String, Commit]
// COMMAND ----------
// get some information about our commits from lakeFS. This may take a few minutes.
import io.lakefs.clients.api.model.Commit
import scala.collection.JavaConverters._
import spark.implicits._
import org.apache.spark.sql.functions._
import io.treeverse.clients.RequestRetryWrapper
val retryWrapper = new RequestRetryWrapper(30000)
for (c <- commits.collect()) {
var wasHereAlready = commitMap.contains(c(0).asInstanceOf[String])
var offset = ""
var hasMore = true
while (!wasHereAlready && hasMore) {
print(".")
val cl = retryWrapper.wrapWithRetry(() => {
refsApi.logCommits(GC_REPO_NAME, c(0).asInstanceOf[String], offset, 1000, null, null)
})
for (r <- cl.getResults.asScala) {
if (commitMap.contains(r.getId)) {
wasHereAlready = true
}
commitMap.put(r.getId, r)
}
offset = cl.getPagination.getNextOffset
hasMore = cl.getPagination.getHasMore
}
}
// save our commits as a dataframe
var commitsDF = commitMap.values.map(c=>(c.getId, c.getCreationDate, c.getParents.asScala)).toSeq.toDF("commit_id", "creation_date", "parents")
commitsDF.write.mode("overwrite").parquet(s"${GC_PARQUET_METADATA_PREFIX}commits")
commitsDF = spark.read.parquet(s"${GC_PARQUET_METADATA_PREFIX}commits").withColumn("creation_date", from_unixtime(col("creation_date"))).join(commits, "commit_id")
// COMMAND ----------
// How many of our commits are expired?
display(commitsDF.groupBy("expired").count)
// COMMAND ----------
// Some examples of expired commits:
display(commitsDF.filter(commitsDF("expired") === true))
// COMMAND ----------
// Some examples of non-expired commits:
display(commitsDF.filter(commitsDF("expired") === false))
// COMMAND ----------
// MAGIC %md
// MAGIC #### Exploring specific deletions
// MAGIC
// MAGIC The following cells will allow you to dive in and understand why specific objects were marked for deletion.
// COMMAND ----------
// Collect metadata about the GC run and about your repository.
// This is the heavy stuff. It can take 15 minutes or more, depending on the size of your cluster.
// Change the number of partitions to 4 * (number of cores in your cluster)
var NUM_PARTITIONS = 4 * 192
import org.apache.spark.sql.functions.rand
import org.apache.spark.sql.functions._
var addresses = spark.read.parquet(GC_ADDRESSES_PATH)
val commitIDs = commits.select("commit_id").as[String].repartition(NUM_PARTITIONS)
// get a dataset of all commit-range pairs:
helper.getRangeIDsWithCommitID(commitIDs, GC_REPO_NAME).toDF("commit_id", "range_id").write.mode("overwrite").parquet(s"${GC_PARQUET_METADATA_PREFIX}commit_ranges")
val commitRanges = spark.read.parquet(s"${GC_PARQUET_METADATA_PREFIX}commit_ranges")
// get all distinct range IDs
commitRanges.select("range_id").distinct.write.mode("overwrite").parquet(s"${GC_PARQUET_METADATA_PREFIX}range_ids")
val rangeIDs = spark.read.parquet(s"${GC_PARQUET_METADATA_PREFIX}range_ids").select("range_id").as[String].repartition(NUM_PARTITIONS)
// get a dataset of all range-address-logical_key tuple:
val rangeAddresses = helper.getRangeEntriesWithRangeID(rangeIDs, GC_REPO_NAME).toDF("range_id", "address", "key")
rangeAddresses.write.partitionBy("range_id").mode("overwrite").option("partitionOverwriteMode", "dynamic").parquet(s"${GC_PARQUET_METADATA_PREFIX}/addresses")
val allRanges = spark.read.parquet(s"${GC_PARQUET_METADATA_PREFIX}/addresses/")
// COMMAND ----------
// Write a summary of this GC run
addresses
.join(allRanges.as("ar"), "address")
.join(commitRanges.as("cr"), "range_id")
.join(commitsDF.as("c"), "commit_id")
.select("ar.address","ar.key","cr.commit_id","c.expired", "c.creation_date")
.write.mode("overwrite").parquet(s"${GC_PARQUET_METADATA_PREFIX}summary/run_id=${RUN_ID}")
val summaryDF = spark.read.parquet(s"${GC_PARQUET_METADATA_PREFIX}summary/run_id=${RUN_ID}")
// COMMAND ----------
// How many objects were marked for deletion?
addresses.count
// COMMAND ----------
// These are some objects that were marked for deletion from your storage.
// Those are addresses in the underlying storage, so they shouldn't mean a lot.
import org.apache.spark.sql.functions.rand
display(addresses.orderBy(rand()).limit(10))
// COMMAND ----------
// choose an address from the previous cell and paste it here:
val ADDRESS_TO_RESEARCH = "b563daccbbff464ea430b551dbf14431"
// we will see the commits containing this object. these are not necessarily commits where it was changed, but commits where the object was present.
// the commits will be sorted in descending creation order, so the ones at the top are likely the last ones in which the object appeared
// therefore, the children of these commits are good candidates for commits where the object was deleted.
// the children column will contain links to the UI, where you can see whether this object was indeed deleted in this commit.
display(
summaryDF.as("f")
.filter(col("address") === ADDRESS_TO_RESEARCH)
.join(commitsDF.as("cd"), array_contains(col("cd.parents"), col("f.commit_id")))
.select(col("f.*"), concat(lit(apiURL.replace("api/v1", s"repositories/${GC_REPO_NAME}/commits/")), col("cd.commit_id"), lit("?prefix="), col("key")).as("children"))
.groupBy("address", "key", "f.commit_id")
.agg(min("expired").as("expired"),max("f.creation_date").as("creation_date"), collect_set("children").as("children"))
.orderBy(desc("creation_date")).limit(1000)
.select("f.key","f.commit_id","creation_date","expired","children")
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment