Skip to content

Instantly share code, notes, and snippets.

@Dierk
Created September 13, 2011 19:32
Show Gist options
  • Save Dierk/1214832 to your computer and use it in GitHub Desktop.
Save Dierk/1214832 to your computer and use it in GitHub Desktop.
Classic concurrent producer-consumer problem with using a kanban system to avoid buffer overflows when consumers are slow
import groovyx.gpars.dataflow.DataFlowQueue
import groovyx.gpars.dataflow.operator.DataFlowPoisson
import static groovyx.gpars.dataflow.DataFlow.operator
import java.util.concurrent.atomic.AtomicInteger
def upstream = new DataFlowQueue() // empty trays travel back upstream to the producer
def downstream = new DataFlowQueue() // trays with products travel to the consumer downstream
def prodWiring = [inputs: [upstream], outputs: [downstream], maxForks: 3 ] // maxForks is optional
def consWiring = [inputs: [downstream], outputs: [upstream], maxForks: 3 ] // maxForks is optional
class Tray { int card; def product }
int wip = prodWiring.maxForks + consWiring.maxForks // work in progress == max # of products in the system
wip.times { upstream << new Tray(card: it) } // put empty trays in the system along with its kanban card
def product = new AtomicInteger(0) // a dummy example product; could be anything
def soMany = 1000
operator prodWiring, { tray ->
def prod = product.andIncrement // producer is used concurrently: be careful with shared state
if (prod > soMany) { // we do not want to produce endlessly in this example
downstream << DataFlowPoisson.instance // let the consumer finish his work, then stop
return
}
def zero = tray.card ? '' : "\n" // new line for tray number zero
print "$zero[$tray.card:$prod] " // visualize production point
tray.product = prod // put product in tray
downstream << tray // send tray with product inside to consumer
}
def consumer = operator consWiring, { tray ->
print " $tray.card:$tray.product " // visualize product consumption and card association
tray.product == null // optionally remove product from tray
upstream << tray // send empty tray back upstream
}
consumer.join() // wait for the overall example to finish
@Dierk
Copy link
Author

Dierk commented Sep 13, 2011

groovy 1.8.2, gpars 0.11

@Dierk
Copy link
Author

Dierk commented Sep 15, 2011

updated the example to keep the card/product association stable without introducing synchronization. Cards are replaced with trays, which contain the product - inspired by having gone through too many airport security checks.

@Dierk
Copy link
Author

Dierk commented Sep 17, 2011

Description of the KanbanFlow pattern available at http://people.canoo.com/mittie/kanbanflow.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment