Skip to content

Instantly share code, notes, and snippets.

@daschl
Created July 1, 2022 06:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daschl/2b2a862ba744580f3420d76301ab8787 to your computer and use it in GitHub Desktop.
Save daschl/2b2a862ba744580f3420d76301ab8787 to your computer and use it in GitHub Desktop.
Couchbase / Reactor Classpath Issues

Overview

The issue we are running into with the couchbase spark connector under databricks is that there seem to be classpath issues which based on my analysis and limited understandung of the underlying runtime should not happen.

I'm sharing the rough details below, if someone needs access to couchbase cloud credentials to try and reproduce please just ping me directly @daschl on twitter and I'm more than happy to provide those.

Background

The Connector uses the Couchbase Scala SDK underneath which has a direct dependency on project reactor and makes use of it quite heavily.

Looking at the classpath, it looks like there is more than one reactor version there:

%sh

ls -l /databricks/jars/*reactor*
-r-xr-xr-x 1 root root  570335 Jan  1  1970 /databricks/jars/----workspace_spark_3_2--third_party--mssql--io.projectreactor.netty__reactor-netty__0.9.11.RELEASE_shaded.jar
-r-xr-xr-x 1 root root 1598395 Jan  1  1970 /databricks/jars/----workspace_spark_3_2--third_party--mssql--io.projectreactor__reactor-core__3.3.9.RELEASE_shaded.jar
-r-xr-xr-x 1 root root  377742 Jan  1  1970 /databricks/jars/common--encryption--io.projectreactor.netty__reactor-netty-core__1.0.9_shaded.jar
-r-xr-xr-x 1 root root   30282 Jan  1  1970 /databricks/jars/common--encryption--io.projectreactor.netty__reactor-netty-http-brave__1.0.9_shaded.jar
-r-xr-xr-x 1 root root  294654 Jan  1  1970 /databricks/jars/common--encryption--io.projectreactor.netty__reactor-netty-http__1.0.9_shaded.jar
-r-xr-xr-x 1 root root     363 Jan  1  1970 /databricks/jars/common--encryption--io.projectreactor.netty__reactor-netty__1.0.9_shaded.jar
-r-xr-xr-x 1 root root 1746574 Jan  1  1970 /databricks/jars/common--encryption--io.projectreactor__reactor-core__3.4.8_shaded.jar

Now we run the following couchbase code:

import com.couchbase.spark._
import org.apache.spark.sql._
import com.couchbase.client.scala.json.JsonObject
import com.couchbase.spark.kv.Get
import com.couchbase.client.scala.kv.MutateInSpec
import com.couchbase.spark.kv.MutateIn
import com.couchbase.client.scala.kv.LookupInSpec
import com.couchbase.spark.kv.LookupIn
import com.couchbase.client.scala.query.QueryOptions
import com.couchbase.spark.query.QueryOptions
import com.couchbase.client.scala.analytics.AnalyticsOptions

sc
 .couchbaseGet(Seq(Get("airline_10"), Get("airline_10642")))
 .collect()
 .foreach(result => println(result.contentAs[JsonObject]))

Which if connected properly now fails:

NoSuchMethodError: reactor.util.retry.Retry.generateCompanion(Lreactor/core/publisher/Flux;)Lorg/reactivestreams/Publisher;
	at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:68)
	at reactor.core.publisher.FluxRetryWhen.subscribeOrReturn(FluxRetryWhen.java:83)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8455)
	at reactor.core.publisher.Flux.subscribeWith(Flux.java:8642)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8439)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8363)
	at com.couchbase.client.core.transaction.cleanup.CoreTransactionsCleanup.runRegularAttemptsCleanupThread(CoreTransactionsCleanup.java:185)
	at com.couchbase.client.core.transaction.cleanup.CoreTransactionsCleanup.<init>(CoreTransactionsCleanup.java:79)
	at com.couchbase.client.core.Core.<init>(Core.java:278)
	at com.couchbase.client.core.Core.create(Core.java:234)
	at com.couchbase.client.scala.AsyncCluster.<init>(AsyncCluster.scala:84)
	at com.couchbase.client.scala.Cluster.<init>(Cluster.scala:66)
	at com.couchbase.client.scala.Cluster$.$anonfun$connect$1(Cluster.scala:355)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at com.couchbase.client.scala.Cluster$.connect(Cluster.scala:353)
	at com.couchbase.spark.config.CouchbaseConnection.cluster(CouchbaseConnection.scala:71)
	at com.couchbase.spark.kv.KeyValuePartition$.partitionsForIds(KeyValuePartition.scala:34)
	at com.couchbase.spark.kv.GetRDD.getPartitions(GetRDD.scala:66)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:307)
	at scala.Option.getOrElse(Option.scala:189)

Woops, so why would this reactor method not be found? I looked at all the conflicting/other reactor versions and they all seem to have this method.

Next I checked which class actually gets loaded:

val classLoader = sc.getClass.getClassLoader()
val resource = classLoader.getResource("reactor/util/retry/Retry.class")
System.out.println(resource)

and it is...

jar:file:/databricks/jars/common--encryption--io.projectreactor__reactor-core__3.4.8_shaded.jar!/reactor/util/retry/Retry.class
classLoader: ClassLoader = sun.misc.Launcher$AppClassLoader@1b26f7b2
resource: java.net.URL = jar:file:/databricks/jars/common--encryption--io.projectreactor__reactor-core__3.4.8_shaded.jar!/reactor/util/retry/Retry.class

So it's definitely not the one from our couchbase jar, but the 3.4.8 version should also have that method. I downloaded the class file inside the jar and decompiled it and it has it.

More Infos

After attaching my jar as a maven dependency these are all the reactor classes loaded, so it includes "my" jar but it is not used.

def urlses(cl: ClassLoader): Array[java.net.URL] = cl match {
      case null => Array()
      case u: java.net.URLClassLoader => u.getURLs() ++ urlses(cl.getParent)
      case _ => urlses(cl.getParent)
 }
    
    
 urlses(getClass.getClassLoader).filter(_.toString.contains("reactor")).foreach(println)
file:/local_disk0/tmp/addedFile7119892486979666926reactor_core_3_4_17-64d92.jar
file:/local_disk0/tmp/addedFile1425383701293167504reactor_scala_extensions_2_12_0_8_0-b2022.jar
file:/databricks/jars/----workspace_spark_3_2--third_party--mssql--io.projectreactor.netty__reactor-netty__0.9.11.RELEASE_shaded.jar
file:/databricks/jars/----workspace_spark_3_2--third_party--mssql--io.projectreactor__reactor-core__3.3.9.RELEASE_shaded.jar
file:/databricks/jars/common--encryption--io.projectreactor.netty__reactor-netty-core__1.0.9_shaded.jar
file:/databricks/jars/common--encryption--io.projectreactor.netty__reactor-netty-http-brave__1.0.9_shaded.jar
file:/databricks/jars/common--encryption--io.projectreactor.netty__reactor-netty-http__1.0.9_shaded.jar
file:/databricks/jars/common--encryption--io.projectreactor.netty__reactor-netty__1.0.9_shaded.jar
file:/databricks/jars/common--encryption--io.projectreactor__reactor-core__3.4.8_shaded.jar

So the ultimate question is: why is the method not found even if there are clearly multiple jars on the classpath for reactor and they all seem to have that method?

{
"num_workers": 0,
"cluster_name": "test-cluster",
"spark_version": "10.4.x-scala2.12",
"spark_conf": {
"spark.couchbase.password": "fixme",
"spark.couchbase.implicitBucket": "travel-sample",
"spark.couchbase.connectionString": "couchbases://cb.mycluster.cloud.couchbase.com",
"spark.couchbase.username": "fixme",
"spark.databricks.delta.preview.enabled": "true",
"spark.couchbase.security.enableTls": "true"
},
"aws_attributes": {
"first_on_demand": 0,
"availability": "ON_DEMAND",
"zone_id": "us-west-2c",
"spot_bid_price_percent": 100,
"ebs_volume_count": 0
},
"node_type_id": "dev-tier-node",
"driver_node_type_id": "dev-tier-node",
"ssh_public_keys": [],
"custom_tags": {},
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3",
"A": "true"
},
"autotermination_minutes": 120,
"enable_elastic_disk": false,
"cluster_source": "UI",
"init_scripts": [],
"cluster_id": "0630-122359-519auijs"
}
Library added to cluster type maven:
com.couchbase.client:spark-connector_2.12:3.2.1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment