updated folder structure
This commit is contained in:
180
alert-scheduler/src/watchdog.py
Normal file
180
alert-scheduler/src/watchdog.py
Normal file
@@ -0,0 +1,180 @@
|
||||
import datetime as dt
|
||||
from scheduler import Scheduler
|
||||
import scheduler.trigger as trigger
|
||||
import json
|
||||
import os.path
|
||||
import time
|
||||
|
||||
def foo():
|
||||
# print("\nJust chilling...\n")
|
||||
return
|
||||
|
||||
def printScheduler():
|
||||
date = dt.datetime.now()
|
||||
print(str(date)+": The currently available jobs are:\n")
|
||||
print(schedule)
|
||||
|
||||
def addParameter(name, object, dict):
|
||||
if name in object:
|
||||
if type(object[name]) is int:
|
||||
dict[name] = object[name]
|
||||
elif type(object[name]) is str:
|
||||
dict[name] = int(object[name])
|
||||
else:
|
||||
print("ERROR: Element cannot be interpreted correctly:",
|
||||
object[name])
|
||||
|
||||
def addDtTimeDeltaJob(job, schedule: Scheduler, function, once):
|
||||
dtTimeDelta = job['dtTimeDelta']
|
||||
if 'minutes' in dtTimeDelta:
|
||||
parameters = {}
|
||||
addParameter('minutes', dtTimeDelta, parameters)
|
||||
if once:
|
||||
schedule.once(dt.timedelta(**parameters), function)
|
||||
else:
|
||||
schedule.cyclic(dt.timedelta(**parameters), function)
|
||||
else:
|
||||
print("ERROR: missing minutes in job: ", job['jobName'])
|
||||
return
|
||||
|
||||
def scheduleDayOfWeek(day, daylist, schedule, function, parameters, once):
|
||||
if day in daylist:
|
||||
day_to_call = getattr(trigger, day)
|
||||
if once:
|
||||
if len(parameters) > 0:
|
||||
# schedule.once(day_to_call(dt.time(**parameters)),function)
|
||||
print("WARN: weekly jobs are currenlty not supported...")
|
||||
else:
|
||||
# schedule.once(day_to_call(),function())
|
||||
print("WARN: weekly jobs are currenlty not supported...")
|
||||
else:
|
||||
if len(parameters) > 0:
|
||||
# schedule.weekly(day_to_call(dt.time(**parameters)),function)
|
||||
print("WARN: weekly jobs are currenlty not supported...")
|
||||
else:
|
||||
# schedule.weekly(day_to_call(),function)
|
||||
print("WARN: weekly jobs are currenlty not supported...")
|
||||
|
||||
def addDtTimeJob(job, schedule: Scheduler, function, once):
|
||||
dtTime = job['dtTime']
|
||||
jobType = job['jobType']
|
||||
parameters = {}
|
||||
addParameter('second', dtTime, parameters)
|
||||
addParameter('minute', dtTime, parameters)
|
||||
addParameter('hour', dtTime, parameters)
|
||||
if (jobType == "minutely") or (jobType == "hourly") or (jobType == "daily"):
|
||||
cycle_to_call = getattr(schedule, jobType)
|
||||
cycle_to_call(dt.time(**parameters), function)
|
||||
pass
|
||||
elif (jobType == 'once') or (jobType == 'weekly'):
|
||||
if 'dayOfWeek' in dtTime:
|
||||
scheduleDayOfWeek(
|
||||
'Monday', dtTime['dayOfWeek'], schedule, function, parameters, once)
|
||||
scheduleDayOfWeek(
|
||||
'Tuesday', dtTime['dayOfWeek'], schedule, function, parameters, once)
|
||||
scheduleDayOfWeek(
|
||||
'Wednesday', dtTime['dayOfWeek'], schedule, function, parameters, once)
|
||||
scheduleDayOfWeek(
|
||||
'Thursday', dtTime['dayOfWeek'], schedule, function, parameters, once)
|
||||
scheduleDayOfWeek(
|
||||
'Friday', dtTime['dayOfWeek'], schedule, function, parameters, once)
|
||||
scheduleDayOfWeek(
|
||||
'Saturday', dtTime['dayOfWeek'], schedule, function, parameters, once)
|
||||
scheduleDayOfWeek(
|
||||
'Sunday', dtTime['dayOfWeek'], schedule, function, parameters, once)
|
||||
else:
|
||||
print(
|
||||
"ERROR: dayOfWeek is mandatory for once and weekly jobs in job:", job['jobName'])
|
||||
return
|
||||
|
||||
def addDtDateTimeOnceJob(job, schedule: Scheduler, function):
|
||||
dtDateTime = job['dtDateTime']
|
||||
parameters = {}
|
||||
addParameter('year', dtDateTime, parameters)
|
||||
addParameter('month', dtDateTime, parameters)
|
||||
addParameter('day', dtDateTime, parameters)
|
||||
addParameter('hour', dtDateTime, parameters)
|
||||
addParameter('minute', dtDateTime, parameters)
|
||||
schedule.once(dt.datetime(**parameters), function)
|
||||
return
|
||||
|
||||
def addJob(job, schedule: Scheduler):
|
||||
if 'jobFunction' in job:
|
||||
jobFunction = job['jobFunction']
|
||||
else:
|
||||
print("ERROR: jobFunction is missing in job: ", job['jobName'])
|
||||
return
|
||||
if 'jobType' in job:
|
||||
jobType = job['jobType']
|
||||
else:
|
||||
print("ERROR: jobType is missing in job: ", job['jobName'])
|
||||
return
|
||||
|
||||
if jobFunction in globals():
|
||||
function = globals()[jobFunction]
|
||||
if jobType == "cyclic":
|
||||
if 'dtTimeDelta' in job:
|
||||
addDtTimeDeltaJob(job, schedule, function, once=False)
|
||||
else:
|
||||
print("ERROR: Missing dtTimeDelta in: ", job['jobName'])
|
||||
elif (jobType == "minutely") or (jobType == "hourly") or (jobType == "daily") or (jobType == "weekly"):
|
||||
if 'dtTime' in job:
|
||||
addDtTimeJob(job, schedule, function, once=False)
|
||||
else:
|
||||
print("ERROR: Missing dtTime in: ", job['jobName'])
|
||||
elif jobType == "once":
|
||||
found = False
|
||||
if 'dtTimeDelta' in job:
|
||||
addDtTimeDeltaJob(job, schedule, function, once=True)
|
||||
found = True
|
||||
if 'dtDateTime' in job:
|
||||
addDtDateTimeOnceJob(job, schedule, function)
|
||||
found = True
|
||||
if 'dtTime' in job:
|
||||
addDtTimeJob(job, schedule, function, once=True)
|
||||
found = True
|
||||
else:
|
||||
if not found:
|
||||
print(
|
||||
"ERROR: Missing either dtTime, dtTimeDelta or dtDateTime in: ", job['jobName'])
|
||||
else:
|
||||
print("Unkown JobType: ", jobType)
|
||||
else:
|
||||
print("\nERROR: Unknown Function: ", jobFunction)
|
||||
print(" Ignoring job: ", job['jobName'])
|
||||
print("\n")
|
||||
|
||||
def addJobs(schedule: Scheduler):
|
||||
cfgFile = "./config/runCfg.json"
|
||||
if os.path.isfile(cfgFile):
|
||||
f = open("./config/runCfg.json")
|
||||
data = json.load(f)
|
||||
f.close()
|
||||
if 'jobs' in data:
|
||||
for job in data['jobs']:
|
||||
if 'jobName' in job:
|
||||
print("Setting up job: ", job['jobName'])
|
||||
addJob(job, schedule)
|
||||
else:
|
||||
print("ERROR: Element is missing a jobName: ", job)
|
||||
print("\n")
|
||||
else:
|
||||
print("WARN: No jobs configured in config file: ", cfgFile)
|
||||
else:
|
||||
print("ERROR: No config file found for scheduler: ", cfgFile)
|
||||
|
||||
|
||||
|
||||
# schedule = Scheduler(n_threads=0)
|
||||
schedule = Scheduler()
|
||||
addJobs(schedule)
|
||||
printScheduler()
|
||||
|
||||
while True:
|
||||
start_time = time.perf_counter()
|
||||
n_exec = schedule.exec_jobs()
|
||||
total_seconds = time.perf_counter() - start_time
|
||||
if n_exec > 0:
|
||||
print("Workers started: "+str(n_exec) +
|
||||
". Total execution time: {:10.4f} s.\n\n".format(total_seconds))
|
||||
time.sleep(1)
|
||||
Reference in New Issue
Block a user