Skip to content

Instantly share code, notes, and snippets.

@inchoate
Last active February 2, 2024 11:40
Show Gist options
  • Star 26 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save inchoate/bd0ff7f609f57c85d9de8ff9d5586e30 to your computer and use it in GitHub Desktop.
Save inchoate/bd0ff7f609f57c85d9de8ff9d5586e30 to your computer and use it in GitHub Desktop.
Adding an extra package to a Python Dataflow project to run on GCP

The Problem

The documentation for how to deploy a pipeline with extra, non-PyPi, pure Python packages on GCP is missing some detail. This gist shows how to package and deploy an external pure-Python, non-PyPi dependency to a managed dataflow pipeline on GCP.

TL;DR: You external package needs to be a python (source/binary) distro properly packaged and shipped alongside your pipeline. It is not enough to only specify a tar file with a setup.py.

Preparing the External Package

Your external package must have a proper setup.py. What follow is an example setup.py for our ETL package. This is used to package version 1.1.1 of the etl library. The library requires 3 native PyPi packages to run. These are specified in the install_requires field. This package also ships with custom external JSON data, declared in the package_data section. Last, the setuptools.find_packages function searches for all available packages and returns that list: :

# ETL's setup.py
from setuptools import setup, find_packages
setup(
    name='etl',
    version='1.1.1',
    install_requires=[
        'nose==1.3.7',
        'datadiff==2.0.0',
        'unicodecsv==0.14.1'
    ],
    description='ETL tools for API v2',
    packages = find_packages(),
    package_data = {
        'etl.lib': ["*.json"]
    }
)

Otherwise, there is nothing special about this setup.py file.

Building the External Package

You need to create a real source (or binary?) distribution of your external package. To do so, run the following in your external package's directory:

python setup.py sdist --formats=gztar

The last few lines you should see will look like this,

hard linking etl/transform/user.py -> etl-1.1.1/etl/transform
Writing etl-1.1.1/setup.cfg
Creating tar archive
removing 'etl-1.1.1' (and everything under it)

The output of this command, if run successfully, is a source distribution of your package, suitable to including into the pipeline project. Look in your ./dist directory for the file:

14:55 $ ls dist
etl-1.1.1.tar.gz

Preparing the Pipeline Project

  • Create a distlib or extra packages directory in which you will place the file you just created in the previous step:
cd pipeline-project
mkdir dist/
cp ~/etl-project/dist/etl-1.1.1.tar.gz .
  • Let the pipeline know you intend to include this package, by using the --extra-package command line argument:
18:38 $ python dataflow_main.py \
    --input=staging
    --output=bigquery-staging
    --runner=DataflowPipelineRunner
    --project=realmassive-staging
    --job_name dataflow-project-1
    --setup_file ./setup.py
    --staging_location gs://dataflow/staging
    --temp_location gs://dataflow/temp
    --requirements_file requirements.txt
    --extra_package dist/etl-1.1.1.tar.gz 
@dovy
Copy link

dovy commented Nov 7, 2016

I had a more specific need. I needed to be able to install mysql_client on the box as well. So I ended up running my requirements as root commands, and then packaging my directory accordingly. Rather than previously packaging up my external package. In fact, I can just specify the path (using a symbolic link) and let Dataflow process it properly.

Here's my directory structure:

Dataflow
--Core
-- owlet

owlet is my pipeline codebase and is a relative symbolic link since all my code is in the same directory.

Here's my setup.py folder:

from distutils.command.build import build as _build
import subprocess

import setuptools

# This class handles the pip install mechanism.
class build(_build):  # pylint: disable=invalid-name
    """A build command class that will be invoked during package install.
    The package built using the current setup.py will be staged and later
    installed in the worker using `pip install package'. This class will be
    instantiated during install for this specific scenario and will trigger
    running the custom commands specified.
    """
    sub_commands = _build.sub_commands + [('CustomCommands', None)]


# Some custom command to run during setup. The command is not essential for this
# workflow. It is used here as an example. Each command will spawn a child
# process. Typically, these commands will include steps to install non-Python
# packages. 
#
# First, note that there is no need to use the sudo command because the setup
# script runs with appropriate access.
# Second, if apt-get tool is used then the first command needs to be 'apt-get
# update' so the tool refreshes itself and initializes links to download
# repositories.  Without this initial step the other apt-get install commands
# will fail with package not found errors. Note also --assume-yes option which
# shortcuts the interactive confirmation.
#
# The output of custom commands (including failures) will be logged in the
# worker-startup log.
CUSTOM_COMMANDS = [
    ['apt-get', 'update'],
    ['apt-get', '--assume-yes', 'install', 'libmysqlclient-dev'],
    ['apt-get', '--assume-yes', 'install', 'python-dev'],
    ['apt-get', '--assume-yes', 'install', 'libssl1.0.0'],
    ['apt-get', '--assume-yes', 'install', 'libffi-dev'],
    ['apt-get', '--assume-yes', 'install', 'libssl-dev'],
    ['apt-get', '--assume-yes', 'install', 'libxml2-dev'],
    ['apt-get', '--assume-yes', 'install', 'libxslt1-dev'],
    ['pip', 'install', 'pyga==2.5.1'],
    ['pip', 'install', 'MySQL-python==1.2.5'],
    ['pip', 'install', 'fluent-logger==0.4.4'],
    ['pip', 'install', 'phonenumbers==7.7.2'],
    ['pip', 'install', 'python-dateutil==2.5.3'],
    ['pip', 'install', 'google-api-python-client==1.5.4'],
    ['pip', 'install', 'suds==0.4'],
    ['pip', 'install', 'websocket-client==0.37.0'],
    ['pip', 'install', 'tornado==4.4.2'],
    ['pip', 'install', 'progressbar2==3.10.1'],
    ['pip', 'install', 'pyOpenSSL==16.2.0'],
    ['pip', 'install', 'futures==3.0.5'],
    ['pip', 'install', 'requests==2.4.3'],
    ['pip', 'install', 'SQLAlchemy==1.1.2']
]


class CustomCommands(setuptools.Command):
    """A setuptools Command class able to run arbitrary commands."""

    def initialize_options(self):
        pass

    def finalize_options(self):
        pass

    def RunCustomCommand(self, command_list):
        print
        'Running command: %s' % command_list
        p = subprocess.Popen(
            command_list,
            stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        # Can use communicate(input='y\n'.encode()) if the command run requires
        # some confirmation.
        stdout_data, _ = p.communicate()
        print
        'Command output: %s' % stdout_data
        if p.returncode != 0:
            raise RuntimeError(
                'Command %s failed: exit code: %s' % (command_list, p.returncode))

    def run(self):
        for command in CUSTOM_COMMANDS:
            self.RunCustomCommand(command)


# Configure the required packages and scripts to install.
# Note that the Python Dataflow containers come with numpy already installed
# so this dependency will not trigger anything to be installed unless a version
# restriction is specified.



REQUIRED_PACKAGES = [

]

setuptools.setup(
    name='core',
    version='0.0.1',
    description='My primary codebase.',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={
        # Command class instantiated and run during pip install scenarios.
        'build': build,
        'CustomCommands': CustomCommands,
    }
)

@max-sixty
Copy link

🙏

@bobbui
Copy link

bobbui commented Sep 4, 2018

thanks for the gist, its really useful. I got it working with batch job but not with streaming job. any ideas?

@GuangsZuo
Copy link

I had a more specific need. I needed to be able to install mysql_client on the box as well. So I ended up running my requirements as root commands, and then packaging my directory accordingly. Rather than previously packaging up my external package. In fact, I can just specify the path (using a symbolic link) and let Dataflow process it properly.

Here's my directory structure:

Dataflow
--Core
-- owlet

owlet is my pipeline codebase and is a relative symbolic link since all my code is in the same directory.

Here's my setup.py folder:

from distutils.command.build import build as _build
import subprocess

import setuptools

# This class handles the pip install mechanism.
class build(_build):  # pylint: disable=invalid-name
    """A build command class that will be invoked during package install.
    The package built using the current setup.py will be staged and later
    installed in the worker using `pip install package'. This class will be
    instantiated during install for this specific scenario and will trigger
    running the custom commands specified.
    """
    sub_commands = _build.sub_commands + [('CustomCommands', None)]


# Some custom command to run during setup. The command is not essential for this
# workflow. It is used here as an example. Each command will spawn a child
# process. Typically, these commands will include steps to install non-Python
# packages. 
#
# First, note that there is no need to use the sudo command because the setup
# script runs with appropriate access.
# Second, if apt-get tool is used then the first command needs to be 'apt-get
# update' so the tool refreshes itself and initializes links to download
# repositories.  Without this initial step the other apt-get install commands
# will fail with package not found errors. Note also --assume-yes option which
# shortcuts the interactive confirmation.
#
# The output of custom commands (including failures) will be logged in the
# worker-startup log.
CUSTOM_COMMANDS = [
    ['apt-get', 'update'],
    ['apt-get', '--assume-yes', 'install', 'libmysqlclient-dev'],
    ['apt-get', '--assume-yes', 'install', 'python-dev'],
    ['apt-get', '--assume-yes', 'install', 'libssl1.0.0'],
    ['apt-get', '--assume-yes', 'install', 'libffi-dev'],
    ['apt-get', '--assume-yes', 'install', 'libssl-dev'],
    ['apt-get', '--assume-yes', 'install', 'libxml2-dev'],
    ['apt-get', '--assume-yes', 'install', 'libxslt1-dev'],
    ['pip', 'install', 'pyga==2.5.1'],
    ['pip', 'install', 'MySQL-python==1.2.5'],
    ['pip', 'install', 'fluent-logger==0.4.4'],
    ['pip', 'install', 'phonenumbers==7.7.2'],
    ['pip', 'install', 'python-dateutil==2.5.3'],
    ['pip', 'install', 'google-api-python-client==1.5.4'],
    ['pip', 'install', 'suds==0.4'],
    ['pip', 'install', 'websocket-client==0.37.0'],
    ['pip', 'install', 'tornado==4.4.2'],
    ['pip', 'install', 'progressbar2==3.10.1'],
    ['pip', 'install', 'pyOpenSSL==16.2.0'],
    ['pip', 'install', 'futures==3.0.5'],
    ['pip', 'install', 'requests==2.4.3'],
    ['pip', 'install', 'SQLAlchemy==1.1.2']
]


class CustomCommands(setuptools.Command):
    """A setuptools Command class able to run arbitrary commands."""

    def initialize_options(self):
        pass

    def finalize_options(self):
        pass

    def RunCustomCommand(self, command_list):
        print
        'Running command: %s' % command_list
        p = subprocess.Popen(
            command_list,
            stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        # Can use communicate(input='y\n'.encode()) if the command run requires
        # some confirmation.
        stdout_data, _ = p.communicate()
        print
        'Command output: %s' % stdout_data
        if p.returncode != 0:
            raise RuntimeError(
                'Command %s failed: exit code: %s' % (command_list, p.returncode))

    def run(self):
        for command in CUSTOM_COMMANDS:
            self.RunCustomCommand(command)


# Configure the required packages and scripts to install.
# Note that the Python Dataflow containers come with numpy already installed
# so this dependency will not trigger anything to be installed unless a version
# restriction is specified.



REQUIRED_PACKAGES = [

]

setuptools.setup(
    name='core',
    version='0.0.1',
    description='My primary codebase.',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={
        # Command class instantiated and run during pip install scenarios.
        'build': build,
        'CustomCommands': CustomCommands,
    }
)

COOL !

@tjwebb
Copy link

tjwebb commented Jan 4, 2021

This is cool. One note to add: this doesn't really work in combination with requirements.txt, since the workers pip install -r requirements.txt before running setup.py. So you'll need to install any python modules that rely on OS packages in setup.py.

@elitongadotti
Copy link

What about when Dataflow pipeline is a package itself and so I need to reference a package from another?
I've tried all options I could but no success so far 😢

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment