Skip to content

Instantly share code, notes, and snippets.

@icoxfog417
Created March 29, 2019 06:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save icoxfog417/3539980a8d69eed16df7e02363e93cc9 to your computer and use it in GitHub Desktop.
Save icoxfog417/3539980a8d69eed16df7e02363e93cc9 to your computer and use it in GitHub Desktop.
class EDINETGetDocumentSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, document_type="xbrl", *args, **kwargs):
self.document_type = document_type
self._next_document_index = -1
super().__init__(*args, **kwargs)
def poke(self, context):
document_ids = context["task_instance"].xcom_pull(
task_ids="edinet_get_document_list")
if self._next_document_index < 0:
self.log.info("Download the {} documents from EDINET.".format(
len(document_ids)))
self._next_document_index = 0
client = api.DocumentClient()
default_path = os.path.join(os.path.dirname(__file__), "../../data")
gcp_bucket = Variable.get("gcp_bucket", default_var="")
save_dir = default_path if not gcp_bucket else ""
document_id = document_ids[self._next_document_index]
self.log.info("Dealing {}/{} documents.".format(
(self._next_document_index + 1), len(document_ids)))
if self.document_type == "pdf":
path = client.get_pdf(document_id, save_dir)
else:
path = client.get_xbrl(document_id, save_dir)
if gcp_bucket:
from google.cloud import storage
client = storage.Client()
bucket = client.get_bucket(gcp_bucket)
exact_name = str(path).split("__")[-1]
bucket.blob(exact_name).upload_from_filename(filename=str(path))
self._next_document_index += 1
if self._next_document_index == len(document_ids):
return True
else:
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment