This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def _get_data(self): | |
| url = f"https://tgftp.nws.noaa.gov/data/observations/metar/cycles/{self.hour:02d}Z.TXT" | |
| text = requests.get(url, verify=False).text | |
| sections = text.split("\n\n") | |
| data = {"dates": [], "metars": []} | |
| for s in sections: | |
| try: | |
| split_s = s.split("\n") | |
| date = split_s[0].strip() | |
| metar = split_s[1].strip() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| select a.id, a.title , a."source" , v.sentiment, array_agg(k.keyword) as keywords | |
| from sentiment.articles a | |
| inner join | |
| sentiment.keyword k on a.id=k.article_id | |
| inner join | |
| sentiment.sentiment_values v on a.id=v.article_id | |
| where a.id=2 | |
| group by a.id, a.title , a."source" , v.sentiment |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def get_sentiment(**kwargs): | |
| """Based on news headlines get openai sentiment values for articles and persist to db.""" | |
| ti = kwargs["ti"] | |
| article_id_range = ti.xcom_pull(task_ids="get_articles") | |
| session = db_conn() | |
| for _id in range(article_id_range[0], article_id_range[1] + 1): | |
| title = get_article_title_from_id(_id) | |
| openai_resp = get_open_ai_answer(title, "sentiment") | |
| try: | |
| value = int(openai_resp) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def get_keywords(**kwargs): | |
| """Based on news headlines get openai keywords for articles and persist to db.""" | |
| ti = kwargs["ti"] | |
| article_id_range = ti.xcom_pull(task_ids="get_articles") | |
| session = db_conn() | |
| for _id in range(article_id_range[0], article_id_range[1] + 1): | |
| title = get_article_title_from_id(_id) | |
| openai_resp = get_open_ai_answer(title) | |
| try: | |
| data = json.loads(openai_resp) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def get_article_title_from_id(_id): | |
| """Get article title from db from article id.""" | |
| session = db_conn() | |
| record = ( | |
| session.query(SentimentArticles).filter(SentimentArticles.id == _id).first() | |
| ) | |
| session.close() | |
| return record.title |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def get_keyword_prompt(headline): | |
| """Prompt for openai to get headline keywords.""" | |
| return f"""Based on the news headline, | |
| provide the most important word bubble | |
| terms associated with who or what the article is about. | |
| If the term is a name, provider only the last name. | |
| Provide a json list. | |
| Desired Format: json list |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def get_open_ai_answer(headline, prompt_type="keyword"): | |
| """Get result from OpenAI.""" | |
| if prompt_type == "keyword": | |
| prompt = get_keyword_prompt(headline) | |
| elif prompt_type == "sentiment": | |
| prompt = get_sentiment_prompt(headline) | |
| else: | |
| raise ValueError(f"prompt_type '{prompt_type}' is invalid") | |
| openai_key = Variable.get("openai_key") | |
| openai.api_key = openai_key |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| from datetime import datetime, timedelta | |
| from airflow.models import DAG, Variable | |
| from airflow.operators.dummy import DummyOperator | |
| from airflow.operators.python import PythonOperator | |
| from demo.models import SentimentArticles, SentimentKeywords, SentimentValues | |
| from demo.utils import db_conn, get_article_title_from_id, get_open_ai_answer | |
| from newsapi import NewsApiClient |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import openai | |
| from airflow.hooks.base import BaseHook | |
| from airflow.models import Variable | |
| from demo.models import SentimentArticles | |
| from sqlalchemy import create_engine | |
| from sqlalchemy.orm import sessionmaker | |
| def db_conn(): | |
| """Get DB connection.""" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from sqlalchemy import Column, DateTime, ForeignKey, Integer, String | |
| from sqlalchemy.orm import declarative_base | |
| from sqlalchemy.sql import func | |
| Base = declarative_base() | |
| class SentimentArticles(Base): | |
| """News API articles.""" |
NewerOlder