Skip to content

Instantly share code, notes, and snippets.

@vvagias
Created January 17, 2020 15:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vvagias/3836c11680133691e5b69293d67253f7 to your computer and use it in GitHub Desktop.
Save vvagias/3836c11680133691e5b69293d67253f7 to your computer and use it in GitHub Desktop.
NiFi load IoT data into Kafka for Flink lab
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2">
<description>push IoT Data into Kafka highTemp Topic.
</description>
<groupId>3790f4bc-b7c1-3c67-b301-dbe48fc844a3</groupId>
<name>IoT Data To Kafka</name>
<snippet>
<controllerServices>
<id>88022ac5-b2a7-33c8-0000-000000000000</id>
<parentGroupId>a488add2-cd6f-3753-0000-000000000000</parentGroupId>
<versionedComponentId>50b0ca6d-8b12-3a34-864b-a0f46e8e21fa</versionedComponentId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.0.1.0.1.0-12</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Schema Write Strategy</key>
<value>
<name>Schema Write Strategy</name>
</value>
</entry>
<entry>
<key>schema-cache</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordSchemaCacheService</identifiesControllerService>
<name>schema-cache</name>
</value>
</entry>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-version</key>
<value>
<name>schema-version</name>
</value>
</entry>
<entry>
<key>schema-branch</key>
<value>
<name>schema-branch</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>Pretty Print JSON</key>
<value>
<name>Pretty Print JSON</name>
</value>
</entry>
<entry>
<key>suppress-nulls</key>
<value>
<name>suppress-nulls</name>
</value>
</entry>
<entry>
<key>output-grouping</key>
<value>
<name>output-grouping</name>
</value>
</entry>
<entry>
<key>compression-format</key>
<value>
<name>compression-format</name>
</value>
</entry>
<entry>
<key>compression-level</key>
<value>
<name>compression-level</name>
</value>
</entry>
</descriptors>
<name>JsonRecordSetWriter</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Schema Write Strategy</key>
<value>no-schema</value>
</entry>
<entry>
<key>schema-cache</key>
</entry>
<entry>
<key>schema-access-strategy</key>
<value>inherit-record-schema</value>
</entry>
<entry>
<key>schema-registry</key>
</entry>
<entry>
<key>schema-name</key>
<value>${schema.name}</value>
</entry>
<entry>
<key>schema-version</key>
</entry>
<entry>
<key>schema-branch</key>
</entry>
<entry>
<key>schema-text</key>
<value>${avro.schema}</value>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
<entry>
<key>Pretty Print JSON</key>
<value>false</value>
</entry>
<entry>
<key>suppress-nulls</key>
<value>never-suppress</value>
</entry>
<entry>
<key>output-grouping</key>
<value>output-array</value>
</entry>
<entry>
<key>compression-format</key>
<value>none</value>
</entry>
<entry>
<key>compression-level</key>
<value>1</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.json.JsonRecordSetWriter</type>
</controllerServices>
<controllerServices>
<id>d5d1e0fb-737f-3b83-0000-000000000000</id>
<parentGroupId>a488add2-cd6f-3753-0000-000000000000</parentGroupId>
<versionedComponentId>bf84f94d-619b-3320-bd2c-674c09307010</versionedComponentId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.0.1.0.1.0-12</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-version</key>
<value>
<name>schema-version</name>
</value>
</entry>
<entry>
<key>schema-branch</key>
<value>
<name>schema-branch</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>schema-inference-cache</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordSchemaCacheService</identifiesControllerService>
<name>schema-inference-cache</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
</descriptors>
<name>JsonTreeReader</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>schema-access-strategy</key>
<value>infer-schema</value>
</entry>
<entry>
<key>schema-registry</key>
</entry>
<entry>
<key>schema-name</key>
<value>${schema.name}</value>
</entry>
<entry>
<key>schema-version</key>
</entry>
<entry>
<key>schema-branch</key>
</entry>
<entry>
<key>schema-text</key>
<value>${avro.schema}</value>
</entry>
<entry>
<key>schema-inference-cache</key>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.json.JsonTreeReader</type>
</controllerServices>
<processors>
<id>35e5be48-0ba1-3132-0000-000000000000</id>
<parentGroupId>a488add2-cd6f-3753-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-kafka-2-0-nar</artifact>
<group>org.apache.nifi</group>
<version>1.9.0.1.0.1.0-12</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>bootstrap.servers</key>
<value>
<name>bootstrap.servers</name>
</value>
</entry>
<entry>
<key>topic</key>
<value>
<name>topic</name>
</value>
</entry>
<entry>
<key>record-reader</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
<name>record-reader</name>
</value>
</entry>
<entry>
<key>record-writer</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService>
<name>record-writer</name>
</value>
</entry>
<entry>
<key>use-transactions</key>
<value>
<name>use-transactions</name>
</value>
</entry>
<entry>
<key>acks</key>
<value>
<name>acks</name>
</value>
</entry>
<entry>
<key>attribute-name-regex</key>
<value>
<name>attribute-name-regex</name>
</value>
</entry>
<entry>
<key>message-header-encoding</key>
<value>
<name>message-header-encoding</name>
</value>
</entry>
<entry>
<key>security.protocol</key>
<value>
<name>security.protocol</name>
</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
<value>
<identifiesControllerService>org.apache.nifi.kerberos.KerberosCredentialsService</identifiesControllerService>
<name>kerberos-credentials-service</name>
</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
<value>
<name>sasl.kerberos.service.name</name>
</value>
</entry>
<entry>
<key>sasl.kerberos.principal</key>
<value>
<name>sasl.kerberos.principal</name>
</value>
</entry>
<entry>
<key>sasl.kerberos.keytab</key>
<value>
<name>sasl.kerberos.keytab</name>
</value>
</entry>
<entry>
<key>ssl.context.service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>ssl.context.service</name>
</value>
</entry>
<entry>
<key>message-key-field</key>
<value>
<name>message-key-field</name>
</value>
</entry>
<entry>
<key>max.request.size</key>
<value>
<name>max.request.size</name>
</value>
</entry>
<entry>
<key>ack.wait.time</key>
<value>
<name>ack.wait.time</name>
</value>
</entry>
<entry>
<key>max.block.ms</key>
<value>
<name>max.block.ms</name>
</value>
</entry>
<entry>
<key>partitioner.class</key>
<value>
<name>partitioner.class</name>
</value>
</entry>
<entry>
<key>compression.type</key>
<value>
<name>compression.type</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>bootstrap.servers</key>
<value>edge2ai-1.dim.local:9092</value>
</entry>
<entry>
<key>topic</key>
<value>highTemp</value>
</entry>
<entry>
<key>record-reader</key>
<value>d5d1e0fb-737f-3b83-0000-000000000000</value>
</entry>
<entry>
<key>record-writer</key>
<value>88022ac5-b2a7-33c8-0000-000000000000</value>
</entry>
<entry>
<key>use-transactions</key>
<value>true</value>
</entry>
<entry>
<key>acks</key>
<value>all</value>
</entry>
<entry>
<key>attribute-name-regex</key>
</entry>
<entry>
<key>message-header-encoding</key>
<value>UTF-8</value>
</entry>
<entry>
<key>security.protocol</key>
<value>PLAINTEXT</value>
</entry>
<entry>
<key>kerberos-credentials-service</key>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
</entry>
<entry>
<key>sasl.kerberos.principal</key>
</entry>
<entry>
<key>sasl.kerberos.keytab</key>
</entry>
<entry>
<key>ssl.context.service</key>
</entry>
<entry>
<key>message-key-field</key>
</entry>
<entry>
<key>max.request.size</key>
<value>1 MB</value>
</entry>
<entry>
<key>ack.wait.time</key>
<value>5 secs</value>
</entry>
<entry>
<key>max.block.ms</key>
<value>5 sec</value>
</entry>
<entry>
<key>partitioner.class</key>
<value>org.apache.kafka.clients.producer.internals.DefaultPartitioner</value>
</entry>
<entry>
<key>compression.type</key>
<value>none</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>PublishKafkaRecord_2_0</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_0</type>
</processors>
</snippet>
<timestamp>01/17/2020 14:21:47 UTC</timestamp>
</template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment