Skip to content

Instantly share code, notes, and snippets.

@theSekyi
Created July 15, 2019 00:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save theSekyi/2450d7cbef2fe7f76c7c1cd5fd90d72f to your computer and use it in GitHub Desktop.
Save theSekyi/2450d7cbef2fe7f76c7c1cd5fd90d72f to your computer and use it in GitHub Desktop.
DEBUG: Checking if CleanData(out_dir=data/interim/) is complete
INFO: Informed scheduler that task CleanData_data_interim__ced0aed8fc has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 11417] Worker Worker(salt=790270365, workers=1, host=lols-MacBook-Pro.local, username=socrates, pid=11417) running CleanData(out_dir=data/interim/)
ERROR: [pid 11417] Worker Worker(salt=790270365, workers=1, host=lols-MacBook-Pro.local, username=socrates, pid=11417) failed CleanData(out_dir=data/interim/)
Traceback (most recent call last):
File "/Users/socrates/.local/share/virtualenvs/pipelines-O1OuTu0F/lib/python3.7/site-packages/luigi/task.py", line 851, in getpaths
return [getpaths(r) for r in struct]
TypeError: 'NoneType' object is not iterable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/socrates/.local/share/virtualenvs/pipelines-O1OuTu0F/lib/python3.7/site-packages/luigi/worker.py", line 199, in run
new_deps = self._run_get_new_deps()
File "/Users/socrates/.local/share/virtualenvs/pipelines-O1OuTu0F/lib/python3.7/site-packages/luigi/worker.py", line 141, in _run_get_new_deps
task_gen = self.task.run()
File "/Users/socrates/Projects/pipelines/task.py", line 68, in run
in_csv = self.input().path
File "/Users/socrates/.local/share/virtualenvs/pipelines-O1OuTu0F/lib/python3.7/site-packages/luigi/task.py", line 645, in input
return getpaths(self.requires())
File "/Users/socrates/.local/share/virtualenvs/pipelines-O1OuTu0F/lib/python3.7/site-packages/luigi/task.py", line 853, in getpaths
raise Exception('Cannot map %s to Task/dict/list' % str(struct))
Exception: Cannot map None to Task/dict/list
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task CleanData_data_interim__ced0aed8fc has status FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 1 pending tasks possibly being run by other workers
DEBUG: There are 1 pending tasks unique to this worker
DEBUG: There are 1 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=790270365, workers=1, host=lols-MacBook-Pro.local, username=socrates, pid=11417) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 failed:
- 1 CleanData(out_dir=data/interim/)
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
import luigi
import os
import requests
from pathlib import Path
import urllib.request
import os
import pandas as pd
import numpy as np
from utils import processing
class GetData(luigi.Task):
fname = luigi.Parameter(default='wine_dataset')
out_dir = luigi.Parameter(default='data/raw/')
url = luigi.Parameter(
default='https://aswer2/releases/download/0.1.0/sampled.csv'
)
def output(self):
out_dir = Path(self.out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
return luigi.LocalTarget(
path=out_dir/f'{self.fname}.csv'
)
def run(self):
out_dir = Path(self.out_dir)/f'{self.fname}.csv'
d_file = urllib.request.urlretrieve(self.url, out_dir)
return d_file
class CleanData(luigi.Task):
out_dir = luigi.Parameter(default='data/interim/')
def requires(self):
GetData()
def output(self):
return luigi.LocalTarget(
path=str(self.out_dir)
)
def run(self):
datatypes = {
"Unnamed: 0": "int64",
"country": "object",
"description": "object",
"designation": "object",
"points": "int64",
"price": "float64",
"province": "object",
"region_1": "object",
"region_2": "object",
"taster_name": "object",
"taster_twitter_handle": "object",
"title": "object",
"variety": "object",
"winery": "object"
}
in_csv = self.input().path
print(f"The csv path is {in_csv}")
# df = pd.read_csv(in_csv, dtype= self.datatypes)
# # get description and column fields out of dataframe
# df_keep = df[['description', 'points']].loc[:]
# # transform dataframe
# df_keep = processing(df_keep, col="description")
# n_samples = len(df_keep)
# idx = np.arange(n_samples)
# test_idx = idx[:n_samples // 10]
# test = df_keep.loc[test_idx]
# train_idx = idx[n_samples // 10:]
# train = df_keep.loc[train_idx]
# # save dataframe in feather format
# os.makedirs(out_dir, exist_ok=True)
# train.to_feather('data/interim/train')
# test.to_feather('data/interim/test')
class TrainModel(luigi.Task):
pass
class MakePredictions(luigi.Task):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment