Last active
June 1, 2023 23:32
-
-
Save jeffehobbs/57e4a692903fcdac16bb4953ae43aab4 to your computer and use it in GitHub Desktop.
record radio, transcribe it, make art
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# radiodreambot | jeffehobbs@gmail.com | |
# record radio, transcribe it, make art | |
import requests, os, shutil, configparser, datetime, time, multiprocessing, openai, tweepy | |
from mastodon import Mastodon | |
from faster_whisper import WhisperModel | |
STREAM_URL = 'https://live.wostreaming.net/direct/saga-wrsifmaac-ibc2' | |
LOCAL_STREAM_OUTPUT_DIRECTORY = '/tmp/' | |
RUN_TIME = 60 | |
TIMESTAMP = str(datetime.datetime.now().strftime("%Y-%m-%d_%H:%M")) | |
DEBUG = False | |
# set up API keys from external config apikeys.txt file | |
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__)) | |
config = configparser.ConfigParser() | |
config.read(SCRIPT_PATH +'/secrets.txt') | |
OPENAI_APIKEY = config.get('openai', 'api_key') | |
TWITTER_CONSUMER_KEY = config.get('twitter', 'api_key') | |
TWITTER_CONSUMER_SECRET = config.get('twitter', 'api_key_secret') | |
TWITTER_ACCESS_TOKEN = config.get('twitter', 'access_token') | |
TWITTER_ACCESS_TOKEN_SECRET = config.get('twitter', 'access_token_secret') | |
MASTODON_ACCESS_TOKEN = config.get('mastodon','access_token') | |
def main(): | |
print(f'recording for {RUN_TIME} seconds...') | |
r = requests.get(STREAM_URL, stream=True) | |
with open(LOCAL_STREAM_OUTPUT_DIRECTORY + TIMESTAMP + '.mp3', 'wb') as f: | |
for block in r.iter_content(1024): | |
f.write(block) | |
return | |
def transcribe(): | |
print(f'transcribing {TIMESTAMP}.mp3 ...') | |
# tiny.en, tiny, base.en, base, small.en, small, medium.en, medium, large-v1, large-v2 | |
# adjust the model depending on how much RAM you have to toss at this | |
model_size = 'large-v2' | |
model = WhisperModel(model_size, device="cpu", compute_type="int8") | |
segments, info = model.transcribe(LOCAL_STREAM_OUTPUT_DIRECTORY + TIMESTAMP + '.mp3', beam_size=5) | |
text = '' | |
for segment in segments: | |
print(segment.text[1:]) | |
text = text + segment.text[1:] + '\n' | |
f = open(LOCAL_STREAM_OUTPUT_DIRECTORY + TIMESTAMP + '.txt', "w") | |
f.write(text) | |
f.close() | |
return(text) | |
# generate image from post text | |
def get_openai_image(text, num_images): | |
openai.api_key = OPENAI_APIKEY | |
file_path = f'/tmp/{TIMESTAMP}.jpg' | |
text = text[:400] | |
response = openai.Image.create(prompt=text, n=num_images, size="1024x1024") | |
image_url = response['data'][0]['url'] | |
response = requests.get(image_url, stream=True) | |
with open(file_path, 'wb') as out_file: | |
shutil.copyfileobj(response.raw, out_file) | |
del response | |
return(file_path) | |
# tweet that stuff | |
def send_tweet(status, image_file_path): | |
media_ids = [] | |
client = tweepy.Client(consumer_key=TWITTER_CONSUMER_KEY, | |
consumer_secret=TWITTER_CONSUMER_SECRET, | |
access_token=TWITTER_ACCESS_TOKEN, | |
access_token_secret=TWITTER_ACCESS_TOKEN_SECRET) | |
auth = tweepy.OAuth1UserHandler( | |
TWITTER_CONSUMER_KEY, | |
TWITTER_CONSUMER_SECRET, | |
TWITTER_ACCESS_TOKEN, | |
TWITTER_ACCESS_TOKEN_SECRET, | |
) | |
api = tweepy.API(auth) | |
media_upload_response = api.media_upload(image_file_path) | |
media_ids.append(media_upload_response.media_id) | |
if (len(status) > 270): | |
status = status[:267] + "..." | |
response = client.create_tweet(text=status, user_auth=True, media_ids=media_ids) | |
return | |
def send_mastodon(status, image_file_path): | |
mastodon = Mastodon( | |
access_token = MASTODON_ACCESS_TOKEN, | |
api_base_url = 'https://botsin.space/' | |
) | |
media = mastodon.media_post(image_file_path, description="") | |
if (len(status) > 500): | |
status = status[:497] + "..." | |
mastodon.status_post(status, media_ids=media) | |
return | |
if __name__ == '__main__': | |
print('starting...') | |
p = multiprocessing.Process(target=main, name="Main") | |
p.start() | |
time.sleep(RUN_TIME) | |
p.terminate() | |
p.join() | |
text = transcribe() | |
image_file = get_openai_image(text, 1) | |
if DEBUG: | |
print('debug mode, exiting early,') | |
else: | |
print('posting...') | |
send_tweet(text, image_file) | |
send_mastodon(text, image_file) | |
print('...end.') | |
# fin |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment