Skip to content

Instantly share code, notes, and snippets.

@odeke-em
Created February 4, 2020 23:01
Show Gist options
  • Save odeke-em/a17aa49854aeae1d83ffc14715f52d79 to your computer and use it in GitHub Desktop.
Save odeke-em/a17aa49854aeae1d83ffc14715f52d79 to your computer and use it in GitHub Desktop.
Auto-refreshing transaction prototype to take care of Spanner 10 second idle limit https://cloud.google.com/spanner/docs/reference/rest/v1/TransactionOptions#idle-transactions
# Copyright 2020 Google LLC
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
import threading
import time
from google.cloud import spanner_v1 as spanner
class AutoRefreshingTransaction(spanner.transaction.Transaction):
"""
RefreshableTransaction ensures that it gets refreshed every
8 seconds by sending a "SELECT 1" and retrieving the results.
"""
__MAX_TIMEOUT_SECS = 8
def initialize_auto_refresh(self):
# This method exists because getting a Transaction from a sessionPool
# returns an already instantiated Transaction, thus we can't have an
# __init__ method.
self.__shared_mem = {'running': True, 'last_active_time': 0}
self.__lock = threading.RLock()
# Spawn the PingingThread.
pth = threading.Thread(target=periodic_ping, name='txn-freshness-ping',
args=(self.__MAX_TIMEOUT_SECS, self.__ping, self.__shared_mem, self.__lock,))
pth.start()
self.__pth = pth
def begin(self, *args, **kwargs):
res = super().begin()
self.initialize_auto_refresh()
return res
def execute_sql(self, *args, **kwargs):
self.__record_as_active()
super().execute_sql(*args, **kwargs)
def execute_update(self, *args, **kwargs):
self.__record_as_active()
super().execute_update(*args, **kwargs)
def commit(self, *args, **kwargs):
self.__record_as_active()
res = super().commit(*args, **kwargs)
self.end()
return res
def rollback(self, *args, **kwargs):
self.__record_as_active()
res = super().rollback(*args, **kwargs)
self.end()
return res
def __record_as_active(self):
self.__lock.acquire()
self.__shared_mem['last_active_time'] = time.time()
self.__lock.release()
def end(self):
self.__lock.acquire()
self.__shared_mem['running'] = False
self.__lock.release()
self.__pth.join()
print('ended')
def __ping(self):
res = super().execute_sql('SELECT 1=1')
print('pinging')
for it in res:
_ = it
print(it)
self.__lock.acquire()
self.__shared_mem['last_active_time'] = time.time()
self.__lock.release()
def periodic_ping(max_timeout_secs, ping, shared_mem, lock):
while True:
lock.acquire()
running = shared_mem['running']
lock.release()
if not running:
break
time.sleep(max_timeout_secs)
lock.acquire()
running = shared_mem['running']
lock.release()
if not running:
break
diff_secs = time.time() - shared_mem['last_active_time']
if diff_secs >= max_timeout_secs:
ping()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment