Skip to content

Instantly share code, notes, and snippets.

PCollection<byte[]> impulseOut = input.apply(Impulse.create());
PCollection<DataChangeRecord> results =
impulseOut
.apply("Generate change stream sources", MapElements.into(TypeDescriptor.of(ChangeStreamSourceDescriptor.class))
.via(_ -> {
return ChangeStreamSourceDescriptor.of(
getChangeStreamName(), getInclusiveStartAt(), getInclusiveEndAt());
}))
.apply("Detect new partitions", ParDo.of(detectNewPartitionsDoFn))
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"cloud.google.com/go/spanner"
@hengfengli
hengfengli / binary_search_implementation.py
Created May 20, 2019 04:20
binary search implementation
# From https://github.com/python/cpython/blob/master/Lib/bisect.py
def bisect_right(a, x, lo=0, hi=None):
"""Return the index where to insert item x in list a, assuming a is sorted.
The return value i is such that all e in a[:i] have e <= x, and all e in
a[i:] have e > x. So if x already appears in the list, a.insert(x) will
insert just after the rightmost x already there.
Optional args lo (default 0) and hi (default len(a)) bound the
slice of a to be searched.
"""
@hengfengli
hengfengli / convert-2d-enumerable-to-array.rb
Created May 16, 2019 13:24
Convert 2d enumerable to an array in ruby
class TestA
include Enumerable
def each(&block)
items.each(&block)
end
def items
[1, 2, 3, 4]
end