Skip to content

Instantly share code, notes, and snippets.

@mmlac
Last active February 2, 2019 01:27
Show Gist options
  • Save mmlac/136b95ae0edcd914027ec280a8c882b4 to your computer and use it in GitHub Desktop.
Save mmlac/136b95ae0edcd914027ec280a8c882b4 to your computer and use it in GitHub Desktop.
/*
* MIT LICENSE
*
* Copyright 2019 Markus Lachinger
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
*/
/* This is just a simple wrapper for the kubernetes-java library, written in Scala, to
* deal with Watches.
*
* I would not recommend using this if you don't already have committed to the Java library, as
* skuber makes this watch much more elegant using Akka Streams. Anyway, I figured this might be
* useful for someone.
*
* The Future will eventually end up on the OkHttp thread, so shouldn't be blocked when returned.
*/
import io.kubernetes.client.ApiClient
import io.kubernetes.client.custom.Quantity
import io.kubernetes.client.models._
import io.kubernetes.client.util.{Config, Watch}
import io.kubernetes.client.apis.{CoreV1Api => JavaCoreV1Api}
// And some more, I used Finagle for the Futurepool, etc
/**
* Generic Watch "waiting" implementation for the Kubernetes Java Client
*
* The hasNext call of the iterator is blocking, so we expect the [[apiClient.getHttpClient.getReadTimeout]]
* to be a reasonably low number (i.e. 30s) to either restart the Watch or realize the task has been interrupted
* and we should not keep watching.
*
*
* @param apiClient client
* @param call Function producing a new Call to execute. Parameter is whether this is a 'watch' Call or not.
* Watch calls are made when we are waiting for progress, a "regular" call is made after a
* watch is restarted to make sure nothing happened in between the two watches that made the
* watch succeed / fail
* @param validate Validation function for watch progress.
* None means everything is still pending / waiting,
* Some(Left(exception)) means what we wanted to do failed, i.e. no PV available, no auto provisioning, etc
* Some(Right(object)) means what we were watching for was successful
* @param watchTypeToken Type Token for GSON to decode the Watch response
* @param callTypeToken Type Token for GSON to decode the 'normal' call.
* Both of these token need to be supplied into the call is type erasure
* will not allow to create them correctly from a [[T]]
* @tparam T Type of the return object of the Watch
* @return On success or failure, Future of the succeeding / failing object. Otherwise throws an exception into the Future
*/
def watcher[T](apiClient: ApiClient,
call: Boolean => Call,
validate: T => Option[Either[Exception, T]],
watchTypeToken: TypeToken[Watch.Response[T]],
callTypeToken: TypeToken[T]
) : Future[T] =
Concurrency.kubernetesFuturePool({ // generic interruptable Finagle Futurepool
def watchOut: T = {
if (Thread.currentThread().isInterrupted) throw new InterruptedException("Thread was interrupted")
val watch = Watch.createWatch[T](
apiClient,
call(true),
watchTypeToken.getType())
//Make blocking call to make sure we didn't miss a success / fail state between Watch restarts
validate(apiClient.execute[T](call(false), callTypeToken.getType).getData) match {
case Some(Left(ex)) => throw ex
case Some(Right(successfulObject)) => successfulObject
case None =>
//Evaluate the watch
try {
// Drop all data that doesn't either succeed or fail (i.e. everything is still just pending)
val iterator = watch.iterator().asScala.map(_.`object`).dropWhile(validate(_).isEmpty)
Try{validate(iterator.next).get} match {
case Failure(a) =>
try{watch.close} catch {case NonFatal(ex)=>}
watchOut
case Success(Left(ex)) => throw ex
case Success(Right(successfulObject)) => successfulObject
}
} catch {
case timeout: RuntimeException if timeout.getCause.isInstanceOf[java.net.SocketTimeoutException] =>
watchOut
} finally {
try{watch.close} catch {case NonFatal(_) =>} // ignore Watch close exception
}
}
}
watchOut
})
}
// Example for creating a PVC
def watchPVCBinding(apiClient: ApiClient,
name: String,
namespace: String
) : Future[V1PersistentVolumeClaim] =
{
def pvcCall(client: JavaCoreV1Api)(watch: Boolean) =
client.listNamespacedPersistentVolumeClaimCall(
namespace, "false", null, s"metadata.name=$name", true,
null, 1, null, null, watch,
null, null)
def watchPVCBound(pvc: V1PersistentVolumeClaim): Option[Either[Exception, V1PersistentVolumeClaim]] =
Option(pvc.getStatus) match {
case None => None
case Some(status) =>
status.getPhase match {
case "Bound" => Some(Right(pvc))
case _ => None
}
}
watcher[V1PersistentVolumeClaim](
apiClient, pvcCall(new JavaCoreV1Api(apiClient)), watchPVCBound,
new TypeToken[Watch.Response[V1PersistentVolumeClaim]](){},
new TypeToken[V1PersistentVolumeClaim](){})
}
def createPVC(
apiClient: ApiClient,
namespace: String,
claimName: String,
accessMode: String, // change to access mode object
size: String,
storageClass: String,
deadline: Instant = Instant.now.plusSeconds(30L),
watchApiClient: ApiClient = volumeWatchClient
) = {
val waitFor = Duration(Instant.now.until(deadline, ChronoUnit.SECONDS), TimeUnit.SECONDS)
val watch =
watchPVCBinding(apiClient, claimName, namespace)
.raiseWithin(waitFor)
// here is how you define how long to maximally wait before giving up and failing
// this will also set the futurepool thread used for waiting to interrupted and therefore
// not renewing the watch as soon as it times out. Even if the watch would produce a result
// after the raise, it wouldn't be set as the Future was already fulfilled
val create = createPVCAsync(apiClient, namespace, claimName, accessMode, size, storageClass)
Future.collect(Seq(create, watch))
// simple way of immediately exiting if the create call didn't succeed
// won't raise the watch thread early
}
def createPVCAsync(apiClient: ApiClient,
namespace: String,
claimName: String,
accessMode: String, // change to access mode object
size: String,
storageClass: String
) = {
val meta = new V1ObjectMeta()
.namespace(namespace)
.name(claimName)
val spec = new V1PersistentVolumeClaimSpec()
.accessModes(List("ReadWriteOnce").asJava)
.volumeMode("Filesystem")
.resources(new V1ResourceRequirements().requests(Map("storage" -> new Quantity(size)) asJava))
.storageClassName("standard")
val promise = Promise[V1PersistentVolumeClaim]()
new JavaCoreV1Api(apiClient)
.createNamespacedPersistentVolumeClaimAsync(
namespace,
new V1PersistentVolumeClaim().metadata(meta).spec(spec),
"false",
FACB[V1PersistentVolumeClaim](promise)) // Just a callback wrapper to set the promise
promise
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment