Skip to content

Instantly share code, notes, and snippets.

@8bit-pixies
Last active December 10, 2015 02:58
Show Gist options
  • Save 8bit-pixies/4371763 to your computer and use it in GitHub Desktop.
Save 8bit-pixies/4371763 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
##############################################################################
# Emilio Schapira
# Copyright (C) 2003 Advanced Interface Technologies, Inc.
# http://www.advancedinterfaces.com
# http://sourceforge.net/projects/pycron/
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##############################################################################
#
# Chapman Siu
# chappers.github.com
# Matching function changed to allow for cron behaviour for dashes
"""
usage: cron [crontab_file_name [log_file_name [pid_file_name]]]
This is a minimal cron clone that can be used in windows.
It writes a log in the current directory, and looks for the
crontab file in the current directory unless otherwise specified by
the command line argument.
"""
import time
import os
import sys
import string
def log(msg):
f = file(logFileName,'a')
f.write(time.ctime(time.time()) + " " + msg + "\n")
def run(command):
log(command)
try:
# This is the only windows dependant call
# We use start because we want to spawn the
# shell command in another process, and not wait
if os.name == 'nt':
os.system('start ' + command)
else:
os.system(command)
except:
log('cron: last command failed to start')
#
# Matching Algorithm chapman's edit
#
def listing(expr):
listing = [dash(x) for x in string.split(expr,',')]
return [item for sublist in listing for item in sublist]
def dash(expr):
value = [int(x) for x in string.split(expr,'-')]
return range(min(value),max(value)+1)
def match(value, expr):
if expr == '*':
return 1
values = listing(expr)
#chapman edit to account for dashes
for v in values:
if v == value:
#here for if there are no dashes
return 1
return 0
#
# end matching algorith
#
def calcNextTime():
# Calculate next time with exact seconds.
tt = time.localtime(time.time() + 60)
tt = tt[:5] + (0,) + tt[6:]
return time.mktime(tt)
crontabFileName = "crontab.tab"
logFileName = "cron.log"
pidFileName = "cron.pid"
try:
crontabFileName = sys.argv[1]
logFileName = sys.argv[2]
pidFileName = sys.argv[3]
except:
pass
#Write the pid in the current directory.
f = file(pidFileName,'w')
f.write(str(os.getpid()))
f.close()
# Log cron start
log('cron: started with file %s' % crontabFileName)
nextTime = calcNextTime()
# Loop forever
while 1:
# Sleep for the time remaining until the next exact minute
currentTime = time.time()
if currentTime < nextTime:
# Add a fraction of a second to make sure we are in
# the right second (just for prettier logs)
time.sleep(nextTime-currentTime+.1)
# Check if the time has changed by more than two minutes. This
# case arises when the system clock is changed. We must reset the timer.
if abs(time.time() - nextTime) > 120:
log('Adjusted system clock.')
nextTime = calcNextTime()
# Build a tuple with the current time
curTuple = time.localtime(nextTime)
# Read the crontab file
try:
lines = file(crontabFileName,'r').readlines()
lineNum = 1
for line in lines:
# Ignore comments
if line[0] != '#' and len(string.strip(line)) != 0:
try:
tokens = string.split(line)
# See if the cron entry matches the current time
# minute
timeMatch = match(curTuple[4],tokens[0])
# hour
timeMatch = timeMatch and match(curTuple[3],tokens[1])
# day
timeMatch = timeMatch and match(curTuple[2],tokens[2])
# month
timeMatch = timeMatch and match(curTuple[1],tokens[3])
# weekday (in crontab 0 or 7 is Sunday)
weekday = curTuple[6]+1
matchWeekday = match(weekday,tokens[4]) or (weekday == 7 and match(0,tokens[4]))
timeMatch = timeMatch and matchWeekday
if timeMatch:
run(string.join(tokens[5:]))
except:
log('cron: error parsing line %i of %s' % (lineNum, crontabFileName))
lineNum = lineNum + 1
except:
log('cron: error opening %s file' % crontabFileName)
print 'error'
# Calculate the time for the next iteration (60 seconds later)
nextTime = nextTime + 60
"""
cron scheduler
by Chapman Siu
Based on script written by Emilio Schapira
---
This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
from datetime import datetime, timedelta
import time
import subprocess
import os
import re
import sys
def log(msg):
log_msg = datetime.now().time().strftime('%H:%M:%S') + " " + msg + "\n"
with file(log_file, 'a') as f: f.write(log_msg)
def run(command):
log(' '.join(command))
try: subprocess.call(command)
except:
print command
log('cron: last command failed to start')
def cleanCommand(txt):
"""remove extra quotation marks if it exists"""
if txt[0] == '"' and txt[-1] == '"':
return txt[1:-1]
else:
return txt
def match(date_part, cron_date):
"""
Parameters:
date_part : the date which is gained from the system
cron_date : the date which is parsed from the crontab.tab file
"""
time_cron = {0:4, 1:3, 2:2, 3:1, 4:6}
# necessary to 'convert' python time to cron time (and vice versa)
cron_index = [(index, dt) for index, dt in enumerate(cron_date) if dt != '*']
if len(cron_index) == 0: return True # must be "* * * * *"
for (index, date) in cron_index:
if '-' not in date:
if str(date_part[time_cron[index]]) != date: return False
return True
else:
dt_range = [str(x) for x in range(int(date.split('-')[0]),int(date.split('-')[1])+1)]
if str(date_part[time_cron[index]]) not in dt_range: return False
return True
# main
cron_tab = "crontab.tab"
log_file = "cron.log"
pid_file = "cron.pid"
try:
(cron_tab, log_file, pid_file) = sys.argv[1:]
except:
pass
with file(pid_file, 'w') as f: f.write(str(os.getpid()))
# perform scheduling, lambda function removes seconds component of time
next_time = (lambda t: t - timedelta(seconds = t.second))(datetime.now() + timedelta(minutes=1))
while True:
current_time = datetime.now()
if current_time < next_time:
time.sleep((next_time-current_time).seconds + .1)
try:
tab = file(cron_tab, 'r').readlines()
for entry in tab:
if entry[0] == '#' or len(entry.strip()) == 0: continue
try:
# ensure that quoted are correctly split
tokens = [cleanCommand(x) for x in re.findall(r'\"[^\"]*\"|\S+', entry)]
(date, command) = (tokens[:5], tokens[5:])
if match(next_time.timetuple(), date): run(command)
except:
log('cron: error parsing %s' % entry)
except:
log('cron: error opening %s file' % cron_tab)
print 'error'
next_time = next_time + timedelta(minutes=1)
from datetime import datetime, timedelta
import time
import subprocess
import os
import re
import sys
def log(msg):
log_msg = datetime.now().time().strftime('%H:%M:%S') + " " + msg + "\n"
with file(log_file, 'a') as f: f.write(log_msg)
def run(command):
log(' '.join(command))
try: subprocess.call(command)
except:
print command
log('cron: last command failed to start')
def cleanCommand(txt):
"""remove extra quotation marks if it exists"""
if txt[0] == '"' and txt[-1] == '"':
return txt[1:-1]
else:
return txt
def match(date_part, cron_date):
# mapping from python's time library to cron
time_cron = {0:4, 1:3, 2:2, 3:1, 4:6}
cron_index = [(index, dt) for index, dt in enumerate(cron_date)
if dt != '*']
if len(cron_index) == 0: return True # must be "* * * * *"
for (index, date) in cron_index:
if str(date_part[time_cron[index]]) != date: return False
return True
# main
cron_tab = "crontab.tab"
log_file = "cron.log"
pid_file = "cron.pid"
try:
(cron_tab, log_file, pid_file) = sys.argv[1:]
except:
pass
with file(pid_file, 'w') as f: f.write(str(os.getpid()))
# perform scheduling, lambda function removes seconds component of time
next_time = (lambda t: t - timedelta(seconds = t.second))(datetime.now() +
timedelta(minutes=1))
while True:
current_time = datetime.now()
if current_time < next_time:
time.sleep((next_time-current_time).seconds + .1)
try:
tab = file(cron_tab, 'r').readlines()
for entry in tab:
if entry[0] == '#' or len(entry.strip()) == 0: continue
try:
# ensure that quoted are correctly split
tokens = [cleanCommand(x) for x in re.findall(r'\"[^\"]*\"|\S+', entry)]
(date, command) = (tokens[:5], tokens[5:])
if match(next_time.timetuple(), date): run(command)
except:
log('cron: error parsing %s' % entry)
except:
log('cron: error opening %s file' % cron_tab)
print 'error'
next_time = next_time + timedelta(minutes=1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment