Skip to content

Instantly share code, notes, and snippets.

@jimwhite
Last active January 11, 2024 03:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jimwhite/df999c61f4c71a23e32a21979eb0c136 to your computer and use it in GitHub Desktop.
Save jimwhite/df999c61f4c71a23e32a21979eb0c136 to your computer and use it in GitHub Desktop.
Fetch history news using Alpaca API, raw JSON format, one article per file
import os
import json
from datetime import datetime, timezone
import time
import argparse
from alpaca.data.historical.news import NewsClient
from alpaca.data.requests import NewsRequest
from alpaca.data.timeframe import TimeFrame
def isoformatutc(d):
return d.isoformat().replace("+00:00", "Z")
def write_news_to_json(n):
n["fetched_at"] = isoformatutc(datetime.now(timezone.utc))
year_mo = n["updated_at"][:7]
news_data_path = os.path.abspath(f"data/news_data_{year_mo}")
if not os.path.exists(news_data_path):
os.makedirs(news_data_path)
with open(
os.path.join(news_data_path, f"{n['id']}_{n['updated_at']}.json"), "wt"
) as outfile:
json.dump(n, outfile)
def get_all_the_news(start, end, timeframe: TimeFrame = TimeFrame.Day):
"""
base function to use with all
:param start:
:param end:
:param timeframe:
:return:
"""
msg = f"Getting news "
msg += f", timeframe: {timeframe}" if timeframe else ""
msg += f" between dates: start={start}, end={end}"
print(msg)
step_size = 50
news_client = NewsClient(
api_key=os.environ["APCA_API_KEY_ID"],
secret_key=os.environ["APCA_API_SECRET_KEY"],
raw_data=True,
)
request_params = NewsRequest(
include_content=True, sort="asc",
timeframe=timeframe,
start=start,
end=end,
limit=step_size,
)
print(f"getting news from {start}")
news_response = news_client.get_news(request_params)
news_list = news_response["news"]
print(len(news_list))
if not news_list:
print("empty news list")
return
for news in news_list:
write_news_to_json(news)
start = datetime.fromisoformat(news["updated_at"])
next_page_token = news_response.get("next_page_token")
while next_page_token and start <= end:
# print(f"page token: {next_page_token}")
request_params = NewsRequest(include_content=True, sort="asc", page_token=next_page_token, limit=step_size)
news_response = news_client.get_news(request_params)
news_list = news_response.get("news")
# print(len(news_list))
if not news_list:
print("empty news list")
break
for news in news_list:
write_news_to_json(news)
start = datetime.fromisoformat(news["updated_at"])
next_page_token = news_response["next_page_token"]
def download_news_year(year):
start = datetime(year, 1, 1, tzinfo=timezone.utc)
end = datetime(year+1, 1, 1, tzinfo=timezone.utc)
timeframe: TimeFrame = TimeFrame.Day
get_all_the_news(start, end, timeframe)
parser = argparse.ArgumentParser()
parser.add_argument("--year", help="Enter the year", type=int)
args = parser.parse_args()
if args.year:
print(f"year: {args.year}")
start_time = time.time()
download_news_year(args.year)
print(f"{args.year} took {time.time() - start_time} sec")
else:
print("No year provided")
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment