Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save harmeetsingh0013/1c9e89ef7061caba46fe9363bb13a2c9 to your computer and use it in GitHub Desktop.
Save harmeetsingh0013/1c9e89ef7061caba46fe9363bb13a2c9 to your computer and use it in GitHub Desktop.
Full - Stack Trace
- should add new user *** FAILED ***
[info] akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://application/user/jdbcCreateTables-singletonProxy#-322546342]] after [20000 ms]. Sender[null] sent message of type "com.lightbend.lagom.internal.persistence.cluster.ClusterStartupTaskActor$Execute$".
[info] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
[info] at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
[info] at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
[info] at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
[info] at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
[info] at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
[info] at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
[info] at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
[info] at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
[info] at java.lang.Thread.run(Thread.java:748)
Event Processor Test Case:
```
class UserEventProcessorSpec extends AsyncWordSpec with BeforeAndAfterAll
with Matchers with MockitoSugar {
var producerStub: ProducerStub[api.UserEvent] = _
private lazy val server = ServiceTest.startServer(ServiceTest.defaultSetup) { ctx =>
new LagomApplication(ctx) with UserComponents with AhcWSComponents with LagomKafkaComponents {
val stubFactory = new ProducerStubFactory(actorSystem, materializer)
producerStub = stubFactory.producer[api.UserEvent](UserKafkaTopic.NAME)
override def serviceLocator = NoServiceLocator
override lazy val readSide: ReadSideTestDriver = new ReadSideTestDriver
}
}
override def afterAll() = server.stop()
private val testDriver = server.application.readSide
private val userRepo = server.application.userRepository
private val offset = new AtomicInteger()
"The user event processor" should {
"add new user " in {
val id = UUID.randomUUID()
val user = User(
id = Some(id),
date = Some(new Timestamp(System.currentTimeMillis())),
name = "James",
mobile1 = "147852369",
company = "James ENT",
vatNo = "5558746",
pinNo = "142001"
)
for {
_ <- feed(user.id.get, UserAdded(user))
repoUser <- userRepo.findUserById(id)
} yield {
repoUser should === (user)
}
}
}
private def feed(itemId: UUID, event: UserEvent) = {
testDriver.feed(itemId.toString, event, Sequence(offset.getAndIncrement))
}
}
```
Event Processor Code:
```
class UserEventProcessor(readSide: JdbcReadSide, userRepository: UserRepository)
(implicit ec: ExecutionContext)
extends ReadSideProcessor[UserEvent] {
override def buildHandler(): ReadSideHandler[UserEvent] = {
readSide.builder[UserEvent]("read_side_offsets")
.setGlobalPrepare(createUserTable)
.setEventHandler[UserAdded](insertNewUser)
.setEventHandler[UserUpdated](updateUserRecord)
.setEventHandler[UserDeleted](deactivateUser)
.build()
}
override def aggregateTags: Set[AggregateEventTag[UserEvent]] = Set(UserEvent.INSTANCE)
private def createUserTable(connection: Connection): Unit = {
val userTable =
"""
|CREATE TABLE IF NOT EXISTS users (
| id uuid NOT NULL,
| date timestamp DEFAULT CURRENT_TIMESTAMP,
| name varchar(100) NOT NULL,
| mobile_1 varchar(15) NOT NULL,
| mobile_2 varchar(45) DEFAULT NULL,
| email varchar(70) DEFAULT NULL,
| company varchar(150) NOT NULL,
| vat_no varchar(45) NOT NULL,
| status varchar(45) NOT NULL DEFAULT 'active',
| address text DEFAULT NULL,
| country varchar(45) DEFAULT NULL,
| state varchar(45) DEFAULT NULL,
| city varchar(45) DEFAULT NULL,
| pin_no varchar(8) NOT NULL,
| PRIMARY KEY (id)
| )
""".stripMargin
val mobile1Index = "CREATE UNIQUE INDEX IF NOT EXISTS mobile_1 on users (mobile_1)"
val vatNoIndex = "CREATE UNIQUE INDEX IF NOT EXISTS vat_no on users (vat_no);"
val pinNoIndex = "CREATE UNIQUE INDEX IF NOT EXISTS pinNoIndex on users (pin_no);"
executePrepareStatements(connection,
Vector(userTable, mobile1Index, vatNoIndex, pinNoIndex))
}
private def executePrepareStatements(connection: Connection, statements: Vector[String]): Unit = {
statements.foreach(stm => tryWith(connection.prepareStatement(stm))(ps => ps.execute()))
}
private def insertNewUser(connection: Connection, started: EventStreamElement[UserAdded]) = {
userRepository.addNewUser(started.event.user)
}
private def updateUserRecord(connection: Connection, started: EventStreamElement[UserUpdated]) = {
userRepository.updateUserDetail(started.event.user)
}
private def deactivateUser(connection: Connection, started: EventStreamElement[UserDeleted]) = {
userRepository.deactivateUser(started.event.id)
}
}
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment