Skip to content

Instantly share code, notes, and snippets.

Created February 7, 2020 11:33
Show Gist options
  • Save odeke-em/89719e9e820a5aed1ba8cbaace0708a5 to your computer and use it in GitHub Desktop.
Save odeke-em/89719e9e820a5aed1ba8cbaace0708a5 to your computer and use it in GitHub Desktop.
A Transaction drop-in replacement to deal with the need for refreshing Cloud Spanner every 10 seconds
# Copyright 2020 Google LLC
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
The code in this file provides a drop-in replacement for spanner_v1.Transaction,
but one that auto-refreshes every 8.5 seconds, to deal with Cloud Spanner's server's
max idle time of 10 seconds, per:
It handles concurrency concerns by using an event loop, a queue and callbacks. If the queue
is empty for 8.5 seconds, it'll ping Cloud Spanner by sending the recommended:
execute_sql('SELECT 1')
and then reading the result back.
import queue
import threading
import time
class PeriodicAutoRefresher:
def __init__(self, period_secs=10, ping_fn=None):
self.__period_secs = period_secs
self.__done = threading.Event()
self.__Q = queue.Queue()
self.__ping_fn = ping_fn
self.__start_time = time.time()
pth = threading.Thread(target=self.__event_loop, name='period-auto-refresh')
self.__pth = pth
def __event_loop(self):
while True:
if not self.__still_running():
callback, fn, args, kwargs = self.__Q.get(block=True, timeout=self.__period_secs)
res, exc = None, None
res = fn(*args, **kwargs)
except Exception as e:
exc = e
callback(res, exc)
except queue.Empty:
if self.__still_running():
def __still_running(self):
return not self.__done.is_set()
def stop(self):
def run_op(self, callback, fn, *args, **kwargs):
self.__Q.put((callback, fn, args, kwargs))
class PeriodicAutoRefreshingTransaction:
PeriodicAutoRefreshingTransaction is the drop-in replacement for spanner_v1.Transaction
but with a max-idle duration of 8.5 seconds, since the last use time of the underlying
Transaction, else we'll perform a ping to Cloud Spanner with 'SELECT 1'.
It becomes active after .begin() has been invoked.
def __init__(self, txn):
self.__txn = txn
def begin(self):
res = self.__txn.begin()
self.__par = PeriodicAutoRefresher(period_secs=8.5, ping_fn=self.__ping)
return res
def __ping(self):
print('Pinging Cloud Spanner at %s' % time.time())
res = self.__txn.execute_sql('SELECT 1')
if res:
for it in res:
_ = it
def execute_sql(self, *args, **kwargs):
return self.__on_event_queue(self.__txn.execute_sql, *args, **kwargs)
def execute_update(self, *args, **kwargs):
return self.__on_event_queue(self.__txn.execute_update, *args, **kwargs)
def commit(self):
res = self.__on_event_queue(self.__txn.commit)
return res
def rollback(self):
res = self.__on_event_queue(self.__txn.rollback)
return res
def was_committed_or_rolledback(self):
# For now it is alright to access Transaction._rolled_back
# even though it is unexported. We've filed a follow-up issue:
return self.__on_event_queue(lambda txn: txn.committed or txn._rolled_back, self.__txn)
def __on_event_queue(self, fn, *args, **kwargs):
event = threading.Event()
res_exc = {}
# Using a lambda here because a defined closure would have scope/visibility
# problems trying to set res_exc, even if we used 'global res_exc'. A lambda solves
# the issue due to different scoping.
# We have to propagate the underlying results and exceptions from
# the asynchronously running callback, converting it to a synchronous call.
callback = lambda in_res, in_exc: (res_exc.setdefault('res', in_res), res_exc.setdefault('exc', in_exc), event.set()) # noqa
self.__par.run_op(callback, fn, *args, **kwargs)
res, exc = res_exc['res'], res_exc['exc']
if exc:
raise exc
return res
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment