Skip to content

Instantly share code, notes, and snippets.

@y13i
Last active December 26, 2018 06:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save y13i/1bd1e01e549a9dbfccc9d2ac97af4c8d to your computer and use it in GitHub Desktop.
Save y13i/1bd1e01e549a9dbfccc9d2ac97af4c8d to your computer and use it in GitHub Desktop.
Cursor-based Pagination な API を RxJS でリアクティブに処理してみる ref: https://qiita.com/y13i/items/1fdd33a49bce80712fe8
await listObjectsAsObservable(s3, {
Bucket: bucket,
Prefix: prefix,
MaxKeys: 50
})
.pipe(
tap(listObjectsResult => {
console.log(
`!!! list contains ${listObjectsResult.Contents!.length} items.`
);
})
)
.toPromise();
[AWS s3 200 0.096s 0 retries] listObjectsV2({ Bucket: 's3-observable-example',
Prefix: 'test1/',
MaxKeys: 50,
ContinuationToken: undefined })
!!! list contains 50 items.
[AWS s3 200 0.076s 0 retries] listObjectsV2({ Bucket: 's3-observable-example',
Prefix: 'test1/',
MaxKeys: 50,
ContinuationToken:
'1KI8JxWWj6YE3WmkKojZ2JeionQ3wtMNsZDGPXICX4e0wk0jhsiYZFh1EjUao850UqILlYHBZUVn45tLUh0bkcA==' })
!!! list contains 50 items.
[AWS s3 200 0.073s 0 retries] listObjectsV2({ Bucket: 's3-observable-example',
Prefix: 'test1/',
MaxKeys: 50,
ContinuationToken:
'1MQ1MDhm6mD4vblrhuvafELPq5mKZQA1V6gHy0wxjfjWB8nSP+DkTdEG5XHLZUjWCkqMBEDnDXyggNokdHMKRig==' })
!!! list contains 30 items.
const divisibleBy13Count = await listObjectsAsObservable(s3, {
Bucket: bucket,
Prefix: prefix,
MaxKeys: 50
})
.pipe(
mergeMap(listObjectsResult => {
return from(listObjectsResult.Contents!);
}),
filter(object => {
const match = object.Key!.match(/(\d+)\.json$/);
if (!match) return false;
const number = parseInt(match[1]!);
return number % 13 === 0;
}),
tap(object => {
console.log(
`The timestamp in object key ${object.Key} is divisible by 13`
);
}),
count()
)
.toPromise();
console.log(
`There are ${divisibleBy13Count} objects with 13-divisible-timestamps.`
);
The timestamp in object key test1/113-1545384463792.json is divisible by 13
The timestamp in object key test1/15-1545384453717.json is divisible by 13
The timestamp in object key test1/90-1545384461985.json is divisible by 13
There are 3 objects with 13-divisible-timestamps.
await listObjectsAsObservable(s3, {
Bucket: bucket,
Prefix: prefix,
MaxKeys: 50
})
.pipe(
mergeMap(listObjectsResult => {
return from(listObjectsResult.Contents!);
}),
filter(object => {
const match = object.Key!.match(/(\d+)\.json$/);
if (!match) return false;
const number = parseInt(match[1]!);
return number % 7 === 0;
}),
take(7),
mergeMap(object => {
return s3.getObject({ Bucket: bucket, Key: object.Key! }).promise();
}),
map(getObjectResult => {
const parsed: { value: number } = JSON.parse(
getObjectResult.Body!.toString()
);
return parsed.value;
}),
scan((acc, x) => {
return acc + x;
}, 0),
tap(currentSum =>
console.log(
`The current sum of values in objects with 7-divisible-timestamps is ${currentSum}`
)
)
)
.toPromise();
The current sum of values in the first 7 objects with 7-divisible-timestamps is 51
The current sum of values in the first 7 objects with 7-divisible-timestamps is 94
The current sum of values in the first 7 objects with 7-divisible-timestamps is 179
The current sum of values in the first 7 objects with 7-divisible-timestamps is 257
The current sum of values in the first 7 objects with 7-divisible-timestamps is 285
The current sum of values in the first 7 objects with 7-divisible-timestamps is 315
The current sum of values in the first 7 objects with 7-divisible-timestamps is 379
import { Observable } from "rxjs";
import { S3 } from "aws-sdk";
type Request = S3.ListObjectsV2Request;
export function listObjectsAsObservable(
s3: S3,
request: Request
): Observable<S3.ListObjectsV2Output> {
return Observable.create((observer: any) => {
let finished = false;
let continuationToken: string | undefined = undefined;
(async () => {
try {
do {
const req: Request = {
...request,
ContinuationToken: continuationToken
};
const result = await s3.listObjectsV2(req).promise();
continuationToken = result.NextContinuationToken;
observer.next(result);
} while (continuationToken && !finished);
} catch (error) {
observer.error(error);
}
observer.complete();
})();
return () => (finished = true);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment