-
-
Save bAcheron/71a1eb0796bbb50743b89d6e15c4a479 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import requests | |
| from googleapiclient.discovery import build | |
| import boto3 | |
| from io import StringIO | |
| import csv | |
| api_key = 'REPLACE WITH YOUR API KEY, usually in AWS key service' | |
| youtube = build('youtube', 'v3', developerKey=api_key) | |
| def fetch_video_details(video_id): | |
| # Fetch detailed information about the video | |
| video_response = youtube.videos().list( | |
| id=video_id, | |
| part='snippet,statistics,contentDetails' | |
| ).execute() | |
| if video_response['items']: | |
| video_data = video_response['items'][0] | |
| details = { | |
| 'Title': video_data['snippet']['title'], | |
| 'Published At': video_data['snippet']['publishedAt'], | |
| 'View Count': video_data['statistics'].get('viewCount', '0'), | |
| 'Like Count': video_data['statistics'].get('likeCount', '0'), | |
| 'Dislike Count': video_data['statistics'].get('dislikeCount', '0'), # Note that dislike counts may not be available due to YouTube API changes | |
| 'Comment Count': video_data['statistics'].get('commentCount', '0'), | |
| 'Duration': video_data['contentDetails']['duration'], | |
| 'Video ID': video_id | |
| } | |
| return details | |
| else: | |
| return None | |
| def lambda_handler(event, context): | |
| #the code below pulls out channel ID from the event | |
| #channel_id = event['channel_id'] | |
| #you could also hard code it to make sure everything is working | |
| channel_id='REPLACE WITH YOUR CHANNEL ID' | |
| # Get upload playlist ID | |
| channel_response = youtube.channels().list(id=channel_id, part='contentDetails').execute() | |
| playlist_id = channel_response['items'][0]['contentDetails']['relatedPlaylists']['uploads'] | |
| # Fetch videos from the playlist | |
| videos = [] | |
| next_page_token = None | |
| while True: | |
| playlist_response = youtube.playlistItems().list( | |
| playlistId=playlist_id, | |
| part='snippet', | |
| maxResults=50, | |
| pageToken=next_page_token | |
| ).execute() | |
| videos += [item['snippet']['resourceId']['videoId'] for item in playlist_response['items']] | |
| next_page_token = playlist_response.get('nextPageToken') | |
| if next_page_token is None: | |
| break | |
| csv_output = StringIO() | |
| fieldnames = ['Title', 'Published At', 'View Count', 'Like Count', 'Comment Count', 'Duration', 'Video ID'] | |
| writer = csv.DictWriter(csv_output, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for video_id in videos: | |
| video_details = fetch_video_details(video_id) | |
| if video_details: | |
| writer.writerow(video_details) | |
| print(f'Video details fetched for {video_details["Title"]}') | |
| # Reset the pointer to the start of the string | |
| csv_output.seek(0) | |
| # Upload the CSV to S3 | |
| s3_client = boto3.client('s3') | |
| s3_bucket_name='REPLACE WITH YOUR BUCKET NAME' | |
| s3_file_key = 'youtube_video_details.csv' | |
| s3_client.put_object(Bucket=s3_bucket_name, Key=s3_file_key, Body=csv_output.getvalue()) | |
| print(f'Successfully uploaded the CSV to s3://{s3_bucket_name}/{s3_file_key}') | |
| return { | |
| 'statusCode': 200, | |
| 'body': 'CSV uploaded successfully' | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment