Created
November 1, 2022 16:05
-
-
Save cnlpete/061eeda3ebef1d067931c941d152900e to your computer and use it in GitHub Desktop.
run in the background, download and generate cloud images and call xplanet every so often
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import sched | |
import subprocess | |
import os | |
import json | |
import requests | |
from PIL import Image | |
# The ID of the satellite product to use, if you want realistic clouds you | |
# want to use 'globalir'. You can see a list of products here: | |
# http://realearth.ssec.wisc.edu/products/ | |
PRODUCT = 'globalir' | |
# URL of the API, if it ever changes | |
API_URL = 'http://re.ssec.wisc.edu/api' | |
# Zoom level. Valid Levels are 0(smallest image) - 20(largest image). Zoom | |
# level 3 will create a 4096x2048 image which if downloaded once an hour | |
# results in about 200 MegaPixels of usage a day, well under the 500 MegaPixels | |
# free amount. Zoom level 4 produces a 8192x4096 image, which results in about | |
# 800 MegaPixels of usage a day, which is under the free API access level. | |
# It isn't clear that zoom levels above 4 get you any more detail anyways. | |
ZOOM = 1 | |
# Your API Key. | |
# You can register for one at: | |
# https://realearth.ssec.wisc.edu/users/ | |
# Without a key, your IP address will be limited to 500 MegaPixels per day. | |
# With a free API Key you can get up to 1000 MegaPixels per day. If you | |
# exceed those thresholds, a watermark will appear on your images. | |
API_KEY = '' | |
# Output file location | |
# Output will be a PNG, this can be easily modified in the code below. | |
# Make sure the file location is writable by the user executing the script. | |
OUTPUT = 'clouds_ssec_2.png' | |
interval_background = 60 * 5 | |
interval_clouds = 60 * 60 * 3 | |
interval_clouds_short = 60 * 5 | |
db_name = 'udb.db' | |
# ================ | |
s = sched.scheduler(time.time, time.sleep) | |
def startXPlanet(): | |
SW_HIDE = 0 | |
info = subprocess.STARTUPINFO() | |
info.dwFlags = subprocess.STARTF_USESHOWWINDOW | |
info.wShowWindow = SW_HIDE | |
subprocess.run(r'PATH_TO_XPLANET\xplanet.exe -projection orthographic -light_time -num_times 1 -verbosity -1 -latitude 54.322 -longitude 10.122 -config PATH_TO_XPLANET_CONFIG\config.txt -save_desktop_file -tmpdir PATH_TO_XPLANET_TEMPDIR', startupinfo=info) | |
def read_db(): | |
try: | |
udb = open(db_name, "r") | |
except: | |
udb = open(db_name,'w') | |
udb.close() | |
return read_db() | |
try: | |
dicT = json.load(udb) | |
udb.close() | |
return dicT | |
except: | |
return {} | |
def store_db(db): | |
udb = open(db_name, 'w') | |
json.dump(db, udb) | |
udb.close() | |
def shouldDoRun(): | |
lastRun = 0 | |
try: | |
data = read_db() | |
lastRun = data['lastRun'] | |
except: | |
return True | |
return lastRun < time.time() - interval_clouds | |
def storeCurrentTimestamp(): | |
data = read_db() | |
data['lastRun'] = time.time() | |
store_db(data) | |
def storeCurrentCloudDateString(clouddatestring): | |
data = read_db() | |
data['clouddatestring'] = clouddatestring | |
store_db(data) | |
def getLastCloudDateString(): | |
try: | |
data = read_db() | |
return data['clouddatestring'] | |
except: | |
return "" | |
def renderBackground(scheduler): | |
print("render background") | |
startXPlanet() | |
print("render background -- done") | |
s.enter(interval_background, 1, renderBackground, (scheduler,)) | |
def getCloudImage(): | |
DATE_REQ = requests.get(API_URL + '/time?products=' + PRODUCT) | |
DATE = json.loads(DATE_REQ.content)[PRODUCT] | |
URL = API_URL + '/image?products=' + PRODUCT + '_' + str(DATE[0]) + '_' +\ | |
str(DATE[1]) + '&equirectangular=true&z=' + str(ZOOM) + '&width=256&height=256&accesskey=' + API_KEY | |
lastDateString = getLastCloudDateString() | |
if lastDateString == str(DATE[0]) + '_' + str(DATE[1]): | |
return | |
storeCurrentCloudDateString(str(DATE[0]) + '_' + str(DATE[1])) | |
# Tiles from SSEC are 256x256 | |
OUTPUT_RAW = Image.new('RGB', (((2 ** ZOOM) * 2 * 256), (2 ** ZOOM) * 256)) | |
for x in range((2 ** ZOOM) * 2): # Equirectangular images are twice as wide | |
for y in range((2 ** ZOOM)): | |
# You could add some logging here if you wanted to see each image go by | |
tile = requests.get(URL + '&x=' + str(x) + '&y=' + str(y), stream=True) | |
img_tile = Image.open(tile.raw) | |
#img_tile.save("tile_" + x + "_" + y + ".png", "PNG") | |
OUTPUT_RAW.paste(im=img_tile, box=(x * 256, y * 256)) | |
OUTPUT_RAW.save(OUTPUT, "PNG") | |
def renderCloudImage(scheduler): | |
print("render cloud image") | |
if (shouldDoRun()): | |
storeCurrentTimestamp() | |
getCloudImage() | |
storeCurrentTimestamp() | |
print("render cloud image -- done") | |
s.enter(interval_clouds, 1, renderCloudImage, (scheduler,)) | |
else: | |
print("render cloud image -- skip") | |
s.enter(interval_clouds_short, 1, renderCloudImage, (scheduler,)) | |
s.enter(0, 1, renderBackground, (s,)) | |
s.enter(0, 2, renderCloudImage, (s,)) | |
s.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment