Last active
December 26, 2018 06:51
-
-
Save y13i/1bd1e01e549a9dbfccc9d2ac97af4c8d to your computer and use it in GitHub Desktop.
Cursor-based Pagination な API を RxJS でリアクティブに処理してみる ref: https://qiita.com/y13i/items/1fdd33a49bce80712fe8
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
await listObjectsAsObservable(s3, { | |
Bucket: bucket, | |
Prefix: prefix, | |
MaxKeys: 50 | |
}) | |
.pipe( | |
tap(listObjectsResult => { | |
console.log( | |
`!!! list contains ${listObjectsResult.Contents!.length} items.` | |
); | |
}) | |
) | |
.toPromise(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[AWS s3 200 0.096s 0 retries] listObjectsV2({ Bucket: 's3-observable-example', | |
Prefix: 'test1/', | |
MaxKeys: 50, | |
ContinuationToken: undefined }) | |
!!! list contains 50 items. | |
[AWS s3 200 0.076s 0 retries] listObjectsV2({ Bucket: 's3-observable-example', | |
Prefix: 'test1/', | |
MaxKeys: 50, | |
ContinuationToken: | |
'1KI8JxWWj6YE3WmkKojZ2JeionQ3wtMNsZDGPXICX4e0wk0jhsiYZFh1EjUao850UqILlYHBZUVn45tLUh0bkcA==' }) | |
!!! list contains 50 items. | |
[AWS s3 200 0.073s 0 retries] listObjectsV2({ Bucket: 's3-observable-example', | |
Prefix: 'test1/', | |
MaxKeys: 50, | |
ContinuationToken: | |
'1MQ1MDhm6mD4vblrhuvafELPq5mKZQA1V6gHy0wxjfjWB8nSP+DkTdEG5XHLZUjWCkqMBEDnDXyggNokdHMKRig==' }) | |
!!! list contains 30 items. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const divisibleBy13Count = await listObjectsAsObservable(s3, { | |
Bucket: bucket, | |
Prefix: prefix, | |
MaxKeys: 50 | |
}) | |
.pipe( | |
mergeMap(listObjectsResult => { | |
return from(listObjectsResult.Contents!); | |
}), | |
filter(object => { | |
const match = object.Key!.match(/(\d+)\.json$/); | |
if (!match) return false; | |
const number = parseInt(match[1]!); | |
return number % 13 === 0; | |
}), | |
tap(object => { | |
console.log( | |
`The timestamp in object key ${object.Key} is divisible by 13` | |
); | |
}), | |
count() | |
) | |
.toPromise(); | |
console.log( | |
`There are ${divisibleBy13Count} objects with 13-divisible-timestamps.` | |
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The timestamp in object key test1/113-1545384463792.json is divisible by 13 | |
The timestamp in object key test1/15-1545384453717.json is divisible by 13 | |
The timestamp in object key test1/90-1545384461985.json is divisible by 13 | |
There are 3 objects with 13-divisible-timestamps. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
await listObjectsAsObservable(s3, { | |
Bucket: bucket, | |
Prefix: prefix, | |
MaxKeys: 50 | |
}) | |
.pipe( | |
mergeMap(listObjectsResult => { | |
return from(listObjectsResult.Contents!); | |
}), | |
filter(object => { | |
const match = object.Key!.match(/(\d+)\.json$/); | |
if (!match) return false; | |
const number = parseInt(match[1]!); | |
return number % 7 === 0; | |
}), | |
take(7), | |
mergeMap(object => { | |
return s3.getObject({ Bucket: bucket, Key: object.Key! }).promise(); | |
}), | |
map(getObjectResult => { | |
const parsed: { value: number } = JSON.parse( | |
getObjectResult.Body!.toString() | |
); | |
return parsed.value; | |
}), | |
scan((acc, x) => { | |
return acc + x; | |
}, 0), | |
tap(currentSum => | |
console.log( | |
`The current sum of values in objects with 7-divisible-timestamps is ${currentSum}` | |
) | |
) | |
) | |
.toPromise(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The current sum of values in the first 7 objects with 7-divisible-timestamps is 51 | |
The current sum of values in the first 7 objects with 7-divisible-timestamps is 94 | |
The current sum of values in the first 7 objects with 7-divisible-timestamps is 179 | |
The current sum of values in the first 7 objects with 7-divisible-timestamps is 257 | |
The current sum of values in the first 7 objects with 7-divisible-timestamps is 285 | |
The current sum of values in the first 7 objects with 7-divisible-timestamps is 315 | |
The current sum of values in the first 7 objects with 7-divisible-timestamps is 379 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable } from "rxjs"; | |
import { S3 } from "aws-sdk"; | |
type Request = S3.ListObjectsV2Request; | |
export function listObjectsAsObservable( | |
s3: S3, | |
request: Request | |
): Observable<S3.ListObjectsV2Output> { | |
return Observable.create((observer: any) => { | |
let finished = false; | |
let continuationToken: string | undefined = undefined; | |
(async () => { | |
try { | |
do { | |
const req: Request = { | |
...request, | |
ContinuationToken: continuationToken | |
}; | |
const result = await s3.listObjectsV2(req).promise(); | |
continuationToken = result.NextContinuationToken; | |
observer.next(result); | |
} while (continuationToken && !finished); | |
} catch (error) { | |
observer.error(error); | |
} | |
observer.complete(); | |
})(); | |
return () => (finished = true); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment