1
0
Fork 0
forked from mirror/grapevine

Properly type stored EDUs

This commit is contained in:
Lambda 2024-08-26 17:10:43 +00:00
parent 26322d5a95
commit cce83beedb
3 changed files with 29 additions and 20 deletions

View file

@ -5,6 +5,7 @@ use ruma::{
client::{error::ErrorKind, to_device::send_event_to_device},
federation::{self, transactions::edu::DirectDeviceContent},
},
serde::Raw,
to_device::DeviceIdOrAllDevices,
};
@ -40,7 +41,7 @@ pub(crate) async fn send_event_to_device_route(
services().sending.send_reliable_edu(
target_user_id.server_name(),
serde_json::to_vec(
Raw::new(
&federation::transactions::edu::Edu::DirectToDevice(
DirectDeviceContent {
sender: sender_user.clone(),

View file

@ -1,4 +1,4 @@
use ruma::{ServerName, UserId};
use ruma::{serde::Raw, ServerName, UserId};
use crate::{
database::KeyValueDatabase,
@ -69,7 +69,7 @@ impl service::sending::Data for KeyValueDatabase {
);
}
let value = if let SendingEventType::Edu(value) = &event {
&**value
value.json().get().as_bytes()
} else {
&[]
};
@ -100,7 +100,7 @@ impl service::sending::Data for KeyValueDatabase {
) -> Result<()> {
for (e, key) in events {
let value = if let SendingEventType::Edu(value) = &e {
&**value
value.json().get().as_bytes()
} else {
&[]
};
@ -205,7 +205,13 @@ fn parse_servercurrentevent(
if value.is_empty() {
SendingEventType::Pdu(PduId::new(event.to_vec()))
} else {
SendingEventType::Edu(value)
SendingEventType::Edu(
Raw::from_json_string(
String::from_utf8(value)
.expect("EDU content in database should be a string"),
)
.expect("EDU content in database should be valid JSON"),
)
},
))
}

View file

@ -28,8 +28,10 @@ use ruma::{
push_rules::PushRulesEvent, receipt::ReceiptType,
AnySyncEphemeralRoomEvent, GlobalAccountDataEventType,
},
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId,
ServerName, UInt, UserId,
push,
serde::Raw,
uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, ServerName,
UInt, UserId,
};
use tokio::{
select,
@ -81,12 +83,12 @@ impl Destination {
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Debug)]
pub(crate) enum SendingEventType {
// pduid
Pdu(PduId),
// pdu json
Edu(Vec<u8>),
Edu(Raw<Edu>),
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
@ -444,7 +446,7 @@ impl Service {
pub(crate) fn select_edus(
&self,
server_name: &ServerName,
) -> Result<(Vec<Vec<u8>>, u64)> {
) -> Result<(Vec<Raw<Edu>>, u64)> {
// u64: count of last edu
let since = self.db.get_latest_educount(server_name)?;
let mut events = Vec::new();
@ -532,7 +534,7 @@ impl Service {
};
events.push(
serde_json::to_vec(&federation_event)
Raw::new(&federation_event)
.expect("json can be serialized"),
);
@ -555,9 +557,7 @@ impl Service {
keys: None,
});
events.push(
serde_json::to_vec(&edu).expect("json can be serialized"),
);
events.push(Raw::new(&edu).expect("json can be serialized"));
}
Ok((events, max_edu_count))
@ -622,7 +622,7 @@ impl Service {
pub(crate) fn send_reliable_edu(
&self,
server: &ServerName,
serialized: Vec<u8>,
serialized: Raw<Edu>,
id: u64,
) -> Result<()> {
let destination = Destination::Normal(server.to_owned());
@ -759,7 +759,9 @@ async fn handle_appservice_event(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b) => &**b,
SendingEventType::Edu(b) => {
b.json().get().as_bytes()
}
SendingEventType::Pdu(b) => b.as_bytes(),
})
.collect::<Vec<_>>(),
@ -885,9 +887,7 @@ async fn handle_federation_event(
));
}
SendingEventType::Edu(edu) => {
if let Ok(raw) = serde_json::from_slice(edu) {
edu_jsons.push(raw);
}
edu_jsons.push(edu.clone());
}
}
}
@ -906,7 +906,9 @@ async fn handle_federation_event(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b) => &**b,
SendingEventType::Edu(b) => {
b.json().get().as_bytes()
}
SendingEventType::Pdu(b) => b.as_bytes(),
})
.collect::<Vec<_>>(),