Created
July 23, 2018 10:27
-
-
Save stanislav-chetvertkov/75c012d33488fafa15cbd0e0c9f00eef to your computer and use it in GitHub Desktop.
Twilio Sink Client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.time.Instant | |
import akka.actor.ActorSystem | |
import akka.http.scaladsl.model._ | |
import akka.stream.ActorMaterializer | |
import cats.data.EitherT | |
import cats.implicits._ | |
import com.twilio.service.encoding.json.CirceJsonSupport | |
import com.twilio.service.http.ServiceHttpClient.{ HttpClient, HttpError, IgnoredEntity, ServiceClientError, ServiceClientFailure, ServiceClientResult } | |
import com.twilio.service.http.{ ServiceHttpClient, SingleRequestHttpClient } | |
import io.circe.syntax._ | |
import io.circe.{ Decoder, Encoder } | |
import org.slf4j.LoggerFactory | |
import akka.http.scaladsl.model.headers._ | |
import scala.concurrent.duration._ | |
import scala.concurrent.{ ExecutionContext, Future } | |
import SyncClient._ | |
/** | |
* The reason for this client to exist is that the current sdk client does not support | |
* https://www.twilio.com/docs/sync/mutation-and-conflict-resolution | |
*/ | |
class SyncClient(account: String, password: String, realm: String) | |
(implicit system: ActorSystem, mt: ActorMaterializer, ec: ExecutionContext) extends CirceJsonSupport { | |
private val logger = LoggerFactory.getLogger(getClass) | |
private val client: HttpClient = SingleRequestHttpClient.singleRequestHttpClient(toStrictProcessingTime = Some(3.seconds)) | |
private val authorization = Authorization(BasicHttpCredentials(account, password)) | |
private val BaseUrl = baseUrl(realm) | |
private val Data = "Data" | |
private val Key = "Key" | |
private val UniqueName = "UniqueName" | |
def createDocument(name: String): ServiceClientResult[Unit] = { | |
val request = HttpRequest( | |
uri = Uri(s"$BaseUrl/Documents"), | |
method = HttpMethods.POST, | |
headers = List(authorization), | |
entity = FormData(Map(UniqueName -> name)).toEntity(HttpCharsets.`UTF-8`) | |
) | |
ServiceHttpClient.executeRequest[IgnoredEntity](client)(request).map(_ => ()) | |
} | |
/** | |
* Queries sync document | |
* | |
* @return T which is a value of `data` field | |
*/ | |
def queryDocument[T](name: String) | |
(implicit decoder: Decoder[DocumentUpdateResponse[T]]): ServiceClientResult[DocumentUpdateResponse[T]] = { | |
val request = HttpRequest(uri = Uri(s"$BaseUrl/Documents/$name"), method = HttpMethods.GET, headers = List(authorization)) | |
ServiceHttpClient.executeRequest[DocumentUpdateResponse[T]](client)(request) | |
} | |
/** | |
* Tries to update the value in a Sync Map, fails with Http (412 PRECONDITION FAILED) | |
* if current revision of the Sync Map does not match - meaning it has been changed since and there could be conflicts | |
* | |
* @param revision - https://www.twilio.com/docs/sync/mutation-and-conflict-resolution | |
* @param documentName - name of a Sync Document, defaults to client's one | |
*/ | |
def optimisticUpdate[T: Encoder](value: T, revision: String, documentName: String) | |
(implicit decoder: Decoder[DocumentUpdateResponse[T]]): ServiceClientResult[DocumentUpdateResponse[T]] = { | |
val ifMatch = `If-Match`(EntityTag(revision)) | |
val request = HttpRequest( | |
uri = Uri(s"$BaseUrl/Documents/$documentName"), | |
method = HttpMethods.POST, | |
headers = List(authorization, ifMatch), | |
entity = FormData(Map(Data -> value.asJson.toString())).toEntity(HttpCharsets.`UTF-8`) | |
) | |
ServiceHttpClient.executeRequest[DocumentUpdateResponse[T]](client)(request) | |
} | |
def updateDocument[T: Encoder](value: T, documentName: String) | |
(implicit decoder: Decoder[DocumentUpdateResponse[T]]): ServiceClientResult[DocumentUpdateResponse[T]] = { | |
val request = HttpRequest( | |
uri = Uri(s"$BaseUrl/Documents/$documentName/Items"), | |
method = HttpMethods.POST, | |
headers = List(authorization), | |
entity = FormData(Map(Data -> value.asJson.toString())).toEntity(HttpCharsets.`UTF-8`) | |
) | |
ServiceHttpClient.executeRequest[DocumentUpdateResponse[T]](client)(request) | |
} | |
def updateDocument[T: Encoder: Decoder](value: T, revision: String, documentName: String, retries: Int = 3) | |
(implicit decoder: Decoder[DocumentUpdateResponse[T]]): ServiceClientResult[DocumentUpdateResponse[T]] = { | |
optimisticUpdate(value = value, revision = revision, documentName = documentName) | |
.recoverWith { | |
case e: HttpError if retries > 0 && e.response.status == StatusCodes.PreconditionFailed => | |
logger.info(e.toString) | |
queryDocument(documentName) | |
.flatMap(r => | |
updateDocument(value = value, revision = r.revision, documentName = documentName, retries = retries - 1) | |
) | |
case _ => | |
val error: ServiceClientError = ServiceClientFailure(s"Couldn't update $documentName") | |
EitherT.left[DocumentUpdateResponse[T]](Future.successful(error)) | |
} | |
} | |
/** | |
* Tries to update value in a Sync map and creates it if it does not exist | |
* @param value - value to create or override with | |
* @param overrideIf - update only if this precondition evaluates to true | |
*/ | |
def updateOrCreateDocument[T: Encoder: Decoder](value: T, | |
overrideIf: (T, T) => Boolean = (current: T, update: T) => current != update, | |
documentName: String) | |
(implicit decoder: Decoder[DocumentUpdateResponse[T]]): ServiceClientResult[DocumentUpdateResponse[T]] = { | |
queryDocument[T](documentName).flatMap { r => | |
if (overrideIf(r.data, value)) { | |
updateDocument[T](value, r.revision, documentName) | |
} else { | |
EitherT.leftT[Future, DocumentUpdateResponse[T]]( | |
ServiceClientFailure(s"Can't update value for $documentName because precondition failed for arguments(${r.data} $value)"): ServiceClientError | |
) | |
} | |
}.recoverWith { | |
case e: HttpError if e.response.status == StatusCodes.NotFound => | |
updateDocument[T](value, documentName) | |
} | |
} | |
} | |
object SyncClient { | |
/** `default` here is a name alias for IS token that Sync automatically generates for every twilio account */ | |
def baseUrl(realm: String): String = { | |
val realmName: String = (realm + ".").replace("prod.", "") | |
s"https://sync.${realmName}twilio.com/v1/Services/default" | |
} | |
/** | |
* Number lock metadata | |
*/ | |
case class CommittedEventsLock(host: String, lastEventOffset: Long, lastModified: Instant) | |
val EmptyLock: CommittedEventsLock = CommittedEventsLock("", 0, Instant.now()) | |
/** | |
* @param revision - version of Sync's Map entry used for conflict resolution | |
* @param data - locking state [[CommittedEventsLock]] | |
*/ | |
case class DocumentUpdateResponse[T](revision: String, data: T) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment