Skip to content

Instantly share code, notes, and snippets.

@dacr
Created June 28, 2024 23:14
Show Gist options
  • Save dacr/24c9fa53a451121051230a1aae81774f to your computer and use it in GitHub Desktop.
Save dacr/24c9fa53a451121051230a1aae81774f to your computer and use it in GitHub Desktop.
neo4j cypher - stream inserted records using zio / published by https://github.com/dacr/code-examples-manager #42059935-7404-4166-8e74-5ecb74844aa7/18c9aa07d4c6103405495898000d87fd395b4e2a
// summary : neo4j cypher - stream inserted records using zio
// keywords : scala, zio, scalatest, neo4j, neotypes, cypher, dsl, @testable
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 42059935-7404-4166-8e74-5ecb74844aa7
// created-on : 2024-06-26T15:39:57+02:00
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : scala-cli $file
// ---------------------
//> using scala "3.4.2"
//> using dep io.github.neotypes::neotypes-core:1.1.0
//> using dep io.github.neotypes::neotypes-zio:1.1.0
//> using dep io.github.neotypes::neotypes-zio-stream:1.1.0
//> using dep dev.zio::zio:2.1.4
//> using dep dev.zio::zio-streams:2.1.4
//> using dep dev.zio::zio-interop-reactivestreams:2.0.2
//> using dep org.neo4j.test:neo4j-harness:5.20.0
////> using options -Ykind-projector:underscores
////> using objectWrapper
// ---------------------
import zio.*
import zio.stream.ZStream
import org.neo4j.driver.AuthTokens
import neotypes.{AsyncDriver, GraphDatabase}
import neotypes.mappers.ResultMapper
import neotypes.syntax.all.*
import neotypes.zio.implicits.*
import neotypes.zio.stream.ZioStream
import neotypes.zio.stream.implicits.*
object NeoTypesApp extends ZIOAppDefault {
case class Record(name: String, age: Int)
val insertsQueries = for {
num <- LazyList.from(1).take(100_000)
name = s"joe$num"
age = scala.util.Random.between(1, 130)
qry = c"CREATE (:Person {name: $name, age: $age})"
} yield qry
val buildStreamDriver =
ZIO.acquireRelease(
ZIO
.attempt(org.neo4j.harness.Neo4jBuilders.newInProcessBuilder().build())
.flatMap(embedded => GraphDatabase.streamDriver[ZioStream](embedded.boltURI(), AuthTokens.none()))
)(neo4jDriver => neo4jDriver.close.ignoreLogged)
val app = for {
_ <- Console.printLine("stream demo using ZIO streams")
streamDriver <- buildStreamDriver
batches = insertsQueries.grouped(100).map(_.reduce(_ + _))
_ <- ZStream
.fromIterator(batches)
.mapZIO(batch => batch.execute.void(streamDriver))
.runDrain
_ <- Console.printLine("collecting data")
results = c"MATCH (p: Person) return p"
.query(ResultMapper.fromFunction(Record.apply))
.stream(streamDriver)
count <- results
// .mapZIO(result => Console.printLine(result.name))
.runCount
_ <- Console.printLine(s"found $count results")
} yield ()
override def run = ZIO.scoped(app)
}
NeoTypesApp.main(Array.empty)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment