Created
March 3, 2014 13:29
-
-
Save mariofusco/9324938 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.drools.core.phreak; | |
import org.drools.core.common.InternalWorkingMemory; | |
import org.drools.core.common.Memory; | |
import org.drools.core.common.MemoryFactory; | |
import org.drools.core.common.NetworkNode; | |
import org.drools.core.common.SynchronizedLeftTupleSets; | |
import org.drools.core.reteoo.AccumulateNode; | |
import org.drools.core.reteoo.AccumulateNode.AccumulateMemory; | |
import org.drools.core.reteoo.AlphaNode; | |
import org.drools.core.reteoo.BetaMemory; | |
import org.drools.core.reteoo.BetaNode; | |
import org.drools.core.reteoo.CompositeObjectSinkAdapter; | |
import org.drools.core.reteoo.ConditionalBranchNode; | |
import org.drools.core.reteoo.ConditionalBranchNode.ConditionalBranchMemory; | |
import org.drools.core.reteoo.EntryPointNode; | |
import org.drools.core.reteoo.EvalConditionNode; | |
import org.drools.core.reteoo.EvalConditionNode.EvalMemory; | |
import org.drools.core.reteoo.FromNode; | |
import org.drools.core.reteoo.FromNode.FromMemory; | |
import org.drools.core.reteoo.LeftInputAdapterNode; | |
import org.drools.core.reteoo.LeftInputAdapterNode.LiaNodeMemory; | |
import org.drools.core.reteoo.LeftTupleSink; | |
import org.drools.core.reteoo.LeftTupleSinkNode; | |
import org.drools.core.reteoo.LeftTupleSinkPropagator; | |
import org.drools.core.reteoo.LeftTupleSource; | |
import org.drools.core.reteoo.NodeTypeEnums; | |
import org.drools.core.reteoo.NotNode; | |
import org.drools.core.reteoo.ObjectSink; | |
import org.drools.core.reteoo.ObjectSource; | |
import org.drools.core.reteoo.ObjectTypeNode; | |
import org.drools.core.reteoo.PathMemory; | |
import org.drools.core.reteoo.QueryElementNode; | |
import org.drools.core.reteoo.QueryElementNode.QueryElementNodeMemory; | |
import org.drools.core.reteoo.ReteooRuleBase; | |
import org.drools.core.reteoo.RiaPathMemory; | |
import org.drools.core.reteoo.RightInputAdapterNode; | |
import org.drools.core.reteoo.RightInputAdapterNode.RiaNodeMemory; | |
import org.drools.core.reteoo.SegmentMemory; | |
import org.drools.core.reteoo.TimerNode; | |
import org.drools.core.reteoo.TimerNode.TimerNodeMemory; | |
import org.drools.core.rule.Rule; | |
import org.drools.core.rule.constraint.QueryNameConstraint; | |
import org.drools.core.util.Iterator; | |
import org.drools.core.util.ObjectHashMap.ObjectEntry; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Queue; | |
public class SegmentUtilities { | |
// public static RightInputAdapterNode getOuterMostRiaNode(RightInputAdapterNode riaNode, LeftTupleSource startLTs) { | |
// if ( riaNode.getStartTupleSource() != startLTs ) { | |
// // This is a nested subnetwork, so we know there must be atleast one outer subnetwork | |
// LeftTupleSource lts = riaNode.getLeftTupleSource(); | |
// while ( true ) { | |
// if ( NodeTypeEnums.isBetaNode(lts) && (( BetaNode )lts).isRightInputIsRiaNode() ) { | |
// return getOuterMostRiaNode( ( RightInputAdapterNode ) ((BetaNode)lts).getRightInput(), startLTs ); | |
// } | |
// lts = lts.getLeftTupleSource(); | |
// } | |
// } else { | |
// return riaNode; | |
// } | |
// } | |
/** | |
* Initialises the NodeSegment memory for all nodes in the segment. | |
* | |
* @param wm | |
*/ | |
public static synchronized SegmentMemory createSegmentMemory(LeftTupleSource tupleSource, | |
final InternalWorkingMemory wm) { | |
SegmentMemory smem = wm.getNodeMemory((MemoryFactory) tupleSource).getSegmentMemory(); | |
if ( smem != null ) { | |
return smem; // this can happen when multiple threads are trying to initialize the segment | |
} | |
tupleSource = findSegmentRoot(tupleSource); | |
LeftTupleSource segmentRoot = tupleSource; | |
smem = restoreSegmentFromPrototype(wm, segmentRoot); | |
if ( smem != null ) { | |
return smem; | |
} | |
smem = new SegmentMemory(segmentRoot); | |
// Iterate all nodes on the same segment, assigning their position as a bit mask value | |
// allLinkedTestMask is the resulting mask used to test if all nodes are linked in | |
long nodePosMask = 1; | |
long allLinkedTestMask = 0; | |
boolean updateNodeBit = true; // nodes after a branch CE can notify, but they cannot impact linking | |
while (true) { | |
if ( tupleSource.isStreamMode() && smem.getTupleQueue() == null ) { | |
// need to make sure there is one Queue, for the rule, when a stream mode node is found. | |
Queue<TupleEntry> queue = initAndGetTupleQueue(tupleSource, wm); | |
smem.setTupleQueue( queue ); | |
} | |
if (NodeTypeEnums.isBetaNode(tupleSource)) { | |
allLinkedTestMask = processBetaNode(tupleSource, wm, smem, nodePosMask, allLinkedTestMask, updateNodeBit); | |
} else { | |
switch (tupleSource.getType()) { | |
case NodeTypeEnums.LeftInputAdapterNode: | |
allLinkedTestMask = processLiaNode((LeftInputAdapterNode) tupleSource, wm, smem, nodePosMask, allLinkedTestMask); | |
break; | |
case NodeTypeEnums.EvalConditionNode: | |
processEvalNode((EvalConditionNode) tupleSource, wm, smem); | |
break; | |
case NodeTypeEnums.ConditionalBranchNode: | |
updateNodeBit = processBranchNode((ConditionalBranchNode) tupleSource, wm, smem); | |
break; | |
case NodeTypeEnums.FromNode: | |
processFromNode((FromNode) tupleSource, wm, smem); | |
break; | |
case NodeTypeEnums.TimerConditionNode: | |
processTimerNode((TimerNode) tupleSource, wm, smem, nodePosMask); | |
break; | |
case NodeTypeEnums.QueryElementNode: | |
updateNodeBit = processQueryNode((QueryElementNode) tupleSource, wm, segmentRoot, smem, nodePosMask); | |
break; | |
} | |
} | |
nodePosMask = nodePosMask << 1; | |
if (tupleSource.getSinkPropagator().size() == 1) { | |
LeftTupleSinkNode sink = (LeftTupleSinkNode) tupleSource.getSinkPropagator().getFirstLeftTupleSink(); | |
if (NodeTypeEnums.isLeftTupleSource(sink)) { | |
tupleSource = (LeftTupleSource) sink; | |
} else { | |
// rtn or rian | |
// While not technically in a segment, we want to be able to iterate easily from the last node memory to the ria/rtn memory | |
// we don't use createNodeMemory, as these may already have been created by, but not added, by the method updateRiaAndTerminalMemory | |
Memory memory = wm.getNodeMemory((MemoryFactory) sink); | |
if (sink.getType() == NodeTypeEnums.RightInputAdaterNode) { | |
PathMemory riaPmem = ((RiaNodeMemory)memory).getRiaPathMemory(); | |
smem.getNodeMemories().add( riaPmem ); | |
RightInputAdapterNode rian = ( RightInputAdapterNode ) sink; | |
ObjectSink[] nodes = rian.getSinkPropagator().getSinks(); | |
for ( ObjectSink node : nodes ) { | |
if ( NodeTypeEnums.isLeftTupleSource(node) ) { | |
SegmentMemory parentSmem = createSegmentMemory( (LeftTupleSource) node, wm ); | |
} | |
} | |
} else if (NodeTypeEnums.isTerminalNode(sink)) { | |
smem.getNodeMemories().add((PathMemory)memory); | |
} | |
memory.setSegmentMemory(smem); | |
smem.setTipNode(sink); | |
break; | |
} | |
} else { | |
// not in same segment | |
smem.setTipNode(tupleSource); | |
break; | |
} | |
} | |
smem.setAllLinkedMaskTest(allLinkedTestMask); | |
// iterate to find root and determine the SegmentNodes position in the RuleSegment | |
LeftTupleSource pathRoot = segmentRoot; | |
int ruleSegmentPosMask = 1; | |
int counter = 0; | |
while (pathRoot.getType() != NodeTypeEnums.LeftInputAdapterNode) { | |
if (!SegmentUtilities.parentInSameSegment(pathRoot, null)) { | |
// for each new found segment, increase the mask bit position | |
ruleSegmentPosMask = ruleSegmentPosMask << 1; | |
counter++; | |
} | |
pathRoot = pathRoot.getLeftTupleSource(); | |
} | |
smem.setSegmentPosMaskBit(ruleSegmentPosMask); | |
smem.setPos(counter); | |
if (smem.getRootNode().getType() != NodeTypeEnums.LeftInputAdapterNode && | |
((LeftTupleSource)smem.getRootNode()).getLeftTupleSource().getType() == NodeTypeEnums.LeftInputAdapterNode ) { | |
// If LiaNode is in it's own segment, then the segment first after that must use SynchronizedLeftTupleSets | |
smem.setStagedTuples( new SynchronizedLeftTupleSets() ); | |
} | |
updateRiaAndTerminalMemory(tupleSource, tupleSource, smem, wm, false); | |
((ReteooRuleBase)wm.getRuleBase()).registerSegmentPrototype(segmentRoot, smem); | |
return smem; | |
} | |
private static LeftTupleSource findSegmentRoot(LeftTupleSource tupleSource) { | |
while (tupleSource.getType() != NodeTypeEnums.LeftInputAdapterNode && | |
SegmentUtilities.parentInSameSegment(tupleSource, null)) { | |
tupleSource = tupleSource.getLeftTupleSource(); | |
} | |
return tupleSource; | |
} | |
private static SegmentMemory restoreSegmentFromPrototype(InternalWorkingMemory wm, LeftTupleSource segmentRoot) { | |
SegmentMemory smem = ((ReteooRuleBase)wm.getRuleBase()).getSegmentFromPrototype(wm, segmentRoot); | |
if ( smem != null ) { | |
// there is a prototype for this segment memory | |
for (NetworkNode node : smem.getNodesInSegment()) { | |
wm.getNodeMemory((MemoryFactory) node).setSegmentMemory(smem); | |
} | |
updateRiaAndTerminalMemory(segmentRoot, segmentRoot, smem, wm, true); | |
} | |
return smem; | |
} | |
private static boolean processQueryNode(QueryElementNode tupleSource, InternalWorkingMemory wm, LeftTupleSource segmentRoot, SegmentMemory smem, long nodePosMask) { | |
// Initialize the QueryElementNode and have it's memory reference the actual query SegmentMemory | |
QueryElementNode queryNode = (QueryElementNode) tupleSource; | |
SegmentMemory querySmem = getQuerySegmentMemory(wm, segmentRoot, queryNode); | |
QueryElementNodeMemory queryNodeMem = (QueryElementNodeMemory) smem.createNodeMemory(queryNode, wm); | |
queryNodeMem.setNodePosMaskBit(nodePosMask); | |
queryNodeMem.setQuerySegmentMemory(querySmem); | |
queryNodeMem.setSegmentMemory(smem); | |
return ! queryNode.getQueryElement().isAbductive(); | |
} | |
public static SegmentMemory getQuerySegmentMemory(InternalWorkingMemory wm, LeftTupleSource segmentRoot, QueryElementNode queryNode) { | |
LeftInputAdapterNode liaNode = getQueryLiaNode(queryNode.getQueryElement().getQueryName(), getQueryOtn(segmentRoot)); | |
LiaNodeMemory liam = (LiaNodeMemory) wm.getNodeMemory((MemoryFactory) liaNode); | |
SegmentMemory querySmem = liam.getSegmentMemory(); | |
if (querySmem == null) { | |
querySmem = createSegmentMemory(liaNode, wm); | |
} | |
return querySmem; | |
} | |
private static void processFromNode(FromNode tupleSource, InternalWorkingMemory wm, SegmentMemory smem) { | |
FromMemory fromMemory = (FromMemory) smem.createNodeMemory((FromNode) tupleSource, wm); | |
fromMemory.getBetaMemory().setSegmentMemory(smem); | |
} | |
private static boolean processBranchNode(ConditionalBranchNode tupleSource, InternalWorkingMemory wm, SegmentMemory smem) { | |
boolean updateNodeBit; | |
ConditionalBranchMemory branchMem = (ConditionalBranchMemory) smem.createNodeMemory((ConditionalBranchNode) tupleSource, wm); | |
branchMem.setSegmentMemory(smem); | |
updateNodeBit = false; // nodes after a branch CE can notify, but they cannot impact linking | |
return updateNodeBit; | |
} | |
private static void processEvalNode(EvalConditionNode tupleSource, InternalWorkingMemory wm, SegmentMemory smem) { | |
EvalMemory evalMem = (EvalMemory) smem.createNodeMemory((EvalConditionNode) tupleSource, wm); | |
evalMem.setSegmentMemory(smem); | |
} | |
private static void processTimerNode(TimerNode tupleSource, InternalWorkingMemory wm, SegmentMemory smem, long nodePosMask) { | |
TimerNodeMemory tnMem = (TimerNodeMemory) smem.createNodeMemory( ( TimerNode ) tupleSource, wm ); | |
tnMem.setNodePosMaskBit(nodePosMask); | |
tnMem.setSegmentMemory(smem); | |
} | |
private static long processLiaNode(LeftInputAdapterNode tupleSource, InternalWorkingMemory wm, SegmentMemory smem, long nodePosMask, long allLinkedTestMask) { | |
LiaNodeMemory liaMemory = (LiaNodeMemory) smem.createNodeMemory((LeftInputAdapterNode) tupleSource, wm); | |
smem.setStagedTuples( new SynchronizedLeftTupleSets() ); // LiaNode SegmentMemory must have Synchronized LeftTupleSets | |
liaMemory.setSegmentMemory(smem); | |
liaMemory.setNodePosMaskBit(nodePosMask); | |
allLinkedTestMask = allLinkedTestMask | nodePosMask; | |
return allLinkedTestMask; | |
} | |
private static long processBetaNode(LeftTupleSource tupleSource, InternalWorkingMemory wm, SegmentMemory smem, long nodePosMask, long allLinkedTestMask, boolean updateNodeBit) { | |
BetaMemory bm; | |
BetaNode betaNode = (BetaNode) tupleSource; | |
if (NodeTypeEnums.AccumulateNode == tupleSource.getType()) { | |
bm = ((AccumulateMemory) smem.createNodeMemory((AccumulateNode) tupleSource, wm)).getBetaMemory(); | |
} else { | |
bm = (BetaMemory) smem.createNodeMemory(betaNode, wm); | |
} | |
// this must be set first, to avoid recursion as sub networks can be initialised multiple ways | |
// and bm.getSegmentMemory == null check can be used to avoid recursion. | |
bm.setSegmentMemory(smem); | |
if (betaNode.isRightInputIsRiaNode()) { | |
// Iterate to find outermost rianode | |
RightInputAdapterNode riaNode = (RightInputAdapterNode) betaNode.getRightInput(); | |
//riaNode = getOuterMostRiaNode(riaNode, betaNode.getLeftTupleSource()); | |
// Iterat | |
LeftTupleSource subnetworkLts = riaNode.getLeftTupleSource(); | |
while (subnetworkLts.getLeftTupleSource() != riaNode.getStartTupleSource()) { | |
subnetworkLts = subnetworkLts.getLeftTupleSource(); | |
} | |
Memory rootSubNetwokrMem = wm.getNodeMemory((MemoryFactory) subnetworkLts); | |
SegmentMemory subNetworkSegmentMemory = rootSubNetwokrMem.getSegmentMemory(); | |
if (subNetworkSegmentMemory == null) { | |
// we need to stop recursion here | |
createSegmentMemory((LeftTupleSource) subnetworkLts, wm); | |
} | |
RiaNodeMemory riaMem = (RiaNodeMemory) wm.getNodeMemory((MemoryFactory) riaNode); | |
bm.setRiaRuleMemory(riaMem.getRiaPathMemory()); | |
if (updateNodeBit && riaMem.getRiaPathMemory().getAllLinkedMaskTest() > 0) { | |
// only ria's with reactive subnetworks can be disabled and thus need checking | |
allLinkedTestMask = allLinkedTestMask | nodePosMask; | |
} | |
} else if (updateNodeBit && | |
(!(NodeTypeEnums.NotNode == tupleSource.getType() && !((NotNode) tupleSource).isEmptyBetaConstraints()) && | |
NodeTypeEnums.AccumulateNode != tupleSource.getType())) { | |
// non empty not nodes and accumulates can never be disabled and thus don't need checking | |
allLinkedTestMask = allLinkedTestMask | nodePosMask; | |
} | |
bm.setNodePosMaskBit(nodePosMask); | |
if (NodeTypeEnums.NotNode == tupleSource.getType()) { | |
// not nodes start up linked in | |
smem.linkNodeWithoutRuleNotify(bm.getNodePosMaskBit()); | |
} | |
return allLinkedTestMask; | |
} | |
public static synchronized void createChildSegments(final InternalWorkingMemory wm, | |
SegmentMemory smem, | |
LeftTupleSinkPropagator sinkProp) { | |
if ( !smem.isEmpty() ) { | |
return; // this can happen when multiple threads are trying to initialize the segment | |
} | |
for (LeftTupleSinkNode sink = (LeftTupleSinkNode) sinkProp.getFirstLeftTupleSink(); sink != null; sink = sink.getNextLeftTupleSinkNode()) { | |
Memory memory = wm.getNodeMemory((MemoryFactory) sink); | |
SegmentMemory childSmem = createChildSegment(wm, sink, memory); | |
smem.add(childSmem); | |
} | |
} | |
public static SegmentMemory createChildSegment(InternalWorkingMemory wm, LeftTupleSink sink, Memory memory) { | |
if (!(NodeTypeEnums.isTerminalNode(sink) || sink.getType() == NodeTypeEnums.RightInputAdaterNode)) { | |
if (memory.getSegmentMemory() == null) { | |
SegmentUtilities.createSegmentMemory((LeftTupleSource) sink, wm); | |
} | |
} else { | |
// RTNS and RiaNode's have their own segment, if they are the child of a split. | |
if (memory.getSegmentMemory() == null) { | |
SegmentMemory childSmem = new SegmentMemory(sink); // rtns or riatns don't need a queue | |
if ( sink.getLeftTupleSource().getType() == NodeTypeEnums.LeftInputAdapterNode ) { | |
// If LiaNode is in it's own segment, then the segment first after that must use SynchronizedLeftTupleSets | |
childSmem.setStagedTuples( new SynchronizedLeftTupleSets() ); | |
} | |
PathMemory pmem; | |
if (NodeTypeEnums.isTerminalNode(sink)) { | |
pmem = (PathMemory) memory; | |
} else { | |
pmem = ((RiaNodeMemory) memory).getRiaPathMemory(); | |
} | |
pmem.getSegmentMemories()[pmem.getSegmentMemories().length - 1] = childSmem; | |
pmem.setSegmentMemory(childSmem); | |
childSmem.getPathMemories().add(pmem); | |
childSmem.setTipNode(sink); | |
childSmem.setSinkFactory(sink); | |
} | |
} | |
return memory.getSegmentMemory(); | |
} | |
/** | |
* Is the LeftTupleSource a node in the sub network for the RightInputAdapterNode | |
* To be in the same network, it must be a node is after the two output of the parent | |
* and before the rianode. | |
* | |
* @param riaNode | |
* @param leftTupleSource | |
* @return | |
*/ | |
public static boolean inSubNetwork(RightInputAdapterNode riaNode, LeftTupleSource leftTupleSource) { | |
LeftTupleSource startTupleSource = riaNode.getStartTupleSource(); | |
LeftTupleSource parent = riaNode.getLeftTupleSource(); | |
while (parent != startTupleSource) { | |
if (parent == leftTupleSource) { | |
return true; | |
} | |
parent = parent.getLeftTupleSource(); | |
} | |
return false; | |
} | |
/** | |
* A Not node has to be eagerly initialized unless in its segment there is at least a join or exists node | |
*/ | |
private static boolean requireEagerInitialization(NotNode notNode) { | |
LeftTupleSource node = findSegmentRoot(notNode); | |
while (true) { | |
if (NodeTypeEnums.JoinNode == node.getType() || NodeTypeEnums.ExistsNode == node.getType()) { | |
return false; | |
} | |
if (node.getSinkPropagator().size() == 1) { | |
LeftTupleSinkNode sink = (LeftTupleSinkNode) node.getSinkPropagator().getFirstLeftTupleSink(); | |
if (NodeTypeEnums.isLeftTupleSource(sink)) { | |
node = (LeftTupleSource) sink; | |
continue; | |
} | |
} | |
return true; | |
} | |
} | |
/** | |
* This adds the segment memory to the terminal node or ria node's list of memories. | |
* In the case of the terminal node this allows it to know that all segments from | |
* the tip to root are linked. | |
* In the case of the ria node its all the segments up to the start of the subnetwork. | |
* This is because the rianode only cares if all of it's segments are linked, then | |
* it sets the bit of node it is the right input for. | |
* | |
* @param lt | |
* @param originalLt | |
* @param smem | |
* @param wm | |
*/ | |
private static void updateRiaAndTerminalMemory( LeftTupleSource lt, | |
LeftTupleSource originalLt, | |
SegmentMemory smem, | |
InternalWorkingMemory wm, | |
boolean fromPrototype ) { | |
for (LeftTupleSink sink : lt.getSinkPropagator().getSinks()) { | |
if (NodeTypeEnums.isLeftTupleSource(sink)) { | |
if (NodeTypeEnums.NotNode == sink.getType() && requireEagerInitialization((NotNode) sink)) { | |
BetaMemory bm = (BetaMemory) wm.getNodeMemory((MemoryFactory) sink); | |
if (bm.getSegmentMemory() == null) { | |
// Not nodes must be initialised | |
createSegmentMemory((NotNode) sink, wm); | |
} | |
} | |
updateRiaAndTerminalMemory((LeftTupleSource) sink, originalLt, smem, wm, fromPrototype); | |
} else if (sink.getType() == NodeTypeEnums.RightInputAdaterNode) { | |
// Only add the RIANode, if the LeftTupleSource is part of the RIANode subnetwork. | |
if (inSubNetwork((RightInputAdapterNode) sink, originalLt)) { | |
RiaNodeMemory riaMem = (RiaNodeMemory) wm.getNodeMemory((MemoryFactory) sink); | |
PathMemory pmem = (PathMemory) riaMem.getRiaPathMemory(); | |
smem.getPathMemories().add(pmem); | |
pmem.getSegmentMemories()[smem.getPos()] = smem; | |
if (fromPrototype) { | |
ObjectSink[] nodes = ((RightInputAdapterNode) sink).getSinkPropagator().getSinks(); | |
for ( ObjectSink node : nodes ) { | |
// check if the SegmentMemory has been already created by the BetaNode and if so avoid to build it twice | |
if ( NodeTypeEnums.isLeftTupleSource(node) && wm.getNodeMemory((MemoryFactory) node).getSegmentMemory() == null ) { | |
restoreSegmentFromPrototype(wm, (LeftTupleSource) node); | |
} | |
} | |
} else if ( ( pmem.getAllLinkedMaskTest() & ( 1L << pmem.getSegmentMemories().length ) ) == 0 ) { | |
// must eagerly initialize child segment memories | |
ObjectSink[] nodes = ((RightInputAdapterNode) sink).getSinkPropagator().getSinks(); | |
for ( ObjectSink node : nodes ) { | |
if ( NodeTypeEnums.isLeftTupleSource(node) ) { | |
createSegmentMemory( (LeftTupleSource) node, wm ); | |
} | |
} | |
} | |
} | |
} else if (NodeTypeEnums.isTerminalNode(sink)) { | |
PathMemory pmem = (PathMemory) wm.getNodeMemory((MemoryFactory) sink); | |
smem.getPathMemories().add(pmem); | |
pmem.getSegmentMemories()[smem.getPos()] = smem; | |
smem.setTupleQueue( pmem.getTupleQueue() ); | |
if (smem.isSegmentLinked()) { | |
// not's can cause segments to be linked, and the rules need to be notified for evaluation | |
smem.notifyRuleLinkSegment(wm); | |
} | |
} | |
} | |
} | |
public static Queue<TupleEntry> initAndGetTupleQueue(NetworkNode node, InternalWorkingMemory wm) { | |
// get's or initializes the queue, if it does not exist. It recurse to the outer most PathMemory | |
// and then trickle the Queue back up to the inner PathMememories | |
LeftTupleSink sink = null; | |
if ( !(NodeTypeEnums.isTerminalNode(node) || NodeTypeEnums.RightInputAdaterNode == node.getType()) ) { | |
// iterate to the terminal of this segment - either rian or rtn | |
sink = ((LeftTupleSource)node).getSinkPropagator().getLastLeftTupleSink(); | |
while ( sink.getType() != NodeTypeEnums.RightInputAdaterNode && !NodeTypeEnums.isTerminalNode( sink) ) { | |
sink = ((LeftTupleSource)sink).getSinkPropagator().getLastLeftTupleSink(); | |
} | |
} else { | |
sink = (LeftTupleSink)node; | |
} | |
Queue<TupleEntry> queue = null; | |
if (NodeTypeEnums.RightInputAdaterNode == sink.getType()) { | |
RightInputAdapterNode rian = (RightInputAdapterNode) sink; | |
RiaNodeMemory riaMem = (RiaNodeMemory) wm.getNodeMemory((MemoryFactory)sink); | |
RiaPathMemory pmem = riaMem.getRiaPathMemory(); | |
queue = pmem.getTupleQueue(); | |
if ( queue == null ) { | |
ObjectSink[] nodes = rian.getSinkPropagator().getSinks(); | |
// iterate the first child sink, we only need the first, as all reach the same outer rtn | |
queue = initAndGetTupleQueue((LeftTupleSource) nodes[0], wm); | |
} | |
} else if (NodeTypeEnums.isTerminalNode(sink)) { | |
PathMemory pmem = (PathMemory) wm.getNodeMemory((MemoryFactory) sink); | |
queue = pmem.getTupleQueue(); | |
if ( queue == null ) { | |
pmem.initQueue(); | |
queue = pmem.getTupleQueue(); | |
} | |
} | |
return queue; | |
} | |
public static boolean parentInSameSegment(LeftTupleSource lt, Rule removingRule) { | |
LeftTupleSource parentLt = lt.getLeftTupleSource(); | |
int size = parentLt.getSinkPropagator().size(); | |
if (removingRule != null && size == 2 && parentLt.getAssociations().containsKey(removingRule)) { | |
// looks like a split, but one of the branches may be removed. | |
LeftTupleSink first = parentLt.getSinkPropagator().getFirstLeftTupleSink(); | |
LeftTupleSink last = parentLt.getSinkPropagator().getLastLeftTupleSink(); | |
if (first.getAssociations().size() == 1 && first.getAssociations().containsKey(removingRule)) { | |
return true; | |
} else if (last.getAssociations().size() == 1 && last.getAssociations().containsKey(removingRule)) { | |
return true; | |
} else { | |
return false; | |
} | |
} else { | |
return size == 1; | |
} | |
// comments out for now, as the optimization to preserve subnetwork segments down one side is troublesome. | |
// LeftTupleSource parent = lt.getLeftTupleSource(); | |
// if ( parent != null && ( parent.getSinkPropagator().size() == 1 || | |
// // same segment, if it's a subnetwork split and we are on the non subnetwork side of the split | |
// ( parent.getSinkPropagator().size() == 2 && | |
// NodeTypeEnums.isBetaNode( lt ) && | |
// ((BetaNode)lt).isRightInputIsRiaNode() ) ) ) { | |
// return true; | |
// } else { | |
// return false; | |
// } | |
} | |
public static ObjectTypeNode getQueryOtn(LeftTupleSource lts) { | |
while (!(lts instanceof LeftInputAdapterNode)) { | |
lts = lts.getLeftTupleSource(); | |
} | |
LeftInputAdapterNode liaNode = (LeftInputAdapterNode) lts; | |
ObjectSource os = liaNode.getObjectSource(); | |
while (!(os instanceof EntryPointNode)) { | |
os = os.getParentObjectSource(); | |
} | |
return ((EntryPointNode) os).getQueryNode(); | |
} | |
public static LeftInputAdapterNode getQueryLiaNode(String queryName, ObjectTypeNode queryOtn) { | |
if (queryOtn.getSinkPropagator() instanceof CompositeObjectSinkAdapter) { | |
CompositeObjectSinkAdapter sink = (CompositeObjectSinkAdapter) queryOtn.getSinkPropagator(); | |
if (sink.getHashableSinks() != null) { | |
for (AlphaNode alphaNode = (AlphaNode) sink.getHashableSinks().getFirst(); alphaNode != null; alphaNode = (AlphaNode) alphaNode.getNextObjectSinkNode()) { | |
QueryNameConstraint nameConstraint = (QueryNameConstraint) alphaNode.getConstraint(); | |
if (queryName.equals(nameConstraint.getQueryName())) { | |
return (LeftInputAdapterNode) alphaNode.getSinkPropagator().getSinks()[0]; | |
} | |
} | |
} | |
Iterator it = sink.getHashedSinkMap().iterator(); | |
for (ObjectEntry entry = (ObjectEntry) it.next(); entry != null; entry = (ObjectEntry) it.next()) { | |
AlphaNode alphaNode = (AlphaNode) entry.getValue(); | |
QueryNameConstraint nameConstraint = (QueryNameConstraint) alphaNode.getConstraint(); | |
if (queryName.equals(nameConstraint.getQueryName())) { | |
return (LeftInputAdapterNode) alphaNode.getSinkPropagator().getSinks()[0]; | |
} | |
} | |
} else { | |
AlphaNode alphaNode = (AlphaNode) queryOtn.getSinkPropagator().getSinks()[0]; | |
QueryNameConstraint nameConstraint = (QueryNameConstraint) alphaNode.getConstraint(); | |
if (queryName.equals(nameConstraint.getQueryName())) { | |
return (LeftInputAdapterNode) alphaNode.getSinkPropagator().getSinks()[0]; | |
} | |
return (LeftInputAdapterNode) queryOtn.getSinkPropagator().getSinks()[0]; | |
} | |
throw new RuntimeException("Unable to find query '" + queryName + "'"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment