Skip to content

Instantly share code, notes, and snippets.

@stanislav-chetvertkov
Created July 23, 2018 10:27
Show Gist options
  • Save stanislav-chetvertkov/75c012d33488fafa15cbd0e0c9f00eef to your computer and use it in GitHub Desktop.
Save stanislav-chetvertkov/75c012d33488fafa15cbd0e0c9f00eef to your computer and use it in GitHub Desktop.
Twilio Sink Client
import java.time.Instant
import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import cats.data.EitherT
import cats.implicits._
import com.twilio.service.encoding.json.CirceJsonSupport
import com.twilio.service.http.ServiceHttpClient.{ HttpClient, HttpError, IgnoredEntity, ServiceClientError, ServiceClientFailure, ServiceClientResult }
import com.twilio.service.http.{ ServiceHttpClient, SingleRequestHttpClient }
import io.circe.syntax._
import io.circe.{ Decoder, Encoder }
import org.slf4j.LoggerFactory
import akka.http.scaladsl.model.headers._
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }
import SyncClient._
/**
* The reason for this client to exist is that the current sdk client does not support
* https://www.twilio.com/docs/sync/mutation-and-conflict-resolution
*/
class SyncClient(account: String, password: String, realm: String)
(implicit system: ActorSystem, mt: ActorMaterializer, ec: ExecutionContext) extends CirceJsonSupport {
private val logger = LoggerFactory.getLogger(getClass)
private val client: HttpClient = SingleRequestHttpClient.singleRequestHttpClient(toStrictProcessingTime = Some(3.seconds))
private val authorization = Authorization(BasicHttpCredentials(account, password))
private val BaseUrl = baseUrl(realm)
private val Data = "Data"
private val Key = "Key"
private val UniqueName = "UniqueName"
def createDocument(name: String): ServiceClientResult[Unit] = {
val request = HttpRequest(
uri = Uri(s"$BaseUrl/Documents"),
method = HttpMethods.POST,
headers = List(authorization),
entity = FormData(Map(UniqueName -> name)).toEntity(HttpCharsets.`UTF-8`)
)
ServiceHttpClient.executeRequest[IgnoredEntity](client)(request).map(_ => ())
}
/**
* Queries sync document
*
* @return T which is a value of `data` field
*/
def queryDocument[T](name: String)
(implicit decoder: Decoder[DocumentUpdateResponse[T]]): ServiceClientResult[DocumentUpdateResponse[T]] = {
val request = HttpRequest(uri = Uri(s"$BaseUrl/Documents/$name"), method = HttpMethods.GET, headers = List(authorization))
ServiceHttpClient.executeRequest[DocumentUpdateResponse[T]](client)(request)
}
/**
* Tries to update the value in a Sync Map, fails with Http (412 PRECONDITION FAILED)
* if current revision of the Sync Map does not match - meaning it has been changed since and there could be conflicts
*
* @param revision - https://www.twilio.com/docs/sync/mutation-and-conflict-resolution
* @param documentName - name of a Sync Document, defaults to client's one
*/
def optimisticUpdate[T: Encoder](value: T, revision: String, documentName: String)
(implicit decoder: Decoder[DocumentUpdateResponse[T]]): ServiceClientResult[DocumentUpdateResponse[T]] = {
val ifMatch = `If-Match`(EntityTag(revision))
val request = HttpRequest(
uri = Uri(s"$BaseUrl/Documents/$documentName"),
method = HttpMethods.POST,
headers = List(authorization, ifMatch),
entity = FormData(Map(Data -> value.asJson.toString())).toEntity(HttpCharsets.`UTF-8`)
)
ServiceHttpClient.executeRequest[DocumentUpdateResponse[T]](client)(request)
}
def updateDocument[T: Encoder](value: T, documentName: String)
(implicit decoder: Decoder[DocumentUpdateResponse[T]]): ServiceClientResult[DocumentUpdateResponse[T]] = {
val request = HttpRequest(
uri = Uri(s"$BaseUrl/Documents/$documentName/Items"),
method = HttpMethods.POST,
headers = List(authorization),
entity = FormData(Map(Data -> value.asJson.toString())).toEntity(HttpCharsets.`UTF-8`)
)
ServiceHttpClient.executeRequest[DocumentUpdateResponse[T]](client)(request)
}
def updateDocument[T: Encoder: Decoder](value: T, revision: String, documentName: String, retries: Int = 3)
(implicit decoder: Decoder[DocumentUpdateResponse[T]]): ServiceClientResult[DocumentUpdateResponse[T]] = {
optimisticUpdate(value = value, revision = revision, documentName = documentName)
.recoverWith {
case e: HttpError if retries > 0 && e.response.status == StatusCodes.PreconditionFailed =>
logger.info(e.toString)
queryDocument(documentName)
.flatMap(r =>
updateDocument(value = value, revision = r.revision, documentName = documentName, retries = retries - 1)
)
case _ =>
val error: ServiceClientError = ServiceClientFailure(s"Couldn't update $documentName")
EitherT.left[DocumentUpdateResponse[T]](Future.successful(error))
}
}
/**
* Tries to update value in a Sync map and creates it if it does not exist
* @param value - value to create or override with
* @param overrideIf - update only if this precondition evaluates to true
*/
def updateOrCreateDocument[T: Encoder: Decoder](value: T,
overrideIf: (T, T) => Boolean = (current: T, update: T) => current != update,
documentName: String)
(implicit decoder: Decoder[DocumentUpdateResponse[T]]): ServiceClientResult[DocumentUpdateResponse[T]] = {
queryDocument[T](documentName).flatMap { r =>
if (overrideIf(r.data, value)) {
updateDocument[T](value, r.revision, documentName)
} else {
EitherT.leftT[Future, DocumentUpdateResponse[T]](
ServiceClientFailure(s"Can't update value for $documentName because precondition failed for arguments(${r.data} $value)"): ServiceClientError
)
}
}.recoverWith {
case e: HttpError if e.response.status == StatusCodes.NotFound =>
updateDocument[T](value, documentName)
}
}
}
object SyncClient {
/** `default` here is a name alias for IS token that Sync automatically generates for every twilio account */
def baseUrl(realm: String): String = {
val realmName: String = (realm + ".").replace("prod.", "")
s"https://sync.${realmName}twilio.com/v1/Services/default"
}
/**
* Number lock metadata
*/
case class CommittedEventsLock(host: String, lastEventOffset: Long, lastModified: Instant)
val EmptyLock: CommittedEventsLock = CommittedEventsLock("", 0, Instant.now())
/**
* @param revision - version of Sync's Map entry used for conflict resolution
* @param data - locking state [[CommittedEventsLock]]
*/
case class DocumentUpdateResponse[T](revision: String, data: T)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment