Skip to content

Instantly share code, notes, and snippets.

View LordBastor's full-sized avatar

Boris Gaganelov LordBastor

  • Sofia, Bulgaria
View GitHub Profile
from datetime import timedelta
from prefect import Flow
from prefect import task
from tasks.pokemontasks import ExtractPokemon, TransformPokemon, LoadPokemon
from utils.config import kubernetes_run, azure_store
extract_pokemon = ExtractPokemon()
transform_pokemon = TransformPokemon()
load_pokemon = LoadPokemon()
import requests
import prefect
from prefect import Task
class ExtractPokemon(Task):
def run(self, **kwargs):
logger = prefect.context.get("logger")
url = "https://pokeapi.co/api/v2/pokemon?limit=151"
response = requests.get(url)
if response.ok:
from prefect.storage import Azure
from prefect.run_configs import KubernetesRun
import os
azure_store = Azure(container="prefect")
kubernetes_run = KubernetesRun(
image=os.environ.get("IMAGE_URL"),
job_template_path=os.environ.get("JOB_TEMPLATE_PATH", "./job_template.yaml"),
labels=["prefect"],
from prefect import Flow, task
@task
def extract():
return [1, 2, 3]
@task
def transform(x):
return [i * 10 for i in x]
pool:
vmImage: ubuntu-latest
variables:
- name: ACR
value: ACR_URL
- name: AZURE_STORAGE_CONNECTION_STRING
value: AZURE_STORAGE_CONNECTION_STRING
- name: ACR_USER
value: ACR_USER