Files
beau-gosse-du-92/src/infrastructure/messaging/matrix/client.rs
Adrien a8a7b16e9f
All checks were successful
ci/woodpecker/pr/validate Pipeline was successful
👷 Add cargo sort-derives tool
2025-04-27 22:10:27 +02:00

842 lines
27 KiB
Rust

use std::{
borrow::Borrow,
collections::HashMap,
sync::{Arc, Mutex},
};
use async_std::stream::StreamExt;
use dioxus::prelude::Task;
use matrix_sdk::{
config::SyncSettings,
event_handler::Ctx,
media::{MediaFormat, MediaRequest, MediaThumbnailSettings, MediaThumbnailSize},
room::{ParentSpace, Room},
ruma::{
api::client::media::get_content_thumbnail::v3::Method,
events::{
room::{
avatar::{RoomAvatarEventContent, StrippedRoomAvatarEvent},
create::{RoomCreateEventContent, StrippedRoomCreateEvent},
member::{MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent},
name::{RoomNameEventContent, StrippedRoomNameEvent},
topic::{RoomTopicEventContent, StrippedRoomTopicEvent},
MediaSource,
},
SyncStateEvent,
},
uint, OwnedMxcUri, OwnedRoomId, OwnedUserId, RoomId, UserId,
},
Client as MatrixClient, RoomState,
};
use tokio::sync::{
broadcast,
broadcast::{error::SendError, Receiver, Sender},
mpsc::{unbounded_channel, UnboundedReceiver},
};
use tracing::{debug, debug_span, error, instrument, warn, Instrument, Span};
use super::{
account_event::AccountEvent,
requester::Requester,
room_event::{RoomEvent, RoomEventsReceiver},
worker_tasks::{LoginStyle, WorkerTask},
};
use crate::utils::oneshot;
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
#[error("Matrix client error: {0}")]
Matrix(#[from] matrix_sdk::Error),
}
#[derive(Clone)]
struct Senders {
account_events_sender: Sender<AccountEvent>,
room_events_senders: Arc<Mutex<HashMap<OwnedRoomId, Sender<RoomEvent>>>>,
}
impl Senders {
fn new(account_events_sender: Sender<AccountEvent>) -> Self {
Self {
account_events_sender,
room_events_senders: Arc::new(Mutex::new(HashMap::new())),
}
}
fn contains(&self, room_id: &RoomId) -> bool {
let room_senders = self.room_events_senders.lock().unwrap();
room_senders.contains_key(room_id)
}
fn send(&self, room_id: &RoomId, event: RoomEvent) -> Result<(), SendError<RoomEvent>> {
let room_senders = self.room_events_senders.lock().unwrap();
if let Some(room_sender) = room_senders.get(room_id) {
if let Err(err) = room_sender.send(event) {
warn!("Unable to send event to the {room_id} room: {err}");
return Err(err);
}
} else {
warn!("No sender found for {room_id} room");
// TODO: Return error
}
Ok(())
}
fn add_room(&self, room_id: &OwnedRoomId) -> Option<RoomEventsReceiver> {
let mut senders = self.room_events_senders.lock().unwrap();
if !senders.contains_key(room_id) {
let (room_sender, room_receiver) = broadcast::channel(32);
senders.insert(room_id.clone(), room_sender);
debug!("Create sender for {room_id} room");
Some(RoomEventsReceiver::new(room_receiver))
} else {
None
}
}
}
pub struct Client {
initialized: bool,
client: Option<Arc<MatrixClient>>,
sync_task: Option<Task>,
senders: Senders,
}
impl Client {
pub fn new(client: Arc<MatrixClient>, account_events_sender: Sender<AccountEvent>) -> Self {
Self {
initialized: false,
client: Some(client),
sync_task: None,
senders: Senders::new(account_events_sender),
}
}
#[instrument(skip_all)]
async fn create_space(
senders: &Ctx<Senders>,
room_id: &OwnedRoomId,
room: Option<&Room>,
) -> anyhow::Result<(), SendError<AccountEvent>> {
if let Some(receiver) = senders.add_room(room_id) {
let current_span = Span::current();
let mut name = None;
let mut topic = None;
if let Some(room) = room {
name = room.name();
topic = room.topic();
}
let (reply, mut response) = oneshot::<bool>();
// We can't use Room instance here, because dyn PaginableRoom is not Sync
let event = AccountEvent::NewSpace(
room_id.clone(),
name.clone(),
topic.clone(),
receiver,
reply,
current_span.clone(),
);
senders.account_events_sender.send(event)?;
// We're expecting a response indicating that the client is able to compute the next RoomEvent
response.recv().await;
let events = vec![
RoomEvent::NewTopic(topic, current_span.clone()),
RoomEvent::NewName(name, current_span),
];
for event in events {
if let Err(_err) = senders.send(room_id, event.clone()) {
// TODO: Return an error
}
}
}
Ok(())
}
#[instrument(skip_all)]
async fn create_room(
senders: &Ctx<Senders>,
room: &Room,
) -> anyhow::Result<(), SendError<AccountEvent>> {
let room_id = room.room_id().to_owned();
if let Some(receiver) = senders.add_room(&room_id) {
let (reply, mut response) = oneshot::<bool>();
let is_direct = match room.is_direct().await {
Ok(is_direct) => Some(is_direct),
Err(err) => {
warn!("Unable to know if the {room_id} room is direct: {err}");
None
}
};
let mut parents = vec![];
if let Ok(mut spaces) = room.parent_spaces().await {
while let Some(parent) = spaces.next().await {
match parent {
Ok(parent) => match parent {
ParentSpace::Reciprocal(parent) => {
parents.push(parent.room_id().to_owned());
}
_ => todo!(),
},
Err(err) => {
error!("{err}");
}
}
}
}
// We can't use Room instance here, because dyn PaginableRoom is not Sync
let event = AccountEvent::NewRoom(
room_id.clone(),
parents.clone(),
room.name(),
room.topic(),
is_direct,
room.state(),
receiver,
reply,
Span::current(),
);
senders.account_events_sender.send(event)?;
// We're expecting a response indicating that the client is able to compute the next RoomEvent
response.recv().await;
}
Ok(())
}
#[instrument(skip_all)]
async fn add_room(
senders: &Ctx<Senders>,
room: &Room,
) -> anyhow::Result<(), SendError<AccountEvent>> {
let room_id = room.room_id().to_owned();
if room.is_space() {
Self::create_space(senders, &room_id, Some(room)).await
} else {
let mut parents = vec![];
if let Ok(mut spaces) = room.parent_spaces().await {
while let Some(parent) = spaces.next().await {
match parent {
Ok(parent) => match parent {
ParentSpace::Reciprocal(parent) => {
parents.push(parent.room_id().to_owned());
}
_ => {
warn!(
"Only ParentSpace::Reciprocal taken into account, skip {:?}",
parent
);
}
},
Err(err) => {
error!("{err}");
}
}
}
}
for parent in parents {
// Create a minimal space to make the relation consistent... its content will be sync later.
if !senders.contains(&parent) {
let _ = Self::create_space(senders, &parent, None).await;
}
let event = RoomEvent::NewChild(room_id.clone(), Span::current());
if let Err(_err) = senders.send(&parent, event) {
// TODO: Return an error
}
}
Self::create_room(senders, room).await
}
}
async fn on_stripped_room_create_event(
_ev: StrippedRoomCreateEvent,
room: Room,
senders: Ctx<Senders>,
) {
let span = debug_span!("Matrix::NewRoom", r = ?room.room_id());
let _ = Self::add_room(&senders, &room).instrument(span).await;
}
// SyncStateEvent: A possibly-redacted state event without a room_id.
async fn on_sync_room_create_event(
_ev: SyncStateEvent<RoomCreateEventContent>,
room: Room,
senders: Ctx<Senders>,
) {
let span = debug_span!("Matrix::NewRoom", r = ?room.room_id());
let _ = Self::add_room(&senders, &room).instrument(span).await;
}
#[instrument(skip_all)]
fn on_invite_room_member_event(
user_id: OwnedUserId,
inviter_id: OwnedUserId,
room: &Room,
matrix_client: &MatrixClient,
senders: &Ctx<Senders>,
) {
if let Some(client_user_id) = matrix_client.user_id() {
let room_id = room.room_id();
let is_account_user = user_id == client_user_id;
debug!(
"{} (account user: {is_account_user}) invited by {} to join the {} room",
&user_id, &inviter_id, &room_id
);
let event =
RoomEvent::Invitation(user_id, inviter_id, is_account_user, Span::current());
if let Err(_err) = senders.send(room_id, event) {
// TODO: Return an error
}
}
}
#[instrument(skip_all)]
fn on_join_room_member_event(
user_id: OwnedUserId,
displayname: Option<String>,
avatar_url: Option<OwnedMxcUri>,
room: &Room,
matrix_client: &MatrixClient,
senders: &Ctx<Senders>,
) {
if let Some(client_user_id) = matrix_client.user_id() {
let is_account_user = user_id == client_user_id;
let room_id = room.room_id();
debug!("{} has joined the {} room", &user_id, &room_id);
let event = RoomEvent::Join(
user_id,
displayname,
avatar_url,
is_account_user,
Span::current(),
);
if let Err(_err) = senders.send(room_id, event) {
// TODO: Return an error
}
}
}
// This function is called on each m.room.member event for an invited room preview (room not already joined).
async fn on_stripped_room_member_event(
ev: StrippedRoomMemberEvent,
matrix_client: MatrixClient,
room: Room,
senders: Ctx<Senders>,
) {
match room.state() {
RoomState::Invited => {
let user_id = &ev.state_key;
match ev.content.membership {
MembershipState::Invite => {
let span = debug_span!("Matrix::RoomInvitation", r = ?room.room_id());
span.in_scope(|| {
Self::on_invite_room_member_event(
user_id.clone(),
ev.sender,
&room,
&matrix_client,
&senders,
)
});
}
MembershipState::Join => {
let span =
debug_span!("Matrix::RoomJoin", r = ?room.room_id(), u = ?user_id)
.entered();
span.in_scope(|| {
Self::on_join_room_member_event(
ev.sender,
ev.content.displayname,
ev.content.avatar_url,
&room,
&matrix_client,
&senders,
)
});
}
_ => {
error!("TODO: {:?}", ev);
}
}
}
_ => {
error!("TODO: {:?}", ev);
}
}
}
// SyncStateEvent: A possibly-redacted state event without a room_id.
// RoomMemberEventContent: The content of an m.room.member event.
async fn on_sync_room_member_event(
ev: SyncStateEvent<RoomMemberEventContent>,
matrix_client: MatrixClient,
room: Room,
senders: Ctx<Senders>,
) {
if let SyncStateEvent::Original(ev) = ev {
match ev.content.membership {
MembershipState::Invite => {
let span = debug_span!("Matrix::RoomInvitation", r = ?room.room_id());
span.in_scope(|| {
let invitee_id = ev.state_key;
Self::on_invite_room_member_event(
invitee_id,
ev.sender,
&room,
&matrix_client,
&senders,
)
});
}
MembershipState::Join => {
let user_id = ev.sender;
let span = debug_span!("Matrix::RoomJoin", r = ?room.room_id(), u = ?user_id)
.entered();
span.in_scope(|| {
Self::on_join_room_member_event(
user_id,
ev.content.displayname,
ev.content.avatar_url,
&room,
&matrix_client,
&senders,
)
});
}
_ => error!("TODO"),
}
}
}
#[instrument(skip_all)]
async fn on_room_avatar_event(room: &Room, senders: &Ctx<Senders>) {
let room_id = room.room_id();
let avatar = match room
.avatar(MediaFormat::Thumbnail(MediaThumbnailSettings {
size: MediaThumbnailSize {
method: Method::Scale,
width: uint!(256),
height: uint!(256),
},
animated: false,
}))
.await
{
Ok(avatar) => avatar,
Err(err) => {
warn!("Unable to fetch avatar for {}: {err}", &room_id);
None
}
};
let event = RoomEvent::NewAvatar(avatar, Span::current());
if let Err(_err) = senders.send(room_id, event) {
// TODO: Return an error
}
}
async fn on_stripped_room_avatar_event(
_ev: StrippedRoomAvatarEvent,
room: Room,
senders: Ctx<Senders>,
) {
let span = debug_span!("Matrix::RoomAvatar", r = ?room.room_id());
Self::on_room_avatar_event(&room, &senders)
.instrument(span)
.await;
}
async fn on_sync_room_avatar_event(
ev: SyncStateEvent<RoomAvatarEventContent>,
room: Room,
senders: Ctx<Senders>,
) {
if let SyncStateEvent::Original(_ev) = ev {
dioxus::prelude::spawn(async move {
let span = debug_span!("Matrix::RoomAvatar", r = ?room.room_id());
Self::on_room_avatar_event(&room, &senders)
.instrument(span)
.await;
});
}
}
#[instrument(skip_all)]
fn on_room_name_event(name: Option<String>, room: &Room, senders: &Ctx<Senders>) {
let event = RoomEvent::NewName(name, Span::current());
if let Err(_err) = senders.send(room.room_id(), event) {
// TODO: Return an error
}
}
async fn on_stripped_room_name_event(
ev: StrippedRoomNameEvent,
room: Room,
senders: Ctx<Senders>,
) {
let span = debug_span!("Matrix::RoomName", r = ?room.room_id());
span.in_scope(|| {
Self::on_room_name_event(ev.content.name, &room, &senders);
});
}
async fn on_sync_room_name_event(
ev: SyncStateEvent<RoomNameEventContent>,
room: Room,
senders: Ctx<Senders>,
) {
if let SyncStateEvent::Original(ev) = ev {
let span = debug_span!("Matrix::RoomName", r = ?room.room_id());
span.in_scope(|| {
Self::on_room_name_event(Some(ev.content.name), &room, &senders);
});
}
}
#[instrument(skip_all)]
fn on_room_topic_event(topic: Option<String>, room: &Room, senders: &Ctx<Senders>) {
let event = RoomEvent::NewTopic(topic, Span::current());
if let Err(_err) = senders.send(room.room_id(), event) {
// TODO: Return an error
}
}
async fn on_stripped_room_topic_event(
ev: StrippedRoomTopicEvent,
room: Room,
senders: Ctx<Senders>,
) {
let span = debug_span!("Matrix::RoomTopic", r = ?room.room_id());
span.in_scope(|| {
Self::on_room_topic_event(ev.content.topic, &room, &senders);
});
}
async fn on_sync_room_topic_event(
ev: SyncStateEvent<RoomTopicEventContent>,
room: Room,
senders: Ctx<Senders>,
) {
if let SyncStateEvent::Original(ev) = ev {
let span = debug_span!("Matrix::RoomTopic", r = ?room.room_id());
span.in_scope(|| {
Self::on_room_topic_event(Some(ev.content.topic), &room, &senders);
});
}
}
pub async fn spawn(homeserver_url: String) -> (Requester, Receiver<AccountEvent>) {
let matrix_client = Arc::new(
MatrixClient::builder()
.homeserver_url(&homeserver_url)
.build()
.await
.unwrap(),
);
let (worker_tasks_sender, worker_tasks_receiver) = unbounded_channel::<WorkerTask>();
let (account_events_sender, account_events_receiver) =
broadcast::channel::<AccountEvent>(32);
let mut client = Client::new(matrix_client, account_events_sender);
dioxus::prelude::spawn(async move {
client.work(worker_tasks_receiver).await;
});
(Requester::new(worker_tasks_sender), account_events_receiver)
}
fn init(&mut self) {
if let Some(client) = self.client.borrow() {
// TODO: Remove clone?
client.add_event_handler_context(self.senders.clone());
let _ = client.add_event_handler(Client::on_stripped_room_create_event);
let _ = client.add_event_handler(Client::on_sync_room_create_event);
let _ = client.add_event_handler(Client::on_stripped_room_member_event);
let _ = client.add_event_handler(Client::on_sync_room_member_event);
let _ = client.add_event_handler(Client::on_stripped_room_avatar_event);
let _ = client.add_event_handler(Client::on_sync_room_avatar_event);
let _ = client.add_event_handler(Client::on_stripped_room_name_event);
let _ = client.add_event_handler(Client::on_sync_room_name_event);
let _ = client.add_event_handler(Client::on_stripped_room_topic_event);
let _ = client.add_event_handler(Client::on_sync_room_topic_event);
self.initialized = true;
}
}
async fn login(&mut self, style: LoginStyle) -> anyhow::Result<()> {
let client = self.client.as_ref().unwrap();
match style {
LoginStyle::Password(username, password) => {
client
.matrix_auth()
.login_username(&username, &password)
.initial_device_display_name("TODO")
.send()
.await
.map_err(ClientError::from)?;
}
}
Ok(())
}
async fn run_forever(&mut self) {
let client = self.client.clone().unwrap();
let task = dioxus::prelude::spawn(async move {
// Sync once so we receive the client state and old messages
let sync_token_option = match client.sync_once(SyncSettings::default()).await {
Ok(sync_response) => Some(sync_response.next_batch),
Err(err) => {
error!("Error during sync one: {}", err);
None
}
};
if let Some(sync_token) = sync_token_option {
debug!("User connected to the homeserver, start syncing");
let settings = SyncSettings::default().token(sync_token);
let _ = client.sync(settings).await;
}
});
self.sync_task = Some(task);
}
async fn get_display_name(&mut self) -> anyhow::Result<Option<String>> {
let client = self.client.as_ref().unwrap();
match client.account().get_display_name().await {
Ok(display_name) => Ok(display_name),
Err(err) => Err(err.into()),
}
}
async fn get_avatar(&mut self) -> anyhow::Result<Option<Vec<u8>>> {
let client = self.client.as_ref().unwrap();
match client
.account()
.get_avatar(MediaFormat::Thumbnail(MediaThumbnailSettings {
size: MediaThumbnailSize {
method: Method::Scale,
width: uint!(256),
height: uint!(256),
},
animated: false,
}))
.await
{
Ok(avatar) => Ok(avatar),
Err(err) => Err(err.into()),
}
}
async fn get_room_avatar(&mut self, room_id: &OwnedRoomId) -> anyhow::Result<Option<Vec<u8>>> {
let client = self.client.as_ref().unwrap();
if let Some(room) = client.get_room(room_id) {
match room
.avatar(MediaFormat::Thumbnail(MediaThumbnailSettings {
size: MediaThumbnailSize {
method: Method::Scale,
width: uint!(256),
height: uint!(256),
},
animated: false,
}))
.await
{
Ok(avatar) => Ok(avatar),
Err(err) => Err(err.into()),
}
} else {
warn!("No room found with the \"{}\" id", room_id.as_str());
// TODO: Return an error if the room has not been found
Ok(None)
}
}
// TODO: Share MediaRequest with other media requests
async fn get_thumbnail(&self, media_url: OwnedMxcUri) -> anyhow::Result<Vec<u8>> {
let client = self.client.as_ref().unwrap();
let media = client.media();
let request = MediaRequest {
source: MediaSource::Plain(media_url),
format: MediaFormat::Thumbnail(MediaThumbnailSettings {
size: MediaThumbnailSize {
method: Method::Scale,
width: uint!(256),
height: uint!(256),
},
animated: false,
}),
};
let res = media.get_media_content(&request, true).await;
Ok(res?)
}
async fn get_room_member_avatar(
&self,
avatar_url: &Option<OwnedMxcUri>,
room_id: &RoomId,
user_id: &UserId,
) -> anyhow::Result<Option<Vec<u8>>> {
let client = self.client.as_ref().unwrap();
if let Some(room) = client.get_room(room_id) {
match avatar_url {
Some(avatar_url) => {
let thumbnail = self.get_thumbnail(avatar_url.clone()).await;
return Ok(Some(thumbnail?));
}
None => match room.get_member(user_id).await {
Ok(room_member) => {
if let Some(room_member) = room_member {
let res = match room_member
.avatar(MediaFormat::Thumbnail(MediaThumbnailSettings {
size: MediaThumbnailSize {
method: Method::Scale,
width: uint!(256),
height: uint!(256),
},
animated: false,
}))
.await
{
Ok(avatar) => Ok(avatar),
Err(err) => Err(err.into()),
};
return res;
}
}
Err(err) => {
warn!("Unable to get room member {user_id}: {err}");
}
},
}
}
Ok(None)
}
async fn join_room(&self, room_id: &RoomId) -> anyhow::Result<bool> {
let client = self.client.as_ref().unwrap();
if let Some(room) = client.get_room(room_id) {
return match room.join().await {
Ok(_) => Ok(true),
Err(err) => Err(err.into()),
};
}
Ok(false)
}
async fn work(&mut self, mut rx: UnboundedReceiver<WorkerTask>) {
while let Some(task) = rx.recv().await {
self.run(task).await;
}
if let Some(task) = self.sync_task.take() {
task.cancel()
}
}
async fn run(&mut self, task: WorkerTask) {
match task {
WorkerTask::Init(reply) => {
self.init();
reply.send(Ok(())).await;
}
WorkerTask::RunForever(reply) => {
{
self.run_forever().await;
reply.send(())
}
.await
}
WorkerTask::Login(style, reply) => {
reply.send(self.login(style).await).await;
}
WorkerTask::GetDisplayName(reply) => {
reply.send(self.get_display_name().await).await;
}
WorkerTask::GetAvatar(reply) => {
reply.send(self.get_avatar().await).await;
}
WorkerTask::GetRoomAvatar(id, reply) => {
reply.send(self.get_room_avatar(&id).await).await;
}
WorkerTask::GetRoomMemberAvatar(avatar_url, room_id, user_id, reply) => {
reply
.send(
self.get_room_member_avatar(&avatar_url, &room_id, &user_id)
.await,
)
.await;
}
WorkerTask::JoinRoom(id, reply) => {
reply.send(self.join_room(&id).await).await;
}
}
}
}