package com.twitter.service.flock.edges.check import scala.collection.immutable.HashMap import org.scalacheck._ import net.lag.configgy.{Config, Configgy} import org.specs.{ScalaCheck, Specification} object EdgesSpec extends Specification with ScalaCheck with Waiter { Configgy.configure(System.getProperty("basedir") + "/config/test.conf") val username = System.getProperty("db.test.user") val password = System.getProperty("db.test.password") var edges: Edges = null "Edges" should { doBefore { val config = Config.fromFile(System.getProperty("basedir") + "/config/" + System.getProperty("stage") + ".conf") val username = config("db.username") val password = config("db.password") val queryEvaluatorFactory = new QueryEvaluatorFactory(username, password) val nameServerQueryEvaluator = queryEvaluatorFactory(config("db.name_server_host"), config("db.name_server_database")) val (e, k) = Edges(config.configMap("edges"), nameServerQueryEvaluator, queryEvaluatorFactory) edges = e } doAfter { edges.stop_writes() val queryEvaluatorFactory = new QueryEvaluatorFactory(username, password) val hostConnection = queryEvaluatorFactory("localhost", null) hostConnection.execute("DROP DATABASE IF EXISTS flock_shard_005") } "be (eventually) consistent" in { new Commands { case class State(forward: Map[(Long, Int), Set[Long]], backward: Map[(Long, Int), Set[Long]]) { def add (sourceId: Long, listId: Int, destinationId: Long) = { val forwardNew = forward.update((sourceId, listId), forward((sourceId, listId)) + destinationId) val backwardNew = backward.update((destinationId, listId), backward((destinationId, listId)) + sourceId) new State(forwardNew, backwardNew) } def remove(sourceId: Long, listId: Int, destinationId: Long) = { val forwardNew = forward.update((sourceId, listId), forward((sourceId, listId)) - destinationId) val backwardNew = backward.update((destinationId, listId), backward((destinationId, listId)) - sourceId) new State(forwardNew, backwardNew) } def count_of_destinations_for(sourceId: Long, listId: Int) = forward(sourceId, listId).size def count_of_sources_for(destinationId: Long, listId: Int) = backward(destinationId, listId).size def contains(sourceId: Long, listId: Int, destinationId: Long) = forward(sourceId, listId).contains(destinationId) } def initialState = { val queryEvaluatorFactory = new QueryEvaluatorFactory(username, password) val hostConnection = queryEvaluatorFactory("localhost", null) hostConnection.execute("DROP DATABASE IF EXISTS flock_shard_005") val forwardShard1 = new gen.ShardInfo ("table_005", "flock_shard_005", "localhost", "localhost") val backwardShard1 = new gen.ShardInfo("table_006", "flock_shard_005", "localhost", "localhost") val forwardShard2 = new gen.ShardInfo ("table_007", "flock_shard_005", "localhost", "localhost") val backwardShard2 = new gen.ShardInfo("table_008", "flock_shard_005", "localhost", "localhost") val forwardShard3 = new gen.ShardInfo ("table_009", "flock_shard_005", "localhost", "localhost") val backwardShard3 = new gen.ShardInfo("table_010", "flock_shard_005", "localhost", "localhost") val forwardShard4 = new gen.ShardInfo ("table_011", "flock_shard_005", "localhost", "localhost") val backwardShard4 = new gen.ShardInfo("table_012", "flock_shard_005", "localhost", "localhost") edges.create_shard(0, gen.List.Follows, forwardShard1, backwardShard1) edges.create_shard(0, gen.List.Blocks, forwardShard2, backwardShard2) edges.create_shard(0, gen.List.FollowsSms, forwardShard3, backwardShard3) edges.create_shard(0, gen.List.RequestsToFollow, forwardShard4, backwardShard4) val map = new HashMap[(Long, Int), Set[Long]] withDefaultValue Set() State(map, map) } case class Add(sourceId: Long, listId: Int, destinationId: Long) extends Command { val time = System.currentTimeMillis def run(s: State) = { println("Adding " + (sourceId, listId, destinationId)) edges.add_at(sourceId, listId, destinationId, time) } def nextState(s: State) = s.add(sourceId, listId, destinationId) preCondition = s => true postCondition = { (s, r) => waitUntil { edges.contains(sourceId, listId, destinationId) } } } case class Remove(sourceId: Long, listId: Int, destinationId: Long) extends Command { val time = System.currentTimeMillis def run(s: State) = { println("Removing " + (sourceId, listId, destinationId)) edges.remove_at(sourceId, listId, destinationId, time) } def nextState(s: State) = s.remove(sourceId, listId, destinationId) preCondition = s => true postCondition = { (s, r) => waitUntil { !edges.contains(sourceId, listId, destinationId) } } } case class Count(sourceId: Long, listId: Int) extends Command { def run(s: State) = { println("Count " + (sourceId, listId)) } def nextState(s: State) = s preCondition = s => true postCondition = { (s, r) => edges.count_of_destinations_for(sourceId, listId) == s.count_of_destinations_for(sourceId, listId) edges.count_of_sources_for(sourceId, listId) == s.count_of_sources_for(sourceId, listId) } } case class Contains(sourceId: Long, listId: Int, destinationId: Long) extends Command { def run(s: State) = { println("Contains " + (sourceId, listId, destinationId)) } def nextState(s: State) = s preCondition = s => true postCondition = { (s, r) => edges.contains(sourceId, listId, destinationId) == s.contains(sourceId, listId, destinationId) } } case class Get(sourceId: Long, listId: Int, destinationId: Long) extends Command { def run(s: State) = { println("Get " + (sourceId, listId, destinationId)) } def nextState(s: State) = s preCondition = s => true postCondition = { (s, r) => if (s.contains(sourceId, listId, destinationId)) { val membership = edges.get(sourceId, listId, destinationId) membership.destination_id == destinationId && membership.created_at <= System.currentTimeMillis } else { try { edges.get(sourceId, listId, destinationId) false } catch { case _ => true } } } } val key = for ( sourceId <- Gen.choose(1L, 10000L); destinationId <- Gen.choose(1L, 10000L); listId <- Gen.elements(gen.List.Follows, gen.List.Blocks, gen.List.RequestsToFollow, gen.List.FollowsSms) ) yield (sourceId, destinationId, listId) val add = for ( (sourceId, destinationId, listId) <- key ) yield Add(sourceId, listId, destinationId) val remove = for ( (sourceId, destinationId, listId) <- key ) yield Remove(sourceId, listId, destinationId) val count = for ( sourceId <- Gen.choose(1L, 10000L); listId <- Gen.elements(gen.List.Follows, gen.List.Blocks, gen.List.RequestsToFollow, gen.List.FollowsSms) ) yield Count(sourceId, listId) val contains = for ( (sourceId, destinationId, listId) <- key ) yield Contains(sourceId, listId, destinationId) val get = for ( (sourceId, destinationId, listId) <- key ) yield Get(sourceId, listId, destinationId) def genCommand(s: State): Gen[Command] = for ( command <- Gen.oneOf(add, remove, count, contains, get) ) yield command }.check } } }