Skip to content

Instantly share code, notes, and snippets.

@rlowit
Created November 22, 2021 13:31
Show Gist options
  • Save rlowit/0abb6f4c2cecff4f1e7c90ac34151dd1 to your computer and use it in GitHub Desktop.
Save rlowit/0abb6f4c2cecff4f1e7c90ac34151dd1 to your computer and use it in GitHub Desktop.
GRPC Server with NIFI dataflows
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// NOTE: you may need to add the sources generated when running `maven clean compile` to your IDE
// configured source directories. Otherwise, the classes generated when the proto is compiled won't
// be accessible. For IntelliJ, open this module's settings and mark the following as source directories:
//
// * target/generated-sources/protobuf/grpc-java
// * target/generated-sources/protobuf/java
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.apache.nifi.processors.grpc";
option java_outer_classname = "FFSProto";
option objc_class_prefix = "FFS";
package org.apache.nifi.processors.grpc;
// The FlowFile service definition.
service FlowFileService {
// Sends a FlowFile (blocking rpc)
rpc Send (FlowFileRequest) returns (FlowFileReply) {}
}
// The request message
message FlowFileRequest {
// tags 1-15 require one byte to encode and should be left for commonly occurring tags.
// For that reason, tags 3-14 are left available.
//
// source: https://developers.google.com/protocol-buffers/docs/proto3#assigning-tags
int64 id = 1;
map<string, string> attributes = 2;
bytes content = 15;
}
// the reply message
message FlowFileReply {
enum ResponseCode {
ERROR = 0;
SUCCESS = 1;
RETRY = 2;
}
ResponseCode responseCode = 1;
string body = 2;
}
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: flowfile.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='flowfile.proto',
package='org.apache.nifi.processors.grpc',
syntax='proto3',
serialized_options=b'\n\037org.apache.nifi.processors.grpcB\010FFSProtoP\001\242\002\003FFS',
create_key=_descriptor._internal_create_key,
serialized_pb=b'\n\x0e\x66lowfile.proto\x12\x1forg.apache.nifi.processors.grpc\"\xb7\x01\n\x0f\x46lowFileRequest\x12\n\n\x02id\x18\x01 \x01(\x03\x12T\n\nattributes\x18\x02 \x03(\x0b\x32@.org.apache.nifi.processors.grpc.FlowFileRequest.AttributesEntry\x12\x0f\n\x07\x63ontent\x18\x0f \x01(\x0c\x1a\x31\n\x0f\x41ttributesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xa3\x01\n\rFlowFileReply\x12Q\n\x0cresponseCode\x18\x01 \x01(\x0e\x32;.org.apache.nifi.processors.grpc.FlowFileReply.ResponseCode\x12\x0c\n\x04\x62ody\x18\x02 \x01(\t\"1\n\x0cResponseCode\x12\t\n\x05\x45RROR\x10\x00\x12\x0b\n\x07SUCCESS\x10\x01\x12\t\n\x05RETRY\x10\x02\x32}\n\x0f\x46lowFileService\x12j\n\x04Send\x12\x30.org.apache.nifi.processors.grpc.FlowFileRequest\x1a..org.apache.nifi.processors.grpc.FlowFileReply\"\x00\x42\x33\n\x1forg.apache.nifi.processors.grpcB\x08\x46\x46SProtoP\x01\xa2\x02\x03\x46\x46Sb\x06proto3'
)
_FLOWFILEREPLY_RESPONSECODE = _descriptor.EnumDescriptor(
name='ResponseCode',
full_name='org.apache.nifi.processors.grpc.FlowFileReply.ResponseCode',
filename=None,
file=DESCRIPTOR,
create_key=_descriptor._internal_create_key,
values=[
_descriptor.EnumValueDescriptor(
name='ERROR', index=0, number=0,
serialized_options=None,
type=None,
create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='SUCCESS', index=1, number=1,
serialized_options=None,
type=None,
create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='RETRY', index=2, number=2,
serialized_options=None,
type=None,
create_key=_descriptor._internal_create_key),
],
containing_type=None,
serialized_options=None,
serialized_start=352,
serialized_end=401,
)
_sym_db.RegisterEnumDescriptor(_FLOWFILEREPLY_RESPONSECODE)
_FLOWFILEREQUEST_ATTRIBUTESENTRY = _descriptor.Descriptor(
name='AttributesEntry',
full_name='org.apache.nifi.processors.grpc.FlowFileRequest.AttributesEntry',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='org.apache.nifi.processors.grpc.FlowFileRequest.AttributesEntry.key', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='value', full_name='org.apache.nifi.processors.grpc.FlowFileRequest.AttributesEntry.value', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=b'8\001',
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=186,
serialized_end=235,
)
_FLOWFILEREQUEST = _descriptor.Descriptor(
name='FlowFileRequest',
full_name='org.apache.nifi.processors.grpc.FlowFileRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='id', full_name='org.apache.nifi.processors.grpc.FlowFileRequest.id', index=0,
number=1, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='attributes', full_name='org.apache.nifi.processors.grpc.FlowFileRequest.attributes', index=1,
number=2, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='content', full_name='org.apache.nifi.processors.grpc.FlowFileRequest.content', index=2,
number=15, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=b"",
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[_FLOWFILEREQUEST_ATTRIBUTESENTRY, ],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=52,
serialized_end=235,
)
_FLOWFILEREPLY = _descriptor.Descriptor(
name='FlowFileReply',
full_name='org.apache.nifi.processors.grpc.FlowFileReply',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='responseCode', full_name='org.apache.nifi.processors.grpc.FlowFileReply.responseCode', index=0,
number=1, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='body', full_name='org.apache.nifi.processors.grpc.FlowFileReply.body', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
_FLOWFILEREPLY_RESPONSECODE,
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=238,
serialized_end=401,
)
_FLOWFILEREQUEST_ATTRIBUTESENTRY.containing_type = _FLOWFILEREQUEST
_FLOWFILEREQUEST.fields_by_name['attributes'].message_type = _FLOWFILEREQUEST_ATTRIBUTESENTRY
_FLOWFILEREPLY.fields_by_name['responseCode'].enum_type = _FLOWFILEREPLY_RESPONSECODE
_FLOWFILEREPLY_RESPONSECODE.containing_type = _FLOWFILEREPLY
DESCRIPTOR.message_types_by_name['FlowFileRequest'] = _FLOWFILEREQUEST
DESCRIPTOR.message_types_by_name['FlowFileReply'] = _FLOWFILEREPLY
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
FlowFileRequest = _reflection.GeneratedProtocolMessageType('FlowFileRequest', (_message.Message,), {
'AttributesEntry' : _reflection.GeneratedProtocolMessageType('AttributesEntry', (_message.Message,), {
'DESCRIPTOR' : _FLOWFILEREQUEST_ATTRIBUTESENTRY,
'__module__' : 'flowfile_pb2'
# @@protoc_insertion_point(class_scope:org.apache.nifi.processors.grpc.FlowFileRequest.AttributesEntry)
})
,
'DESCRIPTOR' : _FLOWFILEREQUEST,
'__module__' : 'flowfile_pb2'
# @@protoc_insertion_point(class_scope:org.apache.nifi.processors.grpc.FlowFileRequest)
})
_sym_db.RegisterMessage(FlowFileRequest)
_sym_db.RegisterMessage(FlowFileRequest.AttributesEntry)
FlowFileReply = _reflection.GeneratedProtocolMessageType('FlowFileReply', (_message.Message,), {
'DESCRIPTOR' : _FLOWFILEREPLY,
'__module__' : 'flowfile_pb2'
# @@protoc_insertion_point(class_scope:org.apache.nifi.processors.grpc.FlowFileReply)
})
_sym_db.RegisterMessage(FlowFileReply)
DESCRIPTOR._options = None
_FLOWFILEREQUEST_ATTRIBUTESENTRY._options = None
_FLOWFILESERVICE = _descriptor.ServiceDescriptor(
name='FlowFileService',
full_name='org.apache.nifi.processors.grpc.FlowFileService',
file=DESCRIPTOR,
index=0,
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_start=403,
serialized_end=528,
methods=[
_descriptor.MethodDescriptor(
name='Send',
full_name='org.apache.nifi.processors.grpc.FlowFileService.Send',
index=0,
containing_service=None,
input_type=_FLOWFILEREQUEST,
output_type=_FLOWFILEREPLY,
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
])
_sym_db.RegisterServiceDescriptor(_FLOWFILESERVICE)
DESCRIPTOR.services_by_name['FlowFileService'] = _FLOWFILESERVICE
# @@protoc_insertion_point(module_scope)
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import flowfile_pb2 as flowfile__pb2
class FlowFileServiceStub(object):
"""The FlowFile service definition.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Send = channel.unary_unary(
'/org.apache.nifi.processors.grpc.FlowFileService/Send',
request_serializer=flowfile__pb2.FlowFileRequest.SerializeToString,
response_deserializer=flowfile__pb2.FlowFileReply.FromString,
)
class FlowFileServiceServicer(object):
"""The FlowFile service definition.
"""
def Send(self, request, context):
"""Sends a FlowFile (blocking rpc)
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_FlowFileServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'Send': grpc.unary_unary_rpc_method_handler(
servicer.Send,
request_deserializer=flowfile__pb2.FlowFileRequest.FromString,
response_serializer=flowfile__pb2.FlowFileReply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'org.apache.nifi.processors.grpc.FlowFileService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class FlowFileService(object):
"""The FlowFile service definition.
"""
@staticmethod
def Send(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/org.apache.nifi.processors.grpc.FlowFileService/Send',
flowfile__pb2.FlowFileRequest.SerializeToString,
flowfile__pb2.FlowFileReply.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
##########################################################
##########################################################
# RESTART THE DOCKER CONTAINER WHEN YOU CHANGE THIS CODE #
##########################################################
##########################################################
import grpc
import asyncio
from grpc import aio
import sys
from concurrent import futures
import flowfile_pb2_grpc as pb2_grpc
import flowfile_pb2 as pb2
import logging
import modelpredict
from pympler import muppy, summary
class FlowFileServiceServicer(pb2_grpc.FlowFileServiceServicer):
def __init__(self, *args, **kwargs):
pass
async def Send(self, request, context):
# get the string from the incoming request
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
attributes = request.attributes
content = request.content
# build parameters
uid = attributes["uid"]
sid = attributes["sid"]
method = attributes["invokegrpc.pythonda01.method"]
response = ""
if method == 'modelpredict':
path_files = '/home/pythonda01/modelpredict_files/'
path_model_files = '/home/pythonda01/model_files/'
captureTime = int(attributes["captureTime"])
maxTime = attributes["maxTime"]
modelFileName = attributes["modelFileName"]
response = modelpredict.modelpredict(content, uid, captureTime, maxTime, modelFileName, path_files, path_model_files)
# build the result
logging.info('[' + sid + ']:[' +method + ']=' + response)
# print(response)
result = {'responseCode': pb2.FlowFileReply.ResponseCode.SUCCESS,
'body': response}
return pb2.FlowFileReply(**result)
async def serve():
server = aio.server(futures.ThreadPoolExecutor(max_workers=1))
pb2_grpc.add_FlowFileServiceServicer_to_server(FlowFileServiceServicer(), server)
server.add_insecure_port('[::]:3010')
await server.start()
await server.wait_for_termination()
if __name__ == '__main__':
asyncio.run(serve())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment