Skip to content

Instantly share code, notes, and snippets.

@hazcod
Created May 28, 2021 13:40
Show Gist options
  • Save hazcod/0671f5bd996570dbff90ccf9fb5b8ae1 to your computer and use it in GitHub Desktop.
Save hazcod/0671f5bd996570dbff90ccf9fb5b8ae1 to your computer and use it in GitHub Desktop.
This helper class helps you use Psycopg but with connection reattempts.
class PGDB():
LIMIT_RETRIES = 5
def __init__(self, user:str, password:str, host:str, port:int, database:str, sslmode:str, reconnect:bool):
self.user = user
self.password = password
self.host = host
self.port = port
self.database = database
self._connection = None
self._cursor = None
self.reconnect = reconnect
self.sslmode = sslmode
self.init()
def connect(self, retry_counter=0):
if not self._connection:
try:
self._connection = psycopg2.connect(
user=self.user, password=self.password, host=self.host, port=self.port,
database=self.database, sslmode=self.sslmode, connect_timeout=3,
)
retry_counter = 0
self._connection.autocommit = False
return self._connection
except psycopg2.OperationalError as error:
if not self.reconnect or retry_counter >= self.LIMIT_RETRIES:
raise error
else:
retry_counter += 1
ctx.log.info("got error {}. reconnecting {}".format(str(error).strip(), retry_counter))
time.sleep(5)
self.connect(retry_counter)
except (Exception, psycopg2.Error) as error:
raise error
def cursor(self):
if not self._cursor or self._cursor.closed:
if not self._connection:
self.connect()
self._cursor = self._connection.cursor()
return self._cursor
def execute(self, query, args, retry_counter:int):
try:
#query = "set statement_timeout=3;" + query
self._cursor.execute(query, args)
retry_counter = 0
except (psycopg2.DatabaseError, psycopg2.OperationalError) as error:
if retry_counter >= self.LIMIT_RETRIES:
raise error
else:
retry_counter += 1
ctx.log.info("got error {}. retrying {}".format(str(error).strip(), retry_counter))
time.sleep(1)
self.reset()
self.execute(query, args, retry_counter)
except (Exception, psycopg2.Error) as error:
raise error
return self._cursor.fetchone()
def is_master(self):
row = self.execute("select pg_is_in_recovery();")
if row and row[0]:
return False
return True
def reset(self):
self.close()
self.connect()
self.cursor()
def close(self):
if self._connection:
if self._cursor:
self._cursor.close()
self._connection.close()
ctx.log.info("PostgreSQL connection is closed")
self._connection = None
self._cursor = None
def init(self):
self.connect()
self.cursor()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment