Skip to content

Instantly share code, notes, and snippets.

@chetanmeh
Created October 1, 2018 14:13
Show Gist options
  • Save chetanmeh/8f72ce65224fac24c9224adc5cacddad to your computer and use it in GitHub Desktop.
Save chetanmeh/8f72ce65224fac24c9224adc5cacddad to your computer and use it in GitHub Desktop.
Delete actions concurrently
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package whisk.http
import java.io.File
import java.net.URLEncoder
import java.nio.charset.StandardCharsets
import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import scala.concurrent.duration.DurationInt
import akka.stream.ActorMaterializer
import spray.json.DefaultJsonProtocol._
import spray.json._
import akka.stream.scaladsl.{FileIO, Framing, Sink}
import akka.testkit.TestKit
import akka.util.ByteString
import com.google.common.base.Stopwatch
import common.StreamLogging
import org.junit.runner.RunWith
import org.scalatest.{FlatSpecLike, Matchers}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import whisk.http.PoolingRestClient.mkRequest
import scala.concurrent.{Await, ExecutionContext}
@RunWith(classOf[JUnitRunner])
class CleanNamespaceTests
extends TestKit(ActorSystem("PoolingRestClientTests"))
with FlatSpecLike
with Matchers
with ScalaFutures
with StreamLogging {
implicit val ec: ExecutionContext = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
val file = new File("xxx")
val client = new PoolingRestClient("https", "runtime.adobe.io", 443, 1000)
val username = "xxx"
val password = "xxx"
case class Entity(name: String, entityType: String) {
def entityTypeName = entityType + "s"
}
protected val baseHeaders: List[HttpHeader] =
List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`))
it should "cleanup all listed names" in {
var count = 0
val w = Stopwatch.createStarted()
val f = FileIO
.fromPath(file.toPath)
.via(Framing.delimiter(ByteString("\n"), 100000))
.map(_.utf8String)
.map(parseEntity)
.mapAsyncUnordered(10) { e =>
delete(e).map((e, _))
}
.runWith(Sink.foreach { r =>
val name = r._1.name
r._2 match {
case Left(StatusCodes.NotFound) =>
println(s"$name not found")
case Right(_) =>
println(s"Deleted $name")
case Left(s: StatusCode) =>
println(s"Unknown status for $name - $s")
}
count += 1
if (count % 100 == 0) println(s"========> [$w] Deleted $count so far")
})
Await.result(f, 5.hour)
println(s"Time taken is $w")
}
private def parseEntity(line: String) = {
val Array(name, entityType) = line.split(",")
Entity(name, entityType)
}
private def delete(entity: Entity) = {
client.requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(entity), headers = baseHeaders))
}
private def uri(e: Entity) = uri2("api", "v1", "namespaces", "_", e.entityTypeName, e.name)
protected def uri2(segments: Any*): Uri = {
val encodedSegments = segments.map(s => URLEncoder.encode(s.toString, StandardCharsets.UTF_8.name))
Uri(s"/${encodedSegments.mkString("/")}")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment