Skip to content

Instantly share code, notes, and snippets.

@haizaar
Last active September 16, 2022 21:09
Show Gist options
  • Save haizaar/63b941ec747f71d076494847fef49317 to your computer and use it in GitHub Desktop.
Save haizaar/63b941ec747f71d076494847fef49317 to your computer and use it in GitHub Desktop.
APIConnector for thread-safe google api python client
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable, Hashable, List, Optional
import google.auth
import httplib2
import structlog
from google_auth_httplib2 import AuthorizedHttp
from googleapiclient.http import HttpRequest
logger = structlog.get_logger(__name__)
@dataclass
class MemCache:
data: dict[Hashable, Any] = field(default_factory=dict)
def get(self, key: Hashable) -> Any:
if hit := self.data.get(key, None):
logger.debug("Cache hit", key=key)
return hit
def set(self, key: Hashable, data: Any) -> None:
self.data[key] = data
def delete(self, key):
try:
del self.data[1]
except KeyError:
pass
@dataclass
class APIConnector:
"""
This class is a thread-safe wrapper around HttpRequest.execute() method.
It uses a pool of AuthorizedHttp objects and makes sure there is
only one in-flight request for each.
"""
factory: Callable[[], AuthorizedHttp]
pool: List[AuthorizedHttp] = field(default_factory=[])
@classmethod
def new(
cls,
credentials: google.auth.Credentials,
initial_size: int = 5,
timeout_seconds: int = 3,
cache: Optional[MemCache] = None,
) -> APIConnector:
factory = lambda: AuthorizedHttp(
credentials, http=httplib2.Http(timeout=timeout_seconds, cache=cache)
)
pool: List[AuthorizedHttp] = []
for i in range(initial_size):
pool.append(factory())
return cls(factory, pool=pool)
def execute(self, request: HttpRequest) -> Any:
http: Optional[AuthorizedHttp] = None
try:
http = self._provision_http()
return request.execute(http=http)
finally:
if http:
self.pool.append(http)
def _provision_http(self) -> AuthorizedHttp:
# This function can run in parallel in multiple threads.
try:
return self.pool.pop()
except IndexError:
logger.info("Transport pool exhausted. Creating new transport")
return self.factory()
def close(self) -> None:
for ahttp in self.pool:
ahttp.http.close()
def __del__(self) -> None:
self.close()
@orcohenadiv-zoomin
Copy link

Hey man, Im trying to use your code for something.
I have created a APICONNECTOR.py class and GSuiteUserManager.py class.
My question is, how am I making this function, to use the new code.

def connect_logic(config: dict[str]) -> googleapiclient.discovery.Resource:
logging.info("Connecting to google drive...")
# The token.json section of the config stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
credentials: Credentials = Credentials.from_authorized_user_info(
info=config[GD_CONFIG][TOKEN_CONFIG], scopes=config[GD_CONFIG][SCOPES])
service: googleapiclient.discovery.Resource = build("drive", "v3", credentials=credentials)
logging.info("Successfully logged into google drive")
return service

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment