事件调度器
调度器主要工作就是执行事件,首先是对事件进行实例化,然后将事件的run方法注册在APScheduler提供的调度器中,代码实例如下
from apscheduler.schedulers.background import BlockingScheduler, BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED
from core.result.reporter import ResultReporter
from core.result.logger import logger
from core.config.setting import static_setting
import core.testengine.caserunner
import importlib
import os
import datetime
import uuid
class EventScheduler:
def __init__(self, reporter: ResultReporter):
self.reporter = reporter
self.scheduler = BlockingScheduler()
self.events = list()
log_path = static_setting.settings["CaseRunner"].log_path
log_file = os.path.join(log_path, "event_scheduler_log.log")
self.log = logger.register("EventScheduler", filename=log_file, for_test=True)
self.scheduler.add_listener(self._event_listen, EVENT_JOB_EXECUTED)
def add_event(self, event, package, args, is_background, need_lock,
start_time, interval=5, loop_count=1, description=""):
m = importlib.import_module(package)
event_cls = getattr(m, event)
new_event = event_cls(description, log=self.log)
new_event.need_lock = need_lock
new_event.back_ground = is_background
new_event.arguments = args
new_event.interval = interval
new_event.loop_count = loop_count
# 生成一个STEP 的节点给Event操作
new_event.reporter = self.reporter.add_event_group(f"Event: {event}")
if is_background:
new_event.job = self.scheduler.add_job(new_event.run, "interval",
seconds=interval,
start_date=start_time,
id=f"{event}{uuid.uuid4()}"
)
else:
new_event.job = self.scheduler.add_job(new_event.run, "date",
run_date=start_time,
id=f"{event}{uuid.uuid4()}"
)
self.events.append(new_event)
def remove_event(self, event_id):
job = self.scheduler.get_job(event_id)
if job:
event_to_remove = None
for event in self.events:
if event.job == job:
event_to_remove = event
self.scheduler.remove_job(event_id)
break
if event_to_remove:
self.events.remove(event_to_remove)
def start(self):
self.scheduler.start()
def _event_listen(self, job):
for event in self.events:
if event.job.id == job.job_id:
if event.back_ground:
return
else:
if event.loop_count == 1:
return
delta = datetime.timedelta(seconds=event.interval)
next_date = job.scheduled_run_time + delta
event.job = self.scheduler.add_job(event.run, "date",
run_date=next_date,
id=f"{event.name}{uuid.uuid4()}")
event.loop_count -= 1
return
if __name__=="__main__":
report = ResultReporter(logger.register("TestLog"))
scheduler = EventScheduler(report)
scheduler.add_event("DemoEvent1", "product.event.demo_event",
args=None, is_background=False, need_lock=False,
loop_count=2, interval=5,
start_time="2020-3-18 00:48:59", description="a test")
scheduler.start()
input("wait")
- APScheduler包含多种scheduler,其中BlockingScheduler表示这个调度器在执行的时候会阻塞当前的线程,而BackgroundScheduler表示该调度器在执行的时候不阻塞当前线程
- APScheduler中的调度器通过add_job()函数来添加新的job,并且会返回一个job类型的对象,job的类型有interval表示该job是一个间隔一定时间执行的循环任务,date表示在所定义的时间执行任务
- EventScheduler中定义了scheduler属性用于存放APScheduler中的调度器实例,通过add_event方法来添加新的事件,而参数event和package表示测试开发者自己实现的时间类名及模块所在的路径名,通过动态引用的方式实例化该事件;is_backgroud用来判断该事件是否是一个背景事件,如果是则用interval方式添加job,如果是一个正常事件则用date方法添加job
- APScheduler的job对象有id属性,因此可以通过唯一的id来对job进行识别
- 最后将实例化的event对象实例添加到EventScheduler的events中作为事件池
- 定义了remove_event方法,APScheduler的scheduler提供了get_job方法,因此可以通过注册job时使用的id来获得job对象实例,然后将job对应的event从EventScheduler的events属性内删除,同时从scheduler中删除job
- 如果要修改job,则可以通过job的modify方法修改除id以外的属性
- 定义了_event_listen方法,EventScheduler类定义了loop_count属性,代表事件执行的次数用于非背景事件,而如果loop_count不是1,则需要重新将其加入scheduler的job中,APScheduler提供了事件注册机制,因此可以实现一个job执行完触发的事件后,重新将job添加到APScheduler中
- 最后使用start方法来启动调度器
测试代码
定义一个事件,代码实例如下
from eventbase import EventBase, EventStatus
from reporter import StepResult
Class DemoEvent(EventBase):
name = "DemoEvent1"
def action(self):
self.reporter.add(StepResult.INFO, "This is a Demo Event")
self.result = EventStatus.SUCCESS
实例化调度器并添加该事件
report = ResultReporter(logger.register("TestLog"))
scheduler = EventScheduler(report)
scheduler.add_event("DemoEvent1", "product.event.demo_event",
args=None, is_background=False, need_lock=False,
loop_count=2, interval=5,
start_time="2020-8-01 04:20:59", description="a test")
scheduler.start()
input("wait")
执行结果是:
E:\Programs\Python\Demo\venv\Scripts\python.exe E:/Programs/Python/Demo/pom/scheduler.py
[2020-08-01 04:22:17,180][TestLog]-<thread:142708>-(line:313), [INFO]: [Event] Event: DemoEvent1
[2020-08-01 04:22:59,001][TestLog]-<thread:78492>-(line:76), [INFO]: This is a Demo Event
[2020-08-01 04:23:04,002][TestLog]-<thread:78492>-(line:76), [INFO]: This is a Demo Event