Skip to content

Instantly share code, notes, and snippets.

@wckdouglas
Last active July 30, 2021 11:44
Show Gist options
  • Save wckdouglas/62bb760510dafc0a8261029e7f3c3d30 to your computer and use it in GitHub Desktop.
Save wckdouglas/62bb760510dafc0a8261029e7f3c3d30 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import subprocess
from pathlib import Path
import numpy as np
import pandas as pd
import luigi
from luigi.contrib.sge import SGEJobTask # fixed SGE in this branch https://github.com/wckdouglas/luigi
from luigi.mock import MockTarget
WORKING_DIR = Path("/home")
RESULT_DIR = WORKING_DIR / "result"
for dir in [RESULT_DIR]:
dir.mkdir(exist_ok=True)
class SecondTask(SGEJobTask):
def requires(self):
return [FirstTask()]
def output(self):
return luigi.LocalTarget('file2.txt')
def work(self):
with requires()[0].output().open('r') as infile, \
self.output().open('w') as outfile:
for line in infile:
print(line.strip(), file=outfile)
class FirstTask(SGEJobTask):
def output(self):
return luigi.LocalTarget('file1.txt')
def work(self):
import subprocess
import shlex
cmd = "echo here"
with open(self.output().path, "w") as out:
subprocess.call(shlex.split(cmd), stdout=out)
class EndTask(luigi.Task):
def requires(self):
return [
SecondTask(
n_cpu=4,
queue="dev-short",
poll_time=40,
parallel_env="smp",
dont_remove_tmp_dir=True,
shared_tmp_dir=str(WORKING_DIR / "tmp"),
)
]
def output(self):
return MockTarget('end', mirror_on_stderr=True)
def run(self):
with open(self.output().path, "w") as out:
print('End', file=out)
if __name__ == "__main__":
luigi.build(
[EndTask()],
local_scheduler=False,
log_level="INFO",
scheduler_port=2020,
workers=100,
)
@wckdouglas
Copy link
Author

Scheduler server started by:

luigid --pidfile=./luigi_server/pid --logdir ./luigi_server/log --state-path ./luigi_server/state.pickle --port 2020

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment