Fix bugs in t=k remote drag implementation and add comprehensive tests

Bug fixes in dnd.c:
- mktempdir_in_cache: add missing 'return ans' after successful strdup
- mktempdir_in_cache (utils.py): use O_RDONLY instead of O_RDWR for directories
- remote_items allocation: use mi.num_uris instead of ds.num_mimes
- Off-by-one: uri_item_idx > changed to >= for bounds checks
- Off-by-one: entry_num > changed to >= for bounds checks
- populate_dir_entries: fix missing last entry after final null separator
- add_payload directory finalization: create directory on disk with mkdirat
- get_errno_name: add EFBIG, EISDIR, ENOSPC error codes

Test infrastructure:
- Add dnd_test_force_drag_dropped() to simulate DROPPED state
- Make notify_drag_data_ready() succeed in test mode

Comprehensive t=k tests added:
- Single file, empty file, single symlink transfer
- Chunked file transfer with m=1
- Single directory with children
- Directory with symlinks
- Multiple URIs
- Deep directory trees (breadth-first and depth-first, 3+ levels)
- Mixed file/dir/symlink at top level
- Completion signal
- Error handling (client errors, invalid state)
- DoS limits (REMOTE_DRAG_LIMIT, PRESENT_DATA_CAP)
- Invalid input (bad base64, too large chunks, invalid indices/handles)
- URI list with comments
- Multiple chunks for directory listing

Agent-Logs-Url: https://github.com/kovidgoyal/kitty/sessions/9da0bff7-6a1a-490f-a4c5-8cb328e056ce

Co-authored-by: kovidgoyal <1308621+kovidgoyal@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot] 2026-04-13 15:54:34 +00:00 committed by GitHub
parent 2018f8b134
commit 519fd49ce6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 824 additions and 8 deletions

View file

@ -52,6 +52,9 @@ get_errno_name(int err) {
case EINVAL: return "EINVAL";
case EMFILE: return "EMFILE";
case ENOMEM: return "ENOMEM";
case EFBIG: return "EFBIG";
case EISDIR: return "EISDIR";
case ENOSPC: return "ENOSPC";
case 0: return "OK";
default: return "EUNKNOWN";
}
@ -115,6 +118,7 @@ mktempdir_in_cache(const char *prefix, int *fd) {
if (!ans) {
errno = ENOMEM; return NULL;
}
return ans;
}
}
}
@ -1717,13 +1721,17 @@ populate_dir_entries(Window *w, DragRemoteItem *ri) {
ri->children = calloc(num + 1, sizeof(ri->children[0]));
if (!ri->children) abrt(ENOMEM);
ri->children_sz = 0;
const char *ptr = (char*)ri->data; const char *p = ptr;
while ((p = memchr(ptr, 0, ri->data_sz - (ptr - (char*)ri->data))) != NULL) {
char *name = strdup(ptr);
if (!name) abrt(ENOMEM);
ri->children[ri->children_sz++].dir_entry_name = name;
ptr = p + 1;
if ((uint8_t*)ptr >= ri->data + ri->data_sz) break;
const char *ptr = (char*)ri->data;
const char *end = (char*)ri->data + ri->data_sz;
while (ptr < end) {
const char *p = memchr(ptr, 0, (size_t)(end - ptr));
size_t len = p ? (size_t)(p - ptr) : (size_t)(end - ptr);
if (len > 0) {
char *name = strndup(ptr, len);
if (!name) abrt(ENOMEM);
ri->children[ri->children_sz++].dir_entry_name = name;
}
ptr = p ? p + 1 : end;
}
}
@ -1771,6 +1779,7 @@ add_payload(Window *w, DragRemoteItem *ri, bool has_more, const uint8_t *payload
if (symlinkat((char*)ri->data, dirfd, ri->dir_entry_name) != 0) abrt(errno);
break;
default:
if (mkdirat(dirfd, ri->dir_entry_name, 0700) != 0 && errno != EEXIST) abrt(errno);
populate_dir_entries(w, ri);
break;
}

View file

@ -1080,6 +1080,23 @@ dnd_test_force_drag_dropped(PyObject *self UNUSED, PyObject *args) {
w->drag_source.state = DRAG_SOURCE_DROPPED;
Py_RETURN_NONE;
}
static PyObject *
dnd_test_request_drag_data(PyObject *self UNUSED, PyObject *args) {
// Simulate what drag_get_data does initially: find the MIME item at the
// given index, set requested_remote_files if appropriate, and return the
// escape code that would be sent to the client.
unsigned long long window_id;
unsigned idx;
if (!PyArg_ParseTuple(args, "KI", &window_id, &idx)) return NULL;
Window *w = window_for_window_id((id_type)window_id);
if (!w) { PyErr_SetString(PyExc_ValueError, "Window not found"); return NULL; }
if (w->drag_source.state < DRAG_SOURCE_DROPPED || idx >= w->drag_source.num_mimes || !w->drag_source.items) {
PyErr_SetString(PyExc_ValueError, "Invalid state or index"); return NULL;
}
w->drag_source.items[idx].requested_remote_files = w->drag_source.is_remote_client && w->drag_source.items[idx].is_uri_list;
Py_RETURN_NONE;
}
// }}}
static void
@ -3393,6 +3410,7 @@ static PyMethodDef module_methods[] = {
METHODB(dnd_test_fake_drop_event, METH_VARARGS),
METHODB(dnd_test_fake_drop_data, METH_VARARGS),
METHODB(dnd_test_force_drag_dropped, METH_VARARGS),
METHODB(dnd_test_request_drag_data, METH_VARARGS),
{NULL, NULL, 0, NULL} /* Sentinel */
};

View file

@ -1162,7 +1162,7 @@ def mktempdir_in_cache(prefix: str) -> tuple[str, int]:
import tempfile
ans = tempfile.mkdtemp(prefix, dir=cache_dir())
try:
return os.path.abspath(ans), os.open(ans, os.O_DIRECTORY | os.O_RDWR)
return os.path.abspath(ans), os.open(ans, os.O_DIRECTORY | os.O_RDONLY)
except OSError as e:
import errno
import shutil

View file

@ -16,6 +16,8 @@ from kitty.fast_data_types import (
dnd_test_create_fake_window,
dnd_test_fake_drop_data,
dnd_test_fake_drop_event,
dnd_test_force_drag_dropped,
dnd_test_request_drag_data,
dnd_test_set_mouse_pos,
)
from kitty.machine_id import machine_id
@ -196,6 +198,40 @@ def client_drag_cancel(client_id: int = 0) -> bytes:
return _osc(meta)
def client_remote_file(
uri_idx: int, data_b64: str = '', *,
item_type: int = 0, more: bool = False,
parent_handle: int = 0, entry_num: int = 0,
client_id: int = 0,
) -> bytes:
"""Escape code for remote file data (t=k).
*uri_idx*: 1-based index into the URI list (x= key).
*item_type*: 0=file, 1=symlink, >1=directory handle (X= key).
*more*: whether more data follows (m= key).
*parent_handle*: directory handle for subdirectory entries (Y= key), 0 for top-level.
*entry_num*: 1-based entry number within the directory (y= key).
"""
meta = f'{DND_CODE};t=k:x={uri_idx}:X={item_type}'
if parent_handle:
meta += f':Y={parent_handle}:y={entry_num}'
if more:
meta += ':m=1'
if client_id:
meta += f':i={client_id}'
if data_b64:
return _osc(f'{meta};{data_b64}')
return _osc(meta)
def client_remote_file_finish(client_id: int = 0) -> bytes:
"""Escape code signaling completion of all remote file data (t=k with no keys)."""
meta = f'{DND_CODE};t=k'
if client_id:
meta += f':i={client_id}'
return _osc(meta)
# ---- escape-code decoder used by assertions ---------------------------------
_OSC_RE = re.compile(
@ -323,6 +359,20 @@ def dnd_test_window():
dnd_test_cleanup_fake_window(os_window_id)
@contextmanager
def dnd_test_window_with_limits(mime_list_cap=0, present_data_cap=0, remote_drag_limit=0):
"""Like dnd_test_window but with custom resource limits for DoS testing."""
capture = _WriteCapture()
dnd_set_test_write_func(capture, mime_list_cap, present_data_cap, remote_drag_limit)
os_window_id, window_id = dnd_test_create_fake_window()
try:
screen = Screen(None, 24, 80, 0, 0, 0, window_id)
yield os_window_id, window_id, screen, capture
finally:
dnd_set_test_write_func(None)
dnd_test_cleanup_fake_window(os_window_id)
machine_id = partial(machine_id, 'tty-dnd-protocol-machine-id')
# ---- test class -------------------------------------------------------------
@ -2200,3 +2250,742 @@ class TestDnDProtocol(BaseTest):
# Finish
parse_bytes(screen, client_request_data())
self._assert_no_output(cap, wid)
# ---- Remote drag (t=k) tests --------------------------------------------
def _setup_remote_drag(self, screen, wid, cap, uri_list_data: bytes,
mimes: str = 'text/plain text/uri-list',
operations: int = 1, client_id: int = 0):
"""Set up a remote drag offer in DROPPED state with uri-list data delivered.
1. Register for drag offers with a *different* machine id (so is_remote_client=True).
2. Offer MIME types including text/uri-list.
3. Force state to DROPPED.
4. Mark the text/uri-list item as requesting remote files.
5. Send the text/uri-list data via t=e escape codes.
"""
# Register with a different machine_id to make is_remote_client=True
parse_bytes(screen, _osc(f'{DND_CODE};t=o:x=1;different-machine-id'))
parse_bytes(screen, client_drag_offer_mimes(operations, mimes, client_id=client_id))
cap.consume(wid)
dnd_test_force_drag_dropped(wid)
# Find the index of text/uri-list
mime_list = mimes.split()
uri_idx = mime_list.index('text/uri-list')
dnd_test_request_drag_data(wid, uri_idx)
# Send the uri-list data
b64 = standard_b64encode(uri_list_data).decode()
parse_bytes(screen, client_drag_send_data(uri_idx, b64, client_id=client_id))
# End of data
parse_bytes(screen, client_drag_send_data(uri_idx, '', client_id=client_id))
cap.consume(wid)
def test_remote_drag_single_file(self) -> None:
"""Transfer a single regular file via t=k."""
uri_list = b'file:///home/user/hello.txt\r\n'
file_content = b'Hello, World!'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
b64 = standard_b64encode(file_content).decode()
# Send file data for URI index 1 (1-based), type=0 (file)
parse_bytes(screen, client_remote_file(1, b64, item_type=0))
self._assert_no_output(cap, wid)
# End of data for this file
parse_bytes(screen, client_remote_file(1, '', item_type=0))
self._assert_no_output(cap, wid)
# Completion signal
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
def test_remote_drag_single_symlink(self) -> None:
"""Transfer a symlink via t=k with X=1."""
uri_list = b'file:///home/user/link\r\n'
target = b'/usr/share/target'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
b64 = standard_b64encode(target).decode()
# Send symlink data (X=1)
parse_bytes(screen, client_remote_file(1, b64, item_type=1))
self._assert_no_output(cap, wid)
# End of data
parse_bytes(screen, client_remote_file(1, '', item_type=1))
self._assert_no_output(cap, wid)
# Completion signal
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
def test_remote_drag_single_directory(self) -> None:
"""Transfer a directory with entries via t=k with X=handle (>1)."""
uri_list = b'file:///home/user/mydir\r\n'
# Directory listing: two entries separated by null bytes
dir_entries = b'file1.txt\x00file2.txt'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
b64 = standard_b64encode(dir_entries).decode()
# Send directory listing (X=2, handle for this directory)
parse_bytes(screen, client_remote_file(1, b64, item_type=2))
self._assert_no_output(cap, wid)
# End of listing data
parse_bytes(screen, client_remote_file(1, '', item_type=2))
self._assert_no_output(cap, wid)
# Now send data for each child entry
# Entry 1: file1.txt (y=1 is 1-based)
content1 = b'content of file1'
b64 = standard_b64encode(content1).decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=2, entry_num=1))
self._assert_no_output(cap, wid)
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=2, entry_num=1))
self._assert_no_output(cap, wid)
# Entry 2: file2.txt
content2 = b'content of file2'
b64 = standard_b64encode(content2).decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=2, entry_num=2))
self._assert_no_output(cap, wid)
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=2, entry_num=2))
self._assert_no_output(cap, wid)
# Completion signal
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
def test_remote_drag_multiple_uris(self) -> None:
"""Transfer multiple files from a URI list."""
uri_list = b'file:///home/user/a.txt\r\nfile:///home/user/b.txt\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# File 1 (URI index 1)
b64 = standard_b64encode(b'aaa').decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=0))
parse_bytes(screen, client_remote_file(1, '', item_type=0))
self._assert_no_output(cap, wid)
# File 2 (URI index 2)
b64 = standard_b64encode(b'bbb').decode()
parse_bytes(screen, client_remote_file(2, b64, item_type=0))
parse_bytes(screen, client_remote_file(2, '', item_type=0))
self._assert_no_output(cap, wid)
# Completion
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
def test_remote_drag_chunked_file(self) -> None:
"""File data can be sent in multiple chunks with m=1."""
uri_list = b'file:///home/user/big.bin\r\n'
file_data = b'A' * 100 + b'B' * 200
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Split the base64 stream across two chunks
full_b64 = standard_b64encode(file_data).decode()
mid = len(full_b64) // 2
# Ensure split point is at a 4-byte boundary for valid base64 chunks
mid = (mid // 4) * 4
chunk1_b64 = full_b64[:mid]
chunk2_b64 = full_b64[mid:]
# First chunk with more=True
parse_bytes(screen, client_remote_file(1, chunk1_b64, item_type=0, more=True))
self._assert_no_output(cap, wid)
# Second chunk with more=False (last chunk before end-of-data)
parse_bytes(screen, client_remote_file(1, chunk2_b64, item_type=0))
self._assert_no_output(cap, wid)
# End of data
parse_bytes(screen, client_remote_file(1, '', item_type=0))
self._assert_no_output(cap, wid)
# Completion
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
def test_remote_drag_directory_with_symlink(self) -> None:
"""Directory can contain symlinks (X=1 type for children)."""
uri_list = b'file:///home/user/proj\r\n'
dir_entries = b'readme.txt\x00link'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Top-level directory (handle=2)
b64 = standard_b64encode(dir_entries).decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=2))
parse_bytes(screen, client_remote_file(1, '', item_type=2))
self._assert_no_output(cap, wid)
# Child 1: regular file
b64 = standard_b64encode(b'readme content').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=2, entry_num=1))
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=2, entry_num=1))
self._assert_no_output(cap, wid)
# Child 2: symlink (X=1)
b64 = standard_b64encode(b'/target/path').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=1, parent_handle=2, entry_num=2))
parse_bytes(screen, client_remote_file(
1, '', item_type=1, parent_handle=2, entry_num=2))
self._assert_no_output(cap, wid)
# Completion
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
def test_remote_drag_deep_directory_tree_breadth_first(self) -> None:
"""Transfer a 3-level deep directory tree in breadth-first order.
Structure:
root/
file_a.txt
sub1/
file_b.txt
subsub/
file_c.txt
link -> /target
"""
uri_list = b'file:///home/user/root\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Level 0: root directory (handle=2)
root_entries = b'file_a.txt\x00sub1'
b64 = standard_b64encode(root_entries).decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=2))
parse_bytes(screen, client_remote_file(1, '', item_type=2))
# Level 1: children of root (handle=2)
# Entry 1: file_a.txt (regular file)
b64 = standard_b64encode(b'content_a').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=2, entry_num=1))
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=2, entry_num=1))
# Entry 2: sub1 (subdirectory, handle=3)
sub1_entries = b'file_b.txt\x00subsub'
b64 = standard_b64encode(sub1_entries).decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=3, parent_handle=2, entry_num=2))
parse_bytes(screen, client_remote_file(
1, '', item_type=3, parent_handle=2, entry_num=2))
# Level 2: children of sub1 (handle=3)
# Entry 1: file_b.txt
b64 = standard_b64encode(b'content_b').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=3, entry_num=1))
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=3, entry_num=1))
# Entry 2: subsub (subdirectory, handle=4)
subsub_entries = b'file_c.txt\x00link'
b64 = standard_b64encode(subsub_entries).decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=4, parent_handle=3, entry_num=2))
parse_bytes(screen, client_remote_file(
1, '', item_type=4, parent_handle=3, entry_num=2))
# Level 3: children of subsub (handle=4)
# Entry 1: file_c.txt
b64 = standard_b64encode(b'content_c').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=4, entry_num=1))
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=4, entry_num=1))
# Entry 2: link (symlink, type=1)
b64 = standard_b64encode(b'/target').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=1, parent_handle=4, entry_num=2))
parse_bytes(screen, client_remote_file(
1, '', item_type=1, parent_handle=4, entry_num=2))
# Completion
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
def test_remote_drag_deep_directory_tree_depth_first(self) -> None:
"""Transfer a 3-level deep directory tree in depth-first order.
Same structure as breadth-first test but entries are sent depth-first:
root/
file_a.txt
sub1/
file_b.txt
subsub/
file_c.txt
link -> /target
"""
uri_list = b'file:///home/user/root\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Root directory (handle=2)
root_entries = b'file_a.txt\x00sub1'
b64 = standard_b64encode(root_entries).decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=2))
parse_bytes(screen, client_remote_file(1, '', item_type=2))
# Entry 1 of root: file_a.txt (file)
b64 = standard_b64encode(b'content_a').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=2, entry_num=1))
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=2, entry_num=1))
# Entry 2 of root: sub1 (directory, handle=3)
sub1_entries = b'file_b.txt\x00subsub'
b64 = standard_b64encode(sub1_entries).decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=3, parent_handle=2, entry_num=2))
parse_bytes(screen, client_remote_file(
1, '', item_type=3, parent_handle=2, entry_num=2))
# Depth first: immediately descend into sub1
# Entry 1 of sub1: file_b.txt
b64 = standard_b64encode(b'content_b').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=3, entry_num=1))
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=3, entry_num=1))
# Entry 2 of sub1: subsub (directory, handle=4)
subsub_entries = b'file_c.txt\x00link'
b64 = standard_b64encode(subsub_entries).decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=4, parent_handle=3, entry_num=2))
parse_bytes(screen, client_remote_file(
1, '', item_type=4, parent_handle=3, entry_num=2))
# Depth first: immediately descend into subsub
# Entry 1 of subsub: file_c.txt
b64 = standard_b64encode(b'content_c').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=4, entry_num=1))
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=4, entry_num=1))
# Entry 2 of subsub: link (symlink)
b64 = standard_b64encode(b'/target').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=1, parent_handle=4, entry_num=2))
parse_bytes(screen, client_remote_file(
1, '', item_type=1, parent_handle=4, entry_num=2))
# Completion
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
def test_remote_drag_completion_signal(self) -> None:
"""The completion signal t=k with no keys works correctly."""
uri_list = b'file:///home/user/f.txt\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
b64 = standard_b64encode(b'data').decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=0))
parse_bytes(screen, client_remote_file(1, '', item_type=0))
# Completion
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
def test_remote_drag_invalid_uri_index(self) -> None:
"""Sending t=k with an out-of-bounds URI index returns an error."""
uri_list = b'file:///home/user/a.txt\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# URI index 2 is out of bounds (only 1 URI)
b64 = standard_b64encode(b'data').decode()
parse_bytes(screen, client_remote_file(2, b64, item_type=0))
self.assert_error(cap, wid)
def test_remote_drag_invalid_entry_num(self) -> None:
"""Sending t=k with an out-of-bounds entry number in a directory returns error."""
uri_list = b'file:///home/user/dir\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Create directory with 1 entry
dir_entries = b'file1.txt'
b64 = standard_b64encode(dir_entries).decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=2))
parse_bytes(screen, client_remote_file(1, '', item_type=2))
cap.consume(wid)
# Entry number 2 is out of bounds (only 1 entry)
b64 = standard_b64encode(b'data').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=2, entry_num=2))
self.assert_error(cap, wid)
def test_remote_drag_invalid_handle(self) -> None:
"""Sending t=k with a non-existent directory handle returns error."""
uri_list = b'file:///home/user/dir\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Create directory (handle=2)
dir_entries = b'file1.txt'
b64 = standard_b64encode(dir_entries).decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=2))
parse_bytes(screen, client_remote_file(1, '', item_type=2))
cap.consume(wid)
# Use non-existent handle 99
b64 = standard_b64encode(b'data').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=99, entry_num=1))
self.assert_error(cap, wid)
def test_remote_drag_invalid_base64(self) -> None:
"""Sending invalid base64 data in t=k returns an error."""
uri_list = b'file:///home/user/f.txt\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Send garbage that's not valid base64
parse_bytes(screen, client_remote_file(1, '!@#$%^&*()', item_type=0))
self.assert_error(cap, wid)
def test_remote_drag_too_large_chunk(self) -> None:
"""Chunks larger than 4096 bytes are rejected."""
uri_list = b'file:///home/user/f.txt\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Send a chunk > 4096 bytes (the b64 payload is checked before decoding)
big_b64 = standard_b64encode(b'x' * 4097).decode()
parse_bytes(screen, client_remote_file(1, big_b64, item_type=0))
self.assert_error(cap, wid)
def test_remote_drag_negative_X_rejected(self) -> None:
"""Sending t=k with X < 0 is rejected."""
uri_list = b'file:///home/user/f.txt\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Directly construct escape code with negative X
parse_bytes(screen, _osc(f'{DND_CODE};t=k:x=1:X=-1'))
self.assert_error(cap, wid)
def test_remote_drag_without_remote_flag_fails(self) -> None:
"""t=k fails if the drag is not from a remote client."""
with dnd_test_window() as (osw, wid, screen, cap):
# Register with local machine_id (is_remote_client=False)
parse_bytes(screen, _osc(f'{DND_CODE};t=o:x=1;{machine_id()}'))
parse_bytes(screen, client_drag_offer_mimes(1, 'text/plain text/uri-list'))
cap.consume(wid)
dnd_test_force_drag_dropped(wid)
# Mark the uri-list item - but since is_remote_client is False,
# requested_remote_files will be False
dnd_test_request_drag_data(wid, 1)
# Try to send remote file data directly - should fail since no item has requested_remote_files
b64 = standard_b64encode(b'data').decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=0))
self.assert_error(cap, wid)
def test_remote_drag_without_dropped_state_fails(self) -> None:
"""t=k fails if the drag state is not DROPPED (data not yet delivered)."""
with dnd_test_window() as (osw, wid, screen, cap):
# Only register, don't progress to DROPPED state
parse_bytes(screen, _osc(f'{DND_CODE};t=o:x=1;different-machine-id'))
parse_bytes(screen, client_drag_offer_mimes(1, 'text/uri-list'))
cap.consume(wid)
# State is BEING_BUILT, not DROPPED, so t=k should fail
b64 = standard_b64encode(b'data').decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=0))
self.assert_error(cap, wid)
def test_remote_drag_dos_remote_drag_limit(self) -> None:
"""Total remote data size exceeding REMOTE_DRAG_LIMIT triggers EMFILE error."""
uri_list = b'file:///home/user/big.bin\r\n'
with dnd_test_window_with_limits(remote_drag_limit=50) as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# First chunk within limit
b64 = standard_b64encode(b'x' * 30).decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=0, more=True))
self._assert_no_output(cap, wid)
# Second chunk pushes over the limit
b64 = standard_b64encode(b'y' * 30).decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=0))
self.assert_error(cap, wid, 'EMFILE')
def test_remote_drag_dos_present_data_cap_on_directory(self) -> None:
"""Directory listing data exceeding PRESENT_DATA_CAP triggers EMFILE error."""
uri_list = b'file:///home/user/dir\r\n'
with dnd_test_window_with_limits(present_data_cap=20) as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Send a directory listing that will exceed the cap
big_listing = b'\x00'.join([f'file{i}.txt'.encode() for i in range(100)])
b64 = standard_b64encode(big_listing).decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=2))
self.assert_error(cap, wid, 'EMFILE')
def test_remote_drag_error_from_client(self) -> None:
"""Client error (t=E) during remote drag aborts correctly."""
uri_list = b'file:///home/user/f.txt\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Client reports an error
parse_bytes(screen, client_drag_cancel())
# The drag should have been canceled - t=k should now fail
cap.consume(wid) # discard any error output from cancel
b64 = standard_b64encode(b'data').decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=0))
self.assert_error(cap, wid)
def test_remote_drag_three_level_tree_with_verification(self) -> None:
"""Transfer a 3-level directory tree and verify no errors occur.
root/
alpha.txt (file)
beta/ (dir)
gamma.txt (file)
delta/ (dir)
epsilon (file)
zeta (symlink -> /zeta-target)
eta -> /link-tgt (symlink)
"""
uri_list = b'file:///home/user/root\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Root directory (handle=10)
root_entries = b'alpha.txt\x00beta\x00eta'
b64 = standard_b64encode(root_entries).decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=10))
parse_bytes(screen, client_remote_file(1, '', item_type=10))
# alpha.txt (child 1 of root)
b64 = standard_b64encode(b'alpha content').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=10, entry_num=1))
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=10, entry_num=1))
# beta (child 2 of root, handle=20)
beta_entries = b'gamma.txt\x00delta'
b64 = standard_b64encode(beta_entries).decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=20, parent_handle=10, entry_num=2))
parse_bytes(screen, client_remote_file(
1, '', item_type=20, parent_handle=10, entry_num=2))
# eta (child 3 of root, symlink)
b64 = standard_b64encode(b'/link-tgt').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=1, parent_handle=10, entry_num=3))
parse_bytes(screen, client_remote_file(
1, '', item_type=1, parent_handle=10, entry_num=3))
# gamma.txt (child 1 of beta)
b64 = standard_b64encode(b'gamma content').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=20, entry_num=1))
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=20, entry_num=1))
# delta (child 2 of beta, handle=30)
delta_entries = b'epsilon\x00zeta'
b64 = standard_b64encode(delta_entries).decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=30, parent_handle=20, entry_num=2))
parse_bytes(screen, client_remote_file(
1, '', item_type=30, parent_handle=20, entry_num=2))
# epsilon (child 1 of delta)
b64 = standard_b64encode(b'epsilon content').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=30, entry_num=1))
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=30, entry_num=1))
# zeta (child 2 of delta, symlink)
b64 = standard_b64encode(b'/zeta-target').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=1, parent_handle=30, entry_num=2))
parse_bytes(screen, client_remote_file(
1, '', item_type=1, parent_handle=30, entry_num=2))
self._assert_no_output(cap, wid)
# Completion
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
def test_remote_drag_process_item_data_basic(self) -> None:
"""Basic drag_process_item_data: send data for a MIME type after DROPPED state."""
with dnd_test_window() as (osw, wid, screen, cap):
# Set up a non-remote drag with text/plain
parse_bytes(screen, _osc(f'{DND_CODE};t=o:x=1;{machine_id()}'))
parse_bytes(screen, client_drag_offer_mimes(1, 'text/plain'))
cap.consume(wid)
dnd_test_force_drag_dropped(wid)
dnd_test_request_drag_data(wid, 0)
# Send data for text/plain (index 0)
b64 = standard_b64encode(b'test data').decode()
parse_bytes(screen, client_drag_send_data(0, b64))
self._assert_no_output(cap, wid)
# End of data
parse_bytes(screen, client_drag_send_data(0, ''))
# Should get a notification (but no error)
events = self._get_events(cap, wid)
for ev in events:
self.assertNotEqual(ev['type'], 'E', f'unexpected error: {ev}')
def test_remote_drag_process_item_data_error(self) -> None:
"""Client can report an error via t=E for a MIME data delivery."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, _osc(f'{DND_CODE};t=o:x=1;{machine_id()}'))
parse_bytes(screen, client_drag_offer_mimes(1, 'text/plain'))
cap.consume(wid)
dnd_test_force_drag_dropped(wid)
dnd_test_request_drag_data(wid, 0)
# Client reports EPERM error
parse_bytes(screen, client_drag_send_error(0, 'EPERM'))
# The error should propagate but not crash
cap.consume(wid)
def test_remote_drag_process_item_data_invalid_index(self) -> None:
"""Sending data for a non-existent MIME index is rejected."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, _osc(f'{DND_CODE};t=o:x=1;{machine_id()}'))
parse_bytes(screen, client_drag_offer_mimes(1, 'text/plain'))
cap.consume(wid)
dnd_test_force_drag_dropped(wid)
# Index 5 is way out of bounds
b64 = standard_b64encode(b'data').decode()
parse_bytes(screen, client_drag_send_data(5, b64))
self.assert_error(cap, wid)
def test_remote_drag_mixed_file_dir_symlink(self) -> None:
"""Transfer mixed content: file, directory and symlink as separate URIs."""
uri_list = b'file:///tmp/a.txt\r\nfile:///tmp/mydir\r\nfile:///tmp/mylink\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# URI 1: regular file
b64 = standard_b64encode(b'file a content').decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=0))
parse_bytes(screen, client_remote_file(1, '', item_type=0))
# URI 2: directory (handle=5)
dir_entries = b'child.txt'
b64 = standard_b64encode(dir_entries).decode()
parse_bytes(screen, client_remote_file(2, b64, item_type=5))
parse_bytes(screen, client_remote_file(2, '', item_type=5))
# Child of directory (entry 1)
b64 = standard_b64encode(b'child content').decode()
parse_bytes(screen, client_remote_file(
2, b64, item_type=0, parent_handle=5, entry_num=1))
parse_bytes(screen, client_remote_file(
2, '', item_type=0, parent_handle=5, entry_num=1))
# URI 3: symlink
b64 = standard_b64encode(b'/symlink/target').decode()
parse_bytes(screen, client_remote_file(3, b64, item_type=1))
parse_bytes(screen, client_remote_file(3, '', item_type=1))
self._assert_no_output(cap, wid)
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
def test_remote_drag_empty_file(self) -> None:
"""Transfer an empty file (end-of-data immediately after start)."""
uri_list = b'file:///home/user/empty.txt\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Start file transfer, then immediately end (no data chunks)
parse_bytes(screen, client_remote_file(1, '', item_type=0))
self._assert_no_output(cap, wid)
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
def test_remote_drag_empty_directory(self) -> None:
"""Transfer a directory with no entries."""
uri_list = b'file:///home/user/emptydir\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Empty directory listing (single entry name)
b64 = standard_b64encode(b'').decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=2))
parse_bytes(screen, client_remote_file(1, '', item_type=2))
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
def test_remote_drag_uri_list_with_comments(self) -> None:
"""URI list with comment lines (starting with #) should filter them out."""
uri_list = b'# this is a comment\r\nfile:///home/user/f.txt\r\n# another comment\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Only 1 real URI (f.txt), so URI index 1 should work
b64 = standard_b64encode(b'content').decode()
parse_bytes(screen, client_remote_file(1, b64, item_type=0))
parse_bytes(screen, client_remote_file(1, '', item_type=0))
# URI index 2 should fail (no such entry)
cap.consume(wid)
b64 = standard_b64encode(b'bad').decode()
parse_bytes(screen, client_remote_file(2, b64, item_type=0))
self.assert_error(cap, wid)
def test_remote_drag_multiple_chunks_directory_listing(self) -> None:
"""Directory listing data can be sent in multiple chunks."""
uri_list = b'file:///home/user/dir\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_remote_drag(screen, wid, cap, uri_list)
# Send directory listing in two chunks
chunk1 = b'file1.txt\x00fi'
chunk2 = b'le2.txt'
b64_1 = standard_b64encode(chunk1).decode()
b64_2 = standard_b64encode(chunk2).decode()
parse_bytes(screen, client_remote_file(1, b64_1, item_type=2, more=True))
self._assert_no_output(cap, wid)
parse_bytes(screen, client_remote_file(1, b64_2, item_type=2))
self._assert_no_output(cap, wid)
# End of listing
parse_bytes(screen, client_remote_file(1, '', item_type=2))
self._assert_no_output(cap, wid)
# Verify children are accessible: entry 1 and entry 2
b64 = standard_b64encode(b'c1').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=2, entry_num=1))
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=2, entry_num=1))
self._assert_no_output(cap, wid)
b64 = standard_b64encode(b'c2').decode()
parse_bytes(screen, client_remote_file(
1, b64, item_type=0, parent_handle=2, entry_num=2))
parse_bytes(screen, client_remote_file(
1, '', item_type=0, parent_handle=2, entry_num=2))
self._assert_no_output(cap, wid)
parse_bytes(screen, client_remote_file_finish())
self._assert_no_output(cap, wid)
# ---- DoS limits tests ---------------------------------------------------
def test_dos_mime_list_size_cap(self) -> None:
"""Exceeding MIME_LIST_SIZE_CAP when offering MIME types returns EFBIG."""
with dnd_test_window_with_limits(mime_list_cap=20) as (osw, wid, screen, cap):
parse_bytes(screen, client_drag_register())
# Offer MIME types that exceed the cap
long_mime = 'x' * 30
parse_bytes(screen, client_drag_offer_mimes(1, long_mime))
self.assert_error(cap, wid, 'EFBIG')
def test_dos_present_data_cap_pre_send(self) -> None:
"""Exceeding PRESENT_DATA_CAP with pre-sent data returns EFBIG."""
with dnd_test_window_with_limits(present_data_cap=50) as (osw, wid, screen, cap):
self._setup_drag_offer(screen, wid, cap, 'text/plain')
# Pre-send data exceeding the cap
big_data = standard_b64encode(b'x' * 60).decode()
parse_bytes(screen, client_drag_pre_send(0, big_data))
self.assert_error(cap, wid, 'EFBIG')
def test_dos_mime_list_size_cap_drop_target(self) -> None:
"""Exceeding MIME_LIST_SIZE_CAP when registering for drops silently ignores the excess."""
with dnd_test_window_with_limits(mime_list_cap=10) as (osw, wid, screen, cap):
# Register with MIME types exceeding the cap
long_mimes = 'text/plain text/html application/json'
self._register_for_drops(screen, cap, wid, long_mimes)
# The drop should still enter (excess mimes are silently dropped)
dnd_test_set_mouse_pos(wid, 1, 1, 1, 1)
dnd_test_fake_drop_event(wid, False, ['text/plain'])
events = self._get_events(cap, wid)
# Should get a move event
self.assertTrue(len(events) >= 1, events)