Created
December 21, 2012 13:19
-
-
Save jeffyu/4352764 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* JBoss, Home of Professional Open Source | |
* Copyright 2008-11, Red Hat Middleware LLC, and others contributors as indicated | |
* by the @authors tag. All rights reserved. | |
* See the copyright.txt in the distribution for a | |
* full listing of individual contributors. | |
* This copyrighted material is made available to anyone wishing to use, | |
* modify, copy, or redistribute it subject to the terms and conditions | |
* of the GNU Lesser General Public License, v. 2.1. | |
* This program is distributed in the hope that it will be useful, but WITHOUT A | |
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A | |
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. | |
* You should have received a copy of the GNU Lesser General Public License, | |
* v.2.1 along with this distribution; if not, write to the Free Software | |
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, | |
* MA 02110-1301, USA. | |
*/ | |
package org.overlord.bam.epn.cep; | |
import java.text.MessageFormat; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
import org.kie.KieBase; | |
import org.kie.KieServices; | |
import org.kie.builder.KieBaseModel; | |
import org.kie.builder.KieBuilder; | |
import org.kie.builder.KieFileSystem; | |
import org.kie.builder.KieModuleModel; | |
import org.kie.builder.KieRepository; | |
import org.kie.conf.EqualityBehaviorOption; | |
import org.kie.conf.EventProcessingOption; | |
import org.kie.runtime.KieContainer; | |
import org.kie.runtime.KieSession; | |
import org.kie.runtime.rule.SessionEntryPoint; | |
import org.overlord.bam.epn.EventProcessor; | |
import org.overlord.bam.internal.epn.DefaultEPNContext; | |
/** | |
* This class represents the CEP implementation of the Event | |
* Processor. | |
* | |
*/ | |
public class CEPEventProcessor extends EventProcessor { | |
private static final Logger LOG=Logger.getLogger(CEPEventProcessor.class.getName()); | |
private DefaultEPNContext _context=null; //new DefaultEPNContext(); | |
private static final java.util.Map<String,KieSession> SESSIONS= | |
new java.util.HashMap<String,KieSession>(); | |
private KieSession _session=null; | |
private String _ruleName=null; | |
/** | |
* {@inheritDoc} | |
*/ | |
public void init() throws Exception { | |
_context = new DefaultEPNContext(getServices()); | |
_session = createSession(); | |
if (LOG.isLoggable(Level.FINEST)) { | |
LOG.finest("CEPEventProcessor init: ruleName="+_ruleName+" session="+_session); | |
} | |
} | |
/** | |
* This method returns the rule name. | |
* | |
* @return The rule name | |
*/ | |
public String getRuleName() { | |
return (_ruleName); | |
} | |
/** | |
* This method sets the rule name. | |
* | |
* @param ruleName The rule name | |
*/ | |
public void setRuleName(String ruleName) { | |
_ruleName = ruleName; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
public java.io.Serializable process(String source, | |
java.io.Serializable event, int retriesLeft) throws Exception { | |
if (LOG.isLoggable(Level.FINEST)) { | |
LOG.finest("Process event '"+event+" from source '"+source | |
+"' on CEP Event Processor '"+getRuleName() | |
+"'"); | |
} | |
_context.forward(null); | |
// Get entry point | |
// TODO: If not simple lookup, then may want to cache this | |
SessionEntryPoint entryPoint=_session.getEntryPoint(source); | |
if (entryPoint != null) { | |
if (LOG.isLoggable(Level.FINEST)) { | |
LOG.finest("Insert event '"+event+" from source '"+source | |
+"' on CEP Event Processor '"+getRuleName() | |
+"' into entry point "+entryPoint); | |
} | |
entryPoint.insert(event); | |
// TODO: Not sure if possible to delay evaluation, until after | |
// all events in batch have been processed/inserted - but then | |
// how to trace the individual results?? | |
_session.fireAllRules(); | |
} else if (LOG.isLoggable(Level.FINEST)) { | |
LOG.finest("No entry point for source Event Processor '"+source | |
+"' on CEP Event Processor '"+getRuleName()+"'"); | |
} | |
return (java.io.Serializable)_context.getResult(); | |
} | |
/** | |
* This method creates a stateful knowledge session per | |
* CEP event processor. | |
* | |
* @return The stateful knowledge session | |
*/ | |
private KieSession createSession() { | |
KieSession ret=null; | |
synchronized (SESSIONS) { | |
ret = SESSIONS.get(getRuleName()); | |
if (ret == null) { | |
KieBase kbase = loadRuleBase(); | |
if (kbase != null) { | |
ret = kbase.newKieSession(); | |
if (ret != null) { | |
ret.setGlobal("epn", _context); | |
ret.fireAllRules(); | |
SESSIONS.put(getRuleName(), ret); | |
} else { | |
System.out.println("The kieSession is null!!!"); | |
} | |
} | |
} | |
} | |
return (ret); | |
} | |
/** | |
* This method loads the rule base associated with the CEP | |
* event processor. | |
* | |
* @return The knowledge base | |
*/ | |
private KieBase loadRuleBase() { | |
String cepRuleBase=getRuleName()+".drl"; | |
try { | |
KieServices ks = KieServices.Factory.get(); | |
KieRepository kr = ks.getRepository(); | |
KieModuleModel kmm = ks.newKieModuleModel(); | |
KieBaseModel kbm = kmm.newKieBaseModel(getRuleName()) | |
.setEqualsBehavior(EqualityBehaviorOption.EQUALITY) | |
.setEventProcessingMode(EventProcessingOption.STREAM); | |
kbm.setDefault(true); | |
KieFileSystem kfs = ks.newKieFileSystem(); | |
kfs.write(ks.getResources().newClassPathResource(cepRuleBase)); | |
kfs.writeKModuleXML(kmm.toXML()); | |
KieBuilder kb = ks.newKieBuilder(kfs); | |
kb.buildAll(); | |
KieContainer container = ks.newKieContainer(kr.getDefaultReleaseId()); | |
KieBase kbase = container.getKieBase(); | |
System.setProperty("kie.mbean", "enabled"); | |
return kbase; | |
} catch (Throwable e) { | |
LOG.log(Level.SEVERE, MessageFormat.format( | |
java.util.PropertyResourceBundle.getBundle( | |
"epn-cep.Messages").getString("EPN-CEP-2"), | |
cepRuleBase, getRuleName()), e); | |
} | |
return (null); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment