Skip to content

Instantly share code, notes, and snippets.

@bAcheron
Created February 8, 2024 22:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bAcheron/71a1eb0796bbb50743b89d6e15c4a479 to your computer and use it in GitHub Desktop.
Save bAcheron/71a1eb0796bbb50743b89d6e15c4a479 to your computer and use it in GitHub Desktop.
import requests
from googleapiclient.discovery import build
import boto3
from io import StringIO
import csv
api_key = 'REPLACE WITH YOUR API KEY, usually in AWS key service'
youtube = build('youtube', 'v3', developerKey=api_key)
def fetch_video_details(video_id):
# Fetch detailed information about the video
video_response = youtube.videos().list(
id=video_id,
part='snippet,statistics,contentDetails'
).execute()
if video_response['items']:
video_data = video_response['items'][0]
details = {
'Title': video_data['snippet']['title'],
'Published At': video_data['snippet']['publishedAt'],
'View Count': video_data['statistics'].get('viewCount', '0'),
'Like Count': video_data['statistics'].get('likeCount', '0'),
'Dislike Count': video_data['statistics'].get('dislikeCount', '0'), # Note that dislike counts may not be available due to YouTube API changes
'Comment Count': video_data['statistics'].get('commentCount', '0'),
'Duration': video_data['contentDetails']['duration'],
'Video ID': video_id
}
return details
else:
return None
def lambda_handler(event, context):
#the code below pulls out channel ID from the event
#channel_id = event['channel_id']
#you could also hard code it to make sure everything is working
channel_id='REPLACE WITH YOUR CHANNEL ID'
# Get upload playlist ID
channel_response = youtube.channels().list(id=channel_id, part='contentDetails').execute()
playlist_id = channel_response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
# Fetch videos from the playlist
videos = []
next_page_token = None
while True:
playlist_response = youtube.playlistItems().list(
playlistId=playlist_id,
part='snippet',
maxResults=50,
pageToken=next_page_token
).execute()
videos += [item['snippet']['resourceId']['videoId'] for item in playlist_response['items']]
next_page_token = playlist_response.get('nextPageToken')
if next_page_token is None:
break
csv_output = StringIO()
fieldnames = ['Title', 'Published At', 'View Count', 'Like Count', 'Comment Count', 'Duration', 'Video ID']
writer = csv.DictWriter(csv_output, fieldnames=fieldnames)
writer.writeheader()
for video_id in videos:
video_details = fetch_video_details(video_id)
if video_details:
writer.writerow(video_details)
print(f'Video details fetched for {video_details["Title"]}')
# Reset the pointer to the start of the string
csv_output.seek(0)
# Upload the CSV to S3
s3_client = boto3.client('s3')
s3_bucket_name='REPLACE WITH YOUR BUCKET NAME'
s3_file_key = 'youtube_video_details.csv'
s3_client.put_object(Bucket=s3_bucket_name, Key=s3_file_key, Body=csv_output.getvalue())
print(f'Successfully uploaded the CSV to s3://{s3_bucket_name}/{s3_file_key}')
return {
'statusCode': 200,
'body': 'CSV uploaded successfully'
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment