more inventory management work
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
from typing import Callable, Dict, Any
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
import logging
|
||||
from apscheduler.schedulers.base import SchedulerNotRunningError
|
||||
|
||||
@ -16,18 +17,38 @@ class BaseScheduler:
|
||||
self,
|
||||
task_name: str,
|
||||
func: Callable,
|
||||
interval_seconds: int,
|
||||
interval_seconds: int = None,
|
||||
cron_expression: str = None,
|
||||
*args,
|
||||
**kwargs
|
||||
) -> None:
|
||||
"""Schedule a task to run at regular intervals using APScheduler"""
|
||||
"""Schedule a task to run at regular intervals or at specific times using APScheduler
|
||||
|
||||
Args:
|
||||
task_name: Name of the task
|
||||
func: Function to execute
|
||||
interval_seconds: Interval in seconds for periodic execution (mutually exclusive with cron_expression)
|
||||
cron_expression: Cron expression for time-based scheduling (mutually exclusive with interval_seconds)
|
||||
*args: Additional positional arguments for the function
|
||||
**kwargs: Additional keyword arguments for the function
|
||||
"""
|
||||
if task_name in self.jobs:
|
||||
logger.warning(f"Task {task_name} already exists. Removing existing job.")
|
||||
self.jobs[task_name].remove()
|
||||
|
||||
if interval_seconds and cron_expression:
|
||||
raise ValueError("Cannot specify both interval_seconds and cron_expression")
|
||||
elif not interval_seconds and not cron_expression:
|
||||
raise ValueError("Must specify either interval_seconds or cron_expression")
|
||||
|
||||
if interval_seconds:
|
||||
trigger = IntervalTrigger(seconds=interval_seconds)
|
||||
else:
|
||||
trigger = CronTrigger.from_crontab(cron_expression)
|
||||
|
||||
job = self.scheduler.add_job(
|
||||
func,
|
||||
trigger=IntervalTrigger(seconds=interval_seconds),
|
||||
trigger=trigger,
|
||||
args=args,
|
||||
kwargs=kwargs,
|
||||
id=task_name,
|
||||
@ -35,7 +56,10 @@ class BaseScheduler:
|
||||
)
|
||||
|
||||
self.jobs[task_name] = job
|
||||
logger.info(f"Scheduled task {task_name} to run every {interval_seconds} seconds")
|
||||
if interval_seconds:
|
||||
logger.info(f"Scheduled task {task_name} to run every {interval_seconds} seconds")
|
||||
else:
|
||||
logger.info(f"Scheduled task {task_name} with cron expression: {cron_expression}")
|
||||
|
||||
def remove_task(self, task_name: str) -> None:
|
||||
"""Remove a scheduled task"""
|
||||
|
@ -1,21 +1,25 @@
|
||||
from app.db.database import transaction
|
||||
from app.services.scheduler.base_scheduler import BaseScheduler
|
||||
from app.services.base_service import BaseService
|
||||
from sqlalchemy import text
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class SchedulerService:
|
||||
class SchedulerService(BaseService):
|
||||
def __init__(self):
|
||||
# Initialize BaseService with None as model since this service doesn't have a specific model
|
||||
super().__init__(None)
|
||||
self.scheduler = BaseScheduler()
|
||||
# Service manager will be set during initialization
|
||||
self._service_manager = None
|
||||
|
||||
@property
|
||||
def service_manager(self):
|
||||
if self._service_manager is None:
|
||||
from app.services.service_manager import ServiceManager
|
||||
self._service_manager = ServiceManager()
|
||||
return self._service_manager
|
||||
async def update_tcgplayer_price_history_daily(self, db):
|
||||
"""
|
||||
Update the TCGPlayer price history table
|
||||
"""
|
||||
with transaction(db):
|
||||
await db.execute(text("""REFRESH MATERIALIZED VIEW CONCURRENTLY most_recent_tcgplayer_price;"""))
|
||||
logger.info("TCGPlayer price history refreshed")
|
||||
|
||||
|
||||
async def update_open_orders_hourly(self, db):
|
||||
"""
|
||||
@ -67,13 +71,19 @@ class SchedulerService:
|
||||
await self.scheduler.schedule_task(
|
||||
task_name="update_open_orders_hourly",
|
||||
func=lambda: self.update_open_orders_hourly(db),
|
||||
interval_seconds=60 * 60, # 1 hour
|
||||
cron_expression="0 * * * *" # Run at minute 0 of every hour
|
||||
)
|
||||
# Schedule all orders update to run daily at 1 AM
|
||||
# Schedule all orders update to run daily at 3 AM
|
||||
await self.scheduler.schedule_task(
|
||||
task_name="update_all_orders_daily",
|
||||
func=lambda: self.update_all_orders_daily(db),
|
||||
interval_seconds=24 * 60 * 60, # 24 hours
|
||||
cron_expression="0 3 * * *" # Run at 3:00 AM every day
|
||||
)
|
||||
# Schedule TCGPlayer price history update to run daily at 1 AM
|
||||
await self.scheduler.schedule_task(
|
||||
task_name="update_tcgplayer_price_history_daily",
|
||||
func=lambda: self.update_tcgplayer_price_history_daily(db),
|
||||
cron_expression="0 1 * * *" # Run at 1:00 AM every day
|
||||
)
|
||||
|
||||
self.scheduler.start()
|
||||
|
Reference in New Issue
Block a user