1
0
Fork 0
forked from mirror/grapevine

Compare commits

...

3 commits

Author SHA1 Message Date
Charles Hall
132bd3ae3a
add scan_prefix method 2024-08-11 20:26:17 -07:00
Charles Hall
029e32971e
add basic typed key value store abstraction
Should eliminate a few classes of footgun when it's done.
2024-08-11 18:30:22 -07:00
Charles Hall
41c01ad1b7
fix mod/use order
I wish rustfmt could figure this out.
2024-08-11 18:24:19 -07:00
6 changed files with 620 additions and 4 deletions

53
Cargo.lock generated
View file

@ -713,6 +713,58 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "frunk"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11a351b59e12f97b4176ee78497dff72e4276fb1ceb13e19056aca7fa0206287"
dependencies = [
"frunk_core",
"frunk_derives",
"frunk_proc_macros",
]
[[package]]
name = "frunk_core"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af2469fab0bd07e64ccf0ad57a1438f63160c69b2e57f04a439653d68eb558d6"
[[package]]
name = "frunk_derives"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0fa992f1656e1707946bbba340ad244f0814009ef8c0118eb7b658395f19a2e"
dependencies = [
"frunk_proc_macro_helpers",
"quote",
"syn",
]
[[package]]
name = "frunk_proc_macro_helpers"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35b54add839292b743aeda6ebedbd8b11e93404f902c56223e51b9ec18a13d2c"
dependencies = [
"frunk_core",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "frunk_proc_macros"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71b85a1d4a9a6b300b41c05e8e13ef2feca03e0334127f29eca9506a7fe13a93"
dependencies = [
"frunk_core",
"frunk_proc_macro_helpers",
"quote",
"syn",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
@ -840,6 +892,7 @@ dependencies = [
"base64 0.22.1",
"bytes",
"clap",
"frunk",
"futures-util",
"hmac",
"html-escape",

View file

@ -95,6 +95,7 @@ axum-server = { version = "0.6.0", features = ["tls-rustls"] }
base64 = "0.22.1"
bytes = "1.6.0"
clap = { version = "4.5.4", default-features = false, features = ["std", "derive", "help", "usage", "error-context", "string", "wrap_help"] }
frunk = "0.4.2"
futures-util = { version = "0.3.30", default-features = false }
hmac = "0.12.1"
html-escape = "0.2.13"

View file

@ -1,6 +1,3 @@
pub(crate) mod abstraction;
pub(crate) mod key_value;
use std::{
collections::{BTreeMap, HashMap, HashSet},
fs,
@ -10,7 +7,6 @@ use std::{
sync::{Arc, Mutex, RwLock},
};
use abstraction::{KeyValueDatabaseEngine, KvTree};
use lru_cache::LruCache;
use ruma::{
events::{
@ -30,6 +26,12 @@ use crate::{
PduEvent, Result, Services, SERVICES,
};
pub(crate) mod abstraction;
pub(crate) mod key_value;
mod map;
use abstraction::{KeyValueDatabaseEngine, KvTree};
pub(crate) struct KeyValueDatabase {
db: Arc<dyn KeyValueDatabaseEngine>,

209
src/database/map.rs Normal file
View file

@ -0,0 +1,209 @@
//! A high-level strongly-typed abstraction over key-value stores
#![warn(missing_docs, clippy::missing_docs_in_private_items)]
use std::{
any::TypeId,
borrow::{Borrow, Cow},
error::Error,
};
use frunk::{HCons, HNil};
use futures_util::Stream;
#[cfg(test)]
mod tests;
/// Errors that can occur during key-value store operations
// Missing docs are allowed here since that kind of information should be
// encoded in the error messages themselves anyway.
#[allow(clippy::missing_docs_in_private_items, dead_code)]
#[derive(thiserror::Error, Debug)]
pub(crate) enum MapError {
#[cfg(any(feature = "sqlite", feature = "rocksdb"))]
#[error(transparent)]
Database(Box<dyn Error>),
#[error("failed to convert stored value into structured data")]
FromBytes(#[source] Box<dyn Error>),
}
/// A high-level representation of a key-value relation in a key-value store
#[allow(dead_code)]
pub(crate) trait Map {
/// The key type of this relation
type Key: ToBytes + FromBytes;
/// The value type of this relation
type Value: ToBytes + FromBytes;
/// Load a value based on its corresponding key
async fn get<K>(&self, key: &K) -> Result<Option<Self::Value>, MapError>
where
Self::Key: Borrow<K>,
K: ToBytes + ?Sized;
/// Insert or update a key-value pair
async fn set<K, V>(&self, key: &K, value: &V) -> Result<(), MapError>
where
Self::Key: Borrow<K>,
Self::Value: Borrow<V>,
K: ToBytes + ?Sized,
V: ToBytes + ?Sized;
/// Remove a key-value pair by its key
///
/// It is not an error to remove a key-value pair that is not present in the
/// store.
async fn del<K>(&self, key: &K) -> Result<(), MapError>
where
Self::Key: Borrow<K>,
K: ToBytes + ?Sized;
/// Get a stream of all key-value pairs whose key matches a key prefix
///
/// While it's possible to provide an entire key as the prefix, it's likely
/// more ergonomic and more performant to use [`Map::get`] in that case
/// instead.
#[rustfmt::skip]
async fn scan_prefix<P>(
&self,
key: &P,
) -> Result<
impl Stream<
Item = (Result<Self::Key, MapError>, Result<Self::Value, MapError>)
>,
MapError,
>
where
P: ToBytes + IsPrefixOf<Self::Key>;
}
/// Convert `Self` into bytes for storage in a key-value store
///
/// Implementations on types other than `HList`s must not contain `0xFF` bytes
/// in their serialized form.
///
/// [`FromBytes`] must be the exact inverse of this operation.
#[allow(dead_code)]
pub(crate) trait ToBytes {
/// Perform the conversion
fn to_bytes(&self) -> Cow<'_, [u8]>;
}
impl ToBytes for () {
fn to_bytes(&self) -> Cow<'_, [u8]> {
Cow::Borrowed(&[])
}
}
impl ToBytes for HNil {
fn to_bytes(&self) -> Cow<'_, [u8]> {
Cow::Borrowed(&[])
}
}
impl<H, T> ToBytes for HCons<H, T>
where
H: ToBytes,
T: ToBytes + 'static,
{
fn to_bytes(&self) -> Cow<'_, [u8]> {
let buf = self.head.to_bytes();
if TypeId::of::<T>() == TypeId::of::<HNil>() {
buf
} else {
let mut buf = buf.into_owned();
buf.push(0xFF);
buf.extend_from_slice(self.tail.to_bytes().as_ref());
Cow::Owned(buf)
}
}
}
impl ToBytes for String {
fn to_bytes(&self) -> Cow<'_, [u8]> {
Cow::Borrowed(self.as_bytes())
}
}
/// Convert from bytes stored in a key-value store into structured data
///
/// This should generally only be implemented by owned types.
///
/// [`ToBytes`] must be the exact inverse of this operation.
#[allow(dead_code)]
pub(crate) trait FromBytes
where
Self: Sized,
{
/// Perform the conversion
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Box<dyn Error>>;
}
impl FromBytes for () {
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Box<dyn Error>> {
bytes
.is_empty()
.then_some(())
.ok_or_else(|| "got bytes when none were expected".into())
}
}
impl FromBytes for HNil {
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Box<dyn Error>> {
bytes
.is_empty()
.then_some(HNil)
.ok_or_else(|| "got bytes when none were expected".into())
}
}
impl<H, T> FromBytes for HCons<H, T>
where
H: FromBytes,
T: FromBytes + 'static,
{
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Box<dyn Error>> {
let (head, tail) = if TypeId::of::<T>() == TypeId::of::<HNil>() {
// There is no spoon. I mean, tail.
(bytes, Vec::new())
} else {
let boundary = bytes
.iter()
.copied()
.position(|x| x == 0xFF)
.ok_or("map entry is missing a boundary")?;
// Don't include the boundary in the head or tail
let head = &bytes[..boundary];
let tail = &bytes[boundary + 1..];
(head.to_owned(), tail.to_owned())
};
Ok(HCons {
head: H::from_bytes(head)?,
tail: T::from_bytes(tail)?,
})
}
}
impl FromBytes for String {
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Box<dyn Error>> {
String::from_utf8(bytes).map_err(Into::into)
}
}
/// Ensures, at compile time, that one `HList` is a prefix of another
pub(crate) trait IsPrefixOf<HList> {}
impl<HList> IsPrefixOf<HList> for HNil {}
impl<Head, PrefixTail, Tail> IsPrefixOf<HCons<Head, Tail>>
for HCons<Head, PrefixTail>
where
PrefixTail: IsPrefixOf<Tail>,
{
}

246
src/database/map/tests.rs Normal file
View file

@ -0,0 +1,246 @@
use std::{
borrow::Borrow, collections::BTreeMap, marker::PhantomData,
mem::ManuallyDrop, sync::RwLock,
};
use frunk::{hlist, HList};
use futures_util::{stream, Stream, StreamExt};
use super::{FromBytes, Map, MapError, ToBytes};
mod conversions;
struct Iter<T, I> {
inner: ManuallyDrop<I>,
guard_ref: *mut T,
}
impl<T, I> Drop for Iter<T, I> {
fn drop(&mut self) {
// SAFETY: The following things must be true for this to be sound:
//
// * `inner`'s `Iterator` impl reads into memory held by `guard_ref` so
// the former must dropped first
// * `Self` must not impl `Clone` or else `guard_ref` could get
// double-free'd
// * `guard_ref` must be constructed by `Box::leak` for `Box::from_raw`
// to work
unsafe {
ManuallyDrop::drop(&mut self.inner);
drop(Box::from_raw(self.guard_ref));
}
}
}
impl<T, I> Iterator for Iter<T, I>
where
I: Iterator,
{
type Item = I::Item;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}
struct TestMap<K, V> {
storage: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
types: PhantomData<(K, V)>,
}
impl<K, V> TestMap<K, V> {
fn new() -> Self {
Self {
storage: RwLock::new(BTreeMap::new()),
types: PhantomData,
}
}
}
impl<K, V> Map for TestMap<K, V>
where
K: ToBytes + FromBytes,
V: ToBytes + FromBytes,
{
type Key = K;
type Value = V;
async fn get<KB>(&self, key: &KB) -> Result<Option<Self::Value>, MapError>
where
Self::Key: Borrow<KB>,
KB: ToBytes + ?Sized,
{
self.storage
.read()
.expect("lock should not be poisoned")
.get(key.borrow().to_bytes().as_ref())
.map(|v| {
Self::Value::from_bytes(v.to_owned())
.map_err(MapError::FromBytes)
})
.transpose()
}
async fn set<KB, VB>(&self, key: &KB, value: &VB) -> Result<(), MapError>
where
Self::Key: Borrow<KB>,
Self::Value: Borrow<VB>,
KB: ToBytes + ?Sized,
VB: ToBytes + ?Sized,
{
self.storage.write().expect("lock should not be poisoned").insert(
key.borrow().to_bytes().into_owned(),
value.borrow().to_bytes().into_owned(),
);
Ok(())
}
async fn del<KB>(&self, key: &KB) -> Result<(), MapError>
where
Self::Key: Borrow<KB>,
KB: ToBytes + ?Sized,
{
self.storage
.write()
.expect("lock should not be poisoned")
.remove(key.borrow().to_bytes().as_ref());
Ok(())
}
#[rustfmt::skip]
async fn scan_prefix<P>(
&self,
key: &P,
) -> Result<
impl Stream<
Item = (Result<Self::Key, MapError>, Result<Self::Value, MapError>)
>,
MapError,
>
where
P: ToBytes,
{
let guard = self
.storage
.read()
.expect("lock should not be poisoned");
let guard = Box::leak(Box::new(guard));
let guard_ref: *mut _ = guard;
let inner = guard
.iter()
.filter(|(kb, _)| kb.starts_with(key.borrow().to_bytes().as_ref()))
.map(|(kb, vb)| {
(
Self::Key::from_bytes(kb.to_owned())
.map_err(MapError::FromBytes),
Self::Value::from_bytes(vb.to_owned())
.map_err(MapError::FromBytes),
)
});
Ok(stream::iter(Iter {
inner: ManuallyDrop::new(inner),
guard_ref,
}))
}
}
#[tokio::test]
async fn string_to_string() {
let test_map = TestMap::<String, String>::new();
let key = "hello".to_owned();
let value = "world".to_owned();
test_map.set(&key, &value).await.expect("insertion should succed");
let actual_value = test_map.get(&key).await.expect("lookup should succeed");
assert_eq!(Some(value), actual_value);
test_map.del(&key).await.expect("deletion should succeed");
let actual_value = test_map.get(&key).await.expect("lookup should succeed");
assert_eq!(None, actual_value);
}
#[tokio::test]
async fn hlist_to_hlist() {
let test_map =
TestMap::<HList![String, String], HList![String, String]>::new();
let key = hlist!["hello".to_owned(), "world".to_owned()];
let value = hlist!["test".to_owned(), "suite".to_owned()];
test_map.set(&key, &value).await.expect("insertion should succed");
let actual_value = test_map.get(&key).await.expect("lookup should succeed");
assert_eq!(Some(value), actual_value);
test_map.del(&key).await.expect("deletion should succeed");
let actual_value = test_map.get(&key).await.expect("lookup should succeed");
assert_eq!(None, actual_value);
}
#[tokio::test]
async fn hlist_scan_prefix() {
let test_map =
TestMap::<HList![String, String], HList![String, String]>::new();
let key = hlist!["hello".to_owned(), "world".to_owned()];
let value = hlist!["test".to_owned(), "suite".to_owned()];
test_map.set(&key, &value).await.expect("insertion should succed");
let key = hlist!["hello".to_owned(), "debugger".to_owned()];
let value = hlist!["tester".to_owned(), "suiter".to_owned()];
test_map.set(&key, &value).await.expect("insertion should succed");
let key = hlist!["shouldn't".to_owned(), "appear".to_owned()];
let value = hlist!["in".to_owned(), "assertions".to_owned()];
test_map.set(&key, &value).await.expect("insertion should succed");
let prefix = hlist!["hello".to_owned()];
let mut stream = test_map
.scan_prefix(&prefix)
.await
.expect("scanning should succeed")
.enumerate();
while let Some((i, next)) = stream.next().await {
let (key, value) = next;
let (key, value) = (
key.expect("key decoding should succeed"),
value.expect("value decoding should succeed"),
);
// Ordering is guaranteed because BTreeMap
match i {
0 => {
assert_eq!(
key,
hlist!["hello".to_owned(), "debugger".to_owned()]
);
assert_eq!(
value,
hlist!["tester".to_owned(), "suiter".to_owned()]
);
}
1 => {
assert_eq!(key, hlist!["hello".to_owned(), "world".to_owned()]);
assert_eq!(
value,
hlist!["test".to_owned(), "suite".to_owned()]
);
}
_ => unreachable!(),
}
}
}

View file

@ -0,0 +1,105 @@
use frunk::{hlist, HList};
use super::super::{FromBytes, ToBytes};
#[test]
pub(crate) fn serialize_hlist_0() {
let expected: &[u8] = &[];
let actual = hlist![];
let actual_bytes = actual.to_bytes();
assert_eq!(expected, actual_bytes.as_ref());
}
#[test]
pub(crate) fn serialize_hlist_1() {
let expected =
[b"hello"].into_iter().flatten().copied().collect::<Vec<_>>();
let actual = hlist!["hello".to_owned()];
let actual_bytes = actual.to_bytes();
assert_eq!(expected.as_slice(), actual_bytes.as_ref());
}
#[test]
pub(crate) fn serialize_hlist_2() {
let expected = [b"hello", [0xFF].as_slice(), b"world"]
.into_iter()
.flatten()
.copied()
.collect::<Vec<_>>();
let actual = hlist!["hello".to_owned(), "world".to_owned()];
let actual_bytes = actual.to_bytes();
assert_eq!(expected.as_slice(), actual_bytes.as_ref());
}
#[test]
pub(crate) fn serialize_hlist_3() {
let expected =
[b"what's", [0xFF].as_slice(), b"up", [0xFF].as_slice(), b"world"]
.into_iter()
.flatten()
.copied()
.collect::<Vec<_>>();
let actual =
hlist!["what's".to_owned(), "up".to_owned(), "world".to_owned()];
let actual_bytes = actual.to_bytes();
assert_eq!(expected.as_slice(), actual_bytes.as_ref());
}
#[test]
pub(crate) fn deserialize_hlist_0() {
let actual = <HList![]>::from_bytes(Vec::new())
.expect("should be able to deserialize");
assert_eq!(hlist![], actual);
}
#[test]
pub(crate) fn deserialize_hlist_1() {
let serialized =
[b"hello"].into_iter().flatten().copied().collect::<Vec<_>>();
let actual = <HList![String]>::from_bytes(serialized)
.expect("should be able to deserialize");
assert_eq!(hlist!["hello".to_owned()], actual);
}
#[test]
pub(crate) fn deserialize_hlist_2() {
let serialized = [b"hello", [0xFF].as_slice(), b"world"]
.into_iter()
.flatten()
.copied()
.collect::<Vec<_>>();
let actual = <HList![String, String]>::from_bytes(serialized)
.expect("should be able to deserialize");
assert_eq!(hlist!["hello".to_owned(), "world".to_owned()], actual);
}
#[test]
pub(crate) fn deserialize_hlist_3() {
let serialized =
[b"what's", [0xFF].as_slice(), b"up", [0xFF].as_slice(), b"world"]
.into_iter()
.flatten()
.copied()
.collect::<Vec<_>>();
let actual = <HList![String, String, String]>::from_bytes(serialized)
.expect("should be able to deserialize");
assert_eq!(
hlist!["what's".to_owned(), "up".to_owned(), "world".to_owned()],
actual
);
}