Created
January 31, 2023 18:19
-
-
Save ecumene/080c20dcadb5c5ecf8cf1a18e5af8edf to your computer and use it in GitHub Desktop.
Service Discovery for Dagster ECS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*/1 * * * * cd /opt/dagster/dagster_home && ./service_discovery.py > /dev/stdout |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
RUN apt-get install cron -y | |
COPY crontab /etc/cron.d/crontab | |
RUN crontab /etc/cron.d/crontab | |
COPY service_discovery.py $DAGSTER_HOME/service_discovery.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import requests | |
import boto3 | |
import yaml | |
def list_dagster_namespaces(): | |
client = boto3.client('servicediscovery') | |
namespaces = client.list_namespaces() | |
dagster_namespaces = [] | |
for namespace in namespaces['Namespaces']: | |
if namespace['Name'].startswith('dagster-'): | |
dagster_namespaces.append(namespace['Name']) | |
return dagster_namespaces | |
def create_grpc_servers(namespaces): | |
grpc_servers = [{ | |
'grpc_server': { | |
'host': f"user_code.{namespace}", | |
'port': 4000, | |
'location_name': namespace.split("-")[-1].split(".")[0] | |
} | |
} for namespace in namespaces] | |
return grpc_servers | |
namespaces = list_dagster_namespaces() | |
load_from = create_grpc_servers(namespaces) | |
mutation = """ | |
mutation { | |
reloadWorkspace { | |
__typename | |
... on Workspace { | |
locationEntries { | |
id | |
name | |
} | |
} | |
} | |
} | |
""" | |
host = "http://dagit.dagster.local:3000/graphql" | |
with open('workspace.yaml', 'a+') as f: | |
try: | |
old_workspace = yaml.load(f, Loader=yaml.SafeLoader) or {} | |
location_names = list(map(lambda x: x['grpc_server']['location_name'], old_workspace.get('load_from', []))) | |
for server in load_from: | |
if server['grpc_server']['location_name'] not in location_names: | |
print(f"Adding {server['grpc_server']['location_name']} to workspace") | |
r = requests.post(host, json={'query': mutation}) | |
print(r.text) | |
finally: | |
f.seek(0) | |
yaml.dump({"load_from": create_grpc_servers(namespaces)}, f) | |
f.truncate() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment