use std::{ borrow::Borrow, collections::HashMap, sync::{Arc, Mutex}, }; use async_std::stream::StreamExt; use dioxus::prelude::Task; use matrix_sdk::{ config::SyncSettings, event_handler::Ctx, media::{MediaFormat, MediaRequest, MediaThumbnailSettings, MediaThumbnailSize}, room::{ParentSpace, Room}, ruma::{ api::client::media::get_content_thumbnail::v3::Method, events::{ room::{ avatar::{RoomAvatarEventContent, StrippedRoomAvatarEvent}, create::{RoomCreateEventContent, StrippedRoomCreateEvent}, member::{MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent}, name::{RoomNameEventContent, StrippedRoomNameEvent}, topic::{RoomTopicEventContent, StrippedRoomTopicEvent}, MediaSource, }, SyncStateEvent, }, uint, OwnedMxcUri, OwnedRoomId, OwnedUserId, RoomId, UserId, }, Client as MatrixClient, RoomState, }; use tokio::sync::{ broadcast, broadcast::{error::SendError, Receiver, Sender}, mpsc::{unbounded_channel, UnboundedReceiver}, }; use tracing::{debug, debug_span, error, instrument, warn, Instrument, Span}; use super::{ account_event::AccountEvent, requester::Requester, room_event::{RoomEvent, RoomEventsReceiver}, worker_tasks::{LoginStyle, WorkerTask}, }; use crate::utils::oneshot; #[derive(Debug, thiserror::Error)] pub enum ClientError { #[error("Matrix client error: {0}")] Matrix(#[from] matrix_sdk::Error), } #[derive(Clone)] struct Senders { account_events_sender: Sender, room_events_senders: Arc>>>, } impl Senders { fn new(account_events_sender: Sender) -> Self { Self { account_events_sender, room_events_senders: Arc::new(Mutex::new(HashMap::new())), } } fn contains(&self, room_id: &RoomId) -> bool { let room_senders = self.room_events_senders.lock().unwrap(); room_senders.contains_key(room_id) } fn send(&self, room_id: &RoomId, event: RoomEvent) -> Result<(), SendError> { let room_senders = self.room_events_senders.lock().unwrap(); if let Some(room_sender) = room_senders.get(room_id) { if let Err(err) = room_sender.send(event) { warn!("Unable to send event to the {room_id} room: {err}"); return Err(err); } } else { warn!("No sender found for {room_id} room"); // TODO: Return error } Ok(()) } fn add_room(&self, room_id: &OwnedRoomId) -> Option { let mut senders = self.room_events_senders.lock().unwrap(); if !senders.contains_key(room_id) { let (room_sender, room_receiver) = broadcast::channel(32); senders.insert(room_id.clone(), room_sender); debug!("Create sender for {room_id} room"); Some(RoomEventsReceiver::new(room_receiver)) } else { None } } } pub struct Client { initialized: bool, client: Option>, sync_task: Option, senders: Senders, } impl Client { pub fn new(client: Arc, account_events_sender: Sender) -> Self { Self { initialized: false, client: Some(client), sync_task: None, senders: Senders::new(account_events_sender), } } #[instrument(skip_all)] async fn create_space( senders: &Ctx, room_id: &OwnedRoomId, room: Option<&Room>, ) -> anyhow::Result<(), SendError> { if let Some(receiver) = senders.add_room(room_id) { let current_span = Span::current(); let mut name = None; let mut topic = None; if let Some(room) = room { name = room.name(); topic = room.topic(); } let (reply, mut response) = oneshot::(); // We can't use Room instance here, because dyn PaginableRoom is not Sync let event = AccountEvent::NewSpace( room_id.clone(), name.clone(), topic.clone(), receiver, reply, current_span.clone(), ); senders.account_events_sender.send(event)?; // We're expecting a response indicating that the client is able to compute the next RoomEvent response.recv().await; let events = vec![ RoomEvent::NewTopic(topic, current_span.clone()), RoomEvent::NewName(name, current_span), ]; for event in events { if let Err(_err) = senders.send(room_id, event.clone()) { // TODO: Return an error } } } Ok(()) } #[instrument(skip_all)] async fn create_room( senders: &Ctx, room: &Room, ) -> anyhow::Result<(), SendError> { let room_id = room.room_id().to_owned(); if let Some(receiver) = senders.add_room(&room_id) { let (reply, mut response) = oneshot::(); let is_direct = match room.is_direct().await { Ok(is_direct) => Some(is_direct), Err(err) => { warn!("Unable to know if the {room_id} room is direct: {err}"); None } }; let mut parents = vec![]; if let Ok(mut spaces) = room.parent_spaces().await { while let Some(parent) = spaces.next().await { match parent { Ok(parent) => match parent { ParentSpace::Reciprocal(parent) => { parents.push(parent.room_id().to_owned()); } _ => todo!(), }, Err(err) => { error!("{err}"); } } } } // We can't use Room instance here, because dyn PaginableRoom is not Sync let event = AccountEvent::NewRoom( room_id.clone(), parents.clone(), room.name(), room.topic(), is_direct, room.state(), receiver, reply, Span::current(), ); senders.account_events_sender.send(event)?; // We're expecting a response indicating that the client is able to compute the next RoomEvent response.recv().await; } Ok(()) } #[instrument(skip_all)] async fn add_room( senders: &Ctx, room: &Room, ) -> anyhow::Result<(), SendError> { let room_id = room.room_id().to_owned(); if room.is_space() { Self::create_space(senders, &room_id, Some(room)).await } else { let mut parents = vec![]; if let Ok(mut spaces) = room.parent_spaces().await { while let Some(parent) = spaces.next().await { match parent { Ok(parent) => match parent { ParentSpace::Reciprocal(parent) => { parents.push(parent.room_id().to_owned()); } _ => { warn!( "Only ParentSpace::Reciprocal taken into account, skip {:?}", parent ); } }, Err(err) => { error!("{err}"); } } } } for parent in parents { // Create a minimal space to make the relation consistent... its content will be sync later. if !senders.contains(&parent) { let _ = Self::create_space(senders, &parent, None).await; } let event = RoomEvent::NewChild(room_id.clone(), Span::current()); if let Err(_err) = senders.send(&parent, event) { // TODO: Return an error } } Self::create_room(senders, room).await } } async fn on_stripped_room_create_event( _ev: StrippedRoomCreateEvent, room: Room, senders: Ctx, ) { let span = debug_span!("Matrix::NewRoom", r = ?room.room_id()); let _ = Self::add_room(&senders, &room).instrument(span).await; } // SyncStateEvent: A possibly-redacted state event without a room_id. async fn on_sync_room_create_event( _ev: SyncStateEvent, room: Room, senders: Ctx, ) { let span = debug_span!("Matrix::NewRoom", r = ?room.room_id()); let _ = Self::add_room(&senders, &room).instrument(span).await; } #[instrument(skip_all)] fn on_invite_room_member_event( user_id: OwnedUserId, inviter_id: OwnedUserId, room: &Room, matrix_client: &MatrixClient, senders: &Ctx, ) { if let Some(client_user_id) = matrix_client.user_id() { let room_id = room.room_id(); let is_account_user = user_id == client_user_id; debug!( "{} (account user: {is_account_user}) invited by {} to join the {} room", &user_id, &inviter_id, &room_id ); let event = RoomEvent::Invitation(user_id, inviter_id, is_account_user, Span::current()); if let Err(_err) = senders.send(room_id, event) { // TODO: Return an error } } } #[instrument(skip_all)] fn on_join_room_member_event( user_id: OwnedUserId, displayname: Option, avatar_url: Option, room: &Room, matrix_client: &MatrixClient, senders: &Ctx, ) { if let Some(client_user_id) = matrix_client.user_id() { let is_account_user = user_id == client_user_id; let room_id = room.room_id(); debug!("{} has joined the {} room", &user_id, &room_id); let event = RoomEvent::Join( user_id, displayname, avatar_url, is_account_user, Span::current(), ); if let Err(_err) = senders.send(room_id, event) { // TODO: Return an error } } } // This function is called on each m.room.member event for an invited room preview (room not already joined). async fn on_stripped_room_member_event( ev: StrippedRoomMemberEvent, matrix_client: MatrixClient, room: Room, senders: Ctx, ) { match room.state() { RoomState::Invited => { let user_id = &ev.state_key; match ev.content.membership { MembershipState::Invite => { let span = debug_span!("Matrix::RoomInvitation", r = ?room.room_id()); span.in_scope(|| { Self::on_invite_room_member_event( user_id.clone(), ev.sender, &room, &matrix_client, &senders, ) }); } MembershipState::Join => { let span = debug_span!("Matrix::RoomJoin", r = ?room.room_id(), u = ?user_id) .entered(); span.in_scope(|| { Self::on_join_room_member_event( ev.sender, ev.content.displayname, ev.content.avatar_url, &room, &matrix_client, &senders, ) }); } _ => { error!("TODO: {:?}", ev); } } } _ => { error!("TODO: {:?}", ev); } } } // SyncStateEvent: A possibly-redacted state event without a room_id. // RoomMemberEventContent: The content of an m.room.member event. async fn on_sync_room_member_event( ev: SyncStateEvent, matrix_client: MatrixClient, room: Room, senders: Ctx, ) { if let SyncStateEvent::Original(ev) = ev { match ev.content.membership { MembershipState::Invite => { let span = debug_span!("Matrix::RoomInvitation", r = ?room.room_id()); span.in_scope(|| { let invitee_id = ev.state_key; Self::on_invite_room_member_event( invitee_id, ev.sender, &room, &matrix_client, &senders, ) }); } MembershipState::Join => { let user_id = ev.sender; let span = debug_span!("Matrix::RoomJoin", r = ?room.room_id(), u = ?user_id) .entered(); span.in_scope(|| { Self::on_join_room_member_event( user_id, ev.content.displayname, ev.content.avatar_url, &room, &matrix_client, &senders, ) }); } _ => error!("TODO"), } } } #[instrument(skip_all)] async fn on_room_avatar_event(room: &Room, senders: &Ctx) { let room_id = room.room_id(); let avatar = match room .avatar(MediaFormat::Thumbnail(MediaThumbnailSettings { size: MediaThumbnailSize { method: Method::Scale, width: uint!(256), height: uint!(256), }, animated: false, })) .await { Ok(avatar) => avatar, Err(err) => { warn!("Unable to fetch avatar for {}: {err}", &room_id); None } }; let event = RoomEvent::NewAvatar(avatar, Span::current()); if let Err(_err) = senders.send(room_id, event) { // TODO: Return an error } } async fn on_stripped_room_avatar_event( _ev: StrippedRoomAvatarEvent, room: Room, senders: Ctx, ) { let span = debug_span!("Matrix::RoomAvatar", r = ?room.room_id()); Self::on_room_avatar_event(&room, &senders) .instrument(span) .await; } async fn on_sync_room_avatar_event( ev: SyncStateEvent, room: Room, senders: Ctx, ) { if let SyncStateEvent::Original(_ev) = ev { dioxus::prelude::spawn(async move { let span = debug_span!("Matrix::RoomAvatar", r = ?room.room_id()); Self::on_room_avatar_event(&room, &senders) .instrument(span) .await; }); } } #[instrument(skip_all)] fn on_room_name_event(name: Option, room: &Room, senders: &Ctx) { let event = RoomEvent::NewName(name, Span::current()); if let Err(_err) = senders.send(room.room_id(), event) { // TODO: Return an error } } async fn on_stripped_room_name_event( ev: StrippedRoomNameEvent, room: Room, senders: Ctx, ) { let span = debug_span!("Matrix::RoomName", r = ?room.room_id()); span.in_scope(|| { Self::on_room_name_event(ev.content.name, &room, &senders); }); } async fn on_sync_room_name_event( ev: SyncStateEvent, room: Room, senders: Ctx, ) { if let SyncStateEvent::Original(ev) = ev { let span = debug_span!("Matrix::RoomName", r = ?room.room_id()); span.in_scope(|| { Self::on_room_name_event(Some(ev.content.name), &room, &senders); }); } } #[instrument(skip_all)] fn on_room_topic_event(topic: Option, room: &Room, senders: &Ctx) { let event = RoomEvent::NewTopic(topic, Span::current()); if let Err(_err) = senders.send(room.room_id(), event) { // TODO: Return an error } } async fn on_stripped_room_topic_event( ev: StrippedRoomTopicEvent, room: Room, senders: Ctx, ) { let span = debug_span!("Matrix::RoomTopic", r = ?room.room_id()); span.in_scope(|| { Self::on_room_topic_event(ev.content.topic, &room, &senders); }); } async fn on_sync_room_topic_event( ev: SyncStateEvent, room: Room, senders: Ctx, ) { if let SyncStateEvent::Original(ev) = ev { let span = debug_span!("Matrix::RoomTopic", r = ?room.room_id()); span.in_scope(|| { Self::on_room_topic_event(Some(ev.content.topic), &room, &senders); }); } } pub async fn spawn(homeserver_url: String) -> (Requester, Receiver) { let matrix_client = Arc::new( MatrixClient::builder() .homeserver_url(&homeserver_url) .build() .await .unwrap(), ); let (worker_tasks_sender, worker_tasks_receiver) = unbounded_channel::(); let (account_events_sender, account_events_receiver) = broadcast::channel::(32); let mut client = Client::new(matrix_client, account_events_sender); dioxus::prelude::spawn(async move { client.work(worker_tasks_receiver).await; }); (Requester::new(worker_tasks_sender), account_events_receiver) } fn init(&mut self) { if let Some(client) = self.client.borrow() { // TODO: Remove clone? client.add_event_handler_context(self.senders.clone()); let _ = client.add_event_handler(Client::on_stripped_room_create_event); let _ = client.add_event_handler(Client::on_sync_room_create_event); let _ = client.add_event_handler(Client::on_stripped_room_member_event); let _ = client.add_event_handler(Client::on_sync_room_member_event); let _ = client.add_event_handler(Client::on_stripped_room_avatar_event); let _ = client.add_event_handler(Client::on_sync_room_avatar_event); let _ = client.add_event_handler(Client::on_stripped_room_name_event); let _ = client.add_event_handler(Client::on_sync_room_name_event); let _ = client.add_event_handler(Client::on_stripped_room_topic_event); let _ = client.add_event_handler(Client::on_sync_room_topic_event); self.initialized = true; } } async fn login(&mut self, style: LoginStyle) -> anyhow::Result<()> { let client = self.client.as_ref().unwrap(); match style { LoginStyle::Password(username, password) => { client .matrix_auth() .login_username(&username, &password) .initial_device_display_name("TODO") .send() .await .map_err(ClientError::from)?; } } Ok(()) } async fn run_forever(&mut self) { let client = self.client.clone().unwrap(); let task = dioxus::prelude::spawn(async move { // Sync once so we receive the client state and old messages let sync_token_option = match client.sync_once(SyncSettings::default()).await { Ok(sync_response) => Some(sync_response.next_batch), Err(err) => { error!("Error during sync one: {}", err); None } }; if let Some(sync_token) = sync_token_option { debug!("User connected to the homeserver, start syncing"); let settings = SyncSettings::default().token(sync_token); let _ = client.sync(settings).await; } }); self.sync_task = Some(task); } async fn get_display_name(&mut self) -> anyhow::Result> { let client = self.client.as_ref().unwrap(); match client.account().get_display_name().await { Ok(display_name) => Ok(display_name), Err(err) => Err(err.into()), } } async fn get_avatar(&mut self) -> anyhow::Result>> { let client = self.client.as_ref().unwrap(); match client .account() .get_avatar(MediaFormat::Thumbnail(MediaThumbnailSettings { size: MediaThumbnailSize { method: Method::Scale, width: uint!(256), height: uint!(256), }, animated: false, })) .await { Ok(avatar) => Ok(avatar), Err(err) => Err(err.into()), } } async fn get_room_avatar(&mut self, room_id: &OwnedRoomId) -> anyhow::Result>> { let client = self.client.as_ref().unwrap(); if let Some(room) = client.get_room(room_id) { match room .avatar(MediaFormat::Thumbnail(MediaThumbnailSettings { size: MediaThumbnailSize { method: Method::Scale, width: uint!(256), height: uint!(256), }, animated: false, })) .await { Ok(avatar) => Ok(avatar), Err(err) => Err(err.into()), } } else { warn!("No room found with the \"{}\" id", room_id.as_str()); // TODO: Return an error if the room has not been found Ok(None) } } // TODO: Share MediaRequest with other media requests async fn get_thumbnail(&self, media_url: OwnedMxcUri) -> anyhow::Result> { let client = self.client.as_ref().unwrap(); let media = client.media(); let request = MediaRequest { source: MediaSource::Plain(media_url), format: MediaFormat::Thumbnail(MediaThumbnailSettings { size: MediaThumbnailSize { method: Method::Scale, width: uint!(256), height: uint!(256), }, animated: false, }), }; let res = media.get_media_content(&request, true).await; Ok(res?) } async fn get_room_member_avatar( &self, avatar_url: &Option, room_id: &RoomId, user_id: &UserId, ) -> anyhow::Result>> { let client = self.client.as_ref().unwrap(); if let Some(room) = client.get_room(room_id) { match avatar_url { Some(avatar_url) => { let thumbnail = self.get_thumbnail(avatar_url.clone()).await; return Ok(Some(thumbnail?)); } None => match room.get_member(user_id).await { Ok(room_member) => { if let Some(room_member) = room_member { let res = match room_member .avatar(MediaFormat::Thumbnail(MediaThumbnailSettings { size: MediaThumbnailSize { method: Method::Scale, width: uint!(256), height: uint!(256), }, animated: false, })) .await { Ok(avatar) => Ok(avatar), Err(err) => Err(err.into()), }; return res; } } Err(err) => { warn!("Unable to get room member {user_id}: {err}"); } }, } } Ok(None) } async fn join_room(&self, room_id: &RoomId) -> anyhow::Result { let client = self.client.as_ref().unwrap(); if let Some(room) = client.get_room(room_id) { return match room.join().await { Ok(_) => Ok(true), Err(err) => Err(err.into()), }; } Ok(false) } async fn work(&mut self, mut rx: UnboundedReceiver) { while let Some(task) = rx.recv().await { self.run(task).await; } if let Some(task) = self.sync_task.take() { task.cancel() } } async fn run(&mut self, task: WorkerTask) { match task { WorkerTask::Init(reply) => { self.init(); reply.send(Ok(())).await; } WorkerTask::RunForever(reply) => { { self.run_forever().await; reply.send(()) } .await } WorkerTask::Login(style, reply) => { reply.send(self.login(style).await).await; } WorkerTask::GetDisplayName(reply) => { reply.send(self.get_display_name().await).await; } WorkerTask::GetAvatar(reply) => { reply.send(self.get_avatar().await).await; } WorkerTask::GetRoomAvatar(id, reply) => { reply.send(self.get_room_avatar(&id).await).await; } WorkerTask::GetRoomMemberAvatar(avatar_url, room_id, user_id, reply) => { reply .send( self.get_room_member_avatar(&avatar_url, &room_id, &user_id) .await, ) .await; } WorkerTask::JoinRoom(id, reply) => { reply.send(self.join_room(&id).await).await; } } } }