10 Commits

Author SHA1 Message Date
bc6b02bc34 ♻️ Rework the Matrix messaging Client 2024-05-10 22:32:35 +02:00
0a936dd12b ♻️ Rework the Matrix messaging Requester 2024-05-10 22:20:32 +02:00
ef41c0bd48 Add events shared by Matrix client and Requester 2024-05-10 22:20:11 +02:00
e3a6ec9858 Add new messaging WorkerTask 2024-05-10 22:16:49 +02:00
692a71faef 🚨 Fix clippy warnings 2024-05-10 20:11:48 +02:00
c2918fbc78 🚧 Add RoomMember value object 2024-05-10 19:56:39 +02:00
bfa1539d23 🚧 Add Space identity 2024-05-10 19:29:42 +02:00
0190cf9165 ♻️ Rework the Room entity 2024-05-10 19:13:46 +02:00
4f9e5c538e 🚧 Add a first version of the mozaik builder service 2024-05-10 18:56:54 +02:00
79e8dea622 🚧 Add Account identity and messaging and store interfaces 2024-05-10 18:56:37 +02:00
22 changed files with 1499 additions and 333 deletions

View File

@@ -36,6 +36,9 @@ tracing = "0.1.40"
tracing-web = "0.1.3" tracing-web = "0.1.3"
tracing-subscriber = "0.3.18" tracing-subscriber = "0.3.18"
git-version = "0.3.9" git-version = "0.3.9"
async-trait = "0.1.80"
tokio-stream = "0.1.15"
image = "0.25.1"
[target.'cfg(target_family = "wasm")'.dependencies] [target.'cfg(target_family = "wasm")'.dependencies]
web-sys = "0.3.69" web-sys = "0.3.69"

127
src/domain/model/account.rs Normal file
View File

@@ -0,0 +1,127 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use async_trait::async_trait;
use tracing::error;
use super::{
common::PresenceState,
messaging_interface::{
AccountMessagingConsumerInterface, AccountMessagingProviderInterface,
RoomMessagingConsumerInterface, SpaceMessagingConsumerInterface,
},
room::{Room, RoomId},
space::{Space, SpaceId},
store_interface::AccountStoreProviderInterface,
};
type Rooms = HashMap<RoomId, Rc<Room>>;
type Spaces = HashMap<SpaceId, Rc<Space>>;
pub struct Account {
display_name: RefCell<Option<String>>,
avatar: RefCell<Option<Vec<u8>>>,
#[allow(dead_code)]
presence_state: RefCell<Option<PresenceState>>,
by_id_rooms: RefCell<Rooms>,
by_id_spaces: RefCell<Spaces>,
messaging_provider: Option<Rc<dyn AccountMessagingProviderInterface>>,
store: &'static dyn AccountStoreProviderInterface,
}
impl Account {
pub fn new(store: &'static dyn AccountStoreProviderInterface) -> Self {
Self {
display_name: RefCell::new(None),
avatar: RefCell::new(None),
presence_state: RefCell::new(None),
by_id_rooms: RefCell::new(Rooms::new()),
by_id_spaces: RefCell::new(Spaces::new()),
messaging_provider: None,
store,
}
}
pub fn set_messaging_provider(&mut self, provider: Rc<dyn AccountMessagingProviderInterface>) {
self.messaging_provider = Some(provider.clone());
}
pub fn get_room(&self, room_id: &RoomId) -> Option<Rc<Room>> {
self.by_id_rooms.borrow().get(room_id).cloned()
}
pub async fn get_display_name(&self) -> &RefCell<Option<String>> {
if self.display_name.borrow().is_none() {
if let Some(requester) = &self.messaging_provider {
let resp = requester.get_display_name().await;
if let Ok(display_name) = resp {
if let Some(display_name) = display_name {
self.display_name.borrow_mut().replace(display_name);
} else {
self.display_name.borrow_mut().take();
}
} else {
error!("err={:?}", resp);
}
}
}
&self.display_name
}
pub async fn get_avatar(&self) -> &RefCell<Option<Vec<u8>>> {
if self.avatar.borrow().is_none() {
if let Some(requester) = &self.messaging_provider {
let resp = requester.get_avatar().await;
if let Ok(avatar) = resp {
if let Some(avatar) = avatar {
self.avatar.borrow_mut().replace(avatar);
} else {
self.avatar.borrow_mut().take();
}
} else {
error!("err={:?}", resp);
}
}
}
&self.avatar
}
}
#[async_trait(?Send)]
impl AccountMessagingConsumerInterface for Account {
async fn on_new_room(&self, room: Room) -> Rc<dyn RoomMessagingConsumerInterface> {
let room_id = room.id().clone();
let room = Rc::new(room);
self.by_id_rooms
.borrow_mut()
.insert(room_id, Rc::clone(&room));
let room_store = Box::new(self.store.on_new_room(Rc::clone(&room)));
room.set_store(Some(room_store));
room
}
async fn on_new_space(&self, space: Space) -> Rc<dyn SpaceMessagingConsumerInterface> {
let space_id = space.id().clone();
let space = Rc::new(space);
self.by_id_spaces
.borrow_mut()
.insert(space_id, Rc::clone(&space));
let space_store = Box::new(self.store.on_new_space(Rc::clone(&space)));
space.set_store(Some(space_store));
space
}
}

View File

@@ -0,0 +1,7 @@
use matrix_sdk::ruma::{presence::PresenceState as MatrixPresenceState, OwnedUserId};
pub type Avatar = Vec<u8>;
pub type PresenceState = MatrixPresenceState;
pub type UserId = OwnedUserId;

View File

@@ -0,0 +1,67 @@
use std::rc::Rc;
use async_trait::async_trait;
use tokio::sync::broadcast::Receiver;
use super::{
common::{Avatar, UserId},
room::{Room, RoomId},
room_member::RoomMember,
space::Space,
};
use crate::infrastructure::messaging::matrix::account_event::AccountEvent;
#[async_trait(?Send)]
pub trait AccountMessagingConsumerInterface {
async fn on_new_room(&self, room: Room) -> Rc<dyn RoomMessagingConsumerInterface>;
async fn on_new_space(&self, space: Space) -> Rc<dyn SpaceMessagingConsumerInterface>;
}
#[async_trait(?Send)]
pub trait AccountMessagingProviderInterface {
async fn get_display_name(&self) -> anyhow::Result<Option<String>>;
async fn get_avatar(&self) -> anyhow::Result<Option<Vec<u8>>>;
async fn run_forever(
&self,
account_events_consumer: &dyn AccountMessagingConsumerInterface,
account_events_receiver: Receiver<AccountEvent>,
) -> anyhow::Result<()>;
}
#[async_trait(?Send)]
pub trait RoomMessagingConsumerInterface {
async fn on_invitation(&self) {}
async fn on_new_topic(&self, _topic: Option<String>) {}
async fn on_new_name(&self, _name: Option<String>) {}
#[allow(dead_code)]
async fn on_membership(&self, _member: RoomMember) {}
}
#[async_trait(?Send)]
pub trait RoomMessagingProviderInterface {
async fn get_avatar(&self, id: &RoomId) -> anyhow::Result<Option<Avatar>>;
async fn get_members(&self, id: &RoomId) -> anyhow::Result<Vec<RoomMember>>;
}
#[async_trait(?Send)]
pub trait SpaceMessagingConsumerInterface {
async fn on_child(&self, _room_id: RoomId) {}
async fn on_new_topic(&self, _topic: Option<String>) {}
async fn on_new_name(&self, _name: Option<String>) {}
}
#[async_trait(?Send)]
pub trait SpaceMessagingProviderInterface {}
// TODO: Rework
#[async_trait(?Send)]
pub trait MemberMessagingProviderInterface {
async fn get_avatar(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> anyhow::Result<Option<Avatar>>;
}

View File

@@ -1,2 +1,8 @@
pub(crate) mod account;
pub(crate) mod common;
pub(crate) mod messaging_interface;
pub(crate) mod room; pub(crate) mod room;
pub(crate) mod room_member;
pub(crate) mod session; pub(crate) mod session;
pub(crate) mod space;
pub(crate) mod store_interface;

View File

@@ -1,24 +1,52 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::{collections::HashMap, sync::Arc}; use std::collections::HashMap;
use std::rc::Rc;
use async_trait::async_trait;
use futures::future::{join, join_all};
use matrix_sdk::ruma::OwnedRoomId; use matrix_sdk::ruma::OwnedRoomId;
use matrix_sdk::{Room as MatrixRoom, RoomState as MatrixRoomState}; use matrix_sdk::RoomState as MatrixRoomState;
use tracing::error; use tracing::{debug, error, trace};
pub(crate) type RoomId = OwnedRoomId; use super::{
common::{Avatar, UserId},
messaging_interface::{RoomMessagingConsumerInterface, RoomMessagingProviderInterface},
room_member::RoomMember,
space::SpaceId,
store_interface::RoomStoreProviderInterface,
};
#[derive(Clone, Debug)] use crate::infrastructure::services::mozaik_builder::create_mozaik;
pub(crate) struct Room {
pub type RoomId = OwnedRoomId;
pub struct Room {
id: RoomId, id: RoomId,
name: Option<String>,
#[allow(dead_code)]
spaces: Vec<SpaceId>,
name: RefCell<Option<String>>,
topic: Option<String>, topic: Option<String>,
is_direct: Option<bool>, is_direct: Option<bool>,
state: Option<MatrixRoomState>, state: Option<MatrixRoomState>,
avatar: RefCell<Option<Avatar>>,
members: RefCell<HashMap<UserId, RoomMember>>,
messaging_provider: Option<Rc<dyn RoomMessagingProviderInterface>>,
store: RefCell<Option<Box<dyn RoomStoreProviderInterface>>>,
}
impl PartialEq for Room {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
} }
impl Room { impl Room {
fn new( pub fn new(
id: RoomId, id: RoomId,
spaces: Vec<SpaceId>,
name: Option<String>, name: Option<String>,
topic: Option<String>, topic: Option<String>,
is_direct: Option<bool>, is_direct: Option<bool>,
@@ -26,120 +54,158 @@ impl Room {
) -> Self { ) -> Self {
Self { Self {
id, id,
name,
spaces,
name: RefCell::new(name),
topic, topic,
is_direct, is_direct,
state, state,
avatar: RefCell::new(None),
members: RefCell::new(HashMap::new()),
messaging_provider: None,
store: RefCell::new(None),
} }
} }
// TODO: Use a factory instead... pub fn set_messaging_provider(
pub async fn from_matrix_room(matrix_room: &MatrixRoom) -> Self { &mut self,
// let room_topic = matrix_room.topic().map(RefCell::new); messaging_provider: Rc<dyn RoomMessagingProviderInterface>,
) {
let id = RoomId::from(matrix_room.room_id()); self.messaging_provider = Some(messaging_provider);
let name = matrix_room.name();
let room_topic = matrix_room.topic();
let is_direct = match matrix_room.is_direct().await {
Ok(is_direct) => Some(is_direct),
Err(err) => {
error!("Unable to know if the room \"{id}\" is direct: {err}");
None
}
};
let state = Some(matrix_room.state());
Self::new(id, name, room_topic, is_direct, state)
// room.timeline.subscribe().await
// Arc::new(matrix_room.to_owned()),
} }
pub fn id(&self) -> &OwnedRoomId { pub fn set_store(&self, store: Option<Box<dyn RoomStoreProviderInterface>>) {
*self.store.borrow_mut() = store;
}
pub fn id(&self) -> &RoomId {
&self.id &self.id
} }
pub fn name(&self) -> &Option<String> { pub fn name(&self) -> Option<String> {
&self.name self.name.borrow().clone()
} }
#[allow(dead_code)]
pub fn topic(&self) -> &Option<String> { pub fn topic(&self) -> &Option<String> {
&self.topic &self.topic
} }
#[allow(dead_code)]
pub fn set_topic(&mut self, topic: Option<String>) { pub fn set_topic(&mut self, topic: Option<String>) {
self.topic = topic; self.topic = topic;
} }
#[allow(dead_code)]
pub fn is_direct(&self) -> &Option<bool> { pub fn is_direct(&self) -> &Option<bool> {
&self.is_direct &self.is_direct
} }
#[allow(dead_code)]
pub fn state(&self) -> &Option<MatrixRoomState> { pub fn state(&self) -> &Option<MatrixRoomState> {
&self.state &self.state
} }
#[allow(dead_code)]
pub fn is_invited(&self) -> Option<bool> { pub fn is_invited(&self) -> Option<bool> {
match self.state { self.state.map(|state| state == MatrixRoomState::Invited)
Some(state) => Some(state == MatrixRoomState::Invited),
None => None,
} }
#[allow(dead_code)]
fn add_member(&self, member: RoomMember) {
self.members
.borrow_mut()
.insert(member.id().clone(), member);
}
pub async fn get_avatar(&self) -> Option<Avatar> {
if self.avatar.borrow().is_none() {
if let Some(requester) = &self.messaging_provider {
let resp = requester.get_avatar(&self.id).await;
if let Ok(avatar) = resp {
if let Some(avatar) = avatar {
return Some(avatar);
} else {
debug!("The room has no avatar... let's generate one");
match self.render_room_avatar_with_members().await {
Ok(avatar) => {
if let Some(avatar) = avatar {
return Some(avatar);
}
}
Err(err) => {
error!("err={}", err);
}
}
}
} else {
error!("err={:?}", resp);
}
}
}
self.avatar.borrow().clone()
}
async fn render_room_avatar_with_members(&self) -> anyhow::Result<Option<Avatar>> {
if let Some(requester) = &self.messaging_provider {
match requester.get_members(&self.id).await {
Ok(members) => {
let mut account_member = None::<&RoomMember>;
let mut other_members = Vec::<&RoomMember>::new();
for member in &members {
if member.is_account_user() {
account_member = Some(member);
} else {
other_members.push(member);
}
}
let other_avatars_futures =
join_all(other_members.iter().map(|member| member.get_avatar()));
let (other_avatars, account_avatar) = if let Some(account_member) =
account_member
{
join(other_avatars_futures, account_member.get_avatar()).await
} else {
(
join_all(other_members.iter().map(|member| member.get_avatar())).await,
None,
)
};
let other_avatars: Vec<Vec<u8>> = other_avatars.into_iter().flatten().collect();
return Ok(Some(create_mozaik(
256,
256,
&other_avatars,
&account_avatar,
)));
}
Err(err) => {
error!("err={}", err);
}
}
}
Ok(None)
} }
} }
pub type ByIdRooms = HashMap<OwnedRoomId, RefCell<Room>>; #[async_trait(?Send)]
impl RoomMessagingConsumerInterface for Room {
// pub type ByIdRooms = HashMap<OwnedRoomId, RefCell<Room>>; async fn on_invitation(&self) {
trace!("Room::on_invitation");
// #[derive(Clone)] }
// pub struct Room { async fn on_membership(&self, member: RoomMember) {
// // pub matrix_room: Arc<MatrixRoom>, trace!("Room::on_membership({:?})", member);
// pub topic: Option<RefCell<String>>, }
// pub members: HashMap<OwnedUserId, RoomMember>, async fn on_new_topic(&self, topic: Option<String>) {
// pub is_direct: Option<bool>, trace!("Room::on_new_topic({:?})", topic);
// // pub timeline: Arc<Timeline>, }
// } async fn on_new_name(&self, name: Option<String>) {
trace!("Room::on_new_name({:?})", name);
// impl Room { }
// pub async fn new( }
// matrix_room: Arc<MatrixRoom>,
// topic: Option<RefCell<String>>,
// is_direct: Option<bool>,
// ) -> Self {
// // TODO: Filter events
// // let timeline = Arc::new(matrix_room.timeline_builder().build().await.ok().unwrap());
// Self {
// matrix_room,
// topic,
// members: HashMap::new(),
// is_direct,
// // timeline,
// }
// }
// pub async fn from_matrix_room(matrix_room: &MatrixRoom) -> Self {
// let room_topic = matrix_room.topic().map(RefCell::new);
// Self::new(
// Arc::new(matrix_room.to_owned()),
// room_topic,
// matrix_room.is_direct().await.ok(),
// )
// .await
// // room.timeline.subscribe().await
// }
// pub fn name(&self) -> Option<String> {
// self.matrix_room.name()
// }
// pub fn id(&self) -> OwnedRoomId {
// OwnedRoomId::from(self.matrix_room.room_id())
// }
// }
// impl PartialEq for Room {
// fn eq(&self, other: &Self) -> bool {
// // TODO: Look for a better way to compare Matrix rooms
// self.matrix_room.room_id() == other.matrix_room.room_id()
// }
// }

View File

@@ -0,0 +1,90 @@
use std::{
cell::RefCell,
fmt::{Debug, Formatter},
rc::Rc,
};
use matrix_sdk::{room::RoomMember as MatrixRoomMember, ruma::OwnedRoomId};
use tracing::error;
use super::{
common::{Avatar, UserId},
messaging_interface::MemberMessagingProviderInterface,
room::RoomId,
};
#[derive(Clone)]
pub struct RoomMember {
id: UserId,
room_id: RoomId,
is_account_user: bool,
#[allow(dead_code)]
avatar: RefCell<Option<Avatar>>,
messaging_provider: Rc<dyn MemberMessagingProviderInterface>,
}
impl RoomMember {
fn new(
id: UserId,
room_id: RoomId,
is_account_user: bool,
messaging_provider: Rc<dyn MemberMessagingProviderInterface>,
) -> Self {
Self {
id,
room_id,
is_account_user,
avatar: RefCell::new(None),
messaging_provider,
}
}
// TODO: Use a factory instead...
pub async fn from_matrix(
matrix_room_member: &MatrixRoomMember,
room_id: &OwnedRoomId,
messaging_provider: Rc<dyn MemberMessagingProviderInterface>,
) -> Self {
Self::new(
UserId::from(matrix_room_member.user_id()),
room_id.clone(),
matrix_room_member.is_account_user(),
messaging_provider,
)
}
pub fn id(&self) -> &UserId {
&self.id
}
#[allow(dead_code)]
pub fn room_id(&self) -> &RoomId {
&self.room_id
}
pub fn is_account_user(&self) -> bool {
self.is_account_user
}
pub async fn get_avatar(&self) -> Option<Avatar> {
match self
.messaging_provider
.get_avatar(&self.room_id, &self.id)
.await
{
Ok(avatar) => avatar,
Err(err) => {
error!("err={}", err);
None
}
}
}
}
impl Debug for RoomMember {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
f.debug_struct("RoomMember").field("id", &self.id).finish()
}
}

91
src/domain/model/space.rs Normal file
View File

@@ -0,0 +1,91 @@
use std::{cell::RefCell, collections::HashSet, rc::Rc};
use async_trait::async_trait;
use matrix_sdk::ruma::OwnedRoomId;
use tracing::error;
use super::{
common::Avatar,
messaging_interface::{SpaceMessagingConsumerInterface, SpaceMessagingProviderInterface},
room::RoomId,
store_interface::SpaceStoreProviderInterface,
};
pub type SpaceId = OwnedRoomId;
// TODO: Add membership?
pub struct Space {
id: SpaceId,
name: RefCell<Option<String>>,
topic: RefCell<Option<String>>,
#[allow(dead_code)]
avatar: RefCell<Option<Avatar>>,
children: RefCell<HashSet<RoomId>>, // We don´t expect to manage nested spaces
messaging_provider: Option<Rc<dyn SpaceMessagingProviderInterface>>,
store: RefCell<Option<Box<dyn SpaceStoreProviderInterface>>>,
}
impl PartialEq for Space {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Space {
pub fn new(id: SpaceId, name: Option<String>, topic: Option<String>) -> Self {
Self {
id,
name: RefCell::new(name),
topic: RefCell::new(topic),
#[allow(dead_code)]
avatar: RefCell::new(None),
children: RefCell::new(HashSet::new()),
messaging_provider: None,
store: RefCell::new(None),
}
}
pub fn set_messaging_provider(&mut self, provider: Rc<dyn SpaceMessagingProviderInterface>) {
self.messaging_provider = Some(provider);
}
pub fn set_store(&self, store: Option<Box<dyn SpaceStoreProviderInterface>>) {
*self.store.borrow_mut() = store;
}
pub fn id(&self) -> &SpaceId {
&self.id
}
pub fn name(&self) -> Option<String> {
self.name.borrow().clone()
}
}
#[async_trait(?Send)]
impl SpaceMessagingConsumerInterface for Space {
async fn on_child(&self, room_id: RoomId) {
error!("Space::on_child({room_id})");
self.children.borrow_mut().insert(room_id);
}
async fn on_new_topic(&self, topic: Option<String>) {
error!("Space::on_new_topic({:?})", topic);
*self.topic.borrow_mut() = topic;
}
async fn on_new_name(&self, name: Option<String>) {
error!("Space::on_new_name({:?})", name);
self.name.borrow_mut().clone_from(&name);
if let Some(store) = self.store.borrow_mut().as_mut() {
store.set_name(name);
}
}
}

View File

@@ -0,0 +1,23 @@
use std::rc::Rc;
use super::room::Room;
use super::space::Space;
use crate::base::{StoreRoom, StoreSpace};
#[allow(dead_code)]
pub trait AccountStoreConsumerInterface {}
pub trait AccountStoreProviderInterface {
fn on_new_room(&self, room: Rc<Room>) -> StoreRoom;
fn on_new_space(&self, space: Rc<Space>) -> StoreSpace;
}
#[allow(dead_code)]
pub trait RoomStoreConsumerInterface {}
pub trait RoomStoreProviderInterface {}
#[allow(dead_code)]
pub trait SpaceStoreConsumerInterface {}
pub trait SpaceStoreProviderInterface {
fn set_name(&mut self, _name: Option<String>) {}
}

View File

@@ -0,0 +1,51 @@
use std::fmt::{Debug, Formatter};
use matrix_sdk::{ruma::OwnedRoomId, RoomState};
use super::room_event::RoomEventsReceiver;
use crate::{domain::model::space::SpaceId, utils::Sender};
#[derive(Clone)]
pub enum AccountEvent {
NewRoom(
OwnedRoomId,
Vec<SpaceId>,
Option<String>,
Option<String>,
Option<bool>,
RoomState,
RoomEventsReceiver,
Sender<bool>,
),
NewSpace(
OwnedRoomId,
Option<String>,
Option<String>,
RoomEventsReceiver,
Sender<bool>,
),
}
impl Debug for AccountEvent {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::NewRoom(id, spaces, name, topic, is_direct, state, _events_receiver, _sender) => {
f.debug_tuple("AccountEvent::NewRoom")
.field(id)
.field(spaces)
.field(name)
.field(topic)
.field(is_direct)
.field(state)
.finish()
}
Self::NewSpace(id, name, topic, _events_receiver, _sender) => f
.debug_tuple("AccountEvent::NewSpace")
.field(id)
.field(name)
.field(topic)
.finish(),
}
}
}

View File

@@ -1,37 +1,46 @@
use std::borrow::Borrow; use std::{
use std::cell::RefCell; borrow::Borrow,
use std::sync::Arc; collections::HashMap,
use std::time::Duration; sync::{Arc, Mutex},
};
use async_std::task; use async_std::stream::StreamExt;
use dioxus::prelude::Task; use dioxus::prelude::Task;
use log::{debug, error};
use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use matrix_sdk::{ use matrix_sdk::{
config::SyncSettings, config::SyncSettings,
event_handler::Ctx, event_handler::Ctx,
room::Room as MatrixRoom, media::{MediaFormat, MediaThumbnailSize},
room::{Room, RoomMember},
ruma::{ ruma::{
api::client::media::get_content_thumbnail::v3::Method,
events::{ events::{
room::{ room::{
member::{RoomMemberEventContent, StrippedRoomMemberEvent}, member::{
OriginalSyncRoomMemberEvent, RoomMemberEventContent, StrippedRoomMemberEvent,
},
topic::RoomTopicEventContent, topic::RoomTopicEventContent,
}, },
SyncStateEvent, SyncStateEvent,
}, },
OwnedRoomId, uint, OwnedRoomId, RoomId, UserId,
}, },
Client as MatrixClient, RoomState as MatrixRoomState, Client as MatrixClient, RoomMemberships, RoomState,
}; };
use super::requester::{Receivers, Requester}; use tokio::sync::{
use super::worker_tasks::{LoginStyle, WorkerTask}; broadcast,
use crate::domain::model::room::Room; broadcast::{error::SendError, Receiver, Sender},
mpsc::{unbounded_channel, UnboundedReceiver},
};
use tracing::{debug, error, warn};
use super::{
account_event::AccountEvent,
requester::Requester,
room_event::{RoomEvent, RoomEventsReceiver},
worker_tasks::{LoginStyle, WorkerTask},
};
use crate::utils::oneshot;
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum ClientError { pub enum ClientError {
@@ -39,21 +48,45 @@ pub enum ClientError {
Matrix(#[from] matrix_sdk::Error), Matrix(#[from] matrix_sdk::Error),
} }
#[derive(Clone)]
pub enum RoomEvent {
TopicEvent(OwnedRoomId, String),
MemberEvent(OwnedRoomId, Room),
InviteEvent(OwnedRoomId, Room),
}
#[derive(Clone)] #[derive(Clone)]
struct Senders { struct Senders {
room_events_sender: Sender<RoomEvent>, account_events_sender: Sender<AccountEvent>,
room_events_senders: Arc<Mutex<HashMap<OwnedRoomId, Sender<RoomEvent>>>>,
} }
impl Senders { impl Senders {
fn new(room_events_sender: Sender<RoomEvent>) -> Self { fn new(account_events_sender: Sender<AccountEvent>) -> Self {
Self { room_events_sender } Self {
account_events_sender,
room_events_senders: Arc::new(Mutex::new(HashMap::new())),
}
}
fn contains(&self, room_id: &RoomId) -> bool {
let room_senders = self.room_events_senders.lock().unwrap();
room_senders.contains_key(room_id)
}
fn send(&self, room_id: &RoomId, event: RoomEvent) -> Result<usize, SendError<RoomEvent>> {
let room_senders = self.room_events_senders.lock().unwrap();
if let Some(room_sender) = room_senders.get(room_id) {
room_sender.send(event)
} else {
error!("No sender found for \"{}\" room id", room_id);
Ok(0)
}
}
fn add_room(&self, room_id: &OwnedRoomId) -> Option<RoomEventsReceiver> {
let mut senders = self.room_events_senders.lock().unwrap();
if !senders.contains_key(room_id) {
let (room_sender, room_receiver) = broadcast::channel(32);
senders.insert(room_id.clone(), room_sender);
Some(RoomEventsReceiver::new(room_receiver))
} else {
None
}
} }
} }
@@ -65,16 +98,173 @@ pub struct Client {
} }
impl Client { impl Client {
pub fn new(client: Arc<MatrixClient>, room_events_sender: Sender<RoomEvent>) -> Self { pub fn new(client: Arc<MatrixClient>, account_events_sender: Sender<AccountEvent>) -> Self {
Self { Self {
initialized: false, initialized: false,
client: Some(client), client: Some(client),
sync_task: None, sync_task: None,
senders: Senders::new(room_events_sender), senders: Senders::new(account_events_sender),
} }
} }
// async fn on_sync_typing_event(_ev: SyncTypingEvent, room: MatrixRoom) { async fn create_space(
senders: &Ctx<Senders>,
room_id: OwnedRoomId,
room: Option<&Room>,
) -> anyhow::Result<(), SendError<AccountEvent>> {
let mut name = None;
let mut topic = None;
if let Some(room) = room {
name = room.name();
topic = room.topic();
}
if let Some(receiver) = senders.add_room(&room_id) {
let (reply, mut response) = oneshot::<bool>();
let event = AccountEvent::NewSpace(room_id.clone(), name, topic, receiver, reply);
if let Err(err) = senders.account_events_sender.send(event) {
error!(
"Unable to publish the new room with \"{}\" id: {}",
room_id, err
);
return Err(err);
}
// We're expecting a response indicating that the client is able to compute the next RoomEvent
response.recv().await;
} else {
let events = vec![RoomEvent::NewTopic(topic), RoomEvent::NewName(name)];
for event in events {
if let Err(err) = senders.send(&room_id, event.clone()) {
error!(
"Unable to publish the {:?} event to the \"{}\" room: {}",
event, room_id, err
);
// return Err(err);
}
}
}
Ok(())
}
async fn create_room(
senders: &Ctx<Senders>,
room: &Room,
) -> anyhow::Result<(), SendError<AccountEvent>> {
let room_id = room.room_id().to_owned();
if let Some(receiver) = senders.add_room(&room_id) {
let (reply, mut response) = oneshot::<bool>();
let is_direct = match room.is_direct().await {
Ok(is_direct) => Some(is_direct),
Err(err) => {
error!("Unable to know if the room \"{room_id}\" is direct: {err}");
None
}
};
let mut parents = vec![];
// TODO: Remove unwrap
let mut spaces = room.parent_spaces().await.unwrap();
while let Some(parent) = spaces.next().await {
match parent {
Ok(parent) => match parent {
matrix_sdk::room::ParentSpace::Reciprocal(parent) => {
parents.push(parent.room_id().to_owned());
}
_ => todo!(),
},
Err(err) => {
error!("{}", err);
}
}
}
let event = AccountEvent::NewRoom(
room_id.clone(),
parents.clone(),
room.name(),
room.topic(),
is_direct,
room.state(),
receiver,
reply,
);
if let Err(err) = senders.account_events_sender.send(event) {
error!(
"Unable to publish the new room with \"{}\" id: {}",
room.room_id(),
err
);
return Err(err);
}
// We're expecting a response indicating that the client is able to compute the next RoomEvent
response.recv().await;
}
Ok(())
}
async fn add_room(
senders: &Ctx<Senders>,
room: &Room,
) -> anyhow::Result<(), SendError<AccountEvent>> {
let room_id = room.room_id().to_owned();
if room.is_space() {
Self::create_space(senders, room_id, Some(room)).await
} else {
let ret = Self::create_room(senders, room).await;
let mut parents = vec![];
// TODO: Remove unwrap
let mut spaces = room.parent_spaces().await.unwrap();
while let Some(parent) = spaces.next().await {
match parent {
Ok(parent) => match parent {
matrix_sdk::room::ParentSpace::Reciprocal(parent) => {
parents.push(parent.room_id().to_owned());
}
_ => {
warn!(
"Only ParentSpace::Reciprocal taken into account, skip {:?}",
parent
);
}
},
Err(err) => {
error!("{}", err);
}
}
}
error!("parents={:?}", &parents);
for parent in parents {
// Create a minimal space to make the relation consistent... its content will be sync later.
if !senders.contains(&parent) {
let _ = Self::create_space(senders, parent.clone(), None).await;
}
let event = RoomEvent::NewChild(room_id.clone());
if let Err(err) = senders.send(parent.as_ref(), event.clone()) {
error!(
"Unable to send the {:?} event to the \"{}\": {:?}",
event, parent, err
);
}
}
ret
}
}
// async fn on_sync_typing_event(_ev: SyncTypingEvent, room: Room) {
// debug!("== on_sync_typing_event =="); // debug!("== on_sync_typing_event ==");
// let room_id = room.room_id().to_owned(); // let room_id = room.room_id().to_owned();
// dbg!(room_id); // dbg!(room_id);
@@ -85,7 +275,7 @@ impl Client {
// dbg!(_ev); // dbg!(_ev);
// } // }
// async fn on_sync_state_event(ev: SyncStateEvent<RoomNameEventContent>, _room: MatrixRoom) { // async fn on_sync_state_event(ev: SyncStateEvent<RoomNameEventContent>, _room: Room) {
// error!("== on_sync_state_event =="); // error!("== on_sync_state_event ==");
// if let SyncStateEvent::Original(ev) = ev { // if let SyncStateEvent::Original(ev) = ev {
// dbg!(ev); // dbg!(ev);
@@ -94,32 +284,33 @@ impl Client {
// async fn on_original_sync_room_message_event( // async fn on_original_sync_room_message_event(
// ev: OriginalSyncRoomMessageEvent, // ev: OriginalSyncRoomMessageEvent,
// _matrix_room: MatrixRoom, // _room: Room,
// _senders: Ctx<Senders>, // _senders: Ctx<Senders>,
// ) { // ) {
// error!("== on_original_sync_room_message_event =="); // error!("== on_original_sync_room_message_event ==");
// error!("ev={:?}", ev.content); // error!("ev={:?}", ev.content);
// }
async fn on_stripped_room_member_event( async fn on_stripped_room_member_event(
ev: StrippedRoomMemberEvent, ev: StrippedRoomMemberEvent,
matrix_client: MatrixClient, matrix_client: MatrixClient,
matrix_room: MatrixRoom, room: Room,
senders: Ctx<Senders>, senders: Ctx<Senders>,
) { ) {
if ev.state_key == matrix_client.user_id().unwrap() error!("*** on_stripped_room_member_event ***");
&& matrix_room.state() == MatrixRoomState::Invited // error!("ev={:?}", ev);
{
let room_id = matrix_room.room_id();
let room = Room::from_matrix_room(&matrix_room).await;
if let Err(err) = senders if ev.state_key == matrix_client.user_id().unwrap()
.room_events_sender && room.state() == RoomState::Invited
.send(RoomEvent::InviteEvent(room_id.to_owned(), room)) && Self::add_room(&senders, &room).await.is_ok()
{ {
let room_id = room.room_id();
let event = RoomEvent::Invitation();
if let Err(err) = senders.send(room_id, event) {
error!( error!(
"Unable to publish the new room with \"{}\" id: {}", "Unable to publish the room \"{}\" invitation: {}",
room_id, err room.room_id(),
err
); );
} }
} }
@@ -127,45 +318,56 @@ impl Client {
async fn on_room_topic_event( async fn on_room_topic_event(
ev: SyncStateEvent<RoomTopicEventContent>, ev: SyncStateEvent<RoomTopicEventContent>,
matrix_room: MatrixRoom, room: Room,
senders: Ctx<Senders>, senders: Ctx<Senders>,
) { ) {
if let SyncStateEvent::Original(ev) = ev { error!("*** on_room_topic_event ***");
let room_id = matrix_room.room_id(); // error!("ev={:?}", ev);
if let Err(err) = senders if let SyncStateEvent::Original(ev) = ev {
.room_events_sender let _ = Self::add_room(&senders, &room).await;
.send(RoomEvent::TopicEvent(room_id.to_owned(), ev.content.topic))
{ let room_id = room.room_id();
error!("Unable to publish the \"{}\" new topic: {}", room_id, err); let event = RoomEvent::NewTopic(Some(ev.content.topic));
if let Err(err) = senders.send(room_id, event) {
error!(
"Unable to publish the room \"{}\" topic: {}",
room.room_id(),
err
);
} }
} }
} }
async fn on_room_member_event( async fn on_room_member_event(
ev: SyncStateEvent<RoomMemberEventContent>, ev: SyncStateEvent<RoomMemberEventContent>,
matrix_room: MatrixRoom, room: Room,
senders: Ctx<Senders>, senders: Ctx<Senders>,
) { ) {
if let SyncStateEvent::Original(_ev) = ev { error!("*** on_room_member_event ***");
let room_id = matrix_room.room_id(); // error!("ev={:?}", ev);
let room = Room::from_matrix_room(&matrix_room).await;
if let Err(err) = senders if let SyncStateEvent::Original(_ev) = ev {
.room_events_sender if Self::add_room(&senders, &room).await.is_ok() {
.send(RoomEvent::MemberEvent(room_id.to_owned(), room)) // let room_id = room.room_id();
{ // // TODO: Client shall only manage Matrix object... not BG92 ones.
error!( // let event = RoomEvent::Membership(RoomMember::new(ev.sender, room_id));
"Unable to publish the new room with \"{}\" id: {}", // if let Some(result) = senders.send(room_id, event) {
room_id, err // if let Err(err) = result {
); // error!(
// "Unable to publish the room \"{}\" membership: {}",
// room.room_id(),
// err
// );
// }
// }
} }
} }
} }
// async fn on_sync_message_like_room_message_event( // async fn on_sync_message_like_room_message_event(
// ev: SyncMessageLikeEvent<RoomMessageEventContent>, // ev: SyncMessageLikeEvent<RoomMessageEventContent>,
// _room: MatrixRoom, // _room: Room,
// _client: MatrixClient, // _client: MatrixClient,
// ) { // ) {
// debug!("== on_sync_message_like_room_message_event =="); // debug!("== on_sync_message_like_room_message_event ==");
@@ -174,7 +376,7 @@ impl Client {
// async fn on_sync_message_like_reaction_event( // async fn on_sync_message_like_reaction_event(
// ev: SyncMessageLikeEvent<ReactionEventContent>, // ev: SyncMessageLikeEvent<ReactionEventContent>,
// _room: MatrixRoom, // _room: Room,
// ) { // ) {
// debug!("== on_sync_message_like_reaction_event =="); // debug!("== on_sync_message_like_reaction_event ==");
// dbg!(ev); // dbg!(ev);
@@ -182,19 +384,19 @@ impl Client {
// async fn on_original_sync_room_redaction_event( // async fn on_original_sync_room_redaction_event(
// ev: OriginalSyncRoomRedactionEvent, // ev: OriginalSyncRoomRedactionEvent,
// _room: MatrixRoom, // _room: Room,
// ) { // ) {
// debug!("== on_original_sync_room_redaction_event =="); // debug!("== on_original_sync_room_redaction_event ==");
// dbg!(ev); // dbg!(ev);
// } // }
// async fn on_original_sync_room_member_event( async fn on_original_sync_room_member_event(
// _ev: OriginalSyncRoomMemberEvent, _ev: OriginalSyncRoomMemberEvent,
// _room: MatrixRoom, _room: Room,
// _client: MatrixClient, _client: MatrixClient,
// ) { ) {
// debug!("== on_original_sync_room_member_event =="); // debug!("== on_original_sync_room_member_event ==");
// error!("room={:?}", room);
// let mut store = store_ctx.read().unwrap().to_owned(); // let mut store = store_ctx.read().unwrap().to_owned();
// dbg!(store.rooms.keys()); // dbg!(store.rooms.keys());
// let is_direct = room.is_direct().await.ok(); // let is_direct = room.is_direct().await.ok();
@@ -203,7 +405,7 @@ impl Client {
// Arc::new(RwLock::new(Room::new(Arc::new(room), None, is_direct))), // Arc::new(RwLock::new(Room::new(Arc::new(room), None, is_direct))),
// ); // );
// let _ = store_ctx.write(store); // let _ = store_ctx.write(store);
// } }
// async fn on_original_sync_key_verif_start_event( // async fn on_original_sync_key_verif_start_event(
// ev: OriginalSyncKeyVerificationStartEvent, // ev: OriginalSyncKeyVerificationStartEvent,
@@ -265,11 +467,7 @@ impl Client {
// debug!("== on_room_event({}) ==", ev.) // debug!("== on_room_event({}) ==", ev.)
// } // }
pub async fn spawn(homeserver_url: String) -> Requester { pub async fn spawn(homeserver_url: String) -> (Requester, Receiver<AccountEvent>) {
let (tx, rx) = unbounded_channel::<WorkerTask>();
let (room_sender, room_receiver) = broadcast::channel(32);
let matrix_client = Arc::new( let matrix_client = Arc::new(
MatrixClient::builder() MatrixClient::builder()
.homeserver_url(&homeserver_url) .homeserver_url(&homeserver_url)
@@ -278,23 +476,22 @@ impl Client {
.unwrap(), .unwrap(),
); );
let mut client = Client::new(matrix_client.clone(), room_sender); let (worker_tasks_sender, worker_tasks_receiver) = unbounded_channel::<WorkerTask>();
let (account_events_sender, account_events_receiver) =
broadcast::channel::<AccountEvent>(32);
let mut client = Client::new(matrix_client, account_events_sender);
dioxus::prelude::spawn(async move { dioxus::prelude::spawn(async move {
client.work(rx).await; client.work(worker_tasks_receiver).await;
}); });
Requester { (Requester::new(worker_tasks_sender), account_events_receiver)
matrix_client,
tx,
receivers: Receivers {
room_receiver: RefCell::new(room_receiver),
},
}
} }
fn init(&mut self) { fn init(&mut self) {
if let Some(client) = self.client.borrow() { if let Some(client) = self.client.borrow() {
// TODO: Remove clone?
client.add_event_handler_context(self.senders.clone()); client.add_event_handler_context(self.senders.clone());
let _ = client.add_event_handler(Client::on_stripped_room_member_event); let _ = client.add_event_handler(Client::on_stripped_room_member_event);
@@ -309,7 +506,9 @@ impl Client {
// let _ = client.add_event_handler(Client::on_sync_message_like_room_message_event); // let _ = client.add_event_handler(Client::on_sync_message_like_room_message_event);
// let _ = client.add_event_handler(Client::on_sync_message_like_reaction_event); // let _ = client.add_event_handler(Client::on_sync_message_like_reaction_event);
// let _ = client.add_event_handler(Client::on_original_sync_room_redaction_event); // let _ = client.add_event_handler(Client::on_original_sync_room_redaction_event);
// let _ = client.add_event_handler(Client::on_original_sync_room_member_event);
let _ = client.add_event_handler(Client::on_original_sync_room_member_event);
// let _ = client.add_event_handler(Client::on_original_sync_key_verif_start_event); // let _ = client.add_event_handler(Client::on_original_sync_key_verif_start_event);
// let _ = client.add_event_handler(Client::on_original_sync_key_verif_key_event); // let _ = client.add_event_handler(Client::on_original_sync_key_verif_key_event);
// let _ = client.add_event_handler(Client::on_original_sync_key_verif_done_event); // let _ = client.add_event_handler(Client::on_original_sync_key_verif_done_event);
@@ -322,12 +521,12 @@ impl Client {
} }
} }
async fn login_and_sync(&mut self, style: LoginStyle) -> anyhow::Result<()> { async fn login(&mut self, style: LoginStyle) -> anyhow::Result<()> {
let client = self.client.clone().unwrap(); let client = self.client.as_ref().unwrap();
match style { match style {
LoginStyle::Password(username, password) => { LoginStyle::Password(username, password) => {
let _resp = client client
.matrix_auth() .matrix_auth()
.login_username(&username, &password) .login_username(&username, &password)
.initial_device_display_name("TODO") .initial_device_display_name("TODO")
@@ -337,7 +536,11 @@ impl Client {
} }
} }
let (synchronized_tx, synchronized_rx) = oneshot::channel::<bool>(); Ok(())
}
async fn run_forever(&mut self) {
let client = self.client.clone().unwrap();
let task = dioxus::prelude::spawn(async move { let task = dioxus::prelude::spawn(async move {
// Sync once so we receive the client state and old messages // Sync once so we receive the client state and old messages
@@ -350,87 +553,107 @@ impl Client {
}; };
if let Some(sync_token) = sync_token_option { if let Some(sync_token) = sync_token_option {
let settings = SyncSettings::default().token(sync_token);
debug!("User connected to the homeserver, start syncing"); debug!("User connected to the homeserver, start syncing");
if let Err(err) = synchronized_tx.send(true) { let settings = SyncSettings::default().token(sync_token);
error!("Unable to notify that the Matrix client is now synchronized ({err})");
}
let _ = client.sync(settings).await; let _ = client.sync(settings).await;
} }
}); });
self.sync_task = Some(task); self.sync_task = Some(task);
// self.start_background_tasks(synchronized_rx);
Ok(())
} }
// async fn register_room_events(&self, room_id: OwnedRoomId) { async fn get_display_name(&mut self) -> anyhow::Result<Option<String>> {
// let client = self.client.unwrap(); let client = self.client.as_ref().unwrap();
// client.add_room_event_handler(&room_id, Client::on_room_event); match client.account().get_display_name().await {
// } Ok(display_name) => Ok(display_name),
Err(err) => Err(err.into()),
}
}
// async fn refresh_rooms( async fn get_avatar(&mut self) -> anyhow::Result<Option<Vec<u8>>> {
// matrix_client: &Arc<MatrixClient>, let client = self.client.as_ref().unwrap();
// room_events_sender: &Sender<RoomEvent>,
// ) {
// let joined_matrix_rooms_ref = &matrix_client.joined_rooms();
// let invited_matrix_rooms_ref = &matrix_client.invited_rooms();
// for matrix_rooms in [joined_matrix_rooms_ref, invited_matrix_rooms_ref] { match client
// for matrix_room in matrix_rooms.iter() { .account()
// let room = Room::from_matrix_room(matrix_room).await; .get_avatar(MediaFormat::Thumbnail(MediaThumbnailSize {
// let event = RoomEvent::MemberEvent(room.id().clone(), room); method: Method::Scale,
width: uint!(256),
height: uint!(256),
}))
.await
{
Ok(avatar) => Ok(avatar),
Err(err) => Err(err.into()),
}
}
// if let Err(err) = room_events_sender.send(event) { async fn get_room_avatar(&mut self, room_id: &OwnedRoomId) -> anyhow::Result<Option<Vec<u8>>> {
// error!("Error: {}", err); let client = self.client.as_ref().unwrap();
// }
// }
// }
// }
// async fn refresh_rooms_forever( if let Some(room) = client.get_room(room_id) {
// matrix_client: Arc<MatrixClient>, match room
// room_events_sender: &Sender<RoomEvent>, .avatar(MediaFormat::Thumbnail(MediaThumbnailSize {
// ) { method: Method::Scale,
// // TODO: Add interval to config width: uint!(256),
// let period_sec = Duration::from_secs(5); height: uint!(256),
}))
.await
{
Ok(avatar) => Ok(avatar),
Err(err) => Err(err.into()),
}
} else {
error!("No room found with the \"{}\" id", room_id.as_str());
// TODO: Return an error if the room has not been found
Ok(None)
}
}
// loop { async fn get_room_members(&mut self, room_id: &OwnedRoomId) -> anyhow::Result<Vec<RoomMember>> {
// Self::refresh_rooms(&matrix_client, room_events_sender).await; let client = self.client.as_ref().unwrap();
// task::sleep(period_sec).await; if let Some(room) = client.get_room(room_id) {
// } match room.members(RoomMemberships::ACTIVE).await {
// } Ok(room_members) => Ok(room_members),
Err(err) => Err(err.into()),
}
} else {
error!("No room found with the \"{}\" id", room_id.as_str());
// TODO: Return an error if the room has not been found
Ok(vec![])
}
}
// fn start_background_tasks(&mut self, synchronized_rx: oneshot::Receiver<bool>) { async fn get_room_member_avatar(
// let client = self.client.clone().unwrap(); &self,
// let room_events_sender = self.senders.room_events_sender.clone(); room_id: &RoomId,
user_id: &UserId,
) -> anyhow::Result<Option<Vec<u8>>> {
let client = self.client.as_ref().unwrap();
// let task = dioxus::prelude::spawn(async move { if let Some(room) = client.get_room(room_id) {
// if let Err(err) = synchronized_rx.await { if let Ok(Some(room_member)) = room.get_member(user_id).await {
// error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})"); let res = match room_member
// } .avatar(MediaFormat::Thumbnail(MediaThumbnailSize {
method: Method::Scale,
// debug!("Start room refreshing forever"); width: uint!(256),
height: uint!(256),
// let _ = Self::refresh_rooms_forever(client, &room_events_sender).await; }))
// }); .await
// self.background_task = Some(task); {
// } Ok(avatar) => Ok(avatar),
Err(err) => Err(err.into()),
};
return res;
}
}
Ok(None)
}
async fn work(&mut self, mut rx: UnboundedReceiver<WorkerTask>) { async fn work(&mut self, mut rx: UnboundedReceiver<WorkerTask>) {
loop { while let Some(task) = rx.recv().await {
match rx.recv().await { self.run(task).await;
Some(task) => self.run(task).await,
None => {
break;
}
}
} }
if let Some(task) = self.sync_task.take() { if let Some(task) = self.sync_task.take() {
@@ -441,17 +664,38 @@ impl Client {
async fn run(&mut self, task: WorkerTask) { async fn run(&mut self, task: WorkerTask) {
match task { match task {
WorkerTask::Init(reply) => { WorkerTask::Init(reply) => {
assert!(!self.initialized);
self.init(); self.init();
reply.send(()).await; reply.send(Ok(())).await;
}
WorkerTask::RunForever(reply) => {
{
self.run_forever().await;
reply.send(())
}
.await
} }
WorkerTask::Login(style, reply) => { WorkerTask::Login(style, reply) => {
assert!(self.initialized); reply.send(self.login(style).await).await;
reply.send(self.login_and_sync(style).await).await; }
} // WorkerTask::registerRoomEvents(room_id, reply) => { WorkerTask::GetDisplayName(reply) => {
// assert!(self.initialized); reply.send(self.get_display_name().await).await;
// reply.send(self.register_room_events(room_id).await).await; }
// } WorkerTask::GetAvatar(reply) => {
reply.send(self.get_avatar().await).await;
}
WorkerTask::GetRoomAvatar(id, reply) => {
reply.send(self.get_room_avatar(&id).await).await;
}
WorkerTask::GetRoomMembers(id, reply) => {
reply.send(self.get_room_members(&id).await).await;
}
WorkerTask::GetRoomMemberAvatar(room_id, user_id, reply) => {
reply
.send(self.get_room_member_avatar(&room_id, &user_id).await)
.await;
}
} }
} }
} }

View File

@@ -1,3 +1,5 @@
pub(crate) mod account_event;
pub(crate) mod client; pub(crate) mod client;
pub(crate) mod requester; pub(crate) mod requester;
pub(crate) mod room_event;
pub(crate) mod worker_tasks; pub(crate) mod worker_tasks;

View File

@@ -1,58 +1,65 @@
use std::cell::RefCell; use std::{collections::HashMap, rc::Rc};
use std::sync::Arc;
use matrix_sdk::Client as MatrixClient; use async_trait::async_trait;
use tokio::sync::broadcast::Receiver; use futures::future::join_all;
use tokio::sync::mpsc::UnboundedSender; use tokio::{
select,
sync::{broadcast::Receiver, mpsc::UnboundedSender},
};
use tokio_stream::{wrappers::BroadcastStream, StreamExt, StreamMap};
use tracing::error;
use super::client::RoomEvent; use super::{
use super::worker_tasks::{LoginStyle, WorkerTask}; account_event::AccountEvent,
use crate::utils::oneshot; room_event::RoomEvent,
worker_tasks::{LoginStyle, WorkerTask},
pub struct Receivers { };
pub room_receiver: RefCell<Receiver<RoomEvent>>, use crate::{
} domain::model::{
impl Clone for Receivers { common::{Avatar, UserId},
fn clone(&self) -> Self { messaging_interface::{
Self { AccountMessagingConsumerInterface, AccountMessagingProviderInterface,
room_receiver: RefCell::new(self.room_receiver.borrow().resubscribe()), MemberMessagingProviderInterface, RoomMessagingConsumerInterface,
} RoomMessagingProviderInterface, SpaceMessagingConsumerInterface,
} SpaceMessagingProviderInterface,
} },
impl PartialEq for Receivers { room::{Room, RoomId},
fn eq(&self, other: &Self) -> bool { room_member::RoomMember,
self.room_receiver space::Space,
.borrow() },
.same_channel(&other.room_receiver.borrow()) utils::oneshot,
} };
}
pub struct Requester { pub struct Requester {
pub matrix_client: Arc<MatrixClient>, worker_tasks_sender: UnboundedSender<WorkerTask>,
pub tx: UnboundedSender<WorkerTask>, }
pub receivers: Receivers,
impl Clone for Requester {
fn clone(&self) -> Self {
Self {
worker_tasks_sender: self.worker_tasks_sender.clone(),
}
}
} }
impl Requester { impl Requester {
pub async fn init(&self) -> anyhow::Result<()> { pub fn new(worker_tasks_sender: UnboundedSender<WorkerTask>) -> Self {
Self {
worker_tasks_sender,
}
}
}
// TODO: Is there a way to avoid this duplication?
macro_rules! request_to_worker {
($self:ident, $task:expr) => {
{
let (reply, mut response) = oneshot(); let (reply, mut response) = oneshot();
if let Err(err) = self.tx.send(WorkerTask::Init(reply)) { let task = $task(reply);
let msg = format!("Unable to request the init of the Matrix client: {err}");
return Err(anyhow::Error::msg(msg));
}
match response.recv().await { if let Err(err) = $self.worker_tasks_sender.send(task) {
Some(result) => Ok(result), let msg = format!("Unable to request to the Matrix client: {err}");
None => Err(anyhow::Error::msg("TBD")),
}
}
pub async fn login(&self, style: LoginStyle) -> anyhow::Result<()> {
let (reply, mut response) = oneshot();
if let Err(err) = self.tx.send(WorkerTask::Login(style, reply)) {
let msg = format!("Unable to request login to the Matrix client: {err}");
return Err(anyhow::Error::msg(msg)); return Err(anyhow::Error::msg(msg));
} }
@@ -61,4 +68,196 @@ impl Requester {
None => Err(anyhow::Error::msg("TBD")), None => Err(anyhow::Error::msg("TBD")),
} }
} }
};
($self:ident, $task:expr $(, $arg:expr)+) => {
{
let (reply, mut response) = oneshot();
let task = $task($($arg),*, reply);
if let Err(err) = $self.worker_tasks_sender.send(task) {
let msg = format!("Unable to request to the Matrix client: {err}");
return Err(anyhow::Error::msg(msg));
}
match response.recv().await {
Some(result) => result,
None => Err(anyhow::Error::msg("TBD")),
}
}
};
}
impl Requester {
pub async fn init(&self) -> anyhow::Result<()> {
request_to_worker!(self, WorkerTask::Init)
}
pub async fn login(&self, style: LoginStyle) -> anyhow::Result<()> {
request_to_worker!(self, WorkerTask::Login, style)
}
}
#[async_trait(?Send)]
impl AccountMessagingProviderInterface for Requester {
async fn get_display_name(&self) -> anyhow::Result<Option<String>> {
request_to_worker!(self, WorkerTask::GetDisplayName)
}
async fn get_avatar(&self) -> anyhow::Result<Option<Avatar>> {
request_to_worker!(self, WorkerTask::GetAvatar)
}
async fn run_forever(
&self,
account_events_consumer: &dyn AccountMessagingConsumerInterface,
mut account_events_receiver: Receiver<AccountEvent>,
) -> anyhow::Result<()> {
// TODO: manage the result provided by response
let (run_forever_tx, _run_forever_rx) = oneshot();
if let Err(err) = self
.worker_tasks_sender
.send(WorkerTask::RunForever(run_forever_tx))
{
let msg = format!("Unable to request login to the Matrix client: {err}");
return Err(anyhow::Error::msg(msg));
}
let mut rooms_events_streams = StreamMap::new();
let mut spaces_events_streams = StreamMap::new();
let mut room_events_consumers =
HashMap::<RoomId, Rc<dyn RoomMessagingConsumerInterface>>::new();
let mut space_events_consumers =
HashMap::<RoomId, Rc<dyn SpaceMessagingConsumerInterface>>::new();
// TODO: Fix this...
let client = Rc::new(self.clone());
loop {
select! {
res = account_events_receiver.recv() => {
if let Ok(account_event) = res {
match account_event {
AccountEvent::NewRoom(id, spaces, name, topic, is_direct, state, receiver, new_room_tx) => {
let mut room = Room::new(id, spaces, name, topic, is_direct, Some(state));
let room_id = room.id().clone();
room.set_messaging_provider(client.clone());
let stream = BroadcastStream::new(receiver.into());
rooms_events_streams.insert(room_id.clone(), stream);
let room_events_consumer = account_events_consumer.on_new_room(room).await;
room_events_consumers.insert(room_id, room_events_consumer);
// We're now ready to recv and compute RoomEvent.
new_room_tx.send(true).await;
},
AccountEvent::NewSpace(id, name, topic, receiver, new_space_tx) => {
let mut space = Space::new(id, name, topic);
let space_id = space.id().clone();
space.set_messaging_provider(client.clone());
let stream = BroadcastStream::new(receiver.into());
spaces_events_streams.insert(space_id.clone(), stream);
let space_events_consumer = account_events_consumer.on_new_space(space).await;
space_events_consumers.insert(space_id, space_events_consumer);
// We're now ready to recv and compute SpaceEvent.
new_space_tx.send(true).await;
}
};
}
},
Some((room_id, room_event)) = rooms_events_streams.next() => {
if let Ok(room_event) = room_event {
if let Some(consumer) = room_events_consumers.get(&room_id) {
match room_event {
RoomEvent::Invitation() => {
consumer.on_invitation().await;
},
// RoomEvent::Membership(user_id, is_account_user) => {
// let member = RoomMember::new(UserId::from(user_id), room_id, is_account_user);
// consumer.on_membership(member).await;
// },
RoomEvent::NewTopic(topic) => {
consumer.on_new_topic(topic).await;
},
RoomEvent::NewName(name) => {
consumer.on_new_name(name).await;
},
_ => {}
}
} else {
error!("No consumer found for \"{}\" room", room_id);
}
}
},
Some((space_id, room_event)) = spaces_events_streams.next() => {
if let Ok(room_event) = room_event {
if let Some(consumer) = space_events_consumers.get(&space_id) {
match room_event {
RoomEvent::NewTopic(topic) => {
consumer.on_new_topic(topic).await;
},
RoomEvent::NewName(name) => {
consumer.on_new_name(name).await;
},
RoomEvent::NewChild(child_id) => {
consumer.on_child(child_id).await;
},
_ => {}
}
} else {
error!("No consumer found for \"{}\" space", space_id);
}
}
}
}
}
}
}
#[async_trait(?Send)]
impl RoomMessagingProviderInterface for Requester {
async fn get_avatar(&self, room_id: &RoomId) -> anyhow::Result<Option<Avatar>> {
request_to_worker!(self, WorkerTask::GetRoomAvatar, room_id.clone())
}
// TODO: Fix return code
async fn get_members(&self, room_id: &RoomId) -> anyhow::Result<Vec<RoomMember>> {
match request_to_worker!(self, WorkerTask::GetRoomMembers, room_id.clone()) {
Ok(matrix_room_members) => {
Ok(join_all(matrix_room_members.iter().map(|member| async {
RoomMember::from_matrix(member, room_id, Rc::new(self.clone())).await
}))
.await)
}
Err(err) => Err(err),
}
}
}
#[async_trait(?Send)]
impl SpaceMessagingProviderInterface for Requester {}
#[async_trait(?Send)]
impl MemberMessagingProviderInterface for Requester {
async fn get_avatar(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> anyhow::Result<Option<Avatar>> {
request_to_worker!(
self,
WorkerTask::GetRoomMemberAvatar,
room_id.clone(),
user_id.clone()
)
}
} }

View File

@@ -0,0 +1,58 @@
use std::fmt::{Debug, Formatter};
use matrix_sdk::ruma::{OwnedRoomId, OwnedUserId};
use tokio::sync::broadcast::Receiver;
#[derive(Clone)]
pub enum RoomEvent {
Invitation(),
#[allow(dead_code)]
Membership(OwnedUserId, bool),
NewTopic(Option<String>),
NewName(Option<String>),
NewChild(OwnedRoomId),
}
impl Debug for RoomEvent {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
Self::Invitation() => f
.debug_tuple("RoomEvent::Invitation")
.field(&format_args!("_"))
.finish(),
Self::Membership(user_id, is_account_user) => f
.debug_tuple("RoomEvent::Membership")
.field(user_id)
.field(is_account_user)
.finish(),
Self::NewTopic(topic) => f.debug_tuple("RoomEvent::NewTopic").field(topic).finish(),
Self::NewName(name) => f.debug_tuple("RoomEvent::NewName").field(name).finish(),
Self::NewChild(room_id) => f
.debug_tuple("SpaceEvent::NewChild")
.field(room_id)
.finish(),
}
}
}
pub struct RoomEventsReceiver(Receiver<RoomEvent>);
impl Clone for RoomEventsReceiver {
fn clone(&self) -> Self {
Self(self.0.resubscribe())
}
}
impl RoomEventsReceiver {
pub fn new(inner: Receiver<RoomEvent>) -> Self {
Self(inner)
}
}
impl From<RoomEventsReceiver> for Receiver<RoomEvent> {
fn from(val: RoomEventsReceiver) -> Self {
val.0
}
}

View File

@@ -1,19 +1,32 @@
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use matrix_sdk::{
room::RoomMember,
ruma::{OwnedRoomId, OwnedUserId},
};
use crate::utils::Sender; use crate::utils::Sender;
#[derive(Debug)] #[derive(Debug)]
pub enum LoginStyle { pub enum LoginStyle {
// SessionRestore(Session),
Password(String, String), Password(String, String),
} }
pub enum WorkerTask { pub enum WorkerTask {
// Init(AsyncProgramStore, ClientReply<()>), Init(Sender<anyhow::Result<()>>),
// Init(ClientReply<()>),
Init(Sender<()>),
//Login(LoginStyle, ClientReply<EditInfo>),
Login(LoginStyle, Sender<anyhow::Result<()>>), Login(LoginStyle, Sender<anyhow::Result<()>>),
RunForever(Sender<()>),
GetDisplayName(Sender<anyhow::Result<Option<String>>>),
GetAvatar(Sender<anyhow::Result<Option<Vec<u8>>>>),
GetRoomAvatar(OwnedRoomId, Sender<anyhow::Result<Option<Vec<u8>>>>),
GetRoomMembers(OwnedRoomId, Sender<anyhow::Result<Vec<RoomMember>>>),
GetRoomMemberAvatar(
OwnedRoomId,
OwnedUserId,
Sender<anyhow::Result<Option<Vec<u8>>>>,
),
} }
impl Debug for WorkerTask { impl Debug for WorkerTask {
@@ -24,11 +37,38 @@ impl Debug for WorkerTask {
.field(&format_args!("_")) .field(&format_args!("_"))
// .field(&format_args!("_")) // .field(&format_args!("_"))
.finish(), .finish(),
WorkerTask::RunForever(_) => f
.debug_tuple("WorkerTask::RunForever")
.field(&format_args!("_"))
.finish(),
WorkerTask::Login(style, _) => f WorkerTask::Login(style, _) => f
.debug_tuple("WorkerTask::Login") .debug_tuple("WorkerTask::Login")
.field(style) .field(style)
// .field(&format_args!("_")) // .field(&format_args!("_"))
.finish(), .finish(),
WorkerTask::GetDisplayName(_) => f
.debug_tuple("WorkerTask::GetDisplayName")
.field(&format_args!("_"))
.finish(),
WorkerTask::GetAvatar(_) => f
.debug_tuple("WorkerTask::GetAvatar")
.field(&format_args!("_"))
.finish(),
WorkerTask::GetRoomAvatar(id, _) => f
.debug_tuple("WorkerTask::GetRoomAvatar")
.field(id)
.finish(),
WorkerTask::GetRoomMembers(id, _) => f
.debug_tuple("WorkerTask::GetRoomMembers")
.field(id)
.finish(),
WorkerTask::GetRoomMemberAvatar(room_id, user_id, _) => f
.debug_tuple("WorkerTask::GetRoomMemberAvatar")
.field(room_id)
.field(user_id)
.finish(),
} }
} }
} }

View File

@@ -1 +1,2 @@
pub(crate) mod mozaik_builder;
pub(crate) mod random_svg_generators; pub(crate) mod random_svg_generators;

View File

@@ -0,0 +1,92 @@
use std::io::Cursor;
use image::imageops::FilterType;
use image::io::Reader;
use image::{DynamicImage, ImageFormat};
use image::{GenericImage, RgbImage};
use tracing::{error, warn};
fn from_raw_to_image(raw: &Vec<u8>) -> Option<DynamicImage> {
match Reader::new(Cursor::new(raw)).with_guessed_format() {
Ok(reader) => match reader.decode() {
Ok(image) => return Some(image),
Err(err) => error!("Unable to decode the image: {}", err),
},
Err(err) => {
error!("Unable to read the image: {}", err)
}
}
None
}
pub fn create_mozaik(
width_px: u32,
height_px: u32,
images: &[Vec<u8>],
padding_image: &Option<Vec<u8>>,
) -> Vec<u8> {
let placeholder = DynamicImage::new_rgb8(128, 128);
let images: Vec<Option<DynamicImage>> = images.iter().map(from_raw_to_image).collect();
let padding_image = if let Some(padding_image) = padding_image {
from_raw_to_image(padding_image)
} else {
None
};
let mut bytes: Vec<u8> = Vec::new();
let mut allocations: Vec<&Option<DynamicImage>> = vec![];
let mut images_per_row = 1;
let mut images_per_col = 1;
match images.len() {
0 => {
allocations.push(&padding_image);
}
1 => {
allocations.push(&images[0]);
}
2 => {
allocations.extend_from_slice(&[&images[0], &images[1], &images[1], &images[0]]);
images_per_row = 2;
images_per_col = 2;
}
_ => {
// TODO: Manage other cases
warn!("For now, we only manage the rendering of mozaic with less than 3 images");
return bytes;
}
}
let image_width_px = width_px / images_per_row;
let image_height_px = height_px / images_per_col;
let mut output = RgbImage::new(width_px, height_px);
let mut row_pos = 0;
for (index, image) in allocations.iter().enumerate() {
if index > 0 && index % images_per_row as usize == 0 {
row_pos += 1;
}
let col_pos = index - (images_per_row as usize * row_pos);
let image = *image;
let scaled = image
.as_ref()
.unwrap_or(&placeholder)
.resize_to_fill(image_width_px, image_height_px, FilterType::Nearest)
.into_rgb8();
let output_image_pos_x = col_pos as u32 * image_width_px;
let output_image_pos_y = row_pos as u32 * image_height_px;
let _ = output.copy_from(&scaled, output_image_pos_x, output_image_pos_y);
}
let _ = output.write_to(&mut Cursor::new(&mut bytes), ImageFormat::Jpeg);
bytes
}

View File

@@ -116,7 +116,7 @@ async fn fetch_text(req: String) -> RequestResult<String> {
async fn fetch_dicebear_svg( async fn fetch_dicebear_svg(
r#type: &DicebearType, r#type: &DicebearType,
req_fields: &Vec<String>, req_fields: &[String],
placeholder_fetcher: Option<Box<impl Future<Output = Option<String>>>>, placeholder_fetcher: Option<Box<impl Future<Output = Option<String>>>>,
) -> String { ) -> String {
// TODO: Use configuration file // TODO: Use configuration file
@@ -146,7 +146,7 @@ async fn fetch_dicebear_svg(
} }
#[cfg(feature = "desktop")] #[cfg(feature = "desktop")]
fn gen_placeholder_fetcher<'a>(path: &'static str) -> Box<impl Future<Output = Option<String>>> { fn gen_placeholder_fetcher(path: &'static str) -> Box<impl Future<Output = Option<String>>> {
let path = format!("./public/{}", &path); let path = format!("./public/{}", &path);
Box::new(async move { Box::new(async move {
match read_to_string(&path).await { match read_to_string(&path).await {

View File

@@ -3,7 +3,6 @@ use dioxus::prelude::*;
use dioxus_free_icons::icons::fa_solid_icons::{ use dioxus_free_icons::icons::fa_solid_icons::{
FaComments, FaLayerGroup, FaMagnifyingGlass, FaPeopleGroup, FaComments, FaLayerGroup, FaMagnifyingGlass, FaPeopleGroup,
}; };
use dioxus_free_icons::icons::md_navigation_icons::MdArrowDropDown;
use dioxus_free_icons::{Icon, IconShape}; use dioxus_free_icons::{Icon, IconShape};
turf::style_sheet!("src/ui/components/icons.scss"); turf::style_sheet!("src/ui/components/icons.scss");
@@ -26,9 +25,6 @@ macro_rules! transparent_icon {
}; };
} }
// TODO: Remove this icon once the conversation panel finished
transparent_icon!(DownArrowIcon, MdArrowDropDown);
transparent_icon!(SearchIcon, FaMagnifyingGlass); transparent_icon!(SearchIcon, FaMagnifyingGlass);
transparent_icon!(SpacesIcon, FaLayerGroup); transparent_icon!(SpacesIcon, FaLayerGroup);
transparent_icon!(ChatsIcon, FaComments); transparent_icon!(ChatsIcon, FaComments);

View File

@@ -538,7 +538,7 @@ fn generate_modal(
on_confirm: on_confirm, on_confirm: on_confirm,
div { div {
{rendered_suggestions.into_iter()} {rendered_suggestions.iter()}
} }
} }
} }
@@ -608,15 +608,16 @@ pub fn Login() -> Element {
generate_random_svg_shape(Some(&shape_config)).await generate_random_svg_shape(Some(&shape_config)).await
}); });
let avatar = match &*random_avatar_future.read_unchecked() { let avatar = (*random_avatar_future.read_unchecked())
Some(svg) => Some(rsx! { .as_ref()
.map(|svg| {
rsx! {
div { div {
class: ClassName::LOGIN_AVATAR_CONTENT, class: ClassName::LOGIN_AVATAR_CONTENT,
dangerous_inner_html: svg.as_str(), dangerous_inner_html: svg.as_str(),
} }
}), }
None => None, });
};
if *spinner_animated.read() && SESSION.read().is_logged { if *spinner_animated.read() && SESSION.read().is_logged {
debug!("Stop spinner"); debug!("Stop spinner");

View File

@@ -59,15 +59,16 @@ pub fn Modal(props: ModalProps) -> Element {
let random_figure_future = let random_figure_future =
use_resource(move || async move { generate_random_svg_avatar(avatar_config).await }); use_resource(move || async move { generate_random_svg_avatar(avatar_config).await });
let icon = match &*random_figure_future.read_unchecked() { let icon = (*random_figure_future.read_unchecked())
Some(svg) => Some(rsx! { .as_ref()
.map(|svg| {
rsx! {
div { div {
class: ClassName::MODAL_CONTENT_ICON_PLACEHOLDER, class: ClassName::MODAL_CONTENT_ICON_PLACEHOLDER,
dangerous_inner_html: svg.as_str(), dangerous_inner_html: svg.as_str(),
} }
}), }
None => None, });
};
let button_class = match &props.severity { let button_class = match &props.severity {
Severity::Ok => SuccessButton, Severity::Ok => SuccessButton,

View File

@@ -8,6 +8,7 @@ impl<T> Receiver<T> {
} }
} }
#[derive(Clone)]
pub struct Sender<T>(_Sender<T>); pub struct Sender<T>(_Sender<T>);
// TODO: Handle error // TODO: Handle error