6 Commits

8 changed files with 323 additions and 188 deletions

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
from logging import getLogger
from typing import Iterable, Self, TYPE_CHECKING
from sqlalchemy import select
@@ -9,31 +10,36 @@ from sqlalchemy.orm import DeclarativeBase
if TYPE_CHECKING:
from .db import Database
logger = getLogger(__name__)
class Base(DeclarativeBase):
db: Database | None = None
@classmethod
async def add(cls, stops: Self | Iterable[Self]) -> bool:
try:
if isinstance(stops, Iterable):
cls.db.session.add_all(stops) # type: ignore
else:
cls.db.session.add(stops) # type: ignore
await cls.db.session.commit() # type: ignore
except (AttributeError, IntegrityError) as err:
print(err)
return False
async def add(cls, objs: Self | Iterable[Self]) -> bool:
if cls.db is not None and (session := await cls.db.get_session()) is not None:
async with session.begin():
try:
if isinstance(objs, Iterable):
session.add_all(objs)
else:
session.add(objs)
except (AttributeError, IntegrityError) as err:
logger.error(err)
return False
return True
@classmethod
async def get_by_id(cls, id_: int | str) -> Self | None:
try:
stmt = select(cls).where(cls.id == id_) # type: ignore
res = await cls.db.session.execute(stmt) # type: ignore
element = res.scalar_one_or_none()
except AttributeError as err:
print(err)
element = None
return element
if cls.db is not None and (session := await cls.db.get_session()) is not None:
async with session.begin():
stmt = select(cls).where(cls.id == id_)
res = await session.execute(stmt)
return res.scalar_one_or_none()
return None

View File

@@ -1,4 +1,10 @@
from logging import getLogger
from typing import Annotated, AsyncIterator
from fastapi import Depends
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from sqlalchemy import text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import (
async_sessionmaker,
AsyncEngine,
@@ -9,40 +15,47 @@ from sqlalchemy.ext.asyncio import (
from .base_class import Base
logger = getLogger(__name__)
class Database:
def __init__(self) -> None:
self._engine: AsyncEngine | None = None
self._session_maker: async_sessionmaker[AsyncSession] | None = None
self._session: AsyncSession | None = None
self._async_engine: AsyncEngine | None = None
self._async_session_local: async_sessionmaker[AsyncSession] | None = None
@property
def session(self) -> AsyncSession | None:
if self._session is None and (session_maker := self._session_maker) is not None:
self._session = session_maker()
return self._session
async def get_session(self) -> AsyncSession | None:
try:
return self._async_session_local() # type: ignore
except (SQLAlchemyError, AttributeError) as e:
logger.exception(e)
return None
# TODO: Preserve UserLastStopSearchResults table from drop.
async def connect(self, db_path: str, clear_static_data: bool = False) -> bool:
self._async_engine = create_async_engine(
db_path, pool_pre_ping=True, pool_size=10, max_overflow=20
)
# TODO: Preserve UserLastStopSearchResults table from drop.
self._engine = create_async_engine(db_path)
if self._engine is not None:
self._session_maker = async_sessionmaker(
self._engine, expire_on_commit=False, class_=AsyncSession
if self._async_engine is not None:
SQLAlchemyInstrumentor().instrument(engine=self._async_engine.sync_engine)
self._async_session_local = async_sessionmaker(
bind=self._async_engine,
# autoflush=False,
expire_on_commit=False,
class_=AsyncSession,
)
if (session := self.session) is not None:
await session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm;"))
async with self._engine.begin() as conn:
async with self._async_engine.begin() as session:
await session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm;"))
if clear_static_data:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
await session.run_sync(Base.metadata.drop_all)
await session.run_sync(Base.metadata.create_all)
return True
async def disconnect(self) -> None:
if self._session is not None:
await self._session.close()
self._session = None
if self._engine is not None:
await self._engine.dispose()
if self._async_engine is not None:
await self._async_engine.dispose()

View File

@@ -15,7 +15,7 @@ from aiohttp import ClientSession
from msgspec import ValidationError
from msgspec.json import Decoder
from pyproj import Transformer
from shapefile import Reader as ShapeFileReader, ShapeRecord
from shapefile import Reader as ShapeFileReader, ShapeRecord # type: ignore
from ..db import Database
from ..models import ConnectionArea, Line, LinePicto, Stop, StopArea, StopShape
@@ -357,7 +357,6 @@ class IdfmInterface:
fields = line.fields
picto_id = fields.picto.id_ if fields.picto is not None else None
picto = await LinePicto.get_by_id(picto_id) if picto_id else None
ret.append(
Line(
@@ -384,7 +383,6 @@ class IdfmInterface:
fields.audiblesigns_available.value
),
picto_id=fields.picto.id_ if fields.picto is not None else None,
picto=picto,
record_id=line.recordid,
record_ts=int(line.record_timestamp.timestamp()),
)

View File

@@ -94,23 +94,24 @@ class Line(Base):
async def get_by_name(
cls, name: str, operator_name: None | str = None
) -> Sequence[Self] | None:
session = cls.db.session
if session is None:
return None
if (session := await cls.db.get_session()) is not None:
filters = {"name": name}
if operator_name is not None:
filters["operator_name"] = operator_name
async with session.begin():
filters = {"name": name}
if operator_name is not None:
filters["operator_name"] = operator_name
stmt = (
select(cls)
.filter_by(**filters)
.options(selectinload(cls.stops), selectinload(cls.picto))
)
res = await session.execute(stmt)
lines = res.scalars().all()
stmt = (
select(cls)
.filter_by(**filters)
.options(selectinload(cls.stops), selectinload(cls.picto))
)
res = await session.execute(stmt)
lines = res.scalars().all()
return lines
return lines
return None
@classmethod
async def _add_picto_to_line(cls, line: str | Self, picto: LinePicto) -> None:
@@ -133,57 +134,63 @@ class Line(Base):
@classmethod
async def add_pictos(cls, line_to_pictos: Iterable[tuple[str, LinePicto]]) -> bool:
session = cls.db.session
if session is None:
return False
if (session := await cls.db.get_session()) is not None:
await asyncio_gather(
*[cls._add_picto_to_line(line, picto) for line, picto in line_to_pictos]
)
async with session.begin():
await asyncio_gather(
*[
cls._add_picto_to_line(line, picto)
for line, picto in line_to_pictos
]
)
await session.commit()
return True
return True
return False
@classmethod
async def add_stops(cls, line_to_stop_ids: Iterable[tuple[str, str, int]]) -> int:
session = cls.db.session
if session is None:
return 0
if (session := await cls.db.get_session()) is not None:
line_names_ops, stop_ids = set(), set()
for line_name, operator_name, stop_id in line_to_stop_ids:
line_names_ops.add((line_name, operator_name))
stop_ids.add(stop_id)
async with session.begin():
lines_res = await session.execute(
select(Line).where(
tuple_(Line.name, Line.operator_name).in_(line_names_ops)
)
)
line_names_ops, stop_ids = set(), set()
for line_name, operator_name, stop_id in line_to_stop_ids:
line_names_ops.add((line_name, operator_name))
stop_ids.add(stop_id)
lines = defaultdict(list)
for line in lines_res.scalars():
lines[(line.name, line.operator_name)].append(line)
stops_res = await session.execute(select(_Stop).where(_Stop.id.in_(stop_ids)))
stops = {stop.id: stop for stop in stops_res.scalars()}
found = 0
for line_name, operator_name, stop_id in line_to_stop_ids:
if (stop := stops.get(stop_id)) is not None:
if (stop_lines := lines.get((line_name, operator_name))) is not None:
for stop_line in stop_lines:
stop_line.stops.append(stop)
found += 1
else:
print(f"No line found for {line_name}/{operator_name}")
else:
print(
f"No stop found for {stop_id} id"
f"(used by {line_name}/{operator_name})"
lines_res = await session.execute(
select(Line).where(
tuple_(Line.name, Line.operator_name).in_(line_names_ops)
)
)
await session.commit()
lines = defaultdict(list)
for line in lines_res.scalars():
lines[(line.name, line.operator_name)].append(line)
return found
stops_res = await session.execute(
select(_Stop).where(_Stop.id.in_(stop_ids))
)
stops = {stop.id: stop for stop in stops_res.scalars()}
found = 0
for line_name, operator_name, stop_id in line_to_stop_ids:
if (stop := stops.get(stop_id)) is not None:
if (
stop_lines := lines.get((line_name, operator_name))
) is not None:
for stop_line in stop_lines:
stop_line.stops.append(stop)
found += 1
else:
print(f"No line found for {line_name}/{operator_name}")
else:
print(
f"No stop found for {stop_id} id"
f"(used by {line_name}/{operator_name})"
)
return found
return 0

View File

@@ -1,13 +1,17 @@
from __future__ import annotations
from typing import Iterable, Sequence, TYPE_CHECKING
from logging import getLogger
from typing import Annotated, Iterable, Sequence, TYPE_CHECKING
from sqlalchemy import (
BigInteger,
Column,
Computed,
desc,
Enum,
Float,
ForeignKey,
func,
Integer,
JSON,
select,
@@ -19,9 +23,9 @@ from sqlalchemy.orm import (
Mapped,
relationship,
selectinload,
with_polymorphic,
)
from sqlalchemy.schema import Index
from sqlalchemy_utils.types.ts_vector import TSVectorType
from ..db import Base, db
from ..idfm_interface.idfm_types import TransportMode, IdfmState, StopAreaType
@@ -30,6 +34,8 @@ if TYPE_CHECKING:
from .line import Line
logger = getLogger(__name__)
stop_area_stop_association_table = Table(
"stop_area_stop_association_table",
Base.metadata,
@@ -71,40 +77,39 @@ class _Stop(Base):
back_populates="stops", lazy="selectin"
)
names_tsv = mapped_column(
TSVectorType("name", "town_name", regconfig="french"),
Computed("to_tsvector('french', name || ' ' || town_name)", persisted=True),
)
__tablename__ = "_stops"
__mapper_args__ = {"polymorphic_identity": "_stops", "polymorphic_on": kind}
__table_args__ = (
# To optimize the ilike requests
Index(
"name_idx_gin",
name,
"names_tsv_idx",
names_tsv,
postgresql_ops={"name": "gin_trgm_ops"},
postgresql_using="gin",
),
)
# TODO: Test https://www.cybertec-postgresql.com/en/postgresql-more-performance-for-like-and-ilike-statements/
# TODO: Should be able to remove with_polymorphic ?
@classmethod
async def get_by_name(cls, name: str) -> Sequence[type[_Stop]] | None:
session = cls.db.session
if session is None:
return None
async def get_by_name(cls, name: str) -> Sequence[_Stop] | None:
if (session := await cls.db.get_session()) is not None:
stop_stop_area = with_polymorphic(_Stop, [Stop, StopArea])
stmt = (
select(stop_stop_area)
.where(stop_stop_area.name.ilike(f"%{name}%"))
.options(
selectinload(stop_stop_area.areas),
selectinload(stop_stop_area.lines),
)
)
async with session.begin():
match_stmt = cls.names_tsv.match(name, postgresql_regconfig="french")
ranking_stmt = func.ts_rank_cd(
cls.names_tsv, func.plainto_tsquery("french", name)
)
stmt = select(cls).filter(match_stmt).order_by(desc(ranking_stmt))
res = await session.execute(stmt)
stops = res.scalars().all()
res = await session.execute(stmt)
stops = res.scalars().all()
return stops
return stops
return None
class Stop(_Stop):
@@ -146,41 +151,43 @@ class StopArea(_Stop):
async def add_stops(
cls, stop_area_to_stop_ids: Iterable[tuple[int, int]]
) -> int | None:
session = cls.db.session
if session is None:
return None
if (session := await cls.db.get_session()) is not None:
stop_area_ids, stop_ids = set(), set()
for stop_area_id, stop_id in stop_area_to_stop_ids:
stop_area_ids.add(stop_area_id)
stop_ids.add(stop_id)
async with session.begin():
stop_areas_res = await session.scalars(
select(StopArea)
.where(StopArea.id.in_(stop_area_ids))
.options(selectinload(StopArea.stops))
)
stop_areas: dict[int, StopArea] = {
stop_area.id: stop_area for stop_area in stop_areas_res.all()
}
stop_area_ids, stop_ids = set(), set()
for stop_area_id, stop_id in stop_area_to_stop_ids:
stop_area_ids.add(stop_area_id)
stop_ids.add(stop_id)
stop_res = await session.execute(select(Stop).where(Stop.id.in_(stop_ids)))
stops: dict[int, Stop] = {stop.id: stop for stop in stop_res.scalars()}
stop_areas_res = await session.scalars(
select(StopArea)
.where(StopArea.id.in_(stop_area_ids))
.options(selectinload(StopArea.stops))
)
stop_areas: dict[int, StopArea] = {
stop_area.id: stop_area for stop_area in stop_areas_res.all()
}
found = 0
for stop_area_id, stop_id in stop_area_to_stop_ids:
if (stop_area := stop_areas.get(stop_area_id)) is not None:
if (stop := stops.get(stop_id)) is not None:
stop_area.stops.append(stop)
found += 1
else:
print(f"No stop found for {stop_id} id")
else:
print(f"No stop area found for {stop_area_id}")
stop_res = await session.execute(
select(Stop).where(Stop.id.in_(stop_ids))
)
stops: dict[int, Stop] = {stop.id: stop for stop in stop_res.scalars()}
await session.commit()
found = 0
for stop_area_id, stop_id in stop_area_to_stop_ids:
if (stop_area := stop_areas.get(stop_area_id)) is not None:
if (stop := stops.get(stop_id)) is not None:
stop_area.stops.append(stop)
found += 1
else:
print(f"No stop found for {stop_id} id")
else:
print(f"No stop area found for {stop_area_id}")
return found
return found
return None
class StopShape(Base):
@@ -221,38 +228,40 @@ class ConnectionArea(Base):
async def add_stops(
cls, conn_area_to_stop_ids: Iterable[tuple[int, int]]
) -> int | None:
session = cls.db.session
if session is None:
return None
if (session := await cls.db.get_session()) is not None:
conn_area_ids, stop_ids = set(), set()
for conn_area_id, stop_id in conn_area_to_stop_ids:
conn_area_ids.add(conn_area_id)
stop_ids.add(stop_id)
async with session.begin():
conn_area_res = await session.execute(
select(ConnectionArea)
.where(ConnectionArea.id.in_(conn_area_ids))
.options(selectinload(ConnectionArea.stops))
)
conn_areas: dict[int, ConnectionArea] = {
conn.id: conn for conn in conn_area_res.scalars()
}
conn_area_ids, stop_ids = set(), set()
for conn_area_id, stop_id in conn_area_to_stop_ids:
conn_area_ids.add(conn_area_id)
stop_ids.add(stop_id)
stop_res = await session.execute(select(_Stop).where(_Stop.id.in_(stop_ids)))
stops: dict[int, _Stop] = {stop.id: stop for stop in stop_res.scalars()}
conn_area_res = await session.execute(
select(ConnectionArea)
.where(ConnectionArea.id.in_(conn_area_ids))
.options(selectinload(ConnectionArea.stops))
)
conn_areas: dict[int, ConnectionArea] = {
conn.id: conn for conn in conn_area_res.scalars()
}
found = 0
for conn_area_id, stop_id in conn_area_to_stop_ids:
if (conn_area := conn_areas.get(conn_area_id)) is not None:
if (stop := stops.get(stop_id)) is not None:
conn_area.stops.append(stop)
found += 1
else:
print(f"No stop found for {stop_id} id")
else:
print(f"No connection area found for {conn_area_id}")
stop_res = await session.execute(
select(Stop).where(Stop.id.in_(stop_ids))
)
stops: dict[int, Stop] = {stop.id: stop for stop in stop_res.scalars()}
await session.commit()
found = 0
for conn_area_id, stop_id in conn_area_to_stop_ids:
if (conn_area := conn_areas.get(conn_area_id)) is not None:
if (stop := stops.get(stop_id)) is not None:
conn_area.stops.append(stop)
found += 1
else:
print(f"No stop found for {stop_id} id")
else:
print(f"No connection area found for {conn_area_id}")
return found
return found
return None

View File

@@ -13,7 +13,62 @@ services:
max-size: 10m
max-file: "3"
ports:
- '127.0.0.1:5438:5432'
- "127.0.0.1:5432:5432"
volumes:
- ./docker/database/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
- ./docker/database/data:/var/lib/postgresql/data
jaeger-agent:
image: jaegertracing/jaeger-agent:latest
command:
- "--reporter.grpc.host-port=jaeger-collector:14250"
ports:
- "127.0.0.1:5775:5775/udp"
- "127.0.0.1:6831:6831/udp"
- "127.0.0.1:6832:6832/udp"
- "127.0.0.1:5778:5778"
restart: on-failure
depends_on:
- jaeger-collector
jaeger-collector:
image: jaegertracing/jaeger-collector:latest
command:
- "--cassandra.keyspace=jaeger_v1_dc1"
- "--cassandra.servers=cassandra"
- "--collector.zipkin.host-port=9411"
- "--sampling.initial-sampling-probability=.5"
- "--sampling.target-samples-per-second=.01"
environment:
- SAMPLING_CONFIG_TYPE=adaptive
- COLLECTOR_OTLP_ENABLED=true
ports:
- "127.0.0.1:4317:4317"
- "127.0.0.1:4318:4318"
# - "127.0.0.1:9411:9411"
# - "127.0.0.1:14250:14250"
# - "127.0.0.1:14268:14268"
# - "127.0.0.1:14269:14269"
restart: on-failure
depends_on:
- cassandra-schema
cassandra:
image: cassandra:latest
cassandra-schema:
image: jaegertracing/jaeger-cassandra-schema:latest
depends_on:
- cassandra
jaeger-query:
image: jaegertracing/jaeger-query:latest
command:
- "--cassandra.keyspace=jaeger_v1_dc1"
- "--cassandra.servers=cassandra"
ports:
- "127.0.0.1:16686:16686"
# - "127.0.0.1:16687:16687"
restart: on-failure
depends_on:
- cassandra-schema

45
backend/main.py Normal file → Executable file
View File

@@ -1,12 +1,26 @@
#!/usr/bin/env python3
import logging
from collections import defaultdict
from datetime import datetime
from os import environ, EX_USAGE
from typing import Sequence
import uvicorn
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.sdk.resources import Resource as OtResource
from opentelemetry.sdk.trace import TracerProvider as OtTracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from rich import print
from starlette.types import ASGIApp
from backend.db import db
from backend.idfm_interface import Destinations as IdfmDestinations, IdfmInterface
@@ -26,8 +40,21 @@ if API_KEY is None:
print('No "API_KEY" environment variable set... abort.')
exit(EX_USAGE)
# TODO: Remove postgresql+asyncpg from environ variable
DB_PATH = "postgresql+asyncpg://cer_user:cer_password@127.0.0.1:5438/cer_db"
APP_NAME = environ.get("APP_NAME", "app")
MODE = environ.get("MODE", "grpc")
COLLECTOR_ENDPOINT_GRPC_ENDPOINT = environ.get(
"COLLECTOR_ENDPOINT_GRPC_ENDPOINT", "127.0.0.1:14250" # "jaeger-collector:14250"
)
# CREATE DATABASE "carrramba-encore-rate";
# CREATE USER cer WITH ENCRYPTED PASSWORD 'cer_password';
# GRANT ALL PRIVILEGES ON DATABASE "carrramba-encore-rate" TO cer;
# \c "carrramba-encore-rate";
# GRANT ALL ON schema public TO cer;
# CREATE EXTENSION IF NOT EXISTS pg_trgm;
# TODO: Remove postgresql+psycopg from environ variable
DB_PATH = "postgresql+psycopg://cer:cer_password@127.0.0.1:5432/carrramba-encore-rate"
app = FastAPI()
@@ -42,6 +69,14 @@ app.add_middleware(
allow_headers=["*"],
)
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
tracer = trace.get_tracer(APP_NAME)
with tracer.start_as_current_span("foo"):
print("Hello world!")
idfm_interface = IdfmInterface(API_KEY, db)
@@ -247,3 +282,9 @@ async def get_stop_shape(stop_id: int) -> StopShapeSchema | None:
msg = f"No shape found for stop {stop_id}"
raise HTTPException(status_code=404, detail=msg)
FastAPIInstrumentor.instrument_app(app)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=4443, ssl_certfile="./config/cert.pem")

View File

@@ -11,13 +11,20 @@ python = "^3.11"
aiohttp = "^3.8.3"
rich = "^12.6.0"
aiofiles = "^22.1.0"
sqlalchemy = {extras = ["asyncio"], version = "^2.0.1"}
fastapi = "^0.88.0"
uvicorn = "^0.20.0"
asyncpg = "^0.27.0"
msgspec = "^0.12.0"
pyshp = "^2.3.1"
pyproj = "^3.5.0"
opentelemetry-instrumentation-fastapi = "^0.38b0"
sqlalchemy-utils = "^0.41.1"
opentelemetry-instrumentation-logging = "^0.38b0"
opentelemetry-sdk = "^1.17.0"
opentelemetry-api = "^1.17.0"
opentelemetry-exporter-otlp-proto-http = "^1.17.0"
opentelemetry-instrumentation-sqlalchemy = "^0.38b0"
sqlalchemy = "^2.0.12"
psycopg = "^3.1.9"
[build-system]
requires = ["poetry-core"]
@@ -39,7 +46,6 @@ autopep8 = "^2.0.1"
pyflakes = "^3.0.1"
yapf = "^0.32.0"
whatthepatch = "^1.0.4"
sqlalchemy = {extras = ["mypy"], version = "^2.0.1"}
mypy = "^1.0.0"
[tool.mypy]