Skip to content

Instantly share code, notes, and snippets.

@mumrah
Last active October 27, 2015 19:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mumrah/81f189a920283e5b7b29 to your computer and use it in GitHub Desktop.
Save mumrah/81f189a920283e5b7b29 to your computer and use it in GitHub Desktop.

Things that must match up:

  • @JsonTypeName and @Schema type param in the config class, as well as the @AutoDiscover type in the factory class
  • Generic param of Stage must point to the config class
  • Generic param of factory must point to the stage class
  • @AutoDiscover "to" param must point to the stage class
package com.lucidworks.apollo;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.MapBinder;
import com.lucidworks.apollo.common.pipeline.PipelineDocument;
import com.lucidworks.apollo.component.ItemStore;
import com.lucidworks.fusion.pipeline.*;
import com.lucidworks.fusion.pipeline.impl.AssistedStageProvider;
import com.lucidworks.fusion.pipeline.impl.GuiceStageFactory;
import com.lucidworks.fusion.pipeline.impl.ZooKeeperPipelineDefinitionRegistry;
import com.lucidworks.fusion.pipeline.index.IndexPipelineManager;
import com.lucidworks.fusion.pipeline.impl.MapPipelineDefinitionRegistry;
import com.lucidworks.fusion.pipeline.index.IndexStageClassRegistry;
import com.lucidworks.fusion.pipeline.index.PipelineDocumentMessageFactory;
import com.lucidworks.fusion.pipeline.index.ZkIndexStageConfigurationRegistry;
import com.netflix.governator.annotations.AutoBindSingleton;
import com.netflix.governator.lifecycle.ClasspathScanner;
import java.lang.annotation.Annotation;
import java.util.Collections;
@AutoBindSingleton
public class IndexPipelineGuiceModule extends AbstractModule {
@Override
protected void configure() {
bind(new TypeLiteral<PipelineManager<PipelineDocument>>(){}).to(IndexPipelineManager.class).in(Singleton.class);
bind(PipelineDefinitionRegistry.class).to(ZooKeeperPipelineDefinitionRegistry.class).in(Singleton.class);
bind(new TypeLiteral<ItemStore<PipelineDefinition>>(){}).to(ZooKeeperPipelineDefinitionRegistry.class).in(Singleton.class);
bind(StageClassRegistry.class).to(IndexStageClassRegistry.class).in(Singleton.class);
bind(StageFactory.class).to(GuiceStageFactory.class);
bind(new TypeLiteral<MessageFactory<PipelineDocument>>(){}).to(PipelineDocumentMessageFactory.class);
bind(StageProvider.class).to(AssistedStageProvider.class);
// Binder for the Assisted-Inject stage factories
MapBinder<String, StageAssistFactory<? extends Stage<PipelineDocument, ? extends Configuration>>> stageFactoryBinder = MapBinder.newMapBinder(
binder(),
new TypeLiteral<String>(){},
new TypeLiteral<StageAssistFactory<? extends Stage<PipelineDocument, ? extends Configuration>>>(){}
);
// Binder for the stage classes themselves
MapBinder<String, Class<? extends Stage<PipelineDocument, ? extends Configuration>>> stageClassBinder = MapBinder.newMapBinder(
binder(),
new TypeLiteral<String>(){},
new TypeLiteral<Class<? extends Stage<PipelineDocument, ? extends Configuration>>>(){}
);
// Bind the stage factories and classes found by classpath scanning
// TODO eventually we will load these classes from the plugin system, but for now we scan our own classpath
ClasspathScanner scanner = new ClasspathScanner(
Collections.singletonList("com.lucidworks.fusion.pipeline.index.stages"),
Collections.<Class<? extends Annotation>>singletonList(AutoDiscover.class)
);
System.err.println("Found @AutoDiscover classes: " + scanner.getClasses());
for(Class<?> clazz : scanner.getClasses()) {
AutoDiscover autoDiscover = clazz.getAnnotation(AutoDiscover.class);
stageFactoryBinder.addBinding(autoDiscover.value()).to((Class<? extends StageAssistFactory<? extends Stage<PipelineDocument, ? extends Configuration>>>) clazz);
install(
new FactoryModuleBuilder()
.implement(Stage.class, (Class<? extends Stage>) autoDiscover.to())
.build(clazz));
stageClassBinder.addBinding(autoDiscover.value()).toInstance((Class<? extends Stage<PipelineDocument, ? extends Configuration>>) autoDiscover.to());
}
}
}
@JsonTypeName(RegexFilterConfig.TYPE)
@Schema(
type = RegexFilterConfig.TYPE,
title = "Regular Expression Filter",
description = "This stage can filter entities using regular expressions"
)
public class RegexFilterConfig extends Configuration {
public static final String TYPE = "regex-filter";
@SchemaProperty(title = "Filters")
private final List<RegexFilterRule> filters;
@JsonCreator
public RegexFilterConfig(
@JsonProperty("id") String id,
@JsonProperty("filters") List<RegexFilterRule> rules) {
super(id);
if (rules != null) {
this.filters = Collections.unmodifiableList(rules);
} else {
this.filters = Collections.emptyList();
}
}
@JsonProperty("filters")
public List<RegexFilterRule> getFilters() {
return filters;
}
public static class RegexFilterRule {
@SchemaProperty(name = "sourceField", title = "Source Field", required = true)
private final String sourceField;
@SchemaProperty(name = "pattern", title = "Filter Pattern", required = true)
private final Pattern pattern;
@JsonCreator
public RegexFilterRule(
@JsonProperty("sourceField") String sourceField,
@JsonProperty("pattern") String pattern) throws RESTValidationError {
this.sourceField = sourceField;
this.pattern = SchemaValidator.parseRegex("pattern", pattern);
}
@JsonProperty("sourceField")
public String getSourceField() {
return sourceField;
}
@JsonProperty("pattern")
public Pattern getPattern() {
return pattern;
}
}
}
public class RegexFilterStage extends DocumentProcessor<RegexFilterConfig> {
@Inject
public RegexFilterStage(@Assisted String id) {
super(id);
}
@Override
public void init(RegexFilterConfig config) {
}
@Override
public void process(PipelineDocument document, Context context, RegexFilterConfig config, StageOutput<PipelineDocument> output) {
PipelineDocument tempDocument = new PipelineDocument();
tempDocument.setId(document.getId());
tempDocument.setMetadata(document.getMetadata());
for(String name : document.getFieldNames()) {
for (PipelineField field : document.getFields(name)) {
boolean removeField = false;
for (RegexFilterConfig.RegexFilterRule rule : config.getFilters()) {
if (rule.getSourceField().equalsIgnoreCase(name)) {
if (field.getValue() != null) {
Matcher matcher = rule.getPattern().matcher(field.getValue().toString());
if (matcher.matches()) {
removeField = true;
break;
}
}
}
}
if (!removeField) {
tempDocument.addField(field);
getMetricRegistry().counter("removed.field").inc();
}
}
}
output.send(tempDocument, context);
}
@Override
public Class<RegexFilterConfig> getConfigurationClass() {
return RegexFilterConfig.class;
}
}
@AutoDiscover(value=RegexFilterConfig.TYPE, to=RegexFilterStage.class)
public interface RegexFilterStageFactory extends StageAssistFactory<RegexFilterStage> {
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment