-
-
Save theSekyi/2450d7cbef2fe7f76c7c1cd5fd90d72f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DEBUG: Checking if CleanData(out_dir=data/interim/) is complete | |
INFO: Informed scheduler that task CleanData_data_interim__ced0aed8fc has status PENDING | |
INFO: Done scheduling tasks | |
INFO: Running Worker with 1 processes | |
DEBUG: Asking scheduler for work... | |
DEBUG: Pending tasks: 1 | |
INFO: [pid 11417] Worker Worker(salt=790270365, workers=1, host=lols-MacBook-Pro.local, username=socrates, pid=11417) running CleanData(out_dir=data/interim/) | |
ERROR: [pid 11417] Worker Worker(salt=790270365, workers=1, host=lols-MacBook-Pro.local, username=socrates, pid=11417) failed CleanData(out_dir=data/interim/) | |
Traceback (most recent call last): | |
File "/Users/socrates/.local/share/virtualenvs/pipelines-O1OuTu0F/lib/python3.7/site-packages/luigi/task.py", line 851, in getpaths | |
return [getpaths(r) for r in struct] | |
TypeError: 'NoneType' object is not iterable | |
During handling of the above exception, another exception occurred: | |
Traceback (most recent call last): | |
File "/Users/socrates/.local/share/virtualenvs/pipelines-O1OuTu0F/lib/python3.7/site-packages/luigi/worker.py", line 199, in run | |
new_deps = self._run_get_new_deps() | |
File "/Users/socrates/.local/share/virtualenvs/pipelines-O1OuTu0F/lib/python3.7/site-packages/luigi/worker.py", line 141, in _run_get_new_deps | |
task_gen = self.task.run() | |
File "/Users/socrates/Projects/pipelines/task.py", line 68, in run | |
in_csv = self.input().path | |
File "/Users/socrates/.local/share/virtualenvs/pipelines-O1OuTu0F/lib/python3.7/site-packages/luigi/task.py", line 645, in input | |
return getpaths(self.requires()) | |
File "/Users/socrates/.local/share/virtualenvs/pipelines-O1OuTu0F/lib/python3.7/site-packages/luigi/task.py", line 853, in getpaths | |
raise Exception('Cannot map %s to Task/dict/list' % str(struct)) | |
Exception: Cannot map None to Task/dict/list | |
DEBUG: 1 running tasks, waiting for next task to finish | |
INFO: Informed scheduler that task CleanData_data_interim__ced0aed8fc has status FAILED | |
DEBUG: Asking scheduler for work... | |
DEBUG: Done | |
DEBUG: There are no more tasks to run at this time | |
DEBUG: There are 1 pending tasks possibly being run by other workers | |
DEBUG: There are 1 pending tasks unique to this worker | |
DEBUG: There are 1 pending tasks last scheduled by this worker | |
INFO: Worker Worker(salt=790270365, workers=1, host=lols-MacBook-Pro.local, username=socrates, pid=11417) was stopped. Shutting down Keep-Alive thread | |
INFO: | |
===== Luigi Execution Summary ===== | |
Scheduled 1 tasks of which: | |
* 1 failed: | |
- 1 CleanData(out_dir=data/interim/) | |
This progress looks :( because there were failed tasks | |
===== Luigi Execution Summary ===== |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import luigi | |
import os | |
import requests | |
from pathlib import Path | |
import urllib.request | |
import os | |
import pandas as pd | |
import numpy as np | |
from utils import processing | |
class GetData(luigi.Task): | |
fname = luigi.Parameter(default='wine_dataset') | |
out_dir = luigi.Parameter(default='data/raw/') | |
url = luigi.Parameter( | |
default='https://aswer2/releases/download/0.1.0/sampled.csv' | |
) | |
def output(self): | |
out_dir = Path(self.out_dir) | |
out_dir.mkdir(parents=True, exist_ok=True) | |
return luigi.LocalTarget( | |
path=out_dir/f'{self.fname}.csv' | |
) | |
def run(self): | |
out_dir = Path(self.out_dir)/f'{self.fname}.csv' | |
d_file = urllib.request.urlretrieve(self.url, out_dir) | |
return d_file | |
class CleanData(luigi.Task): | |
out_dir = luigi.Parameter(default='data/interim/') | |
def requires(self): | |
GetData() | |
def output(self): | |
return luigi.LocalTarget( | |
path=str(self.out_dir) | |
) | |
def run(self): | |
datatypes = { | |
"Unnamed: 0": "int64", | |
"country": "object", | |
"description": "object", | |
"designation": "object", | |
"points": "int64", | |
"price": "float64", | |
"province": "object", | |
"region_1": "object", | |
"region_2": "object", | |
"taster_name": "object", | |
"taster_twitter_handle": "object", | |
"title": "object", | |
"variety": "object", | |
"winery": "object" | |
} | |
in_csv = self.input().path | |
print(f"The csv path is {in_csv}") | |
# df = pd.read_csv(in_csv, dtype= self.datatypes) | |
# # get description and column fields out of dataframe | |
# df_keep = df[['description', 'points']].loc[:] | |
# # transform dataframe | |
# df_keep = processing(df_keep, col="description") | |
# n_samples = len(df_keep) | |
# idx = np.arange(n_samples) | |
# test_idx = idx[:n_samples // 10] | |
# test = df_keep.loc[test_idx] | |
# train_idx = idx[n_samples // 10:] | |
# train = df_keep.loc[train_idx] | |
# # save dataframe in feather format | |
# os.makedirs(out_dir, exist_ok=True) | |
# train.to_feather('data/interim/train') | |
# test.to_feather('data/interim/test') | |
class TrainModel(luigi.Task): | |
pass | |
class MakePredictions(luigi.Task): | |
pass | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment