Skip to content

Instantly share code, notes, and snippets.

@scottmarlow
Created September 18, 2013 11:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save scottmarlow/6607667 to your computer and use it in GitHub Desktop.
Save scottmarlow/6607667 to your computer and use it in GitHub Desktop.
Batch 11.7 Partitioned Chunk Processing
1.
<Create StepContext>
2.
<Store step level properties in StepContext>
3.
<->[StepListener.beforeStep...] // thread A
4.
<->[PartitionReducer.beginPartitionedStep] // thread A
5.
<->[PartitionMapper.mapPartitions] // thread A
// per partition - on thread Px:
a. [<begin transaction> ]
b. <->ItemReader.open // thread Px
c. <->ItemWriter.open // thread Px
d. [<commit transaction> ]
a. <repeat until no more items> {
i. <begin checkpoint [<begin transaction> ]>
ii. <repeat until commit criteria reached> {
1. <->ItemReader.readItem // thread Px
2. <->ItemProcessor.processItem // thread Px
3. <add item to buffer>
iii. }
iv. <->ItemWriter.writeItems // thread Px
v. <->[ItemReader.checkpointInfo] // thread Px
vi. <->[ItemWriter.checkpointInfo] // thread Px
vii. <Store (partition-local) StepContext persistent area>
viii. <commit checkpoint (commit transaction)>
ix. <->[PartitionCollector.collectPartitionData] // thread Px
e. }
f. [<begin transaction> ]
g. <->ItemWriter.close // thread Px
h. <->ItemReader.close // thread Px
i. [<commit transaction> ]
j. <->[PartitionCollector.collectPartitionData] // thread Px
6.
[<begin transaction> ] // thread A
7.
// Actions 9-12 run continuously until all partitions end.
8.
// when collector payload arrives:
9.
<->[PartitionAnalyzer.analyzeCollectorData] // thread A
10.
// when partition ends:
11.
<->[PartitionAnalyzer.analyzeStatus] // thread A
12.
// Remaining actions run after all partitions end:
13.
// if rollback condition occurs:
14.
<->[PartitionReducer.rollbackPartitionedStep] // thread A
15.
[<rollback transaction >]
16.
// else not rollback
17.
<->[PartitionReducer.beforePartitionedStepCompletion] // thread A
18.
[<commit transaction> ] // thread A
19.
<->[PartitionReducer.afterPartitionedStepCompletion] // thread A
20.
<->[StepListener.afterStep...] // thread A
21.
<Store StepContext persistent area>
22.
<Destroy StepContext>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment