Created
August 21, 2012 22:25
-
-
Save clebertsuconic/3420005 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
on QueueImpl: | |
// Step1: open a new transaction | |
Transaction tx = new TransactionImpl(storageManager); | |
LinkedListIterator<MessageReference> iter = iterator(); | |
try | |
{ | |
// Step1.2: iterate on every message that is in memory. (i.e. non-paged) | |
while (iter.hasNext()) | |
{ | |
MessageReference ref = iter.next(); | |
if (filter == null || filter.match(ref.getMessage())) | |
{ | |
deliveringCount.incrementAndGet(); | |
// Step3: ACK every message in memory | |
acknowledge(tx, ref); | |
iter.remove(); | |
refRemoved(ref); | |
count++; | |
} | |
} | |
// Step2: Iterate over scheduled references and ack them as well | |
// notice that we are still using the same transaction here | |
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter); | |
for (MessageReference messageReference : cancelled) | |
{ | |
deliveringCount.incrementAndGet(); | |
acknowledge(tx, messageReference); | |
count++; | |
} | |
// Step3 still as part of the same transaction, we will ACK every message that is on page mode. | |
// That means loading a single object for every message that is on the file | |
// And this is an useless operation since this will be later deleted anyways. | |
if (pageIterator != null) | |
{ | |
while (pageIterator.hasNext()) | |
{ | |
PagedReference reference = pageIterator.next(); | |
pageIterator.remove(); | |
if (filter == null || filter.match(reference.getMessage())) | |
{ | |
count++; | |
pageSubscription.ack(reference); | |
} | |
else | |
{ | |
addTail(reference, false); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment