mirror of
https://github.com/kovidgoyal/kitty.git
synced 2026-05-13 08:26:56 +00:00
Implement new drop protocol X/Y key handling from b466f8b
- dnd.c: drop_send_dir_listing now uses drop_append_request_keys (echoes all request keys including Y for sub-dirs) and emits :X=handle instead of :Y=handle:X=2. Directory handles are now the X value itself. - dnd.c: drop_alloc_dir_handle starts handle counter at 1 so first handle is 2, keeping 0 (absent) and 1 (symlink) reserved as per protocol. - dnd.py: add is_dir_event() / dir_handle() helpers; update all tests to use int(X) > 1 as directory indicator and X as the handle value. - dnd.py: rename test_Y_key_in_dir_listing_response to test_X_key_is_handle_in_dir_listing_response with updated assertions. - dnd.py: test_uri_directory_transfer_tree expanded to verify unambiguous identification (Y=parent, x=entry echoed) at all three directory levels. Agent-Logs-Url: https://github.com/kovidgoyal/kitty/sessions/6973699c-a979-4d97-8213-1a4a501044a1 Co-authored-by: kovidgoyal <1308621+kovidgoyal@users.noreply.github.com>
This commit is contained in:
parent
b466f8b4df
commit
afa63ccec7
2 changed files with 124 additions and 50 deletions
18
kitty/dnd.c
18
kitty/dnd.c
|
|
@ -691,7 +691,9 @@ static uint32_t
|
|||
drop_alloc_dir_handle(Window *w, const char *path, char **entries, size_t num_entries) {
|
||||
ensure_space_for(&w->drop, dir_handles, DirHandle, w->drop.num_dir_handles + 1, dir_handles_capacity, 4, true);
|
||||
w->drop.next_dir_handle_id++;
|
||||
if (w->drop.next_dir_handle_id == 0) w->drop.next_dir_handle_id = 1;
|
||||
/* Handles 0 and 1 are reserved (0 = absent, 1 = symlink indicator), so
|
||||
* valid directory handles must be >= 2. */
|
||||
if (w->drop.next_dir_handle_id <= 1) w->drop.next_dir_handle_id = 2;
|
||||
DirHandle *h = &w->drop.dir_handles[w->drop.num_dir_handles++];
|
||||
zero_at_ptr(h);
|
||||
h->id = w->drop.next_dir_handle_id;
|
||||
|
|
@ -793,12 +795,14 @@ drop_send_dir_listing(Window *w, const char *path) {
|
|||
|
||||
char hdr[128];
|
||||
int hdr_sz = snprintf(hdr, sizeof(hdr), "\x1b]%d;t=r", DND_CODE);
|
||||
/* For dir listings, echo the x and y keys from the request, then add Y=new_handle:X=2 */
|
||||
if (w->drop.current_request_x)
|
||||
hdr_sz += snprintf(hdr + hdr_sz, sizeof(hdr) - hdr_sz, ":x=%d", (int)w->drop.current_request_x);
|
||||
if (w->drop.current_request_y)
|
||||
hdr_sz += snprintf(hdr + hdr_sz, sizeof(hdr) - hdr_sz, ":y=%d", (int)w->drop.current_request_y);
|
||||
hdr_sz += snprintf(hdr + hdr_sz, sizeof(hdr) - hdr_sz, ":Y=%u:X=2", (unsigned)handle_id);
|
||||
/* Echo all request keys (x, y, Y) so the client can unambiguously identify
|
||||
* which filesystem object this listing corresponds to. For top-level URI
|
||||
* file requests Y is absent; for sub-dir reads Y holds the parent handle
|
||||
* and x holds the 1-based entry index. The new handle is X itself (a value
|
||||
* > 1 distinguishes directories from regular files (X absent / X=0) and
|
||||
* symlinks (X=1)). */
|
||||
hdr_sz += drop_append_request_keys(w, hdr + hdr_sz, sizeof(hdr) - hdr_sz);
|
||||
hdr_sz += snprintf(hdr + hdr_sz, sizeof(hdr) - hdr_sz, ":X=%u", (unsigned)handle_id);
|
||||
/* payload_sz includes a trailing null; omit it – the null-separated format
|
||||
* does not require a trailing null after the last entry. */
|
||||
size_t send_sz = payload_sz > 0 ? payload_sz - 1 : 0;
|
||||
|
|
|
|||
|
|
@ -266,6 +266,19 @@ def parse_escape_codes_b64(data: bytes) -> list[dict]:
|
|||
return result
|
||||
|
||||
|
||||
def is_dir_event(e: dict) -> bool:
|
||||
"""Return True if the event is a directory listing response (X > 1)."""
|
||||
try:
|
||||
return int(e['meta'].get('X', '0')) > 1
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
|
||||
|
||||
def dir_handle(e: dict) -> int:
|
||||
"""Return the directory handle from a directory listing event (value of X)."""
|
||||
return int(e['meta']['X'])
|
||||
|
||||
|
||||
# ---- test context manager ---------------------------------------------------
|
||||
|
||||
class _WriteCapture:
|
||||
|
|
@ -740,7 +753,15 @@ class TestDnDProtocol(BaseTest):
|
|||
self.ae(events[0]['payload'].strip(), b'EINVAL')
|
||||
|
||||
def test_uri_directory_transfer_tree(self) -> None:
|
||||
"""Full directory tree transfer: listing, sub-dirs, file integrity."""
|
||||
"""Full directory tree (≥ 3 levels deep) transfer: listing, sub-dirs, file integrity.
|
||||
|
||||
Also verifies that every response from the terminal unambiguously
|
||||
identifies the filesystem object it refers to. For sub-directory
|
||||
listing responses the echoed Y= (parent handle) and x= (1-based entry
|
||||
index within the parent) together with X= (new child handle) make the
|
||||
response unambiguous. For file/error responses Y= and x= alone
|
||||
suffice.
|
||||
"""
|
||||
import hashlib
|
||||
import os
|
||||
import tempfile
|
||||
|
|
@ -764,18 +785,28 @@ class TestDnDProtocol(BaseTest):
|
|||
with dnd_test_window() as (osw, wid, screen, cap):
|
||||
self._setup_uri_drop(screen, wid, cap, uri_list)
|
||||
|
||||
# Request the root directory (idx=0)
|
||||
# Request the root directory (mime_idx=2, file_idx=1)
|
||||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
self.assertTrue(d_events, 'expected directory listing for root')
|
||||
|
||||
root_listing_payload = b''.join(
|
||||
chunk for e in d_events for chunk in e['chunks'] if chunk
|
||||
)
|
||||
root_handle_id = int(d_events[0]['meta']['Y'])
|
||||
self.assertGreater(root_handle_id, 0)
|
||||
root_handle_id = dir_handle(d_events[0])
|
||||
self.assertGreater(root_handle_id, 1,
|
||||
'root directory handle (X=) must be > 1')
|
||||
# For a top-level URI request the response echoes x= and y= from the
|
||||
# request; Y= must be absent because the request had no Y.
|
||||
for ev in d_events:
|
||||
self.ae(ev['meta'].get('x'), '2',
|
||||
'mime index must be echoed in root dir response')
|
||||
self.ae(ev['meta'].get('y'), '1',
|
||||
'file index must be echoed in root dir response')
|
||||
self.assertIsNone(ev['meta'].get('Y'),
|
||||
'Y= must not be present in top-level dir response')
|
||||
|
||||
# Decode null-separated entries (no unique identifier prefix)
|
||||
root_entries = [e for e in root_listing_payload.split(b'\x00') if e]
|
||||
|
|
@ -788,26 +819,39 @@ class TestDnDProtocol(BaseTest):
|
|||
a_idx = entries_list.index('a.txt') + 1
|
||||
b_idx = entries_list.index('b') + 1
|
||||
|
||||
# Read a.txt
|
||||
# Read a.txt — response must echo Y=root_handle_id, x=a_idx
|
||||
parse_bytes(screen, client_dir_read(root_handle_id, a_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
r_events = [e for e in events if e['type'] == 'r']
|
||||
for ev in r_events:
|
||||
self.ae(ev['meta'].get('Y'), str(root_handle_id),
|
||||
'parent handle must be echoed in file response')
|
||||
self.ae(ev['meta'].get('x'), str(a_idx),
|
||||
'entry index must be echoed in file response')
|
||||
a_data = b''.join(e['payload'] for e in r_events if e['payload'])
|
||||
self.ae(a_data, a_content)
|
||||
|
||||
# Read sub-directory b → should get a new directory listing
|
||||
# Response must echo Y=root_handle_id, x=b_idx; X= is new handle
|
||||
parse_bytes(screen, client_dir_read(root_handle_id, b_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
b_d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
b_d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
self.assertTrue(b_d_events, 'expected directory listing for b/')
|
||||
|
||||
b_listing_payload = b''.join(
|
||||
chunk for e in b_d_events for chunk in e['chunks'] if chunk
|
||||
)
|
||||
b_handle_id = int(b_d_events[0]['meta']['Y'])
|
||||
b_handle_id = dir_handle(b_d_events[0])
|
||||
self.assertNotEqual(b_handle_id, root_handle_id)
|
||||
# Unambiguous identification: the response must identify both the
|
||||
# parent dir (Y=) and the entry within it (x=).
|
||||
for ev in b_d_events:
|
||||
self.ae(ev['meta'].get('Y'), str(root_handle_id),
|
||||
'parent handle must be echoed in sub-dir listing response')
|
||||
self.ae(ev['meta'].get('x'), str(b_idx),
|
||||
'entry index must be echoed in sub-dir listing response')
|
||||
|
||||
b_entries = [e for e in b_listing_payload.split(b'\x00') if e]
|
||||
b_names = {e.decode() for e in b_entries}
|
||||
|
|
@ -818,28 +862,41 @@ class TestDnDProtocol(BaseTest):
|
|||
bc_idx = b_entries_list.index('c.txt') + 1
|
||||
bd_idx = b_entries_list.index('d') + 1
|
||||
|
||||
# Read b/c.txt (binary integrity)
|
||||
# Read b/c.txt (binary integrity); response echoes Y=b_handle_id, x=bc_idx
|
||||
parse_bytes(screen, client_dir_read(b_handle_id, bc_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
r_events = [e for e in events if e['type'] == 'r']
|
||||
for ev in r_events:
|
||||
self.ae(ev['meta'].get('Y'), str(b_handle_id),
|
||||
'parent handle must be echoed in file response')
|
||||
self.ae(ev['meta'].get('x'), str(bc_idx),
|
||||
'entry index must be echoed in file response')
|
||||
bc_data = b''.join(e['payload'] for e in r_events if e['payload'])
|
||||
self.ae(bc_data, bc_content)
|
||||
# Check SHA-256 integrity
|
||||
self.ae(hashlib.sha256(bc_data).digest(),
|
||||
hashlib.sha256(bc_content).digest())
|
||||
|
||||
# Read sub-directory b/d → yet another directory listing
|
||||
# Read sub-directory b/d → yet another directory listing (level 3)
|
||||
# Response must echo Y=b_handle_id, x=bd_idx; X= is new handle
|
||||
parse_bytes(screen, client_dir_read(b_handle_id, bd_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
bd_d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
bd_d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
self.assertTrue(bd_d_events, 'expected directory listing for b/d/')
|
||||
|
||||
bd_listing_payload = b''.join(
|
||||
chunk for e in bd_d_events for chunk in e['chunks'] if chunk
|
||||
)
|
||||
bd_handle_id = int(bd_d_events[0]['meta']['Y'])
|
||||
bd_handle_id = dir_handle(bd_d_events[0])
|
||||
# Unambiguous identification for third-level directory.
|
||||
for ev in bd_d_events:
|
||||
self.ae(ev['meta'].get('Y'), str(b_handle_id),
|
||||
'parent handle must be echoed in level-3 sub-dir listing response')
|
||||
self.ae(ev['meta'].get('x'), str(bd_idx),
|
||||
'entry index must be echoed in level-3 sub-dir listing response')
|
||||
|
||||
bd_entries = [e for e in bd_listing_payload.split(b'\x00') if e]
|
||||
bd_names = {e.decode() for e in bd_entries}
|
||||
self.assertIn('e.txt', bd_names)
|
||||
|
|
@ -847,11 +904,16 @@ class TestDnDProtocol(BaseTest):
|
|||
bd_entries_list = [e.decode() for e in bd_entries]
|
||||
bde_idx = bd_entries_list.index('e.txt') + 1
|
||||
|
||||
# Read b/d/e.txt
|
||||
# Read b/d/e.txt; response echoes Y=bd_handle_id, x=bde_idx
|
||||
parse_bytes(screen, client_dir_read(bd_handle_id, bde_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
r_events = [e for e in events if e['type'] == 'r']
|
||||
for ev in r_events:
|
||||
self.ae(ev['meta'].get('Y'), str(bd_handle_id),
|
||||
'parent handle must be echoed in deep file response')
|
||||
self.ae(ev['meta'].get('x'), str(bde_idx),
|
||||
'entry index must be echoed in deep file response')
|
||||
bde_data = b''.join(e['payload'] for e in r_events if e['payload'])
|
||||
self.ae(bde_data, bde_content)
|
||||
|
||||
|
|
@ -874,9 +936,9 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
self.assertTrue(d_ev)
|
||||
hid = int(d_ev[0]['meta']['Y'])
|
||||
hid = dir_handle(d_ev[0])
|
||||
|
||||
# Close the handle
|
||||
parse_bytes(screen, client_dir_read(hid))
|
||||
|
|
@ -901,8 +963,8 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
hid = int(d_ev[0]['meta']['Y'])
|
||||
d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
hid = dir_handle(d_ev[0])
|
||||
|
||||
# Entry 999 does not exist
|
||||
parse_bytes(screen, client_dir_read(hid, 999))
|
||||
|
|
@ -923,7 +985,7 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
|
|
@ -946,11 +1008,11 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
hid = int(d_ev[0]['meta']['Y'])
|
||||
hid = dir_handle(d_ev[0])
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
self.assertIn('link.txt', entries)
|
||||
self.assertIn('real.txt', entries)
|
||||
|
|
@ -982,11 +1044,11 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
hid = int(d_ev[0]['meta']['Y'])
|
||||
hid = dir_handle(d_ev[0])
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
self.assertIn('link_to_dir', entries)
|
||||
link_idx = entries.index('link_to_dir') + 1
|
||||
|
|
@ -1016,11 +1078,11 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
hid = int(d_ev[0]['meta']['Y'])
|
||||
hid = dir_handle(d_ev[0])
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
link_idx = entries.index('abs_link.txt') + 1
|
||||
|
||||
|
|
@ -1046,11 +1108,11 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
hid = int(d_ev[0]['meta']['Y'])
|
||||
hid = dir_handle(d_ev[0])
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
reg_idx = entries.index('regular.txt') + 1
|
||||
|
||||
|
|
@ -1079,11 +1141,11 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
hid = int(d_ev[0]['meta']['Y'])
|
||||
hid = dir_handle(d_ev[0])
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
|
||||
# Read regular file
|
||||
|
|
@ -1122,11 +1184,11 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
root_hid = int(d_ev[0]['meta']['Y'])
|
||||
root_hid = dir_handle(d_ev[0])
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
sub_idx = entries.index('sub') + 1
|
||||
|
||||
|
|
@ -1134,11 +1196,11 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_dir_read(root_hid, sub_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
sub_payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
sub_hid = int(d_ev[0]['meta']['Y'])
|
||||
sub_hid = dir_handle(d_ev[0])
|
||||
sub_entries = [e.decode() for e in sub_payload.split(b'\x00') if e]
|
||||
self.assertIn('nested_link.txt', sub_entries)
|
||||
|
||||
|
|
@ -1164,8 +1226,8 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
hid = int(d_ev[0]['meta']['Y'])
|
||||
d_ev = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
hid = dir_handle(d_ev[0])
|
||||
|
||||
# Index 1 should read the first entry
|
||||
parse_bytes(screen, client_dir_read(hid, 1))
|
||||
|
|
@ -1214,7 +1276,7 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
self.assertTrue(d_events, 'top-level symlink to dir should return directory listing')
|
||||
payload = b''.join(
|
||||
chunk for e in d_events for chunk in e['chunks'] if chunk
|
||||
|
|
@ -1928,8 +1990,13 @@ class TestDnDProtocol(BaseTest):
|
|||
self.ae(events[0]['meta'].get('x'), '2')
|
||||
self.ae(events[0]['meta'].get('y'), '1')
|
||||
|
||||
def test_Y_key_in_dir_listing_response(self) -> None:
|
||||
"""Y= key (new handle) and X=2 are present in directory listing responses."""
|
||||
def test_X_key_is_handle_in_dir_listing_response(self) -> None:
|
||||
"""X= key acts as directory handle (> 1) in directory listing responses.
|
||||
|
||||
For top-level URI directory requests the request keys x= (mime index)
|
||||
and y= (file index) are echoed in the response. X= holds the new
|
||||
directory handle; Y= is absent because the original request had no Y.
|
||||
"""
|
||||
import os
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as root:
|
||||
|
|
@ -1940,12 +2007,15 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
self.assertTrue(d_events, 'expected directory listing')
|
||||
for ev in d_events:
|
||||
self.ae(ev['meta'].get('x'), '2')
|
||||
self.ae(ev['meta'].get('y'), '1')
|
||||
self.assertTrue(int(ev['meta'].get('Y', '0')) > 0, 'Y= must be non-zero handle')
|
||||
handle = dir_handle(ev)
|
||||
self.assertGreater(handle, 1, 'X= must be a directory handle (> 1)')
|
||||
self.assertIsNone(ev['meta'].get('Y'),
|
||||
'Y= must not be present in top-level dir response')
|
||||
|
||||
def test_Y_and_x_keys_in_dir_entry_file_response(self) -> None:
|
||||
"""Y= and x= keys are echoed when reading a file via directory handle."""
|
||||
|
|
@ -1961,9 +2031,9 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
self.assertTrue(d_events)
|
||||
handle_id = int(d_events[0]['meta']['Y'])
|
||||
handle_id = dir_handle(d_events[0])
|
||||
listing = b''.join(chunk for e in d_events for chunk in e['chunks'] if chunk)
|
||||
entries = [e.decode() for e in listing.split(b'\x00') if e]
|
||||
f_idx = entries.index('f.txt') + 1
|
||||
|
|
@ -1992,8 +2062,8 @@ class TestDnDProtocol(BaseTest):
|
|||
parse_bytes(screen, client_request_uri_data(2, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_events = [e for e in events if e['type'] == 'r' and e['meta'].get('X') == '2']
|
||||
handle_id = int(d_events[0]['meta']['Y'])
|
||||
d_events = [e for e in events if e['type'] == 'r' and is_dir_event(e)]
|
||||
handle_id = dir_handle(d_events[0])
|
||||
|
||||
# Out-of-range entry
|
||||
parse_bytes(screen, client_dir_read(handle_id, 999))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue