from collections import defaultdict from datetime import datetime from typing import Sequence from fastapi import APIRouter, HTTPException from fastapi_cache.decorator import cache from idfm_interface import Destinations as IdfmDestinations, TrainStatus from models import Stop, StopArea, StopShape from schemas import ( NextPassage as NextPassageSchema, NextPassages as NextPassagesSchema, Stop as StopSchema, StopArea as StopAreaSchema, StopShape as StopShapeSchema, ) from dependencies import idfm_interface router = APIRouter(prefix="/stop", tags=["stop"]) def _format_stop(stop: Stop) -> StopSchema: return StopSchema( id=stop.id, name=stop.name, town=stop.town_name, epsg3857_x=stop.epsg3857_x, epsg3857_y=stop.epsg3857_y, lines=[line.id for line in stop.lines], ) def optional_datetime_to_ts(dt: datetime | None) -> int | None: return int(dt.timestamp()) if dt else None # TODO: Add limit support @router.get("/") @cache(namespace="stop") async def get_stop( name: str = "", limit: int = 10 ) -> Sequence[StopAreaSchema | StopSchema] | None: matching_stops = await Stop.get_by_name(name) if matching_stops is None: return None formatted: list[StopAreaSchema | StopSchema] = [] stop_areas: dict[int, StopArea] = {} stops: dict[int, Stop] = {} for stop in matching_stops: if isinstance(stop, StopArea): stop_areas[stop.id] = stop elif isinstance(stop, Stop): stops[stop.id] = stop for stop_area in stop_areas.values(): formatted_stops = [] for stop in stop_area.stops: formatted_stops.append(_format_stop(stop)) try: del stops[stop.id] except KeyError as err: print(err) formatted.append( StopAreaSchema( id=stop_area.id, name=stop_area.name, town=stop_area.town_name, type=stop_area.type, lines=[line.id for line in stop_area.lines], stops=formatted_stops, ) ) formatted.extend(_format_stop(stop) for stop in stops.values()) return formatted @router.get("/{stop_id}/nextPassages") @cache(namespace="stop-nextPassages", expire=30) async def get_next_passages(stop_id: int) -> NextPassagesSchema | None: res = await idfm_interface.get_next_passages(stop_id) if res is None: return None service_delivery = res.Siri.ServiceDelivery stop_monitoring_deliveries = service_delivery.StopMonitoringDelivery by_line_by_dst_passages: dict[ int, dict[str, list[NextPassageSchema]] ] = defaultdict(lambda: defaultdict(list)) for delivery in stop_monitoring_deliveries: for stop_visit in delivery.MonitoredStopVisit: journey = stop_visit.MonitoredVehicleJourney # re.match will return None if the given journey.LineRef.value is not valid. try: line_id_match = idfm_interface.LINE_RE.match(journey.LineRef.value) line_id = int(line_id_match.group(1)) # type: ignore except (AttributeError, TypeError, ValueError) as err: raise HTTPException( status_code=404, detail=f'Line "{journey.LineRef.value}" not found' ) from err call = journey.MonitoredCall dst_names = call.DestinationDisplay dsts = [dst.value for dst in dst_names] if dst_names else [] arrivalPlatformName = ( call.ArrivalPlatformName.value if call.ArrivalPlatformName else None ) next_passage = NextPassageSchema( line=line_id, operator=journey.OperatorRef.value, destinations=dsts, atStop=call.VehicleAtStop, aimedArrivalTs=optional_datetime_to_ts(call.AimedArrivalTime), expectedArrivalTs=optional_datetime_to_ts(call.ExpectedArrivalTime), arrivalPlatformName=arrivalPlatformName, aimedDepartTs=optional_datetime_to_ts(call.AimedDepartureTime), expectedDepartTs=optional_datetime_to_ts(call.ExpectedDepartureTime), arrivalStatus=call.ArrivalStatus if call.ArrivalStatus is not None else TrainStatus.unknown, departStatus=call.DepartureStatus if call.DepartureStatus is not None else TrainStatus.unknown, ) by_line_passages = by_line_by_dst_passages[line_id] # TODO: by_line_passages[dst].extend(dsts) instead ? for dst in dsts: by_line_passages[dst].append(next_passage) return NextPassagesSchema( ts=int(service_delivery.ResponseTimestamp.timestamp()), passages=by_line_by_dst_passages, ) @router.get("/{stop_id}/destinations") @cache(namespace="stop-destinations", expire=30) async def get_stop_destinations( stop_id: int, ) -> IdfmDestinations | None: destinations = await idfm_interface.get_destinations(stop_id) return destinations @router.get("/{stop_id}/shape") @cache(namespace="stop-shape") async def get_stop_shape(stop_id: int) -> StopShapeSchema | None: if (await Stop.get_by_id(stop_id)) is not None or ( await StopArea.get_by_id(stop_id) ) is not None: shape_id = stop_id if (shape := await StopShape.get_by_id(shape_id)) is not None: return StopShapeSchema( id=shape.id, type=shape.type, epsg3857_bbox=shape.epsg3857_bbox, epsg3857_points=shape.epsg3857_points, ) msg = f"No shape found for stop {stop_id}" raise HTTPException(status_code=404, detail=msg)