From afa63ccec76627d5d0763713b0e40fb242cfeec3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 10 Apr 2026 07:35:09 +0000 Subject: [PATCH] Implement new drop protocol X/Y key handling from b466f8b - dnd.c: drop_send_dir_listing now uses drop_append_request_keys (echoes all request keys including Y for sub-dirs) and emits :X=handle instead of :Y=handle:X=2. Directory handles are now the X value itself. - dnd.c: drop_alloc_dir_handle starts handle counter at 1 so first handle is 2, keeping 0 (absent) and 1 (symlink) reserved as per protocol. - dnd.py: add is_dir_event() / dir_handle() helpers; update all tests to use int(X) > 1 as directory indicator and X as the handle value. - dnd.py: rename test_Y_key_in_dir_listing_response to test_X_key_is_handle_in_dir_listing_response with updated assertions. - dnd.py: test_uri_directory_transfer_tree expanded to verify unambiguous identification (Y=parent, x=entry echoed) at all three directory levels. Agent-Logs-Url: https://github.com/kovidgoyal/kitty/sessions/6973699c-a979-4d97-8213-1a4a501044a1 Co-authored-by: kovidgoyal <1308621+kovidgoyal@users.noreply.github.com> --- kitty/dnd.c | 18 ++++-- kitty_tests/dnd.py | 156 ++++++++++++++++++++++++++++++++------------- 2 files changed, 124 insertions(+), 50 deletions(-) diff --git a/kitty/dnd.c b/kitty/dnd.c index 433f029ec..415b905a9 100644 --- a/kitty/dnd.c +++ b/kitty/dnd.c @@ -691,7 +691,9 @@ static uint32_t drop_alloc_dir_handle(Window *w, const char *path, char **entries, size_t num_entries) { ensure_space_for(&w->drop, dir_handles, DirHandle, w->drop.num_dir_handles + 1, dir_handles_capacity, 4, true); w->drop.next_dir_handle_id++; - if (w->drop.next_dir_handle_id == 0) w->drop.next_dir_handle_id = 1; + /* Handles 0 and 1 are reserved (0 = absent, 1 = symlink indicator), so + * valid directory handles must be >= 2. */ + if (w->drop.next_dir_handle_id <= 1) w->drop.next_dir_handle_id = 2; DirHandle *h = &w->drop.dir_handles[w->drop.num_dir_handles++]; zero_at_ptr(h); h->id = w->drop.next_dir_handle_id; @@ -793,12 +795,14 @@ drop_send_dir_listing(Window *w, const char *path) { char hdr[128]; int hdr_sz = snprintf(hdr, sizeof(hdr), "\x1b]%d;t=r", DND_CODE); - /* For dir listings, echo the x and y keys from the request, then add Y=new_handle:X=2 */ - if (w->drop.current_request_x) - hdr_sz += snprintf(hdr + hdr_sz, sizeof(hdr) - hdr_sz, ":x=%d", (int)w->drop.current_request_x); - if (w->drop.current_request_y) - hdr_sz += snprintf(hdr + hdr_sz, sizeof(hdr) - hdr_sz, ":y=%d", (int)w->drop.current_request_y); - hdr_sz += snprintf(hdr + hdr_sz, sizeof(hdr) - hdr_sz, ":Y=%u:X=2", (unsigned)handle_id); + /* Echo all request keys (x, y, Y) so the client can unambiguously identify + * which filesystem object this listing corresponds to. For top-level URI + * file requests Y is absent; for sub-dir reads Y holds the parent handle + * and x holds the 1-based entry index. The new handle is X itself (a value + * > 1 distinguishes directories from regular files (X absent / X=0) and + * symlinks (X=1)). */ + hdr_sz += drop_append_request_keys(w, hdr + hdr_sz, sizeof(hdr) - hdr_sz); + hdr_sz += snprintf(hdr + hdr_sz, sizeof(hdr) - hdr_sz, ":X=%u", (unsigned)handle_id); /* payload_sz includes a trailing null; omit it – the null-separated format * does not require a trailing null after the last entry. */ size_t send_sz = payload_sz > 0 ? payload_sz - 1 : 0; diff --git a/kitty_tests/dnd.py b/kitty_tests/dnd.py index eea75e34c..793abafed 100644 --- a/kitty_tests/dnd.py +++ b/kitty_tests/dnd.py @@ -266,6 +266,19 @@ def parse_escape_codes_b64(data: bytes) -> list[dict]: return result +def is_dir_event(e: dict) -> bool: + """Return True if the event is a directory listing response (X > 1).""" + try: + return int(e['meta'].get('X', '0')) > 1 + except (ValueError, TypeError): + return False + + +def dir_handle(e: dict) -> int: + """Return the directory handle from a directory listing event (value of X).""" + return int(e['meta']['X']) + + # ---- test context manager --------------------------------------------------- class _WriteCapture: @@ -740,7 +753,15 @@ class TestDnDProtocol(BaseTest): self.ae(events[0]['payload'].strip(), b'EINVAL') def test_uri_directory_transfer_tree(self) -> None: - """Full directory tree transfer: listing, sub-dirs, file integrity.""" + """Full directory tree (≥ 3 levels deep) transfer: listing, sub-dirs, file integrity. + + Also verifies that every response from the terminal unambiguously + identifies the filesystem object it refers to. For sub-directory + listing responses the echoed Y= (parent handle) and x= (1-based entry + index within the parent) together with X= (new child handle) make the + response unambiguous. For file/error responses Y= and x= alone + suffice. + """ import hashlib import os import tempfile @@ -764,18 +785,28 @@ class TestDnDProtocol(BaseTest): with dnd_test_window() as (osw, wid, screen, cap): self._setup_uri_drop(screen, wid, cap, uri_list) - # Request the root directory (idx=0) + # Request the root directory (mime_idx=2, file_idx=1) parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)] self.assertTrue(d_events, 'expected directory listing for root') root_listing_payload = b''.join( chunk for e in d_events for chunk in e['chunks'] if chunk ) - root_handle_id = int(d_events[0]['meta']['Y']) - self.assertGreater(root_handle_id, 0) + root_handle_id = dir_handle(d_events[0]) + self.assertGreater(root_handle_id, 1, + 'root directory handle (X=) must be > 1') + # For a top-level URI request the response echoes x= and y= from the + # request; Y= must be absent because the request had no Y. + for ev in d_events: + self.ae(ev['meta'].get('x'), '2', + 'mime index must be echoed in root dir response') + self.ae(ev['meta'].get('y'), '1', + 'file index must be echoed in root dir response') + self.assertIsNone(ev['meta'].get('Y'), + 'Y= must not be present in top-level dir response') # Decode null-separated entries (no unique identifier prefix) root_entries = [e for e in root_listing_payload.split(b'\x00') if e] @@ -788,26 +819,39 @@ class TestDnDProtocol(BaseTest): a_idx = entries_list.index('a.txt') + 1 b_idx = entries_list.index('b') + 1 - # Read a.txt + # Read a.txt — response must echo Y=root_handle_id, x=a_idx parse_bytes(screen, client_dir_read(root_handle_id, a_idx)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) r_events = [e for e in events if e['type'] == 'r'] + for ev in r_events: + self.ae(ev['meta'].get('Y'), str(root_handle_id), + 'parent handle must be echoed in file response') + self.ae(ev['meta'].get('x'), str(a_idx), + 'entry index must be echoed in file response') a_data = b''.join(e['payload'] for e in r_events if e['payload']) self.ae(a_data, a_content) # Read sub-directory b → should get a new directory listing + # Response must echo Y=root_handle_id, x=b_idx; X= is new handle parse_bytes(screen, client_dir_read(root_handle_id, b_idx)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - b_d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + b_d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)] self.assertTrue(b_d_events, 'expected directory listing for b/') b_listing_payload = b''.join( chunk for e in b_d_events for chunk in e['chunks'] if chunk ) - b_handle_id = int(b_d_events[0]['meta']['Y']) + b_handle_id = dir_handle(b_d_events[0]) self.assertNotEqual(b_handle_id, root_handle_id) + # Unambiguous identification: the response must identify both the + # parent dir (Y=) and the entry within it (x=). + for ev in b_d_events: + self.ae(ev['meta'].get('Y'), str(root_handle_id), + 'parent handle must be echoed in sub-dir listing response') + self.ae(ev['meta'].get('x'), str(b_idx), + 'entry index must be echoed in sub-dir listing response') b_entries = [e for e in b_listing_payload.split(b'\x00') if e] b_names = {e.decode() for e in b_entries} @@ -818,28 +862,41 @@ class TestDnDProtocol(BaseTest): bc_idx = b_entries_list.index('c.txt') + 1 bd_idx = b_entries_list.index('d') + 1 - # Read b/c.txt (binary integrity) + # Read b/c.txt (binary integrity); response echoes Y=b_handle_id, x=bc_idx parse_bytes(screen, client_dir_read(b_handle_id, bc_idx)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) r_events = [e for e in events if e['type'] == 'r'] + for ev in r_events: + self.ae(ev['meta'].get('Y'), str(b_handle_id), + 'parent handle must be echoed in file response') + self.ae(ev['meta'].get('x'), str(bc_idx), + 'entry index must be echoed in file response') bc_data = b''.join(e['payload'] for e in r_events if e['payload']) self.ae(bc_data, bc_content) # Check SHA-256 integrity self.ae(hashlib.sha256(bc_data).digest(), hashlib.sha256(bc_content).digest()) - # Read sub-directory b/d → yet another directory listing + # Read sub-directory b/d → yet another directory listing (level 3) + # Response must echo Y=b_handle_id, x=bd_idx; X= is new handle parse_bytes(screen, client_dir_read(b_handle_id, bd_idx)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - bd_d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + bd_d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)] self.assertTrue(bd_d_events, 'expected directory listing for b/d/') bd_listing_payload = b''.join( chunk for e in bd_d_events for chunk in e['chunks'] if chunk ) - bd_handle_id = int(bd_d_events[0]['meta']['Y']) + bd_handle_id = dir_handle(bd_d_events[0]) + # Unambiguous identification for third-level directory. + for ev in bd_d_events: + self.ae(ev['meta'].get('Y'), str(b_handle_id), + 'parent handle must be echoed in level-3 sub-dir listing response') + self.ae(ev['meta'].get('x'), str(bd_idx), + 'entry index must be echoed in level-3 sub-dir listing response') + bd_entries = [e for e in bd_listing_payload.split(b'\x00') if e] bd_names = {e.decode() for e in bd_entries} self.assertIn('e.txt', bd_names) @@ -847,11 +904,16 @@ class TestDnDProtocol(BaseTest): bd_entries_list = [e.decode() for e in bd_entries] bde_idx = bd_entries_list.index('e.txt') + 1 - # Read b/d/e.txt + # Read b/d/e.txt; response echoes Y=bd_handle_id, x=bde_idx parse_bytes(screen, client_dir_read(bd_handle_id, bde_idx)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) r_events = [e for e in events if e['type'] == 'r'] + for ev in r_events: + self.ae(ev['meta'].get('Y'), str(bd_handle_id), + 'parent handle must be echoed in deep file response') + self.ae(ev['meta'].get('x'), str(bde_idx), + 'entry index must be echoed in deep file response') bde_data = b''.join(e['payload'] for e in r_events if e['payload']) self.ae(bde_data, bde_content) @@ -874,9 +936,9 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)] self.assertTrue(d_ev) - hid = int(d_ev[0]['meta']['Y']) + hid = dir_handle(d_ev[0]) # Close the handle parse_bytes(screen, client_dir_read(hid)) @@ -901,8 +963,8 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] - hid = int(d_ev[0]['meta']['Y']) + d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)] + hid = dir_handle(d_ev[0]) # Entry 999 does not exist parse_bytes(screen, client_dir_read(hid, 999)) @@ -923,7 +985,7 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)] payload = b''.join( chunk for e in d_ev for chunk in e['chunks'] if chunk ) @@ -946,11 +1008,11 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)] payload = b''.join( chunk for e in d_ev for chunk in e['chunks'] if chunk ) - hid = int(d_ev[0]['meta']['Y']) + hid = dir_handle(d_ev[0]) entries = [e.decode() for e in payload.split(b'\x00') if e] self.assertIn('link.txt', entries) self.assertIn('real.txt', entries) @@ -982,11 +1044,11 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)] payload = b''.join( chunk for e in d_ev for chunk in e['chunks'] if chunk ) - hid = int(d_ev[0]['meta']['Y']) + hid = dir_handle(d_ev[0]) entries = [e.decode() for e in payload.split(b'\x00') if e] self.assertIn('link_to_dir', entries) link_idx = entries.index('link_to_dir') + 1 @@ -1016,11 +1078,11 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)] payload = b''.join( chunk for e in d_ev for chunk in e['chunks'] if chunk ) - hid = int(d_ev[0]['meta']['Y']) + hid = dir_handle(d_ev[0]) entries = [e.decode() for e in payload.split(b'\x00') if e] link_idx = entries.index('abs_link.txt') + 1 @@ -1046,11 +1108,11 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)] payload = b''.join( chunk for e in d_ev for chunk in e['chunks'] if chunk ) - hid = int(d_ev[0]['meta']['Y']) + hid = dir_handle(d_ev[0]) entries = [e.decode() for e in payload.split(b'\x00') if e] reg_idx = entries.index('regular.txt') + 1 @@ -1079,11 +1141,11 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)] payload = b''.join( chunk for e in d_ev for chunk in e['chunks'] if chunk ) - hid = int(d_ev[0]['meta']['Y']) + hid = dir_handle(d_ev[0]) entries = [e.decode() for e in payload.split(b'\x00') if e] # Read regular file @@ -1122,11 +1184,11 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)] payload = b''.join( chunk for e in d_ev for chunk in e['chunks'] if chunk ) - root_hid = int(d_ev[0]['meta']['Y']) + root_hid = dir_handle(d_ev[0]) entries = [e.decode() for e in payload.split(b'\x00') if e] sub_idx = entries.index('sub') + 1 @@ -1134,11 +1196,11 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_dir_read(root_hid, sub_idx)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)] sub_payload = b''.join( chunk for e in d_ev for chunk in e['chunks'] if chunk ) - sub_hid = int(d_ev[0]['meta']['Y']) + sub_hid = dir_handle(d_ev[0]) sub_entries = [e.decode() for e in sub_payload.split(b'\x00') if e] self.assertIn('nested_link.txt', sub_entries) @@ -1164,8 +1226,8 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] - hid = int(d_ev[0]['meta']['Y']) + d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)] + hid = dir_handle(d_ev[0]) # Index 1 should read the first entry parse_bytes(screen, client_dir_read(hid, 1)) @@ -1214,7 +1276,7 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)] self.assertTrue(d_events, 'top-level symlink to dir should return directory listing') payload = b''.join( chunk for e in d_events for chunk in e['chunks'] if chunk @@ -1928,8 +1990,13 @@ class TestDnDProtocol(BaseTest): self.ae(events[0]['meta'].get('x'), '2') self.ae(events[0]['meta'].get('y'), '1') - def test_Y_key_in_dir_listing_response(self) -> None: - """Y= key (new handle) and X=2 are present in directory listing responses.""" + def test_X_key_is_handle_in_dir_listing_response(self) -> None: + """X= key acts as directory handle (> 1) in directory listing responses. + + For top-level URI directory requests the request keys x= (mime index) + and y= (file index) are echoed in the response. X= holds the new + directory handle; Y= is absent because the original request had no Y. + """ import os import tempfile with tempfile.TemporaryDirectory() as root: @@ -1940,12 +2007,15 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)] self.assertTrue(d_events, 'expected directory listing') for ev in d_events: self.ae(ev['meta'].get('x'), '2') self.ae(ev['meta'].get('y'), '1') - self.assertTrue(int(ev['meta'].get('Y', '0')) > 0, 'Y= must be non-zero handle') + handle = dir_handle(ev) + self.assertGreater(handle, 1, 'X= must be a directory handle (> 1)') + self.assertIsNone(ev['meta'].get('Y'), + 'Y= must not be present in top-level dir response') def test_Y_and_x_keys_in_dir_entry_file_response(self) -> None: """Y= and x= keys are echoed when reading a file via directory handle.""" @@ -1961,9 +2031,9 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] + d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)] self.assertTrue(d_events) - handle_id = int(d_events[0]['meta']['Y']) + handle_id = dir_handle(d_events[0]) listing = b''.join(chunk for e in d_events for chunk in e['chunks'] if chunk) entries = [e.decode() for e in listing.split(b'\x00') if e] f_idx = entries.index('f.txt') + 1 @@ -1992,8 +2062,8 @@ class TestDnDProtocol(BaseTest): parse_bytes(screen, client_request_uri_data(2, 1)) raw = cap.consume(wid) events = parse_escape_codes_b64(raw) - d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2'] - handle_id = int(d_events[0]['meta']['Y']) + d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)] + handle_id = dir_handle(d_events[0]) # Out-of-range entry parse_bytes(screen, client_dir_read(handle_id, 999))