Skip to content

Instantly share code, notes, and snippets.

@andreinechaev
Forked from tiry/Nuxeo Queue Importer.md
Last active January 18, 2017 23:30
Show Gist options
  • Save andreinechaev/b28fc903cbdff227e1501ba34086d613 to your computer and use it in GitHub Desktop.
Save andreinechaev/b28fc903cbdff227e1501ba34086d613 to your computer and use it in GitHub Desktop.

Nuxeo Queue Importer

Goal

nuxeo-importer-core contains several sample codes that can be adapted to run imports taking advantage of:

  • thread-pooling
  • batching (import several documents inside a given transaction)
  • event processing filtering (enable bulk mode or skip some events)

This is the most efficient solution to run very fast imports.

However, the default implementation used to come with some limitations and constraints :

  • extending the importer is done in Java
    • this can be an issue for non Java developers
  • multi-threading policy can be complex
    • multi-threading policy depends on the source layout and dependencies between entries
  • if import fails in the middle, then it must be restarted

The work on queue based importer and Kafka aims at addressing these limitations.

Principles and architecture

Decoupling Read from Write

We want the importer infrastructure to promote a clear separation between the 2 sides of the import process :

  • Reader / Producer : the one reading the input data (from files, DB ...)
  • Write / Consumer : the one writing the data into Nuxeo Repository

By decoupling the Reader and Writer, we have several gains :

  • we can get the Writer/Consumer part very generic
    • have a highly optimized importer engine
  • we can run separately the producer and the consumer
    • this means we can more easily re-run the import without being forced to re-run all the pre-processing
  • developers "working on the import process" have mainly to work on the Reader/Producer part
    • this part being mainly decoupled from Nuxeo, they do not have to be Nuxeo developers

Queues

In order to have this decoupling, the idea is to add a queue between the 2 parts of the importer:

Source data => Producer => Queue(s) => Consumer => Import Data in Nuxeo

This is a new implementation of the importer: nuxeo-importer-queues. We clearly split the importer flow in 2 sub parts and have the queue system externalizable.

  • Import part 1
    • read the data from the source
    • build an import message (can include some transformation)
    • en-queue the message
  • Import part 2
    • read the message from the queue
    • create a document inside the repository based on the message

The queue in the middle also allows us to completly decouple the threading model between the 2 parts :

  • part 1 can be mono-threaded if this simpler (since this is usually not the bottleneck)
  • part 2 is by default multi-threaded and batched to increase performances

The queuing system can be proivided by different backend, the current nuxeo-importer-queues currently supports 2 backends :

  • ChronicalQueues
    • in JVM but easy to setup
  • Apache Kafka
    • distributed MOM

Kafka may be a little bit more complex to deploy but in exchange of the additional effort you need to do for the setup it does provide some additional benefits:

  • you can scale the queue between several servers
    • this means that you can run import on different Nuxeo Server nodes
  • you are not limited by available memory
    • Kafka queues on disk
  • you can write the client part in any language supported by Kafka
    • Java, JavaScript, Python, .Net, Ruby ...

Import message

More than the Java API, the real interface you need to implement on the producer side is the message format.

Importer requires you to use the specific formatted messages. You have to use Apache Avro to successfully serialize the message to the required format. We provide Avro template, that should be used for message creaton.

The format tends to be generic for the most use cases.

Default Avro message format:

{
  "namespace": "org.nuxeo.ecm.platform.importer.kafka.avro",
  "type": "record",
  "name": "Message",
  "fields": [
    { "name": "title", "type": "string" },
    { "name": "path", "type": "string" },
    { "name": "folderish", "type": "boolean" },
    { "name": "properties", "type": { "type": "map", "values": "string" }},
    { "name": "parent", "type": "string" },
    { "name": "type", "type": "string" }
  ]
}

The most valuable part is the properties field. It can be field with any custom fields you have in your document.

  • Java developers have advantage of using command line tools for creating Java classes, that will be used for the further object creation
  • Python and JS developers have to create JSON objects, that conform to the reuired format. Avro libraries have tools that will check if the message suits to the template. Messages that do not conform will not be imported.

Importing Binaries

Niether Chronicle Queue nor Apache Kafka are suitable for handling binaries. You should consider each message not more that 30KB. Binaries should be pre-imported to the Binary Manager used on your instances and hash codes added to the the appropriate messages. The Importer expects to find an empty Blob, the blob shpuldn't contain any binaries, only the hash string, that represents an actual binary in your storage.

Using the importer

Setting up the importer imfrastructure

Add marketplace package

  • To use Kafka you required to create a marketplace package, that deploys the DefaultKafkaComponent. The system expects to find the org.nuxeo.ecm.platform.importer.kafka.service.DefaultKafkaComponent contribution. For more details about KafkaConsumer and KafkaProducer configuration, read the official documentation.
    • Below you will find an example of required fields.
    <extension target="org.nuxeo.ecm.platform.importer.kafka.service.DefaultKafkaComponent" point="kafkaConfiguration">
        <kafkaConfig
                bootstrapServer="127.0.0.1:9092">
          <topics>
            <topic>test</topic>
          </topics>
          <producerConfigs>
            <property name="bootstrap.servers">localhost:9092</property>
            <property name="acks">all</property>
            <property name="retries">0</property>
            <property name="batch.size">4194304</property>
            <property name="linger.ms">0</property>
            <property name="max.block.ms">1000</property>
            <property name="compression.type">none</property>
            <property name="key.serializer">org.apache.kafka.common.serialization.StringSerializer</property>
            <property name="value.serializer">org.apache.kafka.common.serialization.StringSerializer</property>
          </producerConfigs>

          <consumerConfigs>
            <property name="bootstrap.servers">localhost:9092</property>
            <property name="group.id">testGroup</property>
            <property name="enable.auto.commit">true</property>
            <property name="auto.offset.reset">earliest</property>
            <property name="auto.commit.interval.ms">1000</property>
            <property name="heartbeat.interval.ms">3000</property>
            <property name="session.timeout.ms">10000</property>
            <property name="request.timeout.ms">15000</property>
            <property name="max.partition.fetch.bytes">30720</property>
            <property name="max.poll.records">100</property>
            <property name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</property>
            <property name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</property>
          </consumerConfigs>
        </kafkaConfig>
      </extension>
  • Any additional fields can be added but you should be aware that the configurations are very sensative.
  • Pay attention to the session.timeout.ms. Time of the conusmer's operation should be known at the planning stage. session.timeout.ms must be less than request.timeout.ms.
  • max.poll.records is the batch.size configuration on the Nuxeo side. You have to keep these numbers equal, otherwise the importer can produce duplicate or lost some messages at the rollback step.

Setup Kafka Broker

  • Required Kafka 0.10.1.0
  • Please follow the official documetnation to deploy Kafka
  • Please follow the official documentation to set up the broker side.
  • Below is an example of the most sensative configurations recomended to use for your own instance.
# The number of threads handling network requests
num.network.threads=4

# The number of threads doing disk I/O
num.io.threads=4

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=5194304

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=5194304

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=519430400

# Enable topic deletion
delete.topic.enable=true

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/var/kafka

auto.create.topics.enable=false
# The number of logical partitions per topic per server. More partitions allow greater parallelism
# for consumption, but also mean more files.
num.partitions=20
#max fetch size
replica.fetch.max.bytes=5242880
# max size of message recieved from producer per topic
message.max.bytes=262144
# compression
compression.type=gzip
  • Please note, that replica.fetch.max.bytes ensures the broker, that the consumer won't grab more than expected. When message.max.bytes ensures, that the broker won't accept a message bigger than the size. So, please configure your consumers and producers accoridngly.

Choosing client

  • Kafka is written in Scala and Java. Although, it has an extensive amount of available clients

Example importer client in JavaScript

XXX

Example importer client in python

Tools required
  • We recomned to use virtual environment for Python, in the example we are using Python 3. Following code should work fine with Python 2 as well.
  • You need to install kafka-python using command line type pip install kafka-python.

First you need to create KafkaProducer, where you pass the Kafka server address and the port, by default it is 9092.

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers='localhost:9092'
)

In this example we will use level-by-level import. It allows us to read from Kafka the topic and import messages in parallel with respect to their level in your document hierarchy.

# buf is an avro serialied message. 
def send(buf, level, producer):
    lv = 'level_' + str(level)
    future = producer.send(
        topic=lv,
        value=buf.getvalue(),
        key=bytes('msg', 'utf-8')
    )
    # await async send
    try:
        metadata = future.get(timeout=1000)
        print(metadata)
    except KafkaError:
        print("Couldn't send message to level " + str(lv))

For more information about kafka-python, please read the following documentation.

Configuring the server side importer

  • threads
    • The best performance can be achieved by using not more than two threads by available core.
    • Note that producers and consumers can be run within the same system.
    • No
  • document factory
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment