mirror of
https://github.com/kovidgoyal/kitty.git
synced 2026-05-13 08:26:56 +00:00
Merge branch 'copilot/update-dnd-protocol-symlink-handling' of https://github.com/kovidgoyal/kitty
This commit is contained in:
commit
019158c168
4 changed files with 334 additions and 50 deletions
|
|
@ -179,8 +179,8 @@ encoded and might be chunked if the directory has a lot of entries.
|
|||
|
||||
``idx`` is an arbitrary 32 bit integer that acts as a handle to this
|
||||
directory. The client can now read the files in this directory using requests of the form
|
||||
``t=d:x=idx:y=num:r=request_id``, here ``num`` is the 0-based index into the list of
|
||||
directory entries previously transmitted to the client, where, ``0`` will
|
||||
``t=d:x=idx:y=num:r=request_id``, here ``num`` is the 1-based index into the list of
|
||||
directory entries previously transmitted to the client, where, ``1`` will
|
||||
correspond to the first entry in the directory. Once the client is done
|
||||
reading a directory it should transmit ``t=d:x=idx:r=request_id`` to the terminal. The
|
||||
terminal can then free any resources associated with that directory. The
|
||||
|
|
|
|||
|
|
@ -114,6 +114,7 @@ def generate(
|
|||
payload_is_base64: bool = True,
|
||||
start_parsing_at: int = 1,
|
||||
field_sep: str = ',',
|
||||
post_init: str = '',
|
||||
) -> str:
|
||||
type_map = resolve_keys(keymap)
|
||||
keys_enum = enum(keymap)
|
||||
|
|
@ -121,6 +122,7 @@ def generate(
|
|||
flag_keys = parse_flag(keymap, type_map, command_class)
|
||||
int_keys, uint_keys = parse_number(keymap)
|
||||
report_cmd = cmd_for_report(report_name, keymap, type_map, payload_allowed, payload_is_base64)
|
||||
post_init_line = f'\n {post_init}' if post_init else ''
|
||||
extra_init = ''
|
||||
if payload_allowed:
|
||||
payload_after_value = "case ';': state = PAYLOAD; break;"
|
||||
|
|
@ -163,7 +165,7 @@ static inline void
|
|||
{extra_init}
|
||||
enum PARSER_STATES {{ KEY, EQUAL, UINT, INT, FLAG, AFTER_VALUE {payload} }};
|
||||
enum PARSER_STATES state = KEY, value_state = FLAG;
|
||||
{command_class} g = {{0}};
|
||||
{command_class} g = {{0}};{post_init_line}
|
||||
unsigned int i, code;
|
||||
uint64_t lcode; int64_t accumulator;
|
||||
bool is_negative; (void)is_negative;
|
||||
|
|
|
|||
43
kitty/dnd.c
43
kitty/dnd.c
|
|
@ -710,9 +710,6 @@ drop_find_dir_handle(Window *w, uint32_t id) {
|
|||
* send the listing to the client as a t=d:x=handle_id response. */
|
||||
static void
|
||||
drop_send_dir_listing(Window *w, const char *path) {
|
||||
struct stat st;
|
||||
if (stat(path, &st) < 0) { drop_send_error(w, EIO); return; }
|
||||
|
||||
DIR *dir = opendir(path);
|
||||
if (!dir) {
|
||||
switch (errno) {
|
||||
|
|
@ -723,17 +720,11 @@ drop_send_dir_listing(Window *w, const char *path) {
|
|||
return;
|
||||
}
|
||||
|
||||
/* Build null-separated payload: unique_id\0entry1\0entry2\0... */
|
||||
/* Build null-separated payload: entry1\0entry2\0... */
|
||||
size_t payload_cap = 4096, payload_sz = 0;
|
||||
char *payload = malloc(payload_cap);
|
||||
if (!payload) { closedir(dir); drop_send_error(w, EIO); return; }
|
||||
|
||||
/* First entry: unique identifier (device:inode) */
|
||||
char uid[64];
|
||||
int uid_len = snprintf(uid, sizeof(uid), "%llu:%llu",
|
||||
(unsigned long long)st.st_dev,
|
||||
(unsigned long long)st.st_ino);
|
||||
|
||||
#define APPEND(s, n) do { \
|
||||
size_t _n = (size_t)(n); \
|
||||
size_t _need = payload_sz + _n + 1; \
|
||||
|
|
@ -748,8 +739,6 @@ drop_send_dir_listing(Window *w, const char *path) {
|
|||
payload[payload_sz++] = 0; \
|
||||
} while(0)
|
||||
|
||||
APPEND(uid, uid_len);
|
||||
|
||||
/* Collect directory entries */
|
||||
size_t ents_cap = 16, ents_num = 0;
|
||||
char **ents = malloc(sizeof(char *) * ents_cap);
|
||||
|
|
@ -909,8 +898,8 @@ do_drop_handle_dir_request(Window *w, uint32_t handle_id, int32_t entry_num) {
|
|||
drop_send_error(w, EIO); return true;
|
||||
}
|
||||
|
||||
struct stat st;
|
||||
if (stat(full, &st) < 0) {
|
||||
struct stat lst;
|
||||
if (lstat(full, &lst) < 0) {
|
||||
switch (errno) {
|
||||
case ENOENT: case ENOTDIR: case ELOOP: drop_send_error(w, ENOENT); break;
|
||||
case EACCES: case EPERM: drop_send_error(w, EPERM); break;
|
||||
|
|
@ -919,10 +908,32 @@ do_drop_handle_dir_request(Window *w, uint32_t handle_id, int32_t entry_num) {
|
|||
return true;
|
||||
}
|
||||
|
||||
if (S_ISDIR(st.st_mode)) {
|
||||
if (S_ISLNK(lst.st_mode)) {
|
||||
/* Symlink: send the symlink target as t=r:X=1 */
|
||||
char target[PATH_MAX];
|
||||
ssize_t tlen = readlink(full, target, sizeof(target) - 1);
|
||||
if (tlen < 0) {
|
||||
switch (errno) {
|
||||
case ENOENT: case ENOTDIR: drop_send_error(w, ENOENT); break;
|
||||
case EACCES: case EPERM: drop_send_error(w, EPERM); break;
|
||||
default: drop_send_error(w, EIO); break;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
target[tlen] = '\0';
|
||||
char hdr[128];
|
||||
int hdr_sz = snprintf(hdr, sizeof(hdr), "\x1b]%d;t=r:X=1", DND_CODE);
|
||||
if (w->drop.current_request_id)
|
||||
hdr_sz += snprintf(hdr + hdr_sz, sizeof(hdr) - hdr_sz, ":r=%u", (unsigned)w->drop.current_request_id);
|
||||
queue_payload_to_child(w->id, w->drop.client_id, &w->drop.pending, hdr, hdr_sz, target, (size_t)tlen, true);
|
||||
queue_payload_to_child(w->id, w->drop.client_id, &w->drop.pending, hdr, hdr_sz, NULL, 0, true);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (S_ISDIR(lst.st_mode)) {
|
||||
drop_send_dir_listing(w, full);
|
||||
return true;
|
||||
} else if (S_ISREG(st.st_mode)) {
|
||||
} else if (S_ISREG(lst.st_mode)) {
|
||||
return drop_send_file_data(w, full);
|
||||
} else {
|
||||
drop_send_error(w, EINVAL);
|
||||
|
|
|
|||
|
|
@ -773,17 +773,14 @@ class TestDnDProtocol(BaseTest):
|
|||
root_handle_id = int(d_events[0]['meta']['x'])
|
||||
self.assertGreater(root_handle_id, 0)
|
||||
|
||||
# Decode null-separated entries
|
||||
# Decode null-separated entries (no unique identifier prefix)
|
||||
root_entries = [e for e in root_listing_payload.split(b'\x00') if e]
|
||||
# First entry is the unique identifier; remainder are file/dir names
|
||||
self.assertGreater(len(root_entries), 1,
|
||||
f'expected entries, got {root_entries}')
|
||||
entry_names = {e.decode() for e in root_entries[1:]}
|
||||
entry_names = {e.decode() for e in root_entries}
|
||||
self.assertIn('a.txt', entry_names)
|
||||
self.assertIn('b', entry_names)
|
||||
|
||||
# Find index of 'a.txt' in the entries list (1-based for t=d:y=)
|
||||
entries_list = [e.decode() for e in root_entries[1:]]
|
||||
entries_list = [e.decode() for e in root_entries]
|
||||
a_idx = entries_list.index('a.txt') + 1
|
||||
b_idx = entries_list.index('b') + 1
|
||||
|
||||
|
|
@ -809,11 +806,11 @@ class TestDnDProtocol(BaseTest):
|
|||
self.assertNotEqual(b_handle_id, root_handle_id)
|
||||
|
||||
b_entries = [e for e in b_listing_payload.split(b'\x00') if e]
|
||||
b_names = {e.decode() for e in b_entries[1:]}
|
||||
b_names = {e.decode() for e in b_entries}
|
||||
self.assertIn('c.txt', b_names)
|
||||
self.assertIn('d', b_names)
|
||||
|
||||
b_entries_list = [e.decode() for e in b_entries[1:]]
|
||||
b_entries_list = [e.decode() for e in b_entries]
|
||||
bc_idx = b_entries_list.index('c.txt') + 1
|
||||
bd_idx = b_entries_list.index('d') + 1
|
||||
|
||||
|
|
@ -840,10 +837,10 @@ class TestDnDProtocol(BaseTest):
|
|||
)
|
||||
bd_handle_id = int(bd_d_events[0]['meta']['x'])
|
||||
bd_entries = [e for e in bd_listing_payload.split(b'\x00') if e]
|
||||
bd_names = {e.decode() for e in bd_entries[1:]}
|
||||
bd_names = {e.decode() for e in bd_entries}
|
||||
self.assertIn('e.txt', bd_names)
|
||||
|
||||
bd_entries_list = [e.decode() for e in bd_entries[1:]]
|
||||
bd_entries_list = [e.decode() for e in bd_entries]
|
||||
bde_idx = bd_entries_list.index('e.txt') + 1
|
||||
|
||||
# Read b/d/e.txt
|
||||
|
|
@ -910,13 +907,12 @@ class TestDnDProtocol(BaseTest):
|
|||
self.ae(events[0]['type'], 'R')
|
||||
self.ae(events[0]['payload'].strip(), b'ENOENT')
|
||||
|
||||
def test_dir_unique_identifier_prevents_loops(self) -> None:
|
||||
"""Each directory listing starts with a unique id (dev:inode format)."""
|
||||
def test_dir_no_unique_identifier(self) -> None:
|
||||
"""Directory listings should not contain a unique identifier prefix."""
|
||||
import os
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as root:
|
||||
sub = os.path.join(root, 'sub')
|
||||
os.mkdir(sub)
|
||||
open(os.path.join(root, 'hello.txt'), 'w').close()
|
||||
uri_list = f'file://{root}\r\n'.encode()
|
||||
with dnd_test_window() as (osw, wid, screen, cap):
|
||||
self._setup_uri_drop(screen, wid, cap, uri_list)
|
||||
|
|
@ -924,28 +920,303 @@ class TestDnDProtocol(BaseTest):
|
|||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'd']
|
||||
root_payload = b''.join(
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
root_handle_id = int(d_ev[0]['meta']['x'])
|
||||
root_uid = root_payload.split(b'\x00')[0].decode()
|
||||
# uid must be non-empty and contain a colon (dev:inode)
|
||||
self.assertIn(':', root_uid, f'uid={root_uid!r}')
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
# All entries should be actual file/dir names, no dev:inode prefix
|
||||
self.assertEqual(entries, ['hello.txt'])
|
||||
|
||||
# Get the sub directory listing to compare identifiers
|
||||
entries = [e.decode() for e in root_payload.split(b'\x00')[1:] if e]
|
||||
sub_idx = entries.index('sub') + 1
|
||||
parse_bytes(screen, client_dir_read(root_handle_id, sub_idx))
|
||||
def test_dir_symlink_to_file(self) -> None:
|
||||
"""Symlinks to files inside directories are reported with t=r:X=1 and the symlink target."""
|
||||
import os
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as root:
|
||||
real_file = os.path.join(root, 'real.txt')
|
||||
with open(real_file, 'w') as f:
|
||||
f.write('real content')
|
||||
os.symlink('real.txt', os.path.join(root, 'link.txt'))
|
||||
uri_list = f'file://{root}\r\n'.encode()
|
||||
with dnd_test_window() as (osw, wid, screen, cap):
|
||||
self._setup_uri_drop(screen, wid, cap, uri_list)
|
||||
parse_bytes(screen, client_request_uri_data(0))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev2 = [e for e in events if e['type'] == 'd']
|
||||
sub_payload = b''.join(
|
||||
chunk for e in d_ev2 for chunk in e['chunks'] if chunk
|
||||
d_ev = [e for e in events if e['type'] == 'd']
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
sub_uid = sub_payload.split(b'\x00')[0].decode() if sub_payload else ''
|
||||
self.assertIn(':', sub_uid, f'sub uid={sub_uid!r}')
|
||||
# Root and sub must have different identifiers
|
||||
self.assertNotEqual(root_uid, sub_uid)
|
||||
hid = int(d_ev[0]['meta']['x'])
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
self.assertIn('link.txt', entries)
|
||||
self.assertIn('real.txt', entries)
|
||||
link_idx = entries.index('link.txt') + 1
|
||||
|
||||
# Read the symlink entry → should get t=r with X=1 and target path
|
||||
parse_bytes(screen, client_dir_read(hid, link_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
r_events = [e for e in events if e['type'] == 'r']
|
||||
self.assertTrue(r_events, 'expected t=r response for symlink')
|
||||
# Check X=1 flag indicating symlink
|
||||
self.assertEqual(r_events[0]['meta'].get('X'), '1',
|
||||
'symlink response must have X=1')
|
||||
# Payload should be the symlink target
|
||||
target = b''.join(e['payload'] for e in r_events if e['payload'])
|
||||
self.ae(target, b'real.txt')
|
||||
|
||||
def test_dir_symlink_to_directory(self) -> None:
|
||||
"""Symlinks to directories inside directories are reported with t=r:X=1."""
|
||||
import os
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as root:
|
||||
os.mkdir(os.path.join(root, 'subdir'))
|
||||
os.symlink('subdir', os.path.join(root, 'link_to_dir'))
|
||||
uri_list = f'file://{root}\r\n'.encode()
|
||||
with dnd_test_window() as (osw, wid, screen, cap):
|
||||
self._setup_uri_drop(screen, wid, cap, uri_list)
|
||||
parse_bytes(screen, client_request_uri_data(0))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'd']
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
hid = int(d_ev[0]['meta']['x'])
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
self.assertIn('link_to_dir', entries)
|
||||
link_idx = entries.index('link_to_dir') + 1
|
||||
|
||||
# Read the symlink → should get t=r with X=1
|
||||
parse_bytes(screen, client_dir_read(hid, link_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
r_events = [e for e in events if e['type'] == 'r']
|
||||
self.assertTrue(r_events, 'expected t=r response for dir symlink')
|
||||
self.assertEqual(r_events[0]['meta'].get('X'), '1')
|
||||
target = b''.join(e['payload'] for e in r_events if e['payload'])
|
||||
self.ae(target, b'subdir')
|
||||
|
||||
def test_dir_symlink_absolute_target(self) -> None:
|
||||
"""Symlinks with absolute targets report the full absolute path."""
|
||||
import os
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as root:
|
||||
real_file = os.path.join(root, 'abs_target.txt')
|
||||
with open(real_file, 'w') as f:
|
||||
f.write('content')
|
||||
os.symlink(real_file, os.path.join(root, 'abs_link.txt'))
|
||||
uri_list = f'file://{root}\r\n'.encode()
|
||||
with dnd_test_window() as (osw, wid, screen, cap):
|
||||
self._setup_uri_drop(screen, wid, cap, uri_list)
|
||||
parse_bytes(screen, client_request_uri_data(0))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'd']
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
hid = int(d_ev[0]['meta']['x'])
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
link_idx = entries.index('abs_link.txt') + 1
|
||||
|
||||
parse_bytes(screen, client_dir_read(hid, link_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
r_events = [e for e in events if e['type'] == 'r']
|
||||
self.assertTrue(r_events)
|
||||
self.assertEqual(r_events[0]['meta'].get('X'), '1')
|
||||
target = b''.join(e['payload'] for e in r_events if e['payload'])
|
||||
self.ae(target, real_file.encode())
|
||||
|
||||
def test_dir_regular_file_no_symlink_flag(self) -> None:
|
||||
"""Regular files in directories must NOT have the X=1 flag."""
|
||||
import os
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as root:
|
||||
with open(os.path.join(root, 'regular.txt'), 'w') as f:
|
||||
f.write('hello')
|
||||
uri_list = f'file://{root}\r\n'.encode()
|
||||
with dnd_test_window() as (osw, wid, screen, cap):
|
||||
self._setup_uri_drop(screen, wid, cap, uri_list)
|
||||
parse_bytes(screen, client_request_uri_data(0))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'd']
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
hid = int(d_ev[0]['meta']['x'])
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
reg_idx = entries.index('regular.txt') + 1
|
||||
|
||||
parse_bytes(screen, client_dir_read(hid, reg_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
r_events = [e for e in events if e['type'] == 'r']
|
||||
self.assertTrue(r_events)
|
||||
# Regular files must not have X=1
|
||||
self.assertNotEqual(r_events[0]['meta'].get('X'), '1',
|
||||
'regular file must not have X=1 symlink flag')
|
||||
data = b''.join(e['payload'] for e in r_events if e['payload'])
|
||||
self.ae(data, b'hello')
|
||||
|
||||
def test_dir_symlink_and_regular_mixed(self) -> None:
|
||||
"""Directory with both regular files and symlinks handles each correctly."""
|
||||
import os
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as root:
|
||||
with open(os.path.join(root, 'data.bin'), 'wb') as f:
|
||||
f.write(b'\x00\x01\x02\x03')
|
||||
os.symlink('data.bin', os.path.join(root, 'alias.bin'))
|
||||
uri_list = f'file://{root}\r\n'.encode()
|
||||
with dnd_test_window() as (osw, wid, screen, cap):
|
||||
self._setup_uri_drop(screen, wid, cap, uri_list)
|
||||
parse_bytes(screen, client_request_uri_data(0))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'd']
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
hid = int(d_ev[0]['meta']['x'])
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
|
||||
# Read regular file
|
||||
data_idx = entries.index('data.bin') + 1
|
||||
parse_bytes(screen, client_dir_read(hid, data_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
r_events = [e for e in events if e['type'] == 'r']
|
||||
self.assertNotEqual(r_events[0]['meta'].get('X'), '1')
|
||||
self.ae(b''.join(e['payload'] for e in r_events if e['payload']),
|
||||
b'\x00\x01\x02\x03')
|
||||
|
||||
# Read symlink
|
||||
alias_idx = entries.index('alias.bin') + 1
|
||||
parse_bytes(screen, client_dir_read(hid, alias_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
r_events = [e for e in events if e['type'] == 'r']
|
||||
self.assertEqual(r_events[0]['meta'].get('X'), '1')
|
||||
self.ae(b''.join(e['payload'] for e in r_events if e['payload']),
|
||||
b'data.bin')
|
||||
|
||||
def test_dir_nested_symlink_in_subdir(self) -> None:
|
||||
"""Symlinks inside nested subdirectories are handled correctly."""
|
||||
import os
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as root:
|
||||
sub = os.path.join(root, 'sub')
|
||||
os.mkdir(sub)
|
||||
with open(os.path.join(sub, 'target.txt'), 'w') as f:
|
||||
f.write('nested target')
|
||||
os.symlink('target.txt', os.path.join(sub, 'nested_link.txt'))
|
||||
uri_list = f'file://{root}\r\n'.encode()
|
||||
with dnd_test_window() as (osw, wid, screen, cap):
|
||||
self._setup_uri_drop(screen, wid, cap, uri_list)
|
||||
parse_bytes(screen, client_request_uri_data(0))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'd']
|
||||
payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
root_hid = int(d_ev[0]['meta']['x'])
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
sub_idx = entries.index('sub') + 1
|
||||
|
||||
# Open subdirectory
|
||||
parse_bytes(screen, client_dir_read(root_hid, sub_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'd']
|
||||
sub_payload = b''.join(
|
||||
chunk for e in d_ev for chunk in e['chunks'] if chunk
|
||||
)
|
||||
sub_hid = int(d_ev[0]['meta']['x'])
|
||||
sub_entries = [e.decode() for e in sub_payload.split(b'\x00') if e]
|
||||
self.assertIn('nested_link.txt', sub_entries)
|
||||
|
||||
link_idx = sub_entries.index('nested_link.txt') + 1
|
||||
parse_bytes(screen, client_dir_read(sub_hid, link_idx))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
r_events = [e for e in events if e['type'] == 'r']
|
||||
self.assertEqual(r_events[0]['meta'].get('X'), '1')
|
||||
self.ae(b''.join(e['payload'] for e in r_events if e['payload']),
|
||||
b'target.txt')
|
||||
|
||||
def test_dir_entry_one_based_index(self) -> None:
|
||||
"""Directory entry index 1 reads the first entry (1-based)."""
|
||||
import os
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as root:
|
||||
with open(os.path.join(root, 'first.txt'), 'w') as f:
|
||||
f.write('first file')
|
||||
uri_list = f'file://{root}\r\n'.encode()
|
||||
with dnd_test_window() as (osw, wid, screen, cap):
|
||||
self._setup_uri_drop(screen, wid, cap, uri_list)
|
||||
parse_bytes(screen, client_request_uri_data(0))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_ev = [e for e in events if e['type'] == 'd']
|
||||
hid = int(d_ev[0]['meta']['x'])
|
||||
|
||||
# Index 1 should read the first entry
|
||||
parse_bytes(screen, client_dir_read(hid, 1))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
r_events = [e for e in events if e['type'] == 'r']
|
||||
self.assertTrue(r_events, 'entry index 1 should read the first entry')
|
||||
data = b''.join(e['payload'] for e in r_events if e['payload'])
|
||||
self.ae(data, b'first file')
|
||||
|
||||
def test_top_level_symlink_to_file_resolved(self) -> None:
|
||||
"""Top-level symlink in URI list resolves to file and sends file data."""
|
||||
import os
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as root:
|
||||
real = os.path.join(root, 'real.txt')
|
||||
with open(real, 'w') as f:
|
||||
f.write('resolved content')
|
||||
link = os.path.join(root, 'link.txt')
|
||||
os.symlink(real, link)
|
||||
uri_list = f'file://{link}\r\n'.encode()
|
||||
with dnd_test_window() as (osw, wid, screen, cap):
|
||||
self._setup_uri_drop(screen, wid, cap, uri_list)
|
||||
parse_bytes(screen, client_request_uri_data(0))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
r_events = [e for e in events if e['type'] == 'r']
|
||||
self.assertTrue(r_events, 'top-level symlink should resolve and send file data')
|
||||
data = b''.join(e['payload'] for e in r_events if e['payload'])
|
||||
self.ae(data, b'resolved content')
|
||||
|
||||
def test_top_level_symlink_to_dir_resolved(self) -> None:
|
||||
"""Top-level symlink to directory in URI list resolves and returns directory listing."""
|
||||
import os
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as root:
|
||||
sub = os.path.join(root, 'realdir')
|
||||
os.mkdir(sub)
|
||||
with open(os.path.join(sub, 'inside.txt'), 'w') as f:
|
||||
f.write('inside')
|
||||
link = os.path.join(root, 'linkdir')
|
||||
os.symlink(sub, link)
|
||||
uri_list = f'file://{link}\r\n'.encode()
|
||||
with dnd_test_window() as (osw, wid, screen, cap):
|
||||
self._setup_uri_drop(screen, wid, cap, uri_list)
|
||||
parse_bytes(screen, client_request_uri_data(0))
|
||||
raw = cap.consume(wid)
|
||||
events = parse_escape_codes_b64(raw)
|
||||
d_events = [e for e in events if e['type'] == 'd']
|
||||
self.assertTrue(d_events, 'top-level symlink to dir should return directory listing')
|
||||
payload = b''.join(
|
||||
chunk for e in d_events for chunk in e['chunks'] if chunk
|
||||
)
|
||||
entries = [e.decode() for e in payload.split(b'\x00') if e]
|
||||
self.assertIn('inside.txt', entries)
|
||||
|
||||
def test_window_close_during_transfer_no_leak(self) -> None:
|
||||
"""Closing the window while dir handles are open frees all resources (no crash)."""
|
||||
|
|
@ -1707,7 +1978,7 @@ class TestDnDProtocol(BaseTest):
|
|||
self.assertTrue(d_events)
|
||||
handle_id = int(d_events[0]['meta']['x'])
|
||||
listing = b''.join(chunk for e in d_events for chunk in e['chunks'] if chunk)
|
||||
entries = [e.decode() for e in listing.split(b'\x00')[1:] if e]
|
||||
entries = [e.decode() for e in listing.split(b'\x00') if e]
|
||||
f_idx = entries.index('f.txt') + 1
|
||||
|
||||
# Read file with request_id
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue