diff --git a/kitty/dnd.c b/kitty/dnd.c index a2883aa9a..108a2fc22 100644 --- a/kitty/dnd.c +++ b/kitty/dnd.c @@ -52,6 +52,9 @@ get_errno_name(int err) { case EINVAL: return "EINVAL"; case EMFILE: return "EMFILE"; case ENOMEM: return "ENOMEM"; + case EFBIG: return "EFBIG"; + case EISDIR: return "EISDIR"; + case ENOSPC: return "ENOSPC"; case 0: return "OK"; default: return "EUNKNOWN"; } @@ -115,6 +118,7 @@ mktempdir_in_cache(const char *prefix, int *fd) { if (!ans) { errno = ENOMEM; return NULL; } + return ans; } } } @@ -1717,13 +1721,17 @@ populate_dir_entries(Window *w, DragRemoteItem *ri) { ri->children = calloc(num + 1, sizeof(ri->children[0])); if (!ri->children) abrt(ENOMEM); ri->children_sz = 0; - const char *ptr = (char*)ri->data; const char *p = ptr; - while ((p = memchr(ptr, 0, ri->data_sz - (ptr - (char*)ri->data))) != NULL) { - char *name = strdup(ptr); - if (!name) abrt(ENOMEM); - ri->children[ri->children_sz++].dir_entry_name = name; - ptr = p + 1; - if ((uint8_t*)ptr >= ri->data + ri->data_sz) break; + const char *ptr = (char*)ri->data; + const char *end = (char*)ri->data + ri->data_sz; + while (ptr < end) { + const char *p = memchr(ptr, 0, (size_t)(end - ptr)); + size_t len = p ? (size_t)(p - ptr) : (size_t)(end - ptr); + if (len > 0) { + char *name = strndup(ptr, len); + if (!name) abrt(ENOMEM); + ri->children[ri->children_sz++].dir_entry_name = name; + } + ptr = p ? p + 1 : end; } } @@ -1771,6 +1779,7 @@ add_payload(Window *w, DragRemoteItem *ri, bool has_more, const uint8_t *payload if (symlinkat((char*)ri->data, dirfd, ri->dir_entry_name) != 0) abrt(errno); break; default: + if (mkdirat(dirfd, ri->dir_entry_name, 0700) != 0 && errno != EEXIST) abrt(errno); populate_dir_entries(w, ri); break; } diff --git a/kitty/glfw.c b/kitty/glfw.c index fad111299..e94bd9db2 100644 --- a/kitty/glfw.c +++ b/kitty/glfw.c @@ -1080,6 +1080,23 @@ dnd_test_force_drag_dropped(PyObject *self UNUSED, PyObject *args) { w->drag_source.state = DRAG_SOURCE_DROPPED; Py_RETURN_NONE; } + +static PyObject * +dnd_test_request_drag_data(PyObject *self UNUSED, PyObject *args) { + // Simulate what drag_get_data does initially: find the MIME item at the + // given index, set requested_remote_files if appropriate, and return the + // escape code that would be sent to the client. + unsigned long long window_id; + unsigned idx; + if (!PyArg_ParseTuple(args, "KI", &window_id, &idx)) return NULL; + Window *w = window_for_window_id((id_type)window_id); + if (!w) { PyErr_SetString(PyExc_ValueError, "Window not found"); return NULL; } + if (w->drag_source.state < DRAG_SOURCE_DROPPED || idx >= w->drag_source.num_mimes || !w->drag_source.items) { + PyErr_SetString(PyExc_ValueError, "Invalid state or index"); return NULL; + } + w->drag_source.items[idx].requested_remote_files = w->drag_source.is_remote_client && w->drag_source.items[idx].is_uri_list; + Py_RETURN_NONE; +} // }}} static void @@ -3393,6 +3410,7 @@ static PyMethodDef module_methods[] = { METHODB(dnd_test_fake_drop_event, METH_VARARGS), METHODB(dnd_test_fake_drop_data, METH_VARARGS), METHODB(dnd_test_force_drag_dropped, METH_VARARGS), + METHODB(dnd_test_request_drag_data, METH_VARARGS), {NULL, NULL, 0, NULL} /* Sentinel */ }; diff --git a/kitty/utils.py b/kitty/utils.py index b604dd972..dd949a9e3 100644 --- a/kitty/utils.py +++ b/kitty/utils.py @@ -1162,7 +1162,7 @@ def mktempdir_in_cache(prefix: str) -> tuple[str, int]: import tempfile ans = tempfile.mkdtemp(prefix, dir=cache_dir()) try: - return os.path.abspath(ans), os.open(ans, os.O_DIRECTORY | os.O_RDWR) + return os.path.abspath(ans), os.open(ans, os.O_DIRECTORY | os.O_RDONLY) except OSError as e: import errno import shutil diff --git a/kitty_tests/dnd.py b/kitty_tests/dnd.py index b5a9eb668..5d5df168a 100644 --- a/kitty_tests/dnd.py +++ b/kitty_tests/dnd.py @@ -16,6 +16,8 @@ from kitty.fast_data_types import ( dnd_test_create_fake_window, dnd_test_fake_drop_data, dnd_test_fake_drop_event, + dnd_test_force_drag_dropped, + dnd_test_request_drag_data, dnd_test_set_mouse_pos, ) from kitty.machine_id import machine_id @@ -196,6 +198,40 @@ def client_drag_cancel(client_id: int = 0) -> bytes: return _osc(meta) +def client_remote_file( + uri_idx: int, data_b64: str = '', *, + item_type: int = 0, more: bool = False, + parent_handle: int = 0, entry_num: int = 0, + client_id: int = 0, +) -> bytes: + """Escape code for remote file data (t=k). + + *uri_idx*: 1-based index into the URI list (x= key). + *item_type*: 0=file, 1=symlink, >1=directory handle (X= key). + *more*: whether more data follows (m= key). + *parent_handle*: directory handle for subdirectory entries (Y= key), 0 for top-level. + *entry_num*: 1-based entry number within the directory (y= key). + """ + meta = f'{DND_CODE};t=k:x={uri_idx}:X={item_type}' + if parent_handle: + meta += f':Y={parent_handle}:y={entry_num}' + if more: + meta += ':m=1' + if client_id: + meta += f':i={client_id}' + if data_b64: + return _osc(f'{meta};{data_b64}') + return _osc(meta) + + +def client_remote_file_finish(client_id: int = 0) -> bytes: + """Escape code signaling completion of all remote file data (t=k with no keys).""" + meta = f'{DND_CODE};t=k' + if client_id: + meta += f':i={client_id}' + return _osc(meta) + + # ---- escape-code decoder used by assertions --------------------------------- _OSC_RE = re.compile( @@ -323,6 +359,20 @@ def dnd_test_window(): dnd_test_cleanup_fake_window(os_window_id) +@contextmanager +def dnd_test_window_with_limits(mime_list_cap=0, present_data_cap=0, remote_drag_limit=0): + """Like dnd_test_window but with custom resource limits for DoS testing.""" + capture = _WriteCapture() + dnd_set_test_write_func(capture, mime_list_cap, present_data_cap, remote_drag_limit) + os_window_id, window_id = dnd_test_create_fake_window() + try: + screen = Screen(None, 24, 80, 0, 0, 0, window_id) + yield os_window_id, window_id, screen, capture + finally: + dnd_set_test_write_func(None) + dnd_test_cleanup_fake_window(os_window_id) + + machine_id = partial(machine_id, 'tty-dnd-protocol-machine-id') # ---- test class ------------------------------------------------------------- @@ -2200,3 +2250,742 @@ class TestDnDProtocol(BaseTest): # Finish parse_bytes(screen, client_request_data()) self._assert_no_output(cap, wid) + + # ---- Remote drag (t=k) tests -------------------------------------------- + + def _setup_remote_drag(self, screen, wid, cap, uri_list_data: bytes, + mimes: str = 'text/plain text/uri-list', + operations: int = 1, client_id: int = 0): + """Set up a remote drag offer in DROPPED state with uri-list data delivered. + + 1. Register for drag offers with a *different* machine id (so is_remote_client=True). + 2. Offer MIME types including text/uri-list. + 3. Force state to DROPPED. + 4. Mark the text/uri-list item as requesting remote files. + 5. Send the text/uri-list data via t=e escape codes. + """ + # Register with a different machine_id to make is_remote_client=True + parse_bytes(screen, _osc(f'{DND_CODE};t=o:x=1;different-machine-id')) + parse_bytes(screen, client_drag_offer_mimes(operations, mimes, client_id=client_id)) + cap.consume(wid) + dnd_test_force_drag_dropped(wid) + # Find the index of text/uri-list + mime_list = mimes.split() + uri_idx = mime_list.index('text/uri-list') + dnd_test_request_drag_data(wid, uri_idx) + # Send the uri-list data + b64 = standard_b64encode(uri_list_data).decode() + parse_bytes(screen, client_drag_send_data(uri_idx, b64, client_id=client_id)) + # End of data + parse_bytes(screen, client_drag_send_data(uri_idx, '', client_id=client_id)) + cap.consume(wid) + + def test_remote_drag_single_file(self) -> None: + """Transfer a single regular file via t=k.""" + uri_list = b'file:///home/user/hello.txt\r\n' + file_content = b'Hello, World!' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + b64 = standard_b64encode(file_content).decode() + # Send file data for URI index 1 (1-based), type=0 (file) + parse_bytes(screen, client_remote_file(1, b64, item_type=0)) + self._assert_no_output(cap, wid) + # End of data for this file + parse_bytes(screen, client_remote_file(1, '', item_type=0)) + self._assert_no_output(cap, wid) + # Completion signal + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + def test_remote_drag_single_symlink(self) -> None: + """Transfer a symlink via t=k with X=1.""" + uri_list = b'file:///home/user/link\r\n' + target = b'/usr/share/target' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + b64 = standard_b64encode(target).decode() + # Send symlink data (X=1) + parse_bytes(screen, client_remote_file(1, b64, item_type=1)) + self._assert_no_output(cap, wid) + # End of data + parse_bytes(screen, client_remote_file(1, '', item_type=1)) + self._assert_no_output(cap, wid) + # Completion signal + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + def test_remote_drag_single_directory(self) -> None: + """Transfer a directory with entries via t=k with X=handle (>1).""" + uri_list = b'file:///home/user/mydir\r\n' + # Directory listing: two entries separated by null bytes + dir_entries = b'file1.txt\x00file2.txt' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + b64 = standard_b64encode(dir_entries).decode() + # Send directory listing (X=2, handle for this directory) + parse_bytes(screen, client_remote_file(1, b64, item_type=2)) + self._assert_no_output(cap, wid) + # End of listing data + parse_bytes(screen, client_remote_file(1, '', item_type=2)) + self._assert_no_output(cap, wid) + + # Now send data for each child entry + # Entry 1: file1.txt (y=1 is 1-based) + content1 = b'content of file1' + b64 = standard_b64encode(content1).decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=2, entry_num=1)) + self._assert_no_output(cap, wid) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=2, entry_num=1)) + self._assert_no_output(cap, wid) + + # Entry 2: file2.txt + content2 = b'content of file2' + b64 = standard_b64encode(content2).decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=2, entry_num=2)) + self._assert_no_output(cap, wid) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=2, entry_num=2)) + self._assert_no_output(cap, wid) + + # Completion signal + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + def test_remote_drag_multiple_uris(self) -> None: + """Transfer multiple files from a URI list.""" + uri_list = b'file:///home/user/a.txt\r\nfile:///home/user/b.txt\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # File 1 (URI index 1) + b64 = standard_b64encode(b'aaa').decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=0)) + parse_bytes(screen, client_remote_file(1, '', item_type=0)) + self._assert_no_output(cap, wid) + # File 2 (URI index 2) + b64 = standard_b64encode(b'bbb').decode() + parse_bytes(screen, client_remote_file(2, b64, item_type=0)) + parse_bytes(screen, client_remote_file(2, '', item_type=0)) + self._assert_no_output(cap, wid) + # Completion + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + def test_remote_drag_chunked_file(self) -> None: + """File data can be sent in multiple chunks with m=1.""" + uri_list = b'file:///home/user/big.bin\r\n' + file_data = b'A' * 100 + b'B' * 200 + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # Split the base64 stream across two chunks + full_b64 = standard_b64encode(file_data).decode() + mid = len(full_b64) // 2 + # Ensure split point is at a 4-byte boundary for valid base64 chunks + mid = (mid // 4) * 4 + chunk1_b64 = full_b64[:mid] + chunk2_b64 = full_b64[mid:] + # First chunk with more=True + parse_bytes(screen, client_remote_file(1, chunk1_b64, item_type=0, more=True)) + self._assert_no_output(cap, wid) + # Second chunk with more=False (last chunk before end-of-data) + parse_bytes(screen, client_remote_file(1, chunk2_b64, item_type=0)) + self._assert_no_output(cap, wid) + # End of data + parse_bytes(screen, client_remote_file(1, '', item_type=0)) + self._assert_no_output(cap, wid) + # Completion + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + def test_remote_drag_directory_with_symlink(self) -> None: + """Directory can contain symlinks (X=1 type for children).""" + uri_list = b'file:///home/user/proj\r\n' + dir_entries = b'readme.txt\x00link' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # Top-level directory (handle=2) + b64 = standard_b64encode(dir_entries).decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=2)) + parse_bytes(screen, client_remote_file(1, '', item_type=2)) + self._assert_no_output(cap, wid) + + # Child 1: regular file + b64 = standard_b64encode(b'readme content').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=2, entry_num=1)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=2, entry_num=1)) + self._assert_no_output(cap, wid) + + # Child 2: symlink (X=1) + b64 = standard_b64encode(b'/target/path').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=1, parent_handle=2, entry_num=2)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=1, parent_handle=2, entry_num=2)) + self._assert_no_output(cap, wid) + + # Completion + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + def test_remote_drag_deep_directory_tree_breadth_first(self) -> None: + """Transfer a 3-level deep directory tree in breadth-first order. + + Structure: + root/ + file_a.txt + sub1/ + file_b.txt + subsub/ + file_c.txt + link -> /target + """ + uri_list = b'file:///home/user/root\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + + # Level 0: root directory (handle=2) + root_entries = b'file_a.txt\x00sub1' + b64 = standard_b64encode(root_entries).decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=2)) + parse_bytes(screen, client_remote_file(1, '', item_type=2)) + + # Level 1: children of root (handle=2) + # Entry 1: file_a.txt (regular file) + b64 = standard_b64encode(b'content_a').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=2, entry_num=1)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=2, entry_num=1)) + + # Entry 2: sub1 (subdirectory, handle=3) + sub1_entries = b'file_b.txt\x00subsub' + b64 = standard_b64encode(sub1_entries).decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=3, parent_handle=2, entry_num=2)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=3, parent_handle=2, entry_num=2)) + + # Level 2: children of sub1 (handle=3) + # Entry 1: file_b.txt + b64 = standard_b64encode(b'content_b').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=3, entry_num=1)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=3, entry_num=1)) + + # Entry 2: subsub (subdirectory, handle=4) + subsub_entries = b'file_c.txt\x00link' + b64 = standard_b64encode(subsub_entries).decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=4, parent_handle=3, entry_num=2)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=4, parent_handle=3, entry_num=2)) + + # Level 3: children of subsub (handle=4) + # Entry 1: file_c.txt + b64 = standard_b64encode(b'content_c').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=4, entry_num=1)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=4, entry_num=1)) + + # Entry 2: link (symlink, type=1) + b64 = standard_b64encode(b'/target').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=1, parent_handle=4, entry_num=2)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=1, parent_handle=4, entry_num=2)) + + # Completion + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + def test_remote_drag_deep_directory_tree_depth_first(self) -> None: + """Transfer a 3-level deep directory tree in depth-first order. + + Same structure as breadth-first test but entries are sent depth-first: + root/ + file_a.txt + sub1/ + file_b.txt + subsub/ + file_c.txt + link -> /target + """ + uri_list = b'file:///home/user/root\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + + # Root directory (handle=2) + root_entries = b'file_a.txt\x00sub1' + b64 = standard_b64encode(root_entries).decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=2)) + parse_bytes(screen, client_remote_file(1, '', item_type=2)) + + # Entry 1 of root: file_a.txt (file) + b64 = standard_b64encode(b'content_a').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=2, entry_num=1)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=2, entry_num=1)) + + # Entry 2 of root: sub1 (directory, handle=3) + sub1_entries = b'file_b.txt\x00subsub' + b64 = standard_b64encode(sub1_entries).decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=3, parent_handle=2, entry_num=2)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=3, parent_handle=2, entry_num=2)) + + # Depth first: immediately descend into sub1 + # Entry 1 of sub1: file_b.txt + b64 = standard_b64encode(b'content_b').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=3, entry_num=1)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=3, entry_num=1)) + + # Entry 2 of sub1: subsub (directory, handle=4) + subsub_entries = b'file_c.txt\x00link' + b64 = standard_b64encode(subsub_entries).decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=4, parent_handle=3, entry_num=2)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=4, parent_handle=3, entry_num=2)) + + # Depth first: immediately descend into subsub + # Entry 1 of subsub: file_c.txt + b64 = standard_b64encode(b'content_c').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=4, entry_num=1)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=4, entry_num=1)) + + # Entry 2 of subsub: link (symlink) + b64 = standard_b64encode(b'/target').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=1, parent_handle=4, entry_num=2)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=1, parent_handle=4, entry_num=2)) + + # Completion + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + def test_remote_drag_completion_signal(self) -> None: + """The completion signal t=k with no keys works correctly.""" + uri_list = b'file:///home/user/f.txt\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + b64 = standard_b64encode(b'data').decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=0)) + parse_bytes(screen, client_remote_file(1, '', item_type=0)) + # Completion + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + def test_remote_drag_invalid_uri_index(self) -> None: + """Sending t=k with an out-of-bounds URI index returns an error.""" + uri_list = b'file:///home/user/a.txt\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # URI index 2 is out of bounds (only 1 URI) + b64 = standard_b64encode(b'data').decode() + parse_bytes(screen, client_remote_file(2, b64, item_type=0)) + self.assert_error(cap, wid) + + def test_remote_drag_invalid_entry_num(self) -> None: + """Sending t=k with an out-of-bounds entry number in a directory returns error.""" + uri_list = b'file:///home/user/dir\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # Create directory with 1 entry + dir_entries = b'file1.txt' + b64 = standard_b64encode(dir_entries).decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=2)) + parse_bytes(screen, client_remote_file(1, '', item_type=2)) + cap.consume(wid) + + # Entry number 2 is out of bounds (only 1 entry) + b64 = standard_b64encode(b'data').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=2, entry_num=2)) + self.assert_error(cap, wid) + + def test_remote_drag_invalid_handle(self) -> None: + """Sending t=k with a non-existent directory handle returns error.""" + uri_list = b'file:///home/user/dir\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # Create directory (handle=2) + dir_entries = b'file1.txt' + b64 = standard_b64encode(dir_entries).decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=2)) + parse_bytes(screen, client_remote_file(1, '', item_type=2)) + cap.consume(wid) + + # Use non-existent handle 99 + b64 = standard_b64encode(b'data').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=99, entry_num=1)) + self.assert_error(cap, wid) + + def test_remote_drag_invalid_base64(self) -> None: + """Sending invalid base64 data in t=k returns an error.""" + uri_list = b'file:///home/user/f.txt\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # Send garbage that's not valid base64 + parse_bytes(screen, client_remote_file(1, '!@#$%^&*()', item_type=0)) + self.assert_error(cap, wid) + + def test_remote_drag_too_large_chunk(self) -> None: + """Chunks larger than 4096 bytes are rejected.""" + uri_list = b'file:///home/user/f.txt\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # Send a chunk > 4096 bytes (the b64 payload is checked before decoding) + big_b64 = standard_b64encode(b'x' * 4097).decode() + parse_bytes(screen, client_remote_file(1, big_b64, item_type=0)) + self.assert_error(cap, wid) + + def test_remote_drag_negative_X_rejected(self) -> None: + """Sending t=k with X < 0 is rejected.""" + uri_list = b'file:///home/user/f.txt\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # Directly construct escape code with negative X + parse_bytes(screen, _osc(f'{DND_CODE};t=k:x=1:X=-1')) + self.assert_error(cap, wid) + + def test_remote_drag_without_remote_flag_fails(self) -> None: + """t=k fails if the drag is not from a remote client.""" + with dnd_test_window() as (osw, wid, screen, cap): + # Register with local machine_id (is_remote_client=False) + parse_bytes(screen, _osc(f'{DND_CODE};t=o:x=1;{machine_id()}')) + parse_bytes(screen, client_drag_offer_mimes(1, 'text/plain text/uri-list')) + cap.consume(wid) + dnd_test_force_drag_dropped(wid) + # Mark the uri-list item - but since is_remote_client is False, + # requested_remote_files will be False + dnd_test_request_drag_data(wid, 1) + # Try to send remote file data directly - should fail since no item has requested_remote_files + b64 = standard_b64encode(b'data').decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=0)) + self.assert_error(cap, wid) + + def test_remote_drag_without_dropped_state_fails(self) -> None: + """t=k fails if the drag state is not DROPPED (data not yet delivered).""" + with dnd_test_window() as (osw, wid, screen, cap): + # Only register, don't progress to DROPPED state + parse_bytes(screen, _osc(f'{DND_CODE};t=o:x=1;different-machine-id')) + parse_bytes(screen, client_drag_offer_mimes(1, 'text/uri-list')) + cap.consume(wid) + # State is BEING_BUILT, not DROPPED, so t=k should fail + b64 = standard_b64encode(b'data').decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=0)) + self.assert_error(cap, wid) + + def test_remote_drag_dos_remote_drag_limit(self) -> None: + """Total remote data size exceeding REMOTE_DRAG_LIMIT triggers EMFILE error.""" + uri_list = b'file:///home/user/big.bin\r\n' + with dnd_test_window_with_limits(remote_drag_limit=50) as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # First chunk within limit + b64 = standard_b64encode(b'x' * 30).decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=0, more=True)) + self._assert_no_output(cap, wid) + # Second chunk pushes over the limit + b64 = standard_b64encode(b'y' * 30).decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=0)) + self.assert_error(cap, wid, 'EMFILE') + + def test_remote_drag_dos_present_data_cap_on_directory(self) -> None: + """Directory listing data exceeding PRESENT_DATA_CAP triggers EMFILE error.""" + uri_list = b'file:///home/user/dir\r\n' + with dnd_test_window_with_limits(present_data_cap=20) as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # Send a directory listing that will exceed the cap + big_listing = b'\x00'.join([f'file{i}.txt'.encode() for i in range(100)]) + b64 = standard_b64encode(big_listing).decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=2)) + self.assert_error(cap, wid, 'EMFILE') + + def test_remote_drag_error_from_client(self) -> None: + """Client error (t=E) during remote drag aborts correctly.""" + uri_list = b'file:///home/user/f.txt\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # Client reports an error + parse_bytes(screen, client_drag_cancel()) + # The drag should have been canceled - t=k should now fail + cap.consume(wid) # discard any error output from cancel + b64 = standard_b64encode(b'data').decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=0)) + self.assert_error(cap, wid) + + def test_remote_drag_three_level_tree_with_verification(self) -> None: + """Transfer a 3-level directory tree and verify no errors occur. + + root/ + alpha.txt (file) + beta/ (dir) + gamma.txt (file) + delta/ (dir) + epsilon (file) + zeta (symlink -> /zeta-target) + eta -> /link-tgt (symlink) + """ + uri_list = b'file:///home/user/root\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + + # Root directory (handle=10) + root_entries = b'alpha.txt\x00beta\x00eta' + b64 = standard_b64encode(root_entries).decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=10)) + parse_bytes(screen, client_remote_file(1, '', item_type=10)) + + # alpha.txt (child 1 of root) + b64 = standard_b64encode(b'alpha content').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=10, entry_num=1)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=10, entry_num=1)) + + # beta (child 2 of root, handle=20) + beta_entries = b'gamma.txt\x00delta' + b64 = standard_b64encode(beta_entries).decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=20, parent_handle=10, entry_num=2)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=20, parent_handle=10, entry_num=2)) + + # eta (child 3 of root, symlink) + b64 = standard_b64encode(b'/link-tgt').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=1, parent_handle=10, entry_num=3)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=1, parent_handle=10, entry_num=3)) + + # gamma.txt (child 1 of beta) + b64 = standard_b64encode(b'gamma content').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=20, entry_num=1)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=20, entry_num=1)) + + # delta (child 2 of beta, handle=30) + delta_entries = b'epsilon\x00zeta' + b64 = standard_b64encode(delta_entries).decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=30, parent_handle=20, entry_num=2)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=30, parent_handle=20, entry_num=2)) + + # epsilon (child 1 of delta) + b64 = standard_b64encode(b'epsilon content').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=30, entry_num=1)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=30, entry_num=1)) + + # zeta (child 2 of delta, symlink) + b64 = standard_b64encode(b'/zeta-target').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=1, parent_handle=30, entry_num=2)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=1, parent_handle=30, entry_num=2)) + + self._assert_no_output(cap, wid) + # Completion + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + def test_remote_drag_process_item_data_basic(self) -> None: + """Basic drag_process_item_data: send data for a MIME type after DROPPED state.""" + with dnd_test_window() as (osw, wid, screen, cap): + # Set up a non-remote drag with text/plain + parse_bytes(screen, _osc(f'{DND_CODE};t=o:x=1;{machine_id()}')) + parse_bytes(screen, client_drag_offer_mimes(1, 'text/plain')) + cap.consume(wid) + dnd_test_force_drag_dropped(wid) + dnd_test_request_drag_data(wid, 0) + # Send data for text/plain (index 0) + b64 = standard_b64encode(b'test data').decode() + parse_bytes(screen, client_drag_send_data(0, b64)) + self._assert_no_output(cap, wid) + # End of data + parse_bytes(screen, client_drag_send_data(0, '')) + # Should get a notification (but no error) + events = self._get_events(cap, wid) + for ev in events: + self.assertNotEqual(ev['type'], 'E', f'unexpected error: {ev}') + + def test_remote_drag_process_item_data_error(self) -> None: + """Client can report an error via t=E for a MIME data delivery.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, _osc(f'{DND_CODE};t=o:x=1;{machine_id()}')) + parse_bytes(screen, client_drag_offer_mimes(1, 'text/plain')) + cap.consume(wid) + dnd_test_force_drag_dropped(wid) + dnd_test_request_drag_data(wid, 0) + # Client reports EPERM error + parse_bytes(screen, client_drag_send_error(0, 'EPERM')) + # The error should propagate but not crash + cap.consume(wid) + + def test_remote_drag_process_item_data_invalid_index(self) -> None: + """Sending data for a non-existent MIME index is rejected.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, _osc(f'{DND_CODE};t=o:x=1;{machine_id()}')) + parse_bytes(screen, client_drag_offer_mimes(1, 'text/plain')) + cap.consume(wid) + dnd_test_force_drag_dropped(wid) + # Index 5 is way out of bounds + b64 = standard_b64encode(b'data').decode() + parse_bytes(screen, client_drag_send_data(5, b64)) + self.assert_error(cap, wid) + + def test_remote_drag_mixed_file_dir_symlink(self) -> None: + """Transfer mixed content: file, directory and symlink as separate URIs.""" + uri_list = b'file:///tmp/a.txt\r\nfile:///tmp/mydir\r\nfile:///tmp/mylink\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + + # URI 1: regular file + b64 = standard_b64encode(b'file a content').decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=0)) + parse_bytes(screen, client_remote_file(1, '', item_type=0)) + + # URI 2: directory (handle=5) + dir_entries = b'child.txt' + b64 = standard_b64encode(dir_entries).decode() + parse_bytes(screen, client_remote_file(2, b64, item_type=5)) + parse_bytes(screen, client_remote_file(2, '', item_type=5)) + + # Child of directory (entry 1) + b64 = standard_b64encode(b'child content').decode() + parse_bytes(screen, client_remote_file( + 2, b64, item_type=0, parent_handle=5, entry_num=1)) + parse_bytes(screen, client_remote_file( + 2, '', item_type=0, parent_handle=5, entry_num=1)) + + # URI 3: symlink + b64 = standard_b64encode(b'/symlink/target').decode() + parse_bytes(screen, client_remote_file(3, b64, item_type=1)) + parse_bytes(screen, client_remote_file(3, '', item_type=1)) + + self._assert_no_output(cap, wid) + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + def test_remote_drag_empty_file(self) -> None: + """Transfer an empty file (end-of-data immediately after start).""" + uri_list = b'file:///home/user/empty.txt\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # Start file transfer, then immediately end (no data chunks) + parse_bytes(screen, client_remote_file(1, '', item_type=0)) + self._assert_no_output(cap, wid) + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + def test_remote_drag_empty_directory(self) -> None: + """Transfer a directory with no entries.""" + uri_list = b'file:///home/user/emptydir\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # Empty directory listing (single entry name) + b64 = standard_b64encode(b'').decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=2)) + parse_bytes(screen, client_remote_file(1, '', item_type=2)) + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + def test_remote_drag_uri_list_with_comments(self) -> None: + """URI list with comment lines (starting with #) should filter them out.""" + uri_list = b'# this is a comment\r\nfile:///home/user/f.txt\r\n# another comment\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # Only 1 real URI (f.txt), so URI index 1 should work + b64 = standard_b64encode(b'content').decode() + parse_bytes(screen, client_remote_file(1, b64, item_type=0)) + parse_bytes(screen, client_remote_file(1, '', item_type=0)) + # URI index 2 should fail (no such entry) + cap.consume(wid) + b64 = standard_b64encode(b'bad').decode() + parse_bytes(screen, client_remote_file(2, b64, item_type=0)) + self.assert_error(cap, wid) + + def test_remote_drag_multiple_chunks_directory_listing(self) -> None: + """Directory listing data can be sent in multiple chunks.""" + uri_list = b'file:///home/user/dir\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_remote_drag(screen, wid, cap, uri_list) + # Send directory listing in two chunks + chunk1 = b'file1.txt\x00fi' + chunk2 = b'le2.txt' + b64_1 = standard_b64encode(chunk1).decode() + b64_2 = standard_b64encode(chunk2).decode() + parse_bytes(screen, client_remote_file(1, b64_1, item_type=2, more=True)) + self._assert_no_output(cap, wid) + parse_bytes(screen, client_remote_file(1, b64_2, item_type=2)) + self._assert_no_output(cap, wid) + # End of listing + parse_bytes(screen, client_remote_file(1, '', item_type=2)) + self._assert_no_output(cap, wid) + + # Verify children are accessible: entry 1 and entry 2 + b64 = standard_b64encode(b'c1').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=2, entry_num=1)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=2, entry_num=1)) + self._assert_no_output(cap, wid) + b64 = standard_b64encode(b'c2').decode() + parse_bytes(screen, client_remote_file( + 1, b64, item_type=0, parent_handle=2, entry_num=2)) + parse_bytes(screen, client_remote_file( + 1, '', item_type=0, parent_handle=2, entry_num=2)) + self._assert_no_output(cap, wid) + + parse_bytes(screen, client_remote_file_finish()) + self._assert_no_output(cap, wid) + + # ---- DoS limits tests --------------------------------------------------- + + def test_dos_mime_list_size_cap(self) -> None: + """Exceeding MIME_LIST_SIZE_CAP when offering MIME types returns EFBIG.""" + with dnd_test_window_with_limits(mime_list_cap=20) as (osw, wid, screen, cap): + parse_bytes(screen, client_drag_register()) + # Offer MIME types that exceed the cap + long_mime = 'x' * 30 + parse_bytes(screen, client_drag_offer_mimes(1, long_mime)) + self.assert_error(cap, wid, 'EFBIG') + + def test_dos_present_data_cap_pre_send(self) -> None: + """Exceeding PRESENT_DATA_CAP with pre-sent data returns EFBIG.""" + with dnd_test_window_with_limits(present_data_cap=50) as (osw, wid, screen, cap): + self._setup_drag_offer(screen, wid, cap, 'text/plain') + # Pre-send data exceeding the cap + big_data = standard_b64encode(b'x' * 60).decode() + parse_bytes(screen, client_drag_pre_send(0, big_data)) + self.assert_error(cap, wid, 'EFBIG') + + def test_dos_mime_list_size_cap_drop_target(self) -> None: + """Exceeding MIME_LIST_SIZE_CAP when registering for drops silently ignores the excess.""" + with dnd_test_window_with_limits(mime_list_cap=10) as (osw, wid, screen, cap): + # Register with MIME types exceeding the cap + long_mimes = 'text/plain text/html application/json' + self._register_for_drops(screen, cap, wid, long_mimes) + # The drop should still enter (excess mimes are silently dropped) + dnd_test_set_mouse_pos(wid, 1, 1, 1, 1) + dnd_test_fake_drop_event(wid, False, ['text/plain']) + events = self._get_events(cap, wid) + # Should get a move event + self.assertTrue(len(events) >= 1, events)