Skip to content

Instantly share code, notes, and snippets.

@abhioncbr
Last active March 7, 2019 13:52
Show Gist options
  • Save abhioncbr/ed962670f77744de3e65c5dcf6d80424 to your computer and use it in GitHub Desktop.
Save abhioncbr/ed962670f77744de3e65c5dcf6d80424 to your computer and use it in GitHub Desktop.
Making S3A Hadoop connector workable with Druid

Apache Druid is a high-performance real-time analytics database. Druid is a unique type of database that combines ideas from OLAP/analytic databases, timeseries databases, and search systems to enable new use cases in real-time architectures. For building a framework for time series trend analysis, prediction model and anomaly detection, I decided to use Druid. As per the requirements, apart from real-time data ingestion, there is a need for batch-based data ingestion too in Druid. After reading several blogs and articles around the production environment setup of Druid cluster for handling petabytes of data, I decided to follow the below architecture:

  • 2 nodes as Druid master which run Druid Coordinator and Overlord processes. Two nodes are for high-availability.
  • 2 nodes as Druid query server which run Druid Broker process. Two nodes are for high-availability and for defining two tiers of queries, i.e. hot and _default_tier. Additionally, Router process on one node for a unique gateway to all Druid API access.
  • 3 nodes as Druid data server which run Druid Historical and MiddleManager processes. Also, three nodes of Zookeeper are also leveraged for running Druid Historical process. Caffeine is used for query results caching.
  • 3 Zookeeper nodes for management of Druid current cluster state.
  • Druid Metadata database node for running Postgres along with Grafana for visualizing Druid cluster metrics.
  • S3 as a Druid Deep storage and also for jobs logs storage.

Below image precisely depicted the Druid architecture. Druid Architecture Image (courtesy: Florian Troßbach) An image is taken from the article Realtime Fast Data Analytics with Druid by Florian Troßbach

Why Druid with workable S3A Hadoop connector is required?

  • Firstly, S3A connector is the newest connector with Hadoop. As per the AmazonS3 Hadoop wiki, previous connectors, i.e., S3 and S3N are deprecated now. With S3A connector there is no need of explicitly providing AWS_ACCESS_KEY & AWS_SECRET_KEY. S3A connector determines the credentials and role from EC2 IAM profile, that makes Druid common runtime properties files much simpler and more generic.
  • Secondly, although even if you are not planning to run Druid indexing job on Hadoop cluster, for ingesting Parquet format data into the Druid, index job should be of type: index_hadoop and hence workable S3A connector is required. Druid supports Parquet format through parquet extension. Since in my case data is in the Parquet format, I need to make it workable.

Druid current state with S3A Hadoop Connector

I am writing this article with perspective to Druid version-0.12.3. Recently, Druid released a new version but till yet I haven't evaluated it for S3A connector.

  • If you try to run aindex_hadoop job using Druid index jobs specs like below
"tuningConfig": {
   "type": "hadoop",
   "jobProperties": {
      "fs.s3a.endpoint": "s3.ca-central-1.amazonaws.com",
      "fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
      "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
      "io.compression.codecs":  "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec"
   }
}

you will get an exception with stacktrace like below:

Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287) ~[?:?]
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669) ~[?:?]
	at org.apache.hadoop.fs.FileSystem.access00(FileSystem.java:94) ~[?:?]
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703) ~[?:?]
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685) ~[?:?]
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373) ~[?:?]
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) ~[?:?]

This is because of the usage of incompatabile version of the hadoop-client libraries and aws-sdk library in Druid code. This issue can be fixed either bumping the hadoop hadoop.compile.version variable to value 2.8.3 from 2.7.3 or by downgrading aws.sdk.version to one lower from 1.10.77 in maven pom.xml file. I decided to follow the first option and after bumping the version, re-built the Druid distribution.

  • After fixing libraries incompatablilty issue, I faced another issue related to the storage path of segment file. Since Druid follows S3n connector, hence default segmentOutputPath value is based on s3n:// uri instead of s3a://. Below is the generated sample job spec for the index job.
    "ioConfig" : {
      "type" : "hadoop",
      "inputSpec" : {
        "type" : "static",
        "paths" : "s3a://experiment-druid/input_data/wikiticker-2015-09-12-sampled.json.gz"
      },
      "metadataUpdateSpec" : null,
      "segmentOutputPath" : "s3n://experiment-druid/deepstorage"
    },

As per the hadoop-index documentation, we can provide segmentOutputPath in ioconfig of the index job spec however, I was getting an error while providing a segment path. For fixing this issue, I found property useS3aSchema in class S3DataSegmentPusherConfig in druid source-code. Setting following below property in druid properties will fix this issue.

druid.storage.useS3aSchema=true
  • Final issue I faced for making it workable was a runtime exception while pushing segment to the S3 deep-storage. Following below is the stacktrace of the execption
java.io.IOException: Output Stream closed
at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83) ~[hadoop-aws-2.8.3.jar:?]
at org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89) ~[hadoop-aws-2.8.3.jar:?]
at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) ~[?:1.8.0_191]
at java.io.DataOutputStream.flush(DataOutputStream.java:123) ~[?:1.8.0_191]

Thanks to the Druid user discussion thread and pull request raised by Christoph Hösler, I was able to fixed the problem by commenting out flush statement in class JobHelper.

Making S3A Hadoop connector workable in a whole

Compiling fixes of all the issues mentioned above, to make S3A Hadoop connector viable with Druid, follow the following steps

  • Rebuild the Druid-0.12.3 source-code branch after changing the hadoop.compile.version property value to 2.8.3 and commenting flush statement in class JobHelper by using command mvn clean package -DskipTests
  • Replace druid-hdfs-storage with newly built artifacts in Druid extensions folder.
  • Copy hadoop-client libraries to 2.8.3 in Druid hadoop-dependencies folder.

Finally, for testing, you can use example index-jobs published in my Github repo.

Happy Druid Exploration!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment