from typing import Callable, Dict, Any from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger import logging from apscheduler.schedulers.base import SchedulerNotRunningError logger = logging.getLogger(__name__) class BaseScheduler: def __init__(self): self.scheduler = AsyncIOScheduler() self.jobs: Dict[str, Any] = {} self._is_running = False async def schedule_task( self, task_name: str, func: Callable, interval_seconds: int, *args, **kwargs ) -> None: """Schedule a task to run at regular intervals using APScheduler""" if task_name in self.jobs: logger.warning(f"Task {task_name} already exists. Removing existing job.") self.jobs[task_name].remove() job = self.scheduler.add_job( func, trigger=IntervalTrigger(seconds=interval_seconds), args=args, kwargs=kwargs, id=task_name, replace_existing=True ) self.jobs[task_name] = job logger.info(f"Scheduled task {task_name} to run every {interval_seconds} seconds") def remove_task(self, task_name: str) -> None: """Remove a scheduled task""" if task_name in self.jobs: self.jobs[task_name].remove() del self.jobs[task_name] logger.info(f"Removed task {task_name}") def start(self) -> None: """Start the scheduler""" if not self._is_running: self.scheduler.start() self._is_running = True logger.info("Scheduler started") async def shutdown(self) -> None: """Shutdown the scheduler""" if self._is_running: try: self.scheduler.shutdown() self._is_running = False logger.info("Scheduler stopped") except SchedulerNotRunningError: logger.warning("Scheduler was already stopped") except Exception as e: logger.error(f"Error shutting down scheduler: {str(e)}") raise