Skip to content

Instantly share code, notes, and snippets.

@clebertsuconic
Created August 21, 2012 22:25
Show Gist options
  • Save clebertsuconic/3420005 to your computer and use it in GitHub Desktop.
Save clebertsuconic/3420005 to your computer and use it in GitHub Desktop.
on QueueImpl:
// Step1: open a new transaction
Transaction tx = new TransactionImpl(storageManager);
LinkedListIterator<MessageReference> iter = iterator();
try
{
// Step1.2: iterate on every message that is in memory. (i.e. non-paged)
while (iter.hasNext())
{
MessageReference ref = iter.next();
if (filter == null || filter.match(ref.getMessage()))
{
deliveringCount.incrementAndGet();
// Step3: ACK every message in memory
acknowledge(tx, ref);
iter.remove();
refRemoved(ref);
count++;
}
}
// Step2: Iterate over scheduled references and ack them as well
// notice that we are still using the same transaction here
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
for (MessageReference messageReference : cancelled)
{
deliveringCount.incrementAndGet();
acknowledge(tx, messageReference);
count++;
}
// Step3 still as part of the same transaction, we will ACK every message that is on page mode.
// That means loading a single object for every message that is on the file
// And this is an useless operation since this will be later deleted anyways.
if (pageIterator != null)
{
while (pageIterator.hasNext())
{
PagedReference reference = pageIterator.next();
pageIterator.remove();
if (filter == null || filter.match(reference.getMessage()))
{
count++;
pageSubscription.ack(reference);
}
else
{
addTail(reference, false);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment