Skip to content

Instantly share code, notes, and snippets.

@hamletbatista
Created October 8, 2020 04:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hamletbatista/4f733d8148747126152353ea71406414 to your computer and use it in GitHub Desktop.
Save hamletbatista/4f733d8148747126152353ea71406414 to your computer and use it in GitHub Desktop.
from git import Repo
from lxml import etree
import advertools as adv
from datetime import datetime
import yaml
import os
full_path="/tmp/wordpress-updates"
#get credentials from environment variables
username=os.environ.get('username', "(not set)")
password=os.environ.get('password', "(not set)")
sitemap_url="https://www.yoursite.com/sitemap_index.xml"
#update to use your repo
remote=f"https://{username}:{password}@github.com/hamletbatista/wordpress-updates.git"
name="Your Name"
email="your@email.com"
def get_lastest_urls():
global sitemap_url
df = adv.sitemap_to_df(sitemap_url)
today = str(datetime.today()).split()[0] #remove hour, minutes, etc.
changed = set(df[df["lastmod"] > today]["loc"])
return changed
def update_yaml(changed_urls):
with open(f"{full_path}/.github/workflows/main.yml", "r") as f:
main_workflow = yaml.load(f)
#fix bug
#del main_workflow[True]
#main_workflow["on"] = "push"
#add changed urls to the workflow yaml
main_workflow["jobs"]["lighthouse"]["steps"][1]["with"]["urls"] = "\n".join(changed_urls)
#Persist it back
with open(f"{full_path}/.github/workflows/main.yml", "w") as f:
f.write(yaml.dump(main_workflow, default_flow_style=False))
def push_to_github():
global name
global email
repo=Repo(full_path)
with repo.config_writer() as git_config:
git_config.set_value('user', 'email', email)
git_config.set_value('user', 'name', name)
repo.git.add([f'{full_path}/.github/workflows/main.yml'])
# Provide a commit message
repo.index.commit('new URL changes to check')
# Push changes
origin = repo.remote(name="origin")
origin.push()
def wordpress_ping_post(request):
global full_path
global remote
try:
Repo.clone_from(remote, full_path)
except:
#repo already exists, let's fetch recent changes
repo=Repo(full_path)
origin = repo.remote(name="origin")
origin.pull()
#Get ping payload from POST request
content_type = request.headers['content-type']
if content_type == 'text/xml':
payload = request.data
root = etree.fromstring(payload)
r = root.xpath('//param/value/string')
#review ping URLs
for value in r:
print(f"{value.tag} - {value.text}")
changed_urls = get_lastest_urls()
update_yaml(changed_urls)
push_to_github()
return "Ok"
elif content_type == 'application/json': #for debugging
changed_urls = get_lastest_urls()
update_yaml(changed_urls)
push_to_github()
return "Ok"
else:
raise ValueError("Unknown content type: {}".format(content_type))
return "Failure"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment