use std::{collections::HashMap, rc::Rc}; use async_trait::async_trait; use matrix_sdk::ruma::{OwnedMxcUri, OwnedRoomId, OwnedUserId}; use tokio::{ select, sync::{broadcast::Receiver, mpsc::UnboundedSender}, }; use tokio_stream::{wrappers::BroadcastStream, StreamExt, StreamMap}; use tracing::{error, instrument, Instrument}; use super::{ account_event::AccountEvent, room_event::RoomEvent, worker_tasks::{LoginStyle, WorkerTask}, }; use crate::{ domain::model::{ common::{Avatar, UserId}, messaging_interface::{ AccountMessagingConsumerInterface, AccountMessagingProviderInterface, MemberMessagingProviderInterface, RoomMessagingConsumerInterface, RoomMessagingProviderInterface, SpaceMessagingConsumerInterface, SpaceMessagingProviderInterface, }, room::{Invitation, Room, RoomId}, room_member::{AvatarUrl, RoomMember}, space::Space, }, utils::oneshot, }; pub struct Requester { worker_tasks_sender: UnboundedSender, } impl Clone for Requester { fn clone(&self) -> Self { Self { worker_tasks_sender: self.worker_tasks_sender.clone(), } } } impl Requester { pub fn new(worker_tasks_sender: UnboundedSender) -> Self { Self { worker_tasks_sender, } } } // TODO: Is there a way to avoid this duplication? macro_rules! request_to_worker { ($self:ident, $task:expr) => { { let (reply, mut response) = oneshot(); let task = $task(reply); if let Err(err) = $self.worker_tasks_sender.send(task) { let msg = format!("Unable to request to the Matrix client: {err}"); return Err(anyhow::Error::msg(msg)); } match response.recv().await { Some(result) => result, None => Err(anyhow::Error::msg("TBD")), } } }; ($self:ident, $task:expr $(, $arg:expr)+) => { { let (reply, mut response) = oneshot(); let task = $task($($arg),*, reply); if let Err(err) = $self.worker_tasks_sender.send(task) { let msg = format!("Unable to request to the Matrix client: {err}"); return Err(anyhow::Error::msg(msg)); } match response.recv().await { Some(result) => result, None => Err(anyhow::Error::msg("TBD")), } } }; } impl Requester { pub async fn init(&self) -> anyhow::Result<()> { request_to_worker!(self, WorkerTask::Init) } pub async fn login(&self, style: LoginStyle) -> anyhow::Result<()> { request_to_worker!(self, WorkerTask::Login, style) } #[instrument(skip_all)] async fn on_room_invitation( consumer: &Rc, user_id: OwnedUserId, sender_id: OwnedUserId, is_account_user: bool, ) { let invitation = Invitation::new(user_id, sender_id, is_account_user); consumer.on_invitation(invitation).await; } #[instrument(skip_all)] async fn on_room_join( consumer: &Rc, room_id: OwnedRoomId, user_id: OwnedUserId, user_name: Option, avatar_url: Option, is_account_user: bool, messaging_provider: Rc, ) { let member = RoomMember::new( UserId::from(user_id), user_name, avatar_url, room_id, is_account_user, messaging_provider, ); consumer.on_membership(member).await; } #[instrument(skip_all)] async fn on_room_new_topic( consumer: &Rc, topic: Option, ) { consumer.on_new_topic(topic).await; } #[instrument(skip_all)] async fn on_room_new_name( consumer: &Rc, name: Option, ) { consumer.on_new_name(name).await; } #[instrument(skip_all)] async fn on_room_new_avatar( consumer: &Rc, avatar: Option, ) { consumer.on_new_avatar(avatar).await; } #[instrument(skip_all)] async fn on_space_new_child( consumer: &Rc, child_id: RoomId, ) { // TODO: Make name consistent consumer.on_child(child_id).await; } #[instrument(skip_all)] async fn on_space_new_topic( consumer: &Rc, topic: Option, ) { consumer.on_new_topic(topic).await; } #[instrument(skip_all)] async fn on_space_new_name( consumer: &Rc, name: Option, ) { consumer.on_new_name(name).await; } // #[instrument(name="SpaceAvatar", skip_all, fields(s = %space_id, a = avatar.is_some()))] // async fn on_space_new_avatar( // consumer: &Rc, // space_id: OwnedRoomId, // avatar: Option, // ) { // consumer.on_new_avatar(avatar).await; // } } #[async_trait(?Send)] impl AccountMessagingProviderInterface for Requester { async fn get_display_name(&self) -> anyhow::Result> { request_to_worker!(self, WorkerTask::GetDisplayName) } async fn get_avatar(&self) -> anyhow::Result> { request_to_worker!(self, WorkerTask::GetAvatar) } async fn run_forever( &self, account_events_consumer: &dyn AccountMessagingConsumerInterface, mut account_events_receiver: Receiver, ) -> anyhow::Result<()> { // TODO: manage the result provided by response let (run_forever_tx, _run_forever_rx) = oneshot(); if let Err(err) = self .worker_tasks_sender .send(WorkerTask::RunForever(run_forever_tx)) { let msg = format!("Unable to request login to the Matrix client: {err}"); return Err(anyhow::Error::msg(msg)); } let mut rooms_events_streams = StreamMap::new(); let mut spaces_events_streams = StreamMap::new(); let mut room_events_consumers = HashMap::>::new(); let mut space_events_consumers = HashMap::>::new(); // TODO: Fix this... let client = Rc::new(self.clone()); loop { select! { res = account_events_receiver.recv() => { if let Ok(account_event) = res { match account_event { AccountEvent::NewRoom( id, spaces, name, topic, is_direct, state, receiver, new_room_tx, span ) => { let mut room = Room::new(id, spaces, name, topic, is_direct, Some(state)); let room_id = room.id().clone(); room.set_messaging_provider(client.clone()); let room = Rc::new(room); let stream = BroadcastStream::new(receiver.into()); rooms_events_streams.insert(room_id.clone(), stream); let room_events_consumer = account_events_consumer.on_new_room(room) .instrument(span) .await; room_events_consumers.insert(room_id, room_events_consumer); // We're now ready to recv and compute RoomEvent. new_room_tx.send(true).await; }, AccountEvent::NewSpace(id, name, topic, receiver, new_space_tx, span) => { let mut space = Space::new(id, name, topic); let space_id = space.id().clone(); space.set_messaging_provider(client.clone()); let space = Rc::new(space); let stream = BroadcastStream::new(receiver.into()); spaces_events_streams.insert(space_id.clone(), stream); let space_events_consumer = account_events_consumer.on_new_space(space) .instrument(span) .await; space_events_consumers.insert(space_id, space_events_consumer); // We're now ready to recv and compute SpaceEvent. new_space_tx.send(true).await; }, }; } }, Some((room_id, room_event)) = rooms_events_streams.next() => { if let Ok(room_event) = room_event { if let Some(consumer) = room_events_consumers.get(&room_id) { match room_event { RoomEvent::Invitation(user_id, sender_id, is_account_user, span) => { Self::on_room_invitation(consumer, user_id, sender_id, is_account_user) .instrument(span) .await; }, RoomEvent::Join(user_id, user_name, avatar_url, is_account_user, span) => { Self::on_room_join( consumer, room_id, user_id, user_name, avatar_url, is_account_user, client.clone()) .instrument(span) .await; }, RoomEvent::NewTopic(topic, span) => { Self::on_room_new_topic(consumer, topic) .instrument(span) .await; }, RoomEvent::NewName(name, span) => { Self::on_room_new_name(consumer, name) .instrument(span) .await; }, RoomEvent::NewAvatar(avatar, span) => { Self::on_room_new_avatar(consumer, avatar) .instrument(span) .await; } // RoomEvent::NewAvatar(avatar) => Self::on_room_new_avatar(consumer, avatar).await, _ => error!("TODO: {:?}", &room_event), } } else { error!("No consumer found for {} room", &room_id); } } }, Some((space_id, room_event)) = spaces_events_streams.next() => { if let Ok(room_event) = room_event { if let Some(consumer) = space_events_consumers.get(&space_id) { match room_event { RoomEvent::NewTopic(topic, span) => { Self::on_space_new_topic(consumer, topic) .instrument(span) .await; }, RoomEvent::NewName(name, span) => { Self::on_space_new_name(consumer, name) .instrument(span) .await; }, RoomEvent::NewChild(child_id, span) => { Self::on_space_new_child(consumer, child_id) .instrument(span) .await; }, _ => error!("TODO: {:?}", &room_event), } } else { error!("No consumer found for {} space", &space_id); } } } } } } } #[async_trait(?Send)] impl RoomMessagingProviderInterface for Requester { async fn get_avatar(&self, room_id: &RoomId) -> anyhow::Result> { request_to_worker!(self, WorkerTask::GetRoomAvatar, room_id.clone()) } // TODO: Fix return code async fn get_members(&self, room_id: &RoomId) -> anyhow::Result> { match request_to_worker!(self, WorkerTask::GetRoomMembers, room_id.clone()) { Ok(matrix_room_members) => { Ok(join_all(matrix_room_members.iter().map(|member| async { RoomMember::from_matrix(member, room_id, Rc::new(self.clone())).await })) .await) } Err(err) => Err(err), } } } #[async_trait(?Send)] impl SpaceMessagingProviderInterface for Requester {} #[async_trait(?Send)] impl MemberMessagingProviderInterface for Requester { async fn get_avatar( &self, room_id: &RoomId, user_id: &UserId, avatar_url: &Option, ) -> anyhow::Result> { request_to_worker!( self, WorkerTask::GetRoomMemberAvatar, room_id.clone(), user_id.clone(), avatar_url.clone() ) } }