Skip to content

Instantly share code, notes, and snippets.

View khuyentran1401's full-sized avatar
🏠
Working from home

Khuyen Tran khuyentran1401

🏠
Working from home
View GitHub Profile
app
├── Visualize.py
├── __init__.py
└── pages
└── 1_🚦_Topics.py
import json
import pandas as pd
from typing import List
import pandas as pd
from helper import load_config
from omegaconf import DictConfig
from prefect import flow, task
from prefect.tasks import task_input_hash
from pydash import py_
@task
def create_dataframe_from_dict(data: dict):
return pd.DataFrame(data)
@task
def remove_duplicates(data: pd.DataFrame, config: DictConfig):
"""Remove the duplicates of a repository"""
subset = list(config.relevant_info)
subset.remove("topics")
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def get_relevant_info(data: List[dict], config: DictConfig):
"""Get only the information that we care about in each repo"""
infos = {}
for info in config.relevant_info:
value = py_(data).map(info).value()
infos[info] = value
return infos
import json
import pandas as pd
from typing import List
import pandas as pd
from helper import load_config
from omegaconf import DictConfig
from prefect import flow, task
from prefect.tasks import task_input_hash
from pydash import py_
from prefect import task, flow
from prefect.tasks import task_input_hash
...
@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(days=1),
retries=3, # new code
retry_delay_seconds=60, # new code
from prefect import task, flow
from prefect.tasks import task_input_hash
...
@task(
cache_key_fn=task_input_hash, # new code
cache_expiration=timedelta(days=1), # new code
)
def get_general_info_of_repos(auth: dict):
10:50:06.667 | INFO | prefect.engine - Created flow run 'prudent-echidna' for flow 'get-data'
10:50:06.667 | INFO | Flow run 'prudent-echidna' - Using task runner 'ConcurrentTaskRunner'
10:50:06.725 | INFO | Flow run 'prudent-echidna' - Created task run 'load_config-69ae3b92-0' for task 'load_config'
10:50:06.741 | INFO | Flow run 'prudent-echidna' - Created task run 'get_authentication-aa770635-0' for task 'get_authentication'
10:50:06.771 | INFO | Flow run 'prudent-echidna' - Created task run 'get_general_info_of_repos-c422ad50-0' for task 'get_general_info_of_repos'
10:50:06.782 | INFO | Task run 'get_authentication-aa770635-0' - Finished in state Completed()
10:50:06.794 | INFO | Flow run 'prudent-echidna' - Created task run 'get_starred_repo_urls-8a266957-0' for task 'get_starred_repo_urls'
10:50:06.823 | INFO | Flow run 'prudent-echidna' - Created task run 'get_specific_info_of_repos-c9bc4fc2-0' for task 'get_specific_info_of_repos'
10:50:06.885 | INFO | Flow run 'prudent-echi
from prefect import task, flow # new code
@task # new code
def get_authentication():
...
@task # new code
def get_general_info_of_repos(auth: dict):
...
import json
import pandas as pd
from typing import List
import pandas as pd
from helper import load_config
from omegaconf import DictConfig
from pydash import py_