♻️ Add Room domain entity

This commit is contained in:
2024-04-10 13:39:45 +02:00
parent a7bccfa779
commit c580fba315
8 changed files with 447 additions and 339 deletions

View File

@@ -1,10 +1,15 @@
use std::borrow::Borrow;
use std::cell::RefCell;
use std::sync::Arc;
use std::time::Duration;
use async_std::task;
use dioxus::prelude::Task;
use log::{debug, error};
use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use matrix_sdk::{
@@ -13,25 +18,11 @@ use matrix_sdk::{
room::Room as MatrixRoom,
ruma::{
events::{
key::verification::{
done::{OriginalSyncKeyVerificationDoneEvent, ToDeviceKeyVerificationDoneEvent},
key::{OriginalSyncKeyVerificationKeyEvent, ToDeviceKeyVerificationKeyEvent},
request::ToDeviceKeyVerificationRequestEvent,
start::{OriginalSyncKeyVerificationStartEvent, ToDeviceKeyVerificationStartEvent},
},
presence::PresenceEvent,
reaction::ReactionEventContent,
room::{
member::{
OriginalSyncRoomMemberEvent, RoomMemberEventContent, StrippedRoomMemberEvent,
},
message::RoomMessageEventContent,
name::RoomNameEventContent,
redaction::OriginalSyncRoomRedactionEvent,
member::{RoomMemberEventContent, StrippedRoomMemberEvent},
topic::RoomTopicEventContent,
},
typing::SyncTypingEvent,
SyncMessageLikeEvent, SyncStateEvent,
SyncStateEvent,
},
OwnedRoomId,
},
@@ -40,7 +31,7 @@ use matrix_sdk::{
use super::requester::{Receivers, Requester};
use super::worker_tasks::{LoginStyle, WorkerTask};
use crate::base::Room;
use crate::domain::model::room::Room;
#[derive(thiserror::Error, Debug)]
pub enum ClientError {
@@ -57,49 +48,58 @@ pub enum RoomEvent {
#[derive(Clone)]
struct Senders {
room_sender: Sender<RoomEvent>,
room_events_sender: Sender<RoomEvent>,
}
impl Senders {
fn new(room_sender: Sender<RoomEvent>) -> Self {
Self { room_sender }
fn new(room_events_sender: Sender<RoomEvent>) -> Self {
Self { room_events_sender }
}
}
pub struct Client {
initialized: bool,
client: Option<Arc<MatrixClient>>,
sync_handle: Option<JoinHandle<()>>,
sync_task: Option<Task>,
senders: Senders,
}
impl Client {
pub fn new(client: Arc<MatrixClient>, room_sender: Sender<RoomEvent>) -> Self {
pub fn new(client: Arc<MatrixClient>, room_events_sender: Sender<RoomEvent>) -> Self {
Self {
initialized: false,
client: Some(client),
sync_handle: None,
senders: Senders::new(room_sender),
sync_task: None,
senders: Senders::new(room_events_sender),
}
}
async fn on_sync_typing_event(_ev: SyncTypingEvent, room: MatrixRoom) {
debug!("== on_sync_typing_event ==");
let room_id = room.room_id().to_owned();
dbg!(room_id);
}
// async fn on_sync_typing_event(_ev: SyncTypingEvent, room: MatrixRoom) {
// debug!("== on_sync_typing_event ==");
// let room_id = room.room_id().to_owned();
// dbg!(room_id);
// }
async fn on_presence_event(_ev: PresenceEvent) {
debug!("== on_presence_event ==");
dbg!(_ev);
}
// async fn on_presence_event(_ev: PresenceEvent) {
// debug!("== on_presence_event ==");
// dbg!(_ev);
// }
async fn on_sync_state_event(ev: SyncStateEvent<RoomNameEventContent>, _room: MatrixRoom) {
error!("== on_sync_state_event ==");
if let SyncStateEvent::Original(ev) = ev {
dbg!(ev);
}
}
// async fn on_sync_state_event(ev: SyncStateEvent<RoomNameEventContent>, _room: MatrixRoom) {
// error!("== on_sync_state_event ==");
// if let SyncStateEvent::Original(ev) = ev {
// dbg!(ev);
// }
// }
// async fn on_original_sync_room_message_event(
// ev: OriginalSyncRoomMessageEvent,
// _matrix_room: MatrixRoom,
// _senders: Ctx<Senders>,
// ) {
// error!("== on_original_sync_room_message_event ==");
// error!("ev={:?}", ev.content);
// }
async fn on_stripped_room_member_event(
ev: StrippedRoomMemberEvent,
@@ -107,20 +107,20 @@ impl Client {
matrix_room: MatrixRoom,
senders: Ctx<Senders>,
) {
if ev.state_key == matrix_client.user_id().unwrap() {
if matrix_room.state() == MatrixRoomState::Invited {
let room_id = matrix_room.room_id();
let room = Room::from_matrix_room(&matrix_room).await;
if ev.state_key == matrix_client.user_id().unwrap()
&& matrix_room.state() == MatrixRoomState::Invited
{
let room_id = matrix_room.room_id();
let room = Room::from_matrix_room(&matrix_room).await;
if let Err(err) = senders
.room_sender
.send(RoomEvent::InviteEvent(room_id.to_owned(), room))
{
error!(
"Unable to publish the new room with \"{}\" id: {}",
room_id, err
);
}
if let Err(err) = senders
.room_events_sender
.send(RoomEvent::InviteEvent(room_id.to_owned(), room))
{
error!(
"Unable to publish the new room with \"{}\" id: {}",
room_id, err
);
}
}
}
@@ -134,7 +134,7 @@ impl Client {
let room_id = matrix_room.room_id();
if let Err(err) = senders
.room_sender
.room_events_sender
.send(RoomEvent::TopicEvent(room_id.to_owned(), ev.content.topic))
{
error!("Unable to publish the \"{}\" new topic: {}", room_id, err);
@@ -148,12 +148,13 @@ impl Client {
senders: Ctx<Senders>,
) {
if let SyncStateEvent::Original(_ev) = ev {
let room_sender = &senders.room_sender;
let room_id = matrix_room.room_id();
let room = Room::from_matrix_room(&matrix_room).await;
if let Err(err) = room_sender.send(RoomEvent::MemberEvent(room_id.to_owned(), room)) {
if let Err(err) = senders
.room_events_sender
.send(RoomEvent::MemberEvent(room_id.to_owned(), room))
{
error!(
"Unable to publish the new room with \"{}\" id: {}",
room_id, err
@@ -162,103 +163,107 @@ impl Client {
}
}
async fn on_sync_message_like_room_message_event(
ev: SyncMessageLikeEvent<RoomMessageEventContent>,
_room: MatrixRoom,
_client: MatrixClient,
) {
debug!("== on_sync_message_like_room_message_event ==");
dbg!(ev);
}
// async fn on_sync_message_like_room_message_event(
// ev: SyncMessageLikeEvent<RoomMessageEventContent>,
// _room: MatrixRoom,
// _client: MatrixClient,
// ) {
// debug!("== on_sync_message_like_room_message_event ==");
// dbg!(ev);
// }
async fn on_sync_message_like_reaction_event(
ev: SyncMessageLikeEvent<ReactionEventContent>,
_room: MatrixRoom,
) {
debug!("== on_sync_message_like_reaction_event ==");
dbg!(ev);
}
// async fn on_sync_message_like_reaction_event(
// ev: SyncMessageLikeEvent<ReactionEventContent>,
// _room: MatrixRoom,
// ) {
// debug!("== on_sync_message_like_reaction_event ==");
// dbg!(ev);
// }
async fn on_original_sync_room_redaction_event(
ev: OriginalSyncRoomRedactionEvent,
_room: MatrixRoom,
) {
debug!("== on_original_sync_room_redaction_event ==");
dbg!(ev);
}
// async fn on_original_sync_room_redaction_event(
// ev: OriginalSyncRoomRedactionEvent,
// _room: MatrixRoom,
// ) {
// debug!("== on_original_sync_room_redaction_event ==");
// dbg!(ev);
// }
async fn on_original_sync_room_member_event(
_ev: OriginalSyncRoomMemberEvent,
_room: MatrixRoom,
_client: MatrixClient,
) {
debug!("== on_original_sync_room_member_event ==");
// async fn on_original_sync_room_member_event(
// _ev: OriginalSyncRoomMemberEvent,
// _room: MatrixRoom,
// _client: MatrixClient,
// ) {
// debug!("== on_original_sync_room_member_event ==");
// let mut store = store_ctx.read().unwrap().to_owned();
// dbg!(store.rooms.keys());
// let is_direct = room.is_direct().await.ok();
// store.rooms.insert(
// OwnedRoomId::from(room_id),
// Arc::new(RwLock::new(Room::new(Arc::new(room), None, is_direct))),
// );
// let _ = store_ctx.write(store);
}
// let mut store = store_ctx.read().unwrap().to_owned();
// dbg!(store.rooms.keys());
// let is_direct = room.is_direct().await.ok();
// store.rooms.insert(
// OwnedRoomId::from(room_id),
// Arc::new(RwLock::new(Room::new(Arc::new(room), None, is_direct))),
// );
// let _ = store_ctx.write(store);
// }
async fn on_original_sync_key_verif_start_event(
ev: OriginalSyncKeyVerificationStartEvent,
_client: MatrixClient,
) {
debug!("== on_original_sync_key_verif_start_event ==");
dbg!(ev);
}
// async fn on_original_sync_key_verif_start_event(
// ev: OriginalSyncKeyVerificationStartEvent,
// _client: MatrixClient,
// ) {
// debug!("== on_original_sync_key_verif_start_event ==");
// dbg!(ev);
// }
async fn on_original_sync_key_verif_key_event(
ev: OriginalSyncKeyVerificationKeyEvent,
_client: MatrixClient,
) {
debug!("== on_original_sync_key_verif_key_event ==");
dbg!(ev);
}
// async fn on_original_sync_key_verif_key_event(
// ev: OriginalSyncKeyVerificationKeyEvent,
// _client: MatrixClient,
// ) {
// debug!("== on_original_sync_key_verif_key_event ==");
// dbg!(ev);
// }
async fn on_original_sync_key_verif_done_event(
ev: OriginalSyncKeyVerificationDoneEvent,
_client: MatrixClient,
) {
debug!("== on_original_sync_key_verif_done_event ==");
dbg!(ev);
}
// async fn on_original_sync_key_verif_done_event(
// ev: OriginalSyncKeyVerificationDoneEvent,
// _client: MatrixClient,
// ) {
// debug!("== on_original_sync_key_verif_done_event ==");
// dbg!(ev);
// }
async fn on_device_key_verif_req_event(
ev: ToDeviceKeyVerificationRequestEvent,
_client: MatrixClient,
) {
debug!("== on_device_key_verif_req_event ==");
dbg!(ev);
}
// async fn on_device_key_verif_req_event(
// ev: ToDeviceKeyVerificationRequestEvent,
// _client: MatrixClient,
// ) {
// debug!("== on_device_key_verif_req_event ==");
// dbg!(ev);
// }
async fn on_device_key_verif_start_event(
ev: ToDeviceKeyVerificationStartEvent,
_client: MatrixClient,
) {
debug!("== on_device_key_verif_start_event ==");
dbg!(ev);
}
// async fn on_device_key_verif_start_event(
// ev: ToDeviceKeyVerificationStartEvent,
// _client: MatrixClient,
// ) {
// debug!("== on_device_key_verif_start_event ==");
// dbg!(ev);
// }
async fn on_device_key_verif_key_event(
ev: ToDeviceKeyVerificationKeyEvent,
_client: MatrixClient,
) {
debug!("== on_device_key_verif_key_event ==");
dbg!(ev);
}
// async fn on_device_key_verif_key_event(
// ev: ToDeviceKeyVerificationKeyEvent,
// _client: MatrixClient,
// ) {
// debug!("== on_device_key_verif_key_event ==");
// dbg!(ev);
// }
async fn on_device_key_verif_done_event(
ev: ToDeviceKeyVerificationDoneEvent,
_client: MatrixClient,
) {
debug!("== on_device_key_verif_done_event ==");
dbg!(ev);
}
// async fn on_device_key_verif_done_event(
// ev: ToDeviceKeyVerificationDoneEvent,
// _client: MatrixClient,
// ) {
// debug!("== on_device_key_verif_done_event ==");
// dbg!(ev);
// }
// async fn on_room_event(ev: SomeEvent, _senders: Ctx<Senders>) {
// debug!("== on_room_event({}) ==", ev.)
// }
pub async fn spawn(homeserver_url: String) -> Requester {
let (tx, rx) = unbounded_channel::<WorkerTask>();
@@ -275,10 +280,8 @@ impl Client {
let mut client = Client::new(matrix_client.clone(), room_sender);
tokio::spawn({
async move {
client.work(rx).await;
}
dioxus::prelude::spawn(async move {
client.work(rx).await;
});
Requester {
@@ -291,62 +294,34 @@ impl Client {
}
fn init(&mut self) {
let client = self.client.clone().unwrap();
if let Some(client) = self.client.borrow() {
client.add_event_handler_context(self.senders.clone());
client.add_event_handler_context(self.senders.clone());
let _ = client.add_event_handler(Client::on_stripped_room_member_event);
let _ = client.add_event_handler(Client::on_room_topic_event);
let _ = client.add_event_handler(Client::on_room_member_event);
let _ = client.add_event_handler(Client::on_sync_typing_event);
let _ = client.add_event_handler(Client::on_presence_event);
let _ = client.add_event_handler(Client::on_sync_state_event);
let _ = client.add_event_handler(Client::on_stripped_room_member_event);
let _ = client.add_event_handler(Client::on_sync_message_like_room_message_event);
let _ = client.add_event_handler(Client::on_sync_message_like_reaction_event);
let _ = client.add_event_handler(Client::on_original_sync_room_redaction_event);
let _ = client.add_event_handler(Client::on_original_sync_room_member_event);
let _ = client.add_event_handler(Client::on_original_sync_key_verif_start_event);
let _ = client.add_event_handler(Client::on_original_sync_key_verif_key_event);
let _ = client.add_event_handler(Client::on_original_sync_key_verif_done_event);
let _ = client.add_event_handler(Client::on_device_key_verif_req_event);
let _ = client.add_event_handler(Client::on_device_key_verif_start_event);
let _ = client.add_event_handler(Client::on_device_key_verif_key_event);
let _ = client.add_event_handler(Client::on_device_key_verif_done_event);
let _ = client.add_event_handler(Client::on_room_topic_event);
let _ = client.add_event_handler(Client::on_room_member_event);
// let _ = client.add_event_handler(Client::on_sync_typing_event);
// let _ = client.add_event_handler(Client::on_presence_event);
// let _ = client.add_event_handler(Client::on_sync_state_event);
// let _ = client.add_event_handler(Client::on_original_sync_room_message_event);
self.initialized = true;
// let _ = client.add_event_handler(Client::on_sync_message_like_room_message_event);
// let _ = client.add_event_handler(Client::on_sync_message_like_reaction_event);
// let _ = client.add_event_handler(Client::on_original_sync_room_redaction_event);
// let _ = client.add_event_handler(Client::on_original_sync_room_member_event);
// let _ = client.add_event_handler(Client::on_original_sync_key_verif_start_event);
// let _ = client.add_event_handler(Client::on_original_sync_key_verif_key_event);
// let _ = client.add_event_handler(Client::on_original_sync_key_verif_done_event);
// let _ = client.add_event_handler(Client::on_device_key_verif_req_event);
// let _ = client.add_event_handler(Client::on_device_key_verif_start_event);
// let _ = client.add_event_handler(Client::on_device_key_verif_key_event);
// let _ = client.add_event_handler(Client::on_device_key_verif_done_event);
self.initialized = true;
}
}
// async fn refresh_rooms(matrix_client: &MatrixClient, room_sender: &Sender<RoomMemberEvent>) {
// let joined_matrix_rooms_ref = &matrix_client.joined_rooms();
// let invited_matrix_rooms_ref = &matrix_client.invited_rooms();
// for matrix_rooms in [joined_matrix_rooms_ref, invited_matrix_rooms_ref] {
// for matrix_room in matrix_rooms.iter() {
// let topic = matrix_room.topic().map(RefCell::new);
// let room = Room::new(
// Arc::new(matrix_room.to_owned()),
// topic,
// matrix_room.is_direct().await.ok(),
// );
// if let Err(err) = room_sender.send(room) {
// warn!("Error: {}", err);
// }
// }
// }
// }
// async fn refresh_rooms_forever(matrix_client: &MatrixClient, room_channel: &Sender<RoomEvent>) {
// // TODO: Add interval to config
// let mut interval = tokio::time::interval(Duration::from_secs(5));
// loop {
// // Self::refresh_rooms(matrix_client, room_channel).await;
// interval.tick().await;
// }
// }
async fn login_and_sync(&mut self, style: LoginStyle) -> anyhow::Result<()> {
let client = self.client.clone().unwrap();
@@ -362,62 +337,95 @@ impl Client {
}
}
// let (synchronized_tx, synchronized_rx) = oneshot::channel();
let (synchronized_tx, synchronized_rx) = oneshot::channel::<bool>();
self.sync_handle = tokio::spawn({
async move {
// Sync once so we receive the client state and old messages
let sync_token_option = match client.sync_once(SyncSettings::default()).await {
Ok(sync_response) => Some(sync_response.next_batch),
Err(err) => {
error!("Error during sync one: {}", err);
None
}
};
if let Some(sync_token) = sync_token_option {
let settings = SyncSettings::default().token(sync_token);
debug!("User connected to the homeserver, start syncing");
let _ = client.sync(settings).await;
let task = dioxus::prelude::spawn(async move {
// Sync once so we receive the client state and old messages
let sync_token_option = match client.sync_once(SyncSettings::default()).await {
Ok(sync_response) => Some(sync_response.next_batch),
Err(err) => {
error!("Error during sync one: {}", err);
None
}
};
if let Some(sync_token) = sync_token_option {
let settings = SyncSettings::default().token(sync_token);
debug!("User connected to the homeserver, start syncing");
if let Err(err) = synchronized_tx.send(true) {
error!("Unable to notify that the Matrix client is now synchronized ({err})");
}
let _ = client.sync(settings).await;
}
})
.into();
});
self.sync_task = Some(task);
// self.start_background_tasks(synchronized_rx);
Ok(())
}
// async fn register_room_events(&self, room_id: OwnedRoomId) {
// let client = self.client.unwrap();
// client.add_room_event_handler(&room_id, Client::on_room_event);
// }
// async fn refresh_rooms(
// matrix_client: &Arc<MatrixClient>,
// room_events_sender: &Sender<RoomEvent>,
// ) {
// let joined_matrix_rooms_ref = &matrix_client.joined_rooms();
// let invited_matrix_rooms_ref = &matrix_client.invited_rooms();
// for matrix_rooms in [joined_matrix_rooms_ref, invited_matrix_rooms_ref] {
// for matrix_room in matrix_rooms.iter() {
// let room = Room::from_matrix_room(matrix_room).await;
// let event = RoomEvent::MemberEvent(room.id().clone(), room);
// if let Err(err) = room_events_sender.send(event) {
// error!("Error: {}", err);
// }
// }
// }
// }
// async fn refresh_rooms_forever(
// matrix_client: Arc<MatrixClient>,
// room_events_sender: &Sender<RoomEvent>,
// ) {
// // TODO: Add interval to config
// let period_sec = Duration::from_secs(5);
// loop {
// Self::refresh_rooms(&matrix_client, room_events_sender).await;
// task::sleep(period_sec).await;
// }
// }
// fn start_background_tasks(&mut self, synchronized_rx: oneshot::Receiver<bool>) {
// let client = self.client.clone().unwrap();
// let room_sender_ref = &self.senders.room_sender;
// let room_events_sender = self.senders.room_events_sender.clone();
// self.load_handle = tokio::spawn({
// to_owned![room_sender_ref];
// async move {
// if let Err(err) = synchronized_rx.await {
// error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})");
// }
// let rooms_refresh = Self::refresh_rooms_forever(
// client.as_ref(),
// &room_sender_ref
// );
// let ((),) = tokio::join!(rooms_refresh);
// let task = dioxus::prelude::spawn(async move {
// if let Err(err) = synchronized_rx.await {
// error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})");
// }
// })
// .into();
// debug!("Start room refreshing forever");
// let _ = Self::refresh_rooms_forever(client, &room_events_sender).await;
// });
// self.background_task = Some(task);
// }
async fn work(&mut self, mut rx: UnboundedReceiver<WorkerTask>) {
loop {
let task = rx.recv().await;
match task {
match rx.recv().await {
Some(task) => self.run(task).await,
None => {
break;
@@ -425,8 +433,8 @@ impl Client {
}
}
if let Some(handle) = self.sync_handle.take() {
handle.abort();
if let Some(task) = self.sync_task.take() {
task.cancel()
}
}
@@ -440,7 +448,10 @@ impl Client {
WorkerTask::Login(style, reply) => {
assert!(self.initialized);
reply.send(self.login_and_sync(style).await).await;
}
} // WorkerTask::registerRoomEvents(room_id, reply) => {
// assert!(self.initialized);
// reply.send(self.register_room_events(room_id).await).await;
// }
}
}
}