Skip to content

Instantly share code, notes, and snippets.

@Cdaprod
Created February 23, 2024 14:59
Show Gist options
  • Save Cdaprod/9bebc6b0c1befb695b36755052530c58 to your computer and use it in GitHub Desktop.
Save Cdaprod/9bebc6b0c1befb695b36755052530c58 to your computer and use it in GitHub Desktop.
MinIO SDK Client and Bucket Validation
from typing import Optional
from pydantic import BaseModel, Field
from minio import Minio
class MinIoConfig(BaseModel):
endpoint: str
access_key: str
secret_key: str
secure: bool = Field(default=True, title="Use SSL?")
bucket: Optional[str] = None
@property
def _minio_client(self):
"""Lazy initialization for minio_client."""
if not hasattr(self, "_instance"):
try:
self._instance = Minio(
self.endpoint,
self.access_key,
self.secret_key,
secure=self.secure,
)
except Exception as e:
raise ValueError(f"Unable to establish MinIO connection: {e}")
return self._instance
def validate_client_and_bucket(self):
"""Validates the presence of the bucket."""
if self.bucket and not self._minio_client.bucket_exists(self.bucket):
raise ValueError(f"Bucket '{self.bucket}' does not exist.")
# Example configuration
config = MinIoConfig(endpoint='play.min.io:443', access_key='minioadmin', secret_key='minioadmin')
config.validate_client_and_bucket() # Validate client and bucket
print("MinIO Connection & Bucket validation succeeded.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment