Avoid using lseek() to track disk cache file write offset

It's slow and not thread safe.
We use pwrite() so it's not reliable anyway.
This commit is contained in:
Kovid Goyal 2025-11-17 11:02:07 +05:30
parent bfca1763f2
commit e6d7e91000
No known key found for this signature in database
GPG key ID: 06BC317B515ACE7C
4 changed files with 50 additions and 55 deletions

View file

@ -87,6 +87,7 @@ typedef struct {
cache_map map;
Holes holes;
unsigned long long total_size;
off_t end_of_data_offset;
} DiskCache;
#define mutex(op) pthread_mutex_##op(&self->lock)
@ -136,23 +137,6 @@ open_cache_file(const char *cache_path) {
// Write loop {{{
static off_t
size_of_cache_file(DiskCache *self) {
off_t pos = lseek(self->cache_file_fd, 0, SEEK_CUR);
off_t ans = lseek(self->cache_file_fd, 0, SEEK_END);
lseek(self->cache_file_fd, pos, SEEK_SET);
return ans;
}
size_t
disk_cache_size_on_disk(PyObject *self) {
if (((DiskCache*)self)->cache_file_fd > -1) {
off_t ans = size_of_cache_file((DiskCache*)self);
return MAX(0, ans);
}
return 0;
}
typedef struct {
CacheKey key;
off_t old_offset, new_offset;
@ -181,7 +165,7 @@ defrag(DiskCache *self) {
RAII_FreeFastFileCopyBuffer(fcb);
bool lock_released = false, ok = false;
off_t size_on_disk = size_of_cache_file(self);
off_t size_on_disk = self->end_of_data_offset;
if (size_on_disk <= 0) goto cleanup;
size_t num_entries = vt_size(&self->map);
if (!num_entries) goto cleanup;
@ -236,6 +220,7 @@ cleanup:
if (!vt_is_end(i)) i.data->val->pos_in_cache_file = e->new_offset;
free(e->key.hash_key);
}
self->end_of_data_offset = lseek(self->cache_file_fd, 0, SEEK_CUR);
}
if (new_cache_file > -1) safe_close(new_cache_file, __FILE__, __LINE__);
}
@ -306,8 +291,7 @@ find_hole_to_use(DiskCache *self, const off_t required_sz) {
static inline bool
needs_defrag(DiskCache *self) {
off_t size_on_disk = size_of_cache_file(self);
return self->total_size && size_on_disk > 0 && (size_t)size_on_disk > self->total_size * self->defrag_factor;
return self->total_size && self->end_of_data_offset > 0 && (size_t)self->end_of_data_offset > self->total_size * self->defrag_factor;
}
static void
@ -395,7 +379,7 @@ write_dirty_entry(DiskCache *self) {
size_t left = self->currently_writing.val.data_sz;
uint8_t *p = self->currently_writing.val.data;
if (self->currently_writing.val.pos_in_cache_file < 0) {
self->currently_writing.val.pos_in_cache_file = size_of_cache_file(self);
self->currently_writing.val.pos_in_cache_file = self->end_of_data_offset;
if (self->currently_writing.val.pos_in_cache_file < 0) {
perror("Failed to seek in disk cache file");
return false;
@ -418,6 +402,7 @@ write_dirty_entry(DiskCache *self) {
left -= n;
p += n;
offset += n;
self->end_of_data_offset = MAX(self->end_of_data_offset, offset);
}
return true;
}
@ -434,6 +419,18 @@ retire_currently_writing(DiskCache *self) {
self->currently_writing.val.data_sz = 0;
}
static int
clear_disk_cache_with_lock_held(DiskCache *self) {
vt_cleanup(&self->map);
cleanup_holes(&self->holes);
self->total_size = 0;
self->end_of_data_offset = 0;
if (self->cache_file_fd > -1) {
if (ftruncate(self->cache_file_fd, 0) == -1) return errno;
}
return 0;
}
static void*
write_loop(void *data) {
DiskCache *self = (DiskCache*)data;
@ -457,9 +454,7 @@ write_loop(void *data) {
} else if (!count) {
mutex(lock);
count = vt_size(&self->map);
if (!count && self->cache_file_fd > -1) {
if (ftruncate(self->cache_file_fd, 0) == 0) lseek(self->cache_file_fd, 0, SEEK_END);
}
if (!count) clear_disk_cache_with_lock_held(self); // failure to truncate is not fatal
mutex(unlock);
}
@ -628,17 +623,18 @@ remove_from_disk_cache(PyObject *self_, const void *key, size_t key_sz) {
return removed;
}
void
static int
clear_disk_cache(PyObject *self_) {
// This is currently only used in testing
DiskCache *self = (DiskCache*)self_;
if (!ensure_state(self)) return;
if (!ensure_state(self)) return 0;
int saved_errno = 0;
disk_cache_wait_for_write(self_, 0);
mutex(lock);
vt_cleanup(&self->map);
cleanup_holes(&self->holes);
self->total_size = 0;
if (self->cache_file_fd > -1) add_hole(self, 0, size_of_cache_file(self));
saved_errno = clear_disk_cache_with_lock_held(self);
mutex(unlock);
wakeup_write_loop(self);
return saved_errno;
}
static void
@ -780,7 +776,7 @@ PYWRAP(read_from_cache_file) {
Py_ssize_t pos = 0, sz = -1;
PA("|nn", &pos, &sz);
mutex(lock);
if (sz < 0) sz = size_of_cache_file(self);
if (sz < 0) sz = self->end_of_data_offset;
mutex(unlock);
PyObject *ans = PyBytes_FromStringAndSize(NULL, sz);
if (ans) {
@ -798,17 +794,20 @@ wait_for_write(PyObject *self, PyObject *args) {
}
static PyObject*
size_on_disk(PyObject *self_, PyObject *args UNUSED) {
end_of_data_offset(PyObject *self_, PyObject *args UNUSED) {
// Only used for testing
DiskCache *self = (DiskCache*)self_;
unsigned long long ans = 0;
mutex(lock);
unsigned long long ans = disk_cache_size_on_disk(self_);
if (self->cache_file_fd > -1) ans = MAX(0, self->end_of_data_offset);
mutex(unlock);
return PyLong_FromUnsignedLongLong(ans);
}
static PyObject*
clear(PyObject *self, PyObject *args UNUSED) {
clear_disk_cache(self);
int saved_errno = clear_disk_cache(self);
if (saved_errno) return PyErr_SetFromErrno(PyExc_OSError);
Py_RETURN_NONE;
}
@ -910,7 +909,7 @@ static PyMethodDef methods[] = {
{"num_cached_in_ram", num_cached_in_ram, METH_NOARGS, NULL},
{"get", get, METH_VARARGS, NULL},
{"wait_for_write", wait_for_write, METH_VARARGS, NULL},
{"size_on_disk", size_on_disk, METH_NOARGS, NULL},
{"end_of_data_offset", end_of_data_offset, METH_NOARGS, NULL},
{"clear", clear, METH_NOARGS, NULL},
{"holes", holes, METH_NOARGS, NULL},

View file

@ -16,7 +16,6 @@ PyObject* read_from_disk_cache_python(PyObject *self_, const void *key, size_t k
bool disk_cache_wait_for_write(PyObject *self, monotonic_t timeout);
size_t disk_cache_total_size(PyObject *self);
size_t disk_cache_size_on_disk(PyObject *self);
void clear_disk_cache(PyObject *self);
size_t disk_cache_clear_from_ram(PyObject *self_, bool(matches)(void* data, void *key, unsigned keysz), void*);
size_t disk_cache_num_cached_in_ram(PyObject *self_);

View file

@ -1812,5 +1812,5 @@ class DiskCache:
def remove_from_ram(self, predicate: Callable[[bytes], bool]) -> int: ...
def num_cached_in_ram(self) -> int: ...
def get(self, key: bytes, store_in_ram: bool = False) -> bytes: ... # raises KeyError if not found
def size_on_disk(self) -> int: ...
def end_of_data_offset(self) -> int: ...
def clear(self) -> None: ...

View file

@ -253,7 +253,7 @@ class TestGraphics(BaseTest):
self.assertEqual(dc.total_size, sum(map(len, data.values())))
self.assertTrue(dc.wait_for_write())
check_data()
sz = dc.size_on_disk()
sz = dc.end_of_data_offset()
self.assertEqual(sz, sum(map(len, data.values())))
self.assertFalse(dc.holes())
holes = set()
@ -262,9 +262,9 @@ class TestGraphics(BaseTest):
holes.add(x)
check_data()
self.assertRaises(KeyError, dc.get, key_as_bytes(x))
self.assertEqual(sz, dc.size_on_disk())
self.assertEqual(sz, dc.end_of_data_offset())
self.assertEqual(holes, {x[1] for x in dc.holes()})
self.assertEqual(sz, dc.size_on_disk())
self.assertEqual(sz, dc.end_of_data_offset())
# fill holes largest first to ensure small one doesn't go into large accidentally causing fragmentation
for i, x in enumerate(sorted(holes, reverse=True)):
x = 'ABCDEFGH'[i] * x
@ -273,13 +273,10 @@ class TestGraphics(BaseTest):
check_data()
holes.discard(len(x))
self.assertEqual(holes, {x[1] for x in dc.holes()})
self.assertEqual(sz, dc.size_on_disk(), f'Disk cache has unexpectedly grown from {sz} to {dc.size_on_disk} with data: {x!r}')
self.assertEqual(sz, dc.end_of_data_offset(), f'Disk cache has unexpectedly grown from {sz} to {dc.end_of_data_offset()} with data: {x!r}')
check_data()
dc.clear()
st = time.monotonic()
while dc.size_on_disk() and time.monotonic() - st < 2:
time.sleep(0.001)
self.assertEqual(dc.size_on_disk(), 0)
self.assertEqual(dc.end_of_data_offset(), 0)
data.clear()
for i in range(25):
@ -287,25 +284,25 @@ class TestGraphics(BaseTest):
dc.wait_for_write()
check_data()
before = dc.size_on_disk()
before = dc.end_of_data_offset()
while dc.total_size > before // 3:
key = random.choice(tuple(data))
self.assertTrue(remove(key))
check_data()
add('trigger defrag', 'XXX')
dc.wait_for_write()
self.assertLess(dc.size_on_disk(), before)
self.assertLess(dc.end_of_data_offset(), before)
check_data()
dc.clear()
st = time.monotonic()
while dc.size_on_disk() and time.monotonic() - st < 20:
while dc.end_of_data_offset() and time.monotonic() - st < 20:
time.sleep(0.01)
self.assertEqual(dc.size_on_disk(), 0)
self.assertEqual(dc.end_of_data_offset(), 0)
for frame in range(32):
add(f'1:{frame}', f'{frame:02d}' * 8)
dc.wait_for_write()
self.assertEqual(dc.size_on_disk(), 32 * 16)
self.assertEqual(dc.end_of_data_offset(), 32 * 16)
self.assertEqual(dc.num_cached_in_ram(), 0)
num_in_ram = 0
for frame in range(32):
@ -326,18 +323,18 @@ class TestGraphics(BaseTest):
self.assertIsNone(add(1, '1' * 1024))
self.assertIsNone(add(2, '2' * 1024))
dc.wait_for_write()
sz = dc.size_on_disk()
sz = dc.end_of_data_offset()
remove(1)
self.ae(sz, dc.size_on_disk())
self.ae(sz, dc.end_of_data_offset())
self.ae({x[1] for x in dc.holes()}, {1024})
self.assertIsNone(add(3, '3' * 800))
dc.wait_for_write()
self.assertFalse(dc.holes())
self.ae(sz, dc.size_on_disk())
self.ae(sz, dc.end_of_data_offset())
self.assertIsNone(add(4, '4' * 100))
sz += 100
dc.wait_for_write()
self.ae(sz, dc.size_on_disk())
self.ae(sz, dc.end_of_data_offset())
check_data()
self.assertFalse(dc.holes())
remove(4)
@ -345,7 +342,7 @@ class TestGraphics(BaseTest):
self.assertIsNone(add(5, '5' * 10))
sz += 10
dc.wait_for_write()
self.ae(sz, dc.size_on_disk())
self.ae(sz, dc.end_of_data_offset())
# test hole coalescing
reset(defrag_factor=20)