Skip to content

Instantly share code, notes, and snippets.

@LordBastor
Created February 5, 2021 16:21
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save LordBastor/9dd64cef3cc69c3f987d0f341e39af2e to your computer and use it in GitHub Desktop.
import requests
import prefect
from prefect import Task
class ExtractPokemon(Task):
def run(self, **kwargs):
logger = prefect.context.get("logger")
url = "https://pokeapi.co/api/v2/pokemon?limit=151"
response = requests.get(url)
if response.ok:
return response.json()
logger.warning(
"Could not load pokemon list! Error {}".format(response.status_code)
)
return {"results": []}
class TransformPokemon(Task):
def run(self, pokemon):
logger = prefect.context.get("logger")
url = pokemon["url"]
name = pokemon["name"].title()
logger.info("Getting {} from {}".format(name, url))
response = requests.get(url)
if response.ok:
return response.json()
logger.warning(
"Could not load pokemon {}! Error {}".format(name, response.status_code)
)
return {}
class LoadPokemon(Task):
def run(self, pokemon):
logger = prefect.context.get("logger")
logger.info(len(pokemon))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment