Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Proposal for plugin system in MLflow

Proposal for a plugin system in MLflow

Motivation

MLflow has an internally pluggable architecture to enable using different backends for both the tracking store and the artifact store. This makes it easy to add new backends in the mlflow package, but does not allow for other packages to provide new handlers for new backends.

This would be useful for several reasons:

  • Allowing external contributors to publish their own handlers in separate packages that are managed by those external contributors. In particular, this would allow for integration of upstream MLflow with cloud platforms that provide MLflow tracking capabilities.
  • Using an mlflow-plugins package as a contrib location to put handlers that are either too specific or not mature enough so they don't add maintenance overhead to the main package. An example of such a handler is the DynamoDB backend in PR #640.

Effect on user's workflow

If the user wishes to use a backend provided by Foo Platform, they could use the mlflow-foo package published by the Foo Platform team. Once they have installed both mlflow-foo and mlflow, they should be able to use tracking and artifact URIs of the format foo://project/bar to track and store experiments in the Foo platform under project bar.

When running the UI server, specifying such a foo://project/bar URI for tracking and artifact would show them the metrics and artifacts stored in the Foo platoform.

The mlflow-foo package would manage credentials and authorization with the Foo backend, so there would need not be any Foo-specific code inside of upstream MLflow.

Using entrypoints as plugin manager

Documentation: https://setuptools.readthedocs.io/en/latest/setuptools.html#dynamic-discovery-of-services-and-plugins

This mechanism is used by many packages (e.g., pytest, papermill, click...) to register external plugins.

Plugin system for tracking client

A plugin system would have a TrackingStoreRegistry class that contains the mappings from scheme to function that returns a Store instance. It would be instantiated at import time and the instance would be available at the top level of the module as mlflow.utils.tracking_store_registry.

At import time, all the built-in handlers would be registered through calling tracking_store_registry.register and there would also be a call to tracking_store_registry.register_entrypoints that registers handlers from external packages.

Example minimal implementation:

import entrypoints

...

class TrackingStoreRegistry:

    def __init__(self):
        self.registry = {}

    def register(self, scheme, store_builder):
        self.registry[scheme] = store_builder

    def register_entrypoints(self):
        # Load handlers provided by other packages
        for entrypoint in entrypoints.get_group_all("mlflow.tracking_store"):
            self.register(entrypoint.name, entrypoint.load())

    def get_store(self, store_uri):
        scheme = urllib.parse.urlparse(uri).scheme
        store_builder = self.registry.get(scheme)
        if store_builder is not None:
            return store_builder(store_uri)
        else:
            raise Exception("Tracking URI must be....")

tracking_store_registry = TrackingStoreRegistry()
tracking_store_registry.register('', _get_file_store)
tracking_store_registry.register('file', _get_file_store)
tracking_store_registry.register('databricks', _get_databricks_rest_store)
for scheme in _DBENGINES:
    tracking_store_registry.register(scheme, _get_sqlalchemy_store)

tracking_store_registry.register_entrypoints()

Calls to mlflow._get_store in other parts of the codebase would be replaced by tracking_store_registry.get_store.

How does an external package register a tracking store handler

External packages would then add an entrypoint field by putting in their setup.py the following:

setup(
    name="mlflow-foo"
    ...
    entry_points={
        "mlflow.tracking_store": "foo=custom_mlflow_store:custom_builder"
    }
)

Installing mlflow-foo would make it possible to set the tracking URI to foo://project-bar and mlflow would use the designated function from mlflow-custom to get the store. The store returned by mlflow_foo.custom_mlflow_store.custom_builder(store_uri) could be a RestStore with custom credentials or a completely new subclass of AbstractStore.

Plugin system for artifact store

To instantiate the appropriate artifact store, the constructor method ArtifactRepository.from_artifact_uri is used. This returns an implentation of ArtifactRepository appropriate to the scheme in the URI, with the following mapping:

  • s3:/ -> S3ArtifactRepository
  • gs:/ -> GCSArtifactRepository
  • wasbs:/ -> AzureBlobArtifactRepository
  • ftp:/ -> FTPArtifactRepository
  • sftp:/ -> SFTPArtifactRepository
  • dbfs:/ -> DbfsArtifactRepository
  • any other -> LocalArtifactRepository

The system would work in a similar way to the TrackingStoreRegistry, but keep track of the mapping from artifact URI scheme to implementations of ArtifactRepository.

import entrypoints

...

class ArtifactRepositoryRegistry:

    def __init__(self):
        self.registry = {}

    def register(self, scheme, repository):
        self.registry[scheme] = repository

    def register_entrypoints(self):
        # Load handlers provided by other packages
        for entrypoint in entrypoints.get_group_all("mlflow.artifact_repository"):
            self.register(entrypoint.name, entrypoint.load())

    def get_artifact_repository(self, artifact_uri):
        scheme = urllib.parse.urlparse(artifact_uri).scheme
        repository = self.registry.get(scheme)
        if repository is not None:
            return repository(artifact_uri)
        else:
            raise Exception("Artifact URI must be....")

artifact_repository_registry = ArtifactRepositoryRegistry()
artifact_repository_registry.register('s3:/', S3ArtifactRepository)
artifact_repository_registry.register('gs:/', GCSArtifactRepository)
artifact_repository_registry.register('wasbs:/', AzureBlobArtifactRepository)
artifact_repository_registry.register('ftp:/', FTPArtifactRepository)
artifact_repository_registry.register('sftp:/', SFTPArtifactRepository)
artifact_repository_registry.register('dbfs:/', DbfsArtifactRepository)

artifact_repository_registry.register_entrypoints()

Calls to ArtifactRepository.from_artifact_uri in other parts of the codebase would be replaced by artifact_repository_registry.get_artifact_repository.

Note on implementation

In the implementations above both registries have been built as separate classes to account for possible differences that might be needed down the line, but they could also be built from a single HandlerRegistry class that takes the entrypoint group to load handlers from as an argument:

```python
import entrypoints

...

class HandlerRegistry:

    def __init__(self, entrypoint_group):
        self.entrypoint_group = entrypoint_group
        self.registry = {}

    def register(self, scheme, repository):
        self.registry[scheme] = repository

    def register_entrypoints(self):
        # Load handlers provided by other packages
        for entrypoint in entrypoints.get_group_all(self.entrypoint_group):
            self.register(entrypoint.name, entrypoint.load())

    def get_store(self, store_uri):
        scheme = urllib.parse.urlparse(artifact_uri).scheme
        repository = self.registry.get(scheme)
        if repository is not None:
            return repository(store_uri)
        else:
            valid_schemes = ", ".join(self.registry.keys())
            raise Exception("URI scheme must be one of {}".format(valid_schemes))

Then the two registry instances would differ in which entrypoint group is specified and which handlers are registered within the package:

tracking_store_registry = HandlerRegistry("mlflow.tracking_store")
artifact_repository_registry = HandlerRegistry("mlflow.artifact_repository")

We could also consider whether the entrypoint registration is done on instantiation of the HandlerRegistry, giving less priority to external handlers than internal, or manually after registering all internal handlers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.