Skip to content

Instantly share code, notes, and snippets.

@ignitz
Last active January 20, 2020 14:17
Show Gist options
  • Save ignitz/d2848ebf2ee29ac0e5c61189841aebf6 to your computer and use it in GitHub Desktop.
Save ignitz/d2848ebf2ee29ac0e5c61189841aebf6 to your computer and use it in GitHub Desktop.
Simple example of hardcoded NiFi cluster with 3 nodes without authentication
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
flowFile = session.get();
if(!flowFile){return;}
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
LocalDate startDate = LocalDate.now().minusDays(1);
String fromDateFormatted = formatter.format(startDate);
flowFile = session.putAttribute(flowFile, "startDate", fromDateFormatted);
flowFile = session.putAttribute(flowFile, "prefix_filename", prefix_filename as String);
session.transfer(flowFile, REL_SUCCESS)
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
flowFile = session.get();
if(!flowFile){return;}
def flowFiles = [] as List<FlowFile>;
String startDate = flowFile.getAttribute('startDate');
LocalDate nowDate = LocalDate.now();
for (LocalDate instanceDate = LocalDate.parse(startDate);
instanceDate < nowDate;
instanceDate = instanceDate.plusDays(1))
{
def newFlowFile = session.create();
newFlowFile = session.putAttribute(newFlowFile, "startDate", instanceDate.toString());
newFlowFile = session.putAttribute(newFlowFile, "prefix_filename", prefix_filename as String);
flowFiles << newFlowFile;
}
session.transfer(flowFiles, REL_SUCCESS);
session.remove(flowFile);
AWSTemplateFormatVersion: '2010-09-09'
Description: Deploys a NiFi cluster.
Parameters:
KeyName:
Description: Name of an existing EC2 KeyPair to enable SSH access to the instance
Type: AWS::EC2::KeyPair::KeyName
ConstraintDescription: must be the name of an existing EC2 KeyPair.
NiFiInstanceType:
Description: NiFi EC2 instance type
Type: String
Default: t2.large
NiFiAMI:
Description: NiFi AMI
Type: String
# Ubuntu
Default: ami-f4cc1de2
Location:
Description: The IP address range that can be used for NiFi and SSH.
Type: String
MinLength: '9'
MaxLength: '18'
Default: '0.0.0.0/0'
AllowedPattern: (\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,2})
ConstraintDescription: must be a valid IP CIDR range of the form x.x.x.x/x.
useVPC:
Type: AWS::EC2::VPC::Id
Description: "VPC to deploy the cluster into."
Default: vpc-XXXXXXXX
useSubnet:
Type: AWS::EC2::Subnet::Id
Description: "Subnet to deploy the cluster into. Must be in the selected VPC."
Default: subnet-XXXXXXXXXX
Resources:
NiFi1Instance:
Type: AWS::EC2::Instance
Properties:
InstanceType: !Ref 'NiFiInstanceType'
SecurityGroupIds:
- !Ref 'NiFiInstanceSecurityGroup'
KeyName: !Ref 'KeyName'
ImageId: !Ref 'NiFiAMI'
SubnetId: !Ref 'useSubnet'
Tags:
- Key: Name
Value: NiFi 1
BlockDeviceMappings:
- DeviceName: /dev/xvda
Ebs:
VolumeSize: 100
DeleteOnTermination: true
VolumeType: gp2
UserData:
Fn::Base64: !Sub |
#!/bin/bash
SELFIP=$(curl http://169.254.169.254/latest/meta-data/local-ipv4)
exec > >(tee /var/log/user-data.log|logger -t user-data -s 2>/dev/console) 2>&1
apt-get update && apt-get -y install openjdk-8-jdk ntp
cd /opt/
curl https://ds-data-lake.s3.amazonaws.com/nifi-1.9.2-bin.tar.gz -o nifi-1.9.2-bin.tar.gz
gunzip -c nifi-1.9.2-bin.tar.gz | tar xvf -
# echo -e "\n$SELFIP nifi-node-1\n$NODE2IP nifi-node-2\n" >> /etc/hosts
hostnamectl set-hostname nifi-node-1
echo -e "nifi-node-1:2888:3888\nserver.2=nifi-node-2:2888:3888\nserver.3=nifi-node-3:2888:3888\n" >> ./nifi-1.9.2/conf/zookeeper.properties
mkdir ./nifi-1.9.2/state && mkdir ./nifi-1.9.2/state/zookeeper
echo 1 > ./nifi-1.9.2/state/zookeeper/myid
sed -i "/nifi.web.http.host=/ s/=.*/=nifi-node-1/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.state.management.embedded.zookeeper.start=/ s/=.*/=true/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.zookeeper.connect.string=/ s/=.*/=nifi-node-1:2181,nifi-node-2:2181,nifi-node-3:2181/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.is.node=/ s/=.*/=true/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.node.address=/ s/=.*/=nifi-node-1/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.node.protocol.port=/ s/=.*/=8082/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.remote.input.host=/ s/=.*/=nifi-node-1/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.remote.input.socket.port=/ s/=.*/=10000/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.flow.election.max.wait.time/ s/=.*/=1 min/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.flow.election.max.candidates=/ s/=.*/=2/" ./nifi-1.9.2/conf/nifi.properties
echo "UserData config done."
./nifi-1.9.2/bin/nifi.sh install && rm nifi-1.9.2-bin.tar.gz
reboot
NiFi2Instance:
Type: AWS::EC2::Instance
Properties:
InstanceType: !Ref 'NiFiInstanceType'
SecurityGroupIds:
- !Ref 'NiFiInstanceSecurityGroup'
KeyName: !Ref 'KeyName'
ImageId: !Ref 'NiFiAMI'
SubnetId: !Ref 'useSubnet'
Tags:
- Key: Name
Value: NiFi 2
BlockDeviceMappings:
- DeviceName: /dev/xvda
Ebs:
VolumeSize: 100
DeleteOnTermination: true
VolumeType: gp2
UserData:
Fn::Base64: !Sub |
#!/bin/bash
SELFIP=$(curl http://169.254.169.254/latest/meta-data/local-ipv4)
exec > >(tee /var/log/user-data.log|logger -t user-data -s 2>/dev/console) 2>&1
apt-get update && apt-get -y install openjdk-8-jdk ntp
cd /opt/
curl https://ds-data-lake.s3.amazonaws.com/nifi-1.9.2-bin.tar.gz -o nifi-1.9.2-bin.tar.gz
gunzip -c nifi-1.9.2-bin.tar.gz | tar xvf -
hostnamectl set-hostname nifi-node-2
echo -e "nifi-node-1:2888:3888\nserver.2=nifi-node-2:2888:3888\nserver.3=nifi-node-3:2888:3888\n" >> ./nifi-1.9.2/conf/zookeeper.properties
mkdir ./nifi-1.9.2/state && mkdir ./nifi-1.9.2/state/zookeeper
echo 2 > ./nifi-1.9.2/state/zookeeper/myid
sed -i "/nifi.web.http.host=/ s/=.*/=nifi-node-2/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.state.management.embedded.zookeeper.start=/ s/=.*/=true/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.zookeeper.connect.string=/ s/=.*/=nifi-node-1:2181,nifi-node-2:2181,nifi-node-3:2181/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.is.node=/ s/=.*/=true/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.node.address=/ s/=.*/=nifi-node-2/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.node.protocol.port=/ s/=.*/=8082/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.remote.input.host=/ s/=.*/=nifi-node-2/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.remote.input.socket.port=/ s/=.*/=10000/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.flow.election.max.wait.time/ s/=.*/=1 min/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.flow.election.max.candidates=/ s/=.*/=2/" ./nifi-1.9.2/conf/nifi.properties
echo "UserData config done."
./nifi-1.9.2/bin/nifi.sh install && rm nifi-1.9.2-bin.tar.gz
reboot
NiFi3Instance:
Type: AWS::EC2::Instance
Properties:
InstanceType: !Ref 'NiFiInstanceType'
SecurityGroupIds:
- !Ref 'NiFiInstanceSecurityGroup'
KeyName: !Ref 'KeyName'
ImageId: !Ref 'NiFiAMI'
SubnetId: !Ref useSubnet
Tags:
- Key: Name
Value: NiFi 3
BlockDeviceMappings:
- DeviceName: /dev/xvda
Ebs:
VolumeSize: 100
DeleteOnTermination: true
VolumeType: gp2
UserData:
Fn::Base64:
!Sub |
#!/bin/bash
SELFIP=$(curl http://169.254.169.254/latest/meta-data/local-ipv4)
exec > >(tee /var/log/user-data.log|logger -t user-data -s 2>/dev/console) 2>&1
apt-get update && apt-get -y install openjdk-8-jdk ntp
cd /opt/
curl https://ds-data-lake.s3.amazonaws.com/nifi-1.9.2-bin.tar.gz -o nifi-1.9.2-bin.tar.gz
gunzip -c nifi-1.9.2-bin.tar.gz | tar xvf -
hostnamectl set-hostname nifi-node-3
echo -e "nifi-node-1:2888:3888\nserver.2=nifi-node-2:2888:3888\nserver.3=nifi-node-3:2888:3888\n" >> ./nifi-1.9.2/conf/zookeeper.properties
mkdir ./nifi-1.9.2/state && mkdir ./nifi-1.9.2/state/zookeeper
echo 3 > ./nifi-1.9.2/state/zookeeper/myid
sed -i "/nifi.web.http.host=/ s/=.*/=nifi-node-3/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.state.management.embedded.zookeeper.start=/ s/=.*/=true/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.zookeeper.connect.string=/ s/=.*/=nifi-node-1:2181,nifi-node-2:2181,nifi-node-3:2181/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.is.node=/ s/=.*/=true/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.node.address=/ s/=.*/=nifi-node-3/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.node.protocol.port=/ s/=.*/=8082/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.remote.input.host=/ s/=.*/=nifi-node-3/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.remote.input.socket.port=/ s/=.*/=10000/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.flow.election.max.wait.time/ s/=.*/=1 min/" ./nifi-1.9.2/conf/nifi.properties
sed -i "/nifi.cluster.flow.election.max.candidates=/ s/=.*/=2/" ./nifi-1.9.2/conf/nifi.properties
echo "UserData config done."
./nifi-1.9.2/bin/nifi.sh install && rm nifi-1.9.2-bin.tar.gz
reboot
#
NiFiInstanceSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Group for NiFi instances
SecurityGroupIngress:
- IpProtocol: '-1'
CidrIp: '0.0.0.0/0'
- IpProtocol: tcp
ToPort: '22'
FromPort: '22'
CidrIp: !Ref 'Location'
VpcId: !Ref 'useVPC'
Outputs: {}
import java.nio.charset.StandardCharsets
flowFile = session.get();
if(!flowFile){return;}
Integer totalItems = Integer.parseInt(flowFile.getAttribute('totalItems'));
String startDate = flowFile.getAttribute('startDate');
String prefix_filename = flowFile.getAttribute('prefix_filename');
def flowFiles = [] as List<FlowFile>;
for (i = 0; i < totalItems; i += 100) {
def newFlowFile = session.create();
newFlowFile = session.putAttribute(newFlowFile, "startIndex", i.toString());
newFlowFile = session.putAttribute(newFlowFile, "startDate", startDate);
newFlowFile = session.putAttribute(newFlowFile, "prefix_filename", prefix_filename as String);
flowFiles << newFlowFile;
}
session.transfer(flowFiles, REL_SUCCESS);
session.remove(flowFile);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment