Skip to content

Instantly share code, notes, and snippets.

@thomassuckow
Created November 10, 2017 16:30
Show Gist options
  • Save thomassuckow/d409c006d780d4dbf7979276626f95bd to your computer and use it in GitHub Desktop.
Save thomassuckow/d409c006d780d4dbf7979276626f95bd to your computer and use it in GitHub Desktop.
Nifi Extract Email Body Text Processor
package foo;
import javax.mail.BodyPart;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;
import java.nio.charset.StandardCharsets;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import java.io.IOException;
import java.util.*;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"split", "email"})
@CapabilityDescription("Extracts the plain/text body from a multipart email")
public class ExtractEmailBodyText extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All successfully extracted FlowFiles are routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles are transferred to this relationship when an error occurs")
.build();
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(new Relationship[]{REL_SUCCESS, REL_FAILURE})));
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList());
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
private String getTextFromMessage(Message message) throws MessagingException, IOException {
if (message.isMimeType("text/plain")){
return message.getContent().toString();
}else if (message.isMimeType("multipart/*")) {
String result = "";
MimeMultipart mimeMultipart = (MimeMultipart)message.getContent();
int count = mimeMultipart.getCount();
for (int i = 0; i < count; i ++){
BodyPart bodyPart = mimeMultipart.getBodyPart(i);
if (bodyPart.isMimeType("text/plain")){
result = result + "\n" + bodyPart.getContent();
break; //without break same text appears twice in my tests
}
}
return result;
}
return "";
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
try {
final FlowFile newFlowFile = session.write(flowFile, (inputStream, outputStream) -> {
try {
MimeMessage mime = new MimeMessage(null, inputStream);
outputStream.write(getTextFromMessage(mime).getBytes(StandardCharsets.UTF_8));
} catch( MessagingException e) {
throw new ProcessException("Not an Email", e);
}
});
session.putAttribute(newFlowFile, "mime.type", "text/plain");
session.transfer(newFlowFile, REL_SUCCESS);
}
catch (ProcessException e) {
logger.error("Error: " + e.getLocalizedMessage());
session.transfer(flowFile, REL_FAILURE);
}
session.commit();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment