1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use std::ffi::CStr;
use libc::{self, c_char, c_void};
use crate::{
compaction_filter::{self, CompactionFilter},
ffi,
};
pub trait CompactionFilterFactory {
type Filter: CompactionFilter;
fn create(&mut self, context: CompactionFilterContext) -> Self::Filter;
fn name(&self) -> &CStr;
}
pub unsafe extern "C" fn destructor_callback<F>(raw_self: *mut c_void)
where
F: CompactionFilterFactory,
{
Box::from_raw(raw_self as *mut F);
}
pub unsafe extern "C" fn name_callback<F>(raw_self: *mut c_void) -> *const c_char
where
F: CompactionFilterFactory,
{
let self_ = &*(raw_self as *const c_void as *const F);
self_.name().as_ptr()
}
pub struct CompactionFilterContext {
pub is_full_compaction: bool,
pub is_manual_compaction: bool,
}
impl CompactionFilterContext {
unsafe fn from_raw(ptr: *mut ffi::rocksdb_compactionfiltercontext_t) -> Self {
let is_full_compaction = ffi::rocksdb_compactionfiltercontext_is_full_compaction(ptr) != 0;
let is_manual_compaction =
ffi::rocksdb_compactionfiltercontext_is_manual_compaction(ptr) != 0;
Self {
is_full_compaction,
is_manual_compaction,
}
}
}
pub unsafe extern "C" fn create_compaction_filter_callback<F>(
raw_self: *mut c_void,
context: *mut ffi::rocksdb_compactionfiltercontext_t,
) -> *mut ffi::rocksdb_compactionfilter_t
where
F: CompactionFilterFactory,
{
let self_ = &mut *(raw_self as *mut F);
let context = CompactionFilterContext::from_raw(context);
let filter = Box::new(self_.create(context));
let filter_ptr = Box::into_raw(filter);
ffi::rocksdb_compactionfilter_create(
filter_ptr as *mut c_void,
Some(compaction_filter::destructor_callback::<F::Filter>),
Some(compaction_filter::filter_callback::<F::Filter>),
Some(compaction_filter::name_callback::<F::Filter>),
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::compaction_filter::Decision;
use crate::{Options, DB};
use std::ffi::CString;
struct CountFilter(u16, CString);
impl CompactionFilter for CountFilter {
fn filter(&mut self, _level: u32, _key: &[u8], _value: &[u8]) -> crate::CompactionDecision {
self.0 += 1;
if self.0 > 2 {
Decision::Remove
} else {
Decision::Keep
}
}
fn name(&self) -> &CStr {
&self.1
}
}
struct TestFactory(CString);
impl CompactionFilterFactory for TestFactory {
type Filter = CountFilter;
fn create(&mut self, _context: CompactionFilterContext) -> Self::Filter {
CountFilter(0, CString::new("CountFilter").unwrap())
}
fn name(&self) -> &CStr {
&self.0
}
}
#[test]
fn compaction_filter_factory_test() {
let path = "_rust_rocksdb_filter_factory_test";
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_compaction_filter_factory(TestFactory(CString::new("TestFactory").unwrap()));
{
let db = DB::open(&opts, path).unwrap();
let _r = db.put(b"k1", b"a");
let _r = db.put(b"_rk", b"b");
let _r = db.put(b"%k", b"c");
db.compact_range(None::<&[u8]>, None::<&[u8]>);
assert_eq!(db.get(b"%k1").unwrap(), None);
}
let result = DB::destroy(&opts, path);
assert!(result.is_ok());
}
}