Compare commits
6 Commits
reduce-fro
...
b713042359
Author | SHA1 | Date | |
---|---|---|---|
b713042359
|
|||
5505209760
|
|||
6aa28f7bfb
|
|||
07d43bfcb4
|
|||
6eb78d7307
|
|||
bcedf32bec
|
@@ -1,5 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from logging import getLogger
|
||||||
from typing import Iterable, Self, TYPE_CHECKING
|
from typing import Iterable, Self, TYPE_CHECKING
|
||||||
|
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
@@ -9,31 +10,36 @@ from sqlalchemy.orm import DeclarativeBase
|
|||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .db import Database
|
from .db import Database
|
||||||
|
|
||||||
|
logger = getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Base(DeclarativeBase):
|
class Base(DeclarativeBase):
|
||||||
db: Database | None = None
|
db: Database | None = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def add(cls, stops: Self | Iterable[Self]) -> bool:
|
async def add(cls, objs: Self | Iterable[Self]) -> bool:
|
||||||
|
if cls.db is not None and (session := await cls.db.get_session()) is not None:
|
||||||
|
|
||||||
|
async with session.begin():
|
||||||
try:
|
try:
|
||||||
if isinstance(stops, Iterable):
|
if isinstance(objs, Iterable):
|
||||||
cls.db.session.add_all(stops) # type: ignore
|
session.add_all(objs)
|
||||||
else:
|
else:
|
||||||
cls.db.session.add(stops) # type: ignore
|
session.add(objs)
|
||||||
await cls.db.session.commit() # type: ignore
|
|
||||||
except (AttributeError, IntegrityError) as err:
|
except (AttributeError, IntegrityError) as err:
|
||||||
print(err)
|
logger.error(err)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def get_by_id(cls, id_: int | str) -> Self | None:
|
async def get_by_id(cls, id_: int | str) -> Self | None:
|
||||||
try:
|
if cls.db is not None and (session := await cls.db.get_session()) is not None:
|
||||||
stmt = select(cls).where(cls.id == id_) # type: ignore
|
|
||||||
res = await cls.db.session.execute(stmt) # type: ignore
|
async with session.begin():
|
||||||
element = res.scalar_one_or_none()
|
stmt = select(cls).where(cls.id == id_)
|
||||||
except AttributeError as err:
|
res = await session.execute(stmt)
|
||||||
print(err)
|
return res.scalar_one_or_none()
|
||||||
element = None
|
|
||||||
return element
|
return None
|
||||||
|
@@ -1,4 +1,10 @@
|
|||||||
|
from logging import getLogger
|
||||||
|
from typing import Annotated, AsyncIterator
|
||||||
|
|
||||||
|
from fastapi import Depends
|
||||||
|
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
|
||||||
from sqlalchemy import text
|
from sqlalchemy import text
|
||||||
|
from sqlalchemy.exc import SQLAlchemyError
|
||||||
from sqlalchemy.ext.asyncio import (
|
from sqlalchemy.ext.asyncio import (
|
||||||
async_sessionmaker,
|
async_sessionmaker,
|
||||||
AsyncEngine,
|
AsyncEngine,
|
||||||
@@ -9,40 +15,47 @@ from sqlalchemy.ext.asyncio import (
|
|||||||
from .base_class import Base
|
from .base_class import Base
|
||||||
|
|
||||||
|
|
||||||
|
logger = getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Database:
|
class Database:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self._engine: AsyncEngine | None = None
|
self._async_engine: AsyncEngine | None = None
|
||||||
self._session_maker: async_sessionmaker[AsyncSession] | None = None
|
self._async_session_local: async_sessionmaker[AsyncSession] | None = None
|
||||||
self._session: AsyncSession | None = None
|
|
||||||
|
|
||||||
@property
|
async def get_session(self) -> AsyncSession | None:
|
||||||
def session(self) -> AsyncSession | None:
|
try:
|
||||||
if self._session is None and (session_maker := self._session_maker) is not None:
|
return self._async_session_local() # type: ignore
|
||||||
self._session = session_maker()
|
|
||||||
return self._session
|
|
||||||
|
|
||||||
async def connect(self, db_path: str, clear_static_data: bool = False) -> bool:
|
except (SQLAlchemyError, AttributeError) as e:
|
||||||
|
logger.exception(e)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
# TODO: Preserve UserLastStopSearchResults table from drop.
|
# TODO: Preserve UserLastStopSearchResults table from drop.
|
||||||
self._engine = create_async_engine(db_path)
|
async def connect(self, db_path: str, clear_static_data: bool = False) -> bool:
|
||||||
if self._engine is not None:
|
self._async_engine = create_async_engine(
|
||||||
self._session_maker = async_sessionmaker(
|
db_path, pool_pre_ping=True, pool_size=10, max_overflow=20
|
||||||
self._engine, expire_on_commit=False, class_=AsyncSession
|
|
||||||
)
|
)
|
||||||
if (session := self.session) is not None:
|
|
||||||
await session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm;"))
|
|
||||||
|
|
||||||
async with self._engine.begin() as conn:
|
if self._async_engine is not None:
|
||||||
|
SQLAlchemyInstrumentor().instrument(engine=self._async_engine.sync_engine)
|
||||||
|
|
||||||
|
self._async_session_local = async_sessionmaker(
|
||||||
|
bind=self._async_engine,
|
||||||
|
# autoflush=False,
|
||||||
|
expire_on_commit=False,
|
||||||
|
class_=AsyncSession,
|
||||||
|
)
|
||||||
|
|
||||||
|
async with self._async_engine.begin() as session:
|
||||||
|
await session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm;"))
|
||||||
if clear_static_data:
|
if clear_static_data:
|
||||||
await conn.run_sync(Base.metadata.drop_all)
|
await session.run_sync(Base.metadata.drop_all)
|
||||||
await conn.run_sync(Base.metadata.create_all)
|
await session.run_sync(Base.metadata.create_all)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def disconnect(self) -> None:
|
async def disconnect(self) -> None:
|
||||||
if self._session is not None:
|
if self._async_engine is not None:
|
||||||
await self._session.close()
|
await self._async_engine.dispose()
|
||||||
self._session = None
|
|
||||||
|
|
||||||
if self._engine is not None:
|
|
||||||
await self._engine.dispose()
|
|
||||||
|
@@ -15,7 +15,7 @@ from aiohttp import ClientSession
|
|||||||
from msgspec import ValidationError
|
from msgspec import ValidationError
|
||||||
from msgspec.json import Decoder
|
from msgspec.json import Decoder
|
||||||
from pyproj import Transformer
|
from pyproj import Transformer
|
||||||
from shapefile import Reader as ShapeFileReader, ShapeRecord
|
from shapefile import Reader as ShapeFileReader, ShapeRecord # type: ignore
|
||||||
|
|
||||||
from ..db import Database
|
from ..db import Database
|
||||||
from ..models import ConnectionArea, Line, LinePicto, Stop, StopArea, StopShape
|
from ..models import ConnectionArea, Line, LinePicto, Stop, StopArea, StopShape
|
||||||
@@ -357,7 +357,6 @@ class IdfmInterface:
|
|||||||
fields = line.fields
|
fields = line.fields
|
||||||
|
|
||||||
picto_id = fields.picto.id_ if fields.picto is not None else None
|
picto_id = fields.picto.id_ if fields.picto is not None else None
|
||||||
picto = await LinePicto.get_by_id(picto_id) if picto_id else None
|
|
||||||
|
|
||||||
ret.append(
|
ret.append(
|
||||||
Line(
|
Line(
|
||||||
@@ -384,7 +383,6 @@ class IdfmInterface:
|
|||||||
fields.audiblesigns_available.value
|
fields.audiblesigns_available.value
|
||||||
),
|
),
|
||||||
picto_id=fields.picto.id_ if fields.picto is not None else None,
|
picto_id=fields.picto.id_ if fields.picto is not None else None,
|
||||||
picto=picto,
|
|
||||||
record_id=line.recordid,
|
record_id=line.recordid,
|
||||||
record_ts=int(line.record_timestamp.timestamp()),
|
record_ts=int(line.record_timestamp.timestamp()),
|
||||||
)
|
)
|
||||||
|
@@ -94,10 +94,9 @@ class Line(Base):
|
|||||||
async def get_by_name(
|
async def get_by_name(
|
||||||
cls, name: str, operator_name: None | str = None
|
cls, name: str, operator_name: None | str = None
|
||||||
) -> Sequence[Self] | None:
|
) -> Sequence[Self] | None:
|
||||||
session = cls.db.session
|
if (session := await cls.db.get_session()) is not None:
|
||||||
if session is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
async with session.begin():
|
||||||
filters = {"name": name}
|
filters = {"name": name}
|
||||||
if operator_name is not None:
|
if operator_name is not None:
|
||||||
filters["operator_name"] = operator_name
|
filters["operator_name"] = operator_name
|
||||||
@@ -112,6 +111,8 @@ class Line(Base):
|
|||||||
|
|
||||||
return lines
|
return lines
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def _add_picto_to_line(cls, line: str | Self, picto: LinePicto) -> None:
|
async def _add_picto_to_line(cls, line: str | Self, picto: LinePicto) -> None:
|
||||||
formatted_line: Self | None = None
|
formatted_line: Self | None = None
|
||||||
@@ -133,23 +134,25 @@ class Line(Base):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def add_pictos(cls, line_to_pictos: Iterable[tuple[str, LinePicto]]) -> bool:
|
async def add_pictos(cls, line_to_pictos: Iterable[tuple[str, LinePicto]]) -> bool:
|
||||||
session = cls.db.session
|
if (session := await cls.db.get_session()) is not None:
|
||||||
if session is None:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
async with session.begin():
|
||||||
await asyncio_gather(
|
await asyncio_gather(
|
||||||
*[cls._add_picto_to_line(line, picto) for line, picto in line_to_pictos]
|
*[
|
||||||
|
cls._add_picto_to_line(line, picto)
|
||||||
|
for line, picto in line_to_pictos
|
||||||
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
await session.commit()
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def add_stops(cls, line_to_stop_ids: Iterable[tuple[str, str, int]]) -> int:
|
async def add_stops(cls, line_to_stop_ids: Iterable[tuple[str, str, int]]) -> int:
|
||||||
session = cls.db.session
|
if (session := await cls.db.get_session()) is not None:
|
||||||
if session is None:
|
|
||||||
return 0
|
async with session.begin():
|
||||||
|
|
||||||
line_names_ops, stop_ids = set(), set()
|
line_names_ops, stop_ids = set(), set()
|
||||||
for line_name, operator_name, stop_id in line_to_stop_ids:
|
for line_name, operator_name, stop_id in line_to_stop_ids:
|
||||||
@@ -166,13 +169,17 @@ class Line(Base):
|
|||||||
for line in lines_res.scalars():
|
for line in lines_res.scalars():
|
||||||
lines[(line.name, line.operator_name)].append(line)
|
lines[(line.name, line.operator_name)].append(line)
|
||||||
|
|
||||||
stops_res = await session.execute(select(_Stop).where(_Stop.id.in_(stop_ids)))
|
stops_res = await session.execute(
|
||||||
|
select(_Stop).where(_Stop.id.in_(stop_ids))
|
||||||
|
)
|
||||||
stops = {stop.id: stop for stop in stops_res.scalars()}
|
stops = {stop.id: stop for stop in stops_res.scalars()}
|
||||||
|
|
||||||
found = 0
|
found = 0
|
||||||
for line_name, operator_name, stop_id in line_to_stop_ids:
|
for line_name, operator_name, stop_id in line_to_stop_ids:
|
||||||
if (stop := stops.get(stop_id)) is not None:
|
if (stop := stops.get(stop_id)) is not None:
|
||||||
if (stop_lines := lines.get((line_name, operator_name))) is not None:
|
if (
|
||||||
|
stop_lines := lines.get((line_name, operator_name))
|
||||||
|
) is not None:
|
||||||
for stop_line in stop_lines:
|
for stop_line in stop_lines:
|
||||||
stop_line.stops.append(stop)
|
stop_line.stops.append(stop)
|
||||||
found += 1
|
found += 1
|
||||||
@@ -184,6 +191,6 @@ class Line(Base):
|
|||||||
f"(used by {line_name}/{operator_name})"
|
f"(used by {line_name}/{operator_name})"
|
||||||
)
|
)
|
||||||
|
|
||||||
await session.commit()
|
|
||||||
|
|
||||||
return found
|
return found
|
||||||
|
|
||||||
|
return 0
|
||||||
|
@@ -1,13 +1,17 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Iterable, Sequence, TYPE_CHECKING
|
from logging import getLogger
|
||||||
|
from typing import Annotated, Iterable, Sequence, TYPE_CHECKING
|
||||||
|
|
||||||
from sqlalchemy import (
|
from sqlalchemy import (
|
||||||
BigInteger,
|
BigInteger,
|
||||||
Column,
|
Column,
|
||||||
|
Computed,
|
||||||
|
desc,
|
||||||
Enum,
|
Enum,
|
||||||
Float,
|
Float,
|
||||||
ForeignKey,
|
ForeignKey,
|
||||||
|
func,
|
||||||
Integer,
|
Integer,
|
||||||
JSON,
|
JSON,
|
||||||
select,
|
select,
|
||||||
@@ -19,9 +23,9 @@ from sqlalchemy.orm import (
|
|||||||
Mapped,
|
Mapped,
|
||||||
relationship,
|
relationship,
|
||||||
selectinload,
|
selectinload,
|
||||||
with_polymorphic,
|
|
||||||
)
|
)
|
||||||
from sqlalchemy.schema import Index
|
from sqlalchemy.schema import Index
|
||||||
|
from sqlalchemy_utils.types.ts_vector import TSVectorType
|
||||||
|
|
||||||
from ..db import Base, db
|
from ..db import Base, db
|
||||||
from ..idfm_interface.idfm_types import TransportMode, IdfmState, StopAreaType
|
from ..idfm_interface.idfm_types import TransportMode, IdfmState, StopAreaType
|
||||||
@@ -30,6 +34,8 @@ if TYPE_CHECKING:
|
|||||||
from .line import Line
|
from .line import Line
|
||||||
|
|
||||||
|
|
||||||
|
logger = getLogger(__name__)
|
||||||
|
|
||||||
stop_area_stop_association_table = Table(
|
stop_area_stop_association_table = Table(
|
||||||
"stop_area_stop_association_table",
|
"stop_area_stop_association_table",
|
||||||
Base.metadata,
|
Base.metadata,
|
||||||
@@ -71,41 +77,40 @@ class _Stop(Base):
|
|||||||
back_populates="stops", lazy="selectin"
|
back_populates="stops", lazy="selectin"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
names_tsv = mapped_column(
|
||||||
|
TSVectorType("name", "town_name", regconfig="french"),
|
||||||
|
Computed("to_tsvector('french', name || ' ' || town_name)", persisted=True),
|
||||||
|
)
|
||||||
|
|
||||||
__tablename__ = "_stops"
|
__tablename__ = "_stops"
|
||||||
__mapper_args__ = {"polymorphic_identity": "_stops", "polymorphic_on": kind}
|
__mapper_args__ = {"polymorphic_identity": "_stops", "polymorphic_on": kind}
|
||||||
__table_args__ = (
|
__table_args__ = (
|
||||||
# To optimize the ilike requests
|
|
||||||
Index(
|
Index(
|
||||||
"name_idx_gin",
|
"names_tsv_idx",
|
||||||
name,
|
names_tsv,
|
||||||
postgresql_ops={"name": "gin_trgm_ops"},
|
postgresql_ops={"name": "gin_trgm_ops"},
|
||||||
postgresql_using="gin",
|
postgresql_using="gin",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: Test https://www.cybertec-postgresql.com/en/postgresql-more-performance-for-like-and-ilike-statements/
|
|
||||||
# TODO: Should be able to remove with_polymorphic ?
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def get_by_name(cls, name: str) -> Sequence[type[_Stop]] | None:
|
async def get_by_name(cls, name: str) -> Sequence[_Stop] | None:
|
||||||
session = cls.db.session
|
if (session := await cls.db.get_session()) is not None:
|
||||||
if session is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
stop_stop_area = with_polymorphic(_Stop, [Stop, StopArea])
|
async with session.begin():
|
||||||
stmt = (
|
match_stmt = cls.names_tsv.match(name, postgresql_regconfig="french")
|
||||||
select(stop_stop_area)
|
ranking_stmt = func.ts_rank_cd(
|
||||||
.where(stop_stop_area.name.ilike(f"%{name}%"))
|
cls.names_tsv, func.plainto_tsquery("french", name)
|
||||||
.options(
|
|
||||||
selectinload(stop_stop_area.areas),
|
|
||||||
selectinload(stop_stop_area.lines),
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
stmt = select(cls).filter(match_stmt).order_by(desc(ranking_stmt))
|
||||||
|
|
||||||
res = await session.execute(stmt)
|
res = await session.execute(stmt)
|
||||||
stops = res.scalars().all()
|
stops = res.scalars().all()
|
||||||
|
|
||||||
return stops
|
return stops
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
class Stop(_Stop):
|
class Stop(_Stop):
|
||||||
|
|
||||||
@@ -146,9 +151,9 @@ class StopArea(_Stop):
|
|||||||
async def add_stops(
|
async def add_stops(
|
||||||
cls, stop_area_to_stop_ids: Iterable[tuple[int, int]]
|
cls, stop_area_to_stop_ids: Iterable[tuple[int, int]]
|
||||||
) -> int | None:
|
) -> int | None:
|
||||||
session = cls.db.session
|
if (session := await cls.db.get_session()) is not None:
|
||||||
if session is None:
|
|
||||||
return None
|
async with session.begin():
|
||||||
|
|
||||||
stop_area_ids, stop_ids = set(), set()
|
stop_area_ids, stop_ids = set(), set()
|
||||||
for stop_area_id, stop_id in stop_area_to_stop_ids:
|
for stop_area_id, stop_id in stop_area_to_stop_ids:
|
||||||
@@ -164,7 +169,9 @@ class StopArea(_Stop):
|
|||||||
stop_area.id: stop_area for stop_area in stop_areas_res.all()
|
stop_area.id: stop_area for stop_area in stop_areas_res.all()
|
||||||
}
|
}
|
||||||
|
|
||||||
stop_res = await session.execute(select(Stop).where(Stop.id.in_(stop_ids)))
|
stop_res = await session.execute(
|
||||||
|
select(Stop).where(Stop.id.in_(stop_ids))
|
||||||
|
)
|
||||||
stops: dict[int, Stop] = {stop.id: stop for stop in stop_res.scalars()}
|
stops: dict[int, Stop] = {stop.id: stop for stop in stop_res.scalars()}
|
||||||
|
|
||||||
found = 0
|
found = 0
|
||||||
@@ -178,10 +185,10 @@ class StopArea(_Stop):
|
|||||||
else:
|
else:
|
||||||
print(f"No stop area found for {stop_area_id}")
|
print(f"No stop area found for {stop_area_id}")
|
||||||
|
|
||||||
await session.commit()
|
|
||||||
|
|
||||||
return found
|
return found
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
class StopShape(Base):
|
class StopShape(Base):
|
||||||
|
|
||||||
@@ -221,9 +228,9 @@ class ConnectionArea(Base):
|
|||||||
async def add_stops(
|
async def add_stops(
|
||||||
cls, conn_area_to_stop_ids: Iterable[tuple[int, int]]
|
cls, conn_area_to_stop_ids: Iterable[tuple[int, int]]
|
||||||
) -> int | None:
|
) -> int | None:
|
||||||
session = cls.db.session
|
if (session := await cls.db.get_session()) is not None:
|
||||||
if session is None:
|
|
||||||
return None
|
async with session.begin():
|
||||||
|
|
||||||
conn_area_ids, stop_ids = set(), set()
|
conn_area_ids, stop_ids = set(), set()
|
||||||
for conn_area_id, stop_id in conn_area_to_stop_ids:
|
for conn_area_id, stop_id in conn_area_to_stop_ids:
|
||||||
@@ -239,8 +246,10 @@ class ConnectionArea(Base):
|
|||||||
conn.id: conn for conn in conn_area_res.scalars()
|
conn.id: conn for conn in conn_area_res.scalars()
|
||||||
}
|
}
|
||||||
|
|
||||||
stop_res = await session.execute(select(_Stop).where(_Stop.id.in_(stop_ids)))
|
stop_res = await session.execute(
|
||||||
stops: dict[int, _Stop] = {stop.id: stop for stop in stop_res.scalars()}
|
select(Stop).where(Stop.id.in_(stop_ids))
|
||||||
|
)
|
||||||
|
stops: dict[int, Stop] = {stop.id: stop for stop in stop_res.scalars()}
|
||||||
|
|
||||||
found = 0
|
found = 0
|
||||||
for conn_area_id, stop_id in conn_area_to_stop_ids:
|
for conn_area_id, stop_id in conn_area_to_stop_ids:
|
||||||
@@ -253,6 +262,6 @@ class ConnectionArea(Base):
|
|||||||
else:
|
else:
|
||||||
print(f"No connection area found for {conn_area_id}")
|
print(f"No connection area found for {conn_area_id}")
|
||||||
|
|
||||||
await session.commit()
|
|
||||||
|
|
||||||
return found
|
return found
|
||||||
|
|
||||||
|
return None
|
||||||
|
@@ -13,7 +13,62 @@ services:
|
|||||||
max-size: 10m
|
max-size: 10m
|
||||||
max-file: "3"
|
max-file: "3"
|
||||||
ports:
|
ports:
|
||||||
- '127.0.0.1:5438:5432'
|
- "127.0.0.1:5432:5432"
|
||||||
volumes:
|
volumes:
|
||||||
- ./docker/database/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
|
- ./docker/database/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
|
||||||
- ./docker/database/data:/var/lib/postgresql/data
|
- ./docker/database/data:/var/lib/postgresql/data
|
||||||
|
|
||||||
|
jaeger-agent:
|
||||||
|
image: jaegertracing/jaeger-agent:latest
|
||||||
|
command:
|
||||||
|
- "--reporter.grpc.host-port=jaeger-collector:14250"
|
||||||
|
ports:
|
||||||
|
- "127.0.0.1:5775:5775/udp"
|
||||||
|
- "127.0.0.1:6831:6831/udp"
|
||||||
|
- "127.0.0.1:6832:6832/udp"
|
||||||
|
- "127.0.0.1:5778:5778"
|
||||||
|
restart: on-failure
|
||||||
|
depends_on:
|
||||||
|
- jaeger-collector
|
||||||
|
|
||||||
|
jaeger-collector:
|
||||||
|
image: jaegertracing/jaeger-collector:latest
|
||||||
|
command:
|
||||||
|
- "--cassandra.keyspace=jaeger_v1_dc1"
|
||||||
|
- "--cassandra.servers=cassandra"
|
||||||
|
- "--collector.zipkin.host-port=9411"
|
||||||
|
- "--sampling.initial-sampling-probability=.5"
|
||||||
|
- "--sampling.target-samples-per-second=.01"
|
||||||
|
environment:
|
||||||
|
- SAMPLING_CONFIG_TYPE=adaptive
|
||||||
|
- COLLECTOR_OTLP_ENABLED=true
|
||||||
|
ports:
|
||||||
|
- "127.0.0.1:4317:4317"
|
||||||
|
- "127.0.0.1:4318:4318"
|
||||||
|
# - "127.0.0.1:9411:9411"
|
||||||
|
# - "127.0.0.1:14250:14250"
|
||||||
|
# - "127.0.0.1:14268:14268"
|
||||||
|
# - "127.0.0.1:14269:14269"
|
||||||
|
restart: on-failure
|
||||||
|
depends_on:
|
||||||
|
- cassandra-schema
|
||||||
|
|
||||||
|
cassandra:
|
||||||
|
image: cassandra:latest
|
||||||
|
|
||||||
|
cassandra-schema:
|
||||||
|
image: jaegertracing/jaeger-cassandra-schema:latest
|
||||||
|
depends_on:
|
||||||
|
- cassandra
|
||||||
|
|
||||||
|
jaeger-query:
|
||||||
|
image: jaegertracing/jaeger-query:latest
|
||||||
|
command:
|
||||||
|
- "--cassandra.keyspace=jaeger_v1_dc1"
|
||||||
|
- "--cassandra.servers=cassandra"
|
||||||
|
ports:
|
||||||
|
- "127.0.0.1:16686:16686"
|
||||||
|
# - "127.0.0.1:16687:16687"
|
||||||
|
restart: on-failure
|
||||||
|
depends_on:
|
||||||
|
- cassandra-schema
|
||||||
|
45
backend/main.py
Normal file → Executable file
45
backend/main.py
Normal file → Executable file
@@ -1,12 +1,26 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import logging
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from os import environ, EX_USAGE
|
from os import environ, EX_USAGE
|
||||||
from typing import Sequence
|
from typing import Sequence
|
||||||
|
|
||||||
|
import uvicorn
|
||||||
from fastapi import FastAPI, HTTPException
|
from fastapi import FastAPI, HTTPException
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
from fastapi.staticfiles import StaticFiles
|
from fastapi.staticfiles import StaticFiles
|
||||||
|
from opentelemetry import trace
|
||||||
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||||
|
from opentelemetry.sdk.trace import TracerProvider
|
||||||
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||||
|
|
||||||
|
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
||||||
|
from opentelemetry.instrumentation.logging import LoggingInstrumentor
|
||||||
|
from opentelemetry.sdk.resources import Resource as OtResource
|
||||||
|
from opentelemetry.sdk.trace import TracerProvider as OtTracerProvider
|
||||||
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||||
from rich import print
|
from rich import print
|
||||||
|
from starlette.types import ASGIApp
|
||||||
|
|
||||||
from backend.db import db
|
from backend.db import db
|
||||||
from backend.idfm_interface import Destinations as IdfmDestinations, IdfmInterface
|
from backend.idfm_interface import Destinations as IdfmDestinations, IdfmInterface
|
||||||
@@ -26,8 +40,21 @@ if API_KEY is None:
|
|||||||
print('No "API_KEY" environment variable set... abort.')
|
print('No "API_KEY" environment variable set... abort.')
|
||||||
exit(EX_USAGE)
|
exit(EX_USAGE)
|
||||||
|
|
||||||
# TODO: Remove postgresql+asyncpg from environ variable
|
APP_NAME = environ.get("APP_NAME", "app")
|
||||||
DB_PATH = "postgresql+asyncpg://cer_user:cer_password@127.0.0.1:5438/cer_db"
|
MODE = environ.get("MODE", "grpc")
|
||||||
|
COLLECTOR_ENDPOINT_GRPC_ENDPOINT = environ.get(
|
||||||
|
"COLLECTOR_ENDPOINT_GRPC_ENDPOINT", "127.0.0.1:14250" # "jaeger-collector:14250"
|
||||||
|
)
|
||||||
|
|
||||||
|
# CREATE DATABASE "carrramba-encore-rate";
|
||||||
|
# CREATE USER cer WITH ENCRYPTED PASSWORD 'cer_password';
|
||||||
|
# GRANT ALL PRIVILEGES ON DATABASE "carrramba-encore-rate" TO cer;
|
||||||
|
# \c "carrramba-encore-rate";
|
||||||
|
# GRANT ALL ON schema public TO cer;
|
||||||
|
# CREATE EXTENSION IF NOT EXISTS pg_trgm;
|
||||||
|
|
||||||
|
# TODO: Remove postgresql+psycopg from environ variable
|
||||||
|
DB_PATH = "postgresql+psycopg://cer:cer_password@127.0.0.1:5432/carrramba-encore-rate"
|
||||||
|
|
||||||
|
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
@@ -42,6 +69,14 @@ app.add_middleware(
|
|||||||
allow_headers=["*"],
|
allow_headers=["*"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
trace.set_tracer_provider(TracerProvider())
|
||||||
|
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
|
||||||
|
tracer = trace.get_tracer(APP_NAME)
|
||||||
|
|
||||||
|
with tracer.start_as_current_span("foo"):
|
||||||
|
print("Hello world!")
|
||||||
|
|
||||||
|
|
||||||
idfm_interface = IdfmInterface(API_KEY, db)
|
idfm_interface = IdfmInterface(API_KEY, db)
|
||||||
|
|
||||||
|
|
||||||
@@ -247,3 +282,9 @@ async def get_stop_shape(stop_id: int) -> StopShapeSchema | None:
|
|||||||
|
|
||||||
msg = f"No shape found for stop {stop_id}"
|
msg = f"No shape found for stop {stop_id}"
|
||||||
raise HTTPException(status_code=404, detail=msg)
|
raise HTTPException(status_code=404, detail=msg)
|
||||||
|
|
||||||
|
|
||||||
|
FastAPIInstrumentor.instrument_app(app)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
uvicorn.run(app, host="0.0.0.0", port=4443, ssl_certfile="./config/cert.pem")
|
||||||
|
@@ -11,13 +11,20 @@ python = "^3.11"
|
|||||||
aiohttp = "^3.8.3"
|
aiohttp = "^3.8.3"
|
||||||
rich = "^12.6.0"
|
rich = "^12.6.0"
|
||||||
aiofiles = "^22.1.0"
|
aiofiles = "^22.1.0"
|
||||||
sqlalchemy = {extras = ["asyncio"], version = "^2.0.1"}
|
|
||||||
fastapi = "^0.88.0"
|
fastapi = "^0.88.0"
|
||||||
uvicorn = "^0.20.0"
|
uvicorn = "^0.20.0"
|
||||||
asyncpg = "^0.27.0"
|
|
||||||
msgspec = "^0.12.0"
|
msgspec = "^0.12.0"
|
||||||
pyshp = "^2.3.1"
|
pyshp = "^2.3.1"
|
||||||
pyproj = "^3.5.0"
|
pyproj = "^3.5.0"
|
||||||
|
opentelemetry-instrumentation-fastapi = "^0.38b0"
|
||||||
|
sqlalchemy-utils = "^0.41.1"
|
||||||
|
opentelemetry-instrumentation-logging = "^0.38b0"
|
||||||
|
opentelemetry-sdk = "^1.17.0"
|
||||||
|
opentelemetry-api = "^1.17.0"
|
||||||
|
opentelemetry-exporter-otlp-proto-http = "^1.17.0"
|
||||||
|
opentelemetry-instrumentation-sqlalchemy = "^0.38b0"
|
||||||
|
sqlalchemy = "^2.0.12"
|
||||||
|
psycopg = "^3.1.9"
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
requires = ["poetry-core"]
|
requires = ["poetry-core"]
|
||||||
@@ -39,7 +46,6 @@ autopep8 = "^2.0.1"
|
|||||||
pyflakes = "^3.0.1"
|
pyflakes = "^3.0.1"
|
||||||
yapf = "^0.32.0"
|
yapf = "^0.32.0"
|
||||||
whatthepatch = "^1.0.4"
|
whatthepatch = "^1.0.4"
|
||||||
sqlalchemy = {extras = ["mypy"], version = "^2.0.1"}
|
|
||||||
mypy = "^1.0.0"
|
mypy = "^1.0.0"
|
||||||
|
|
||||||
[tool.mypy]
|
[tool.mypy]
|
||||||
|
Reference in New Issue
Block a user