Parser for multi cursor escape code

This commit is contained in:
Kovid Goyal 2025-08-22 15:45:44 +05:30
parent e6c1597834
commit 8f5dc42a61
No known key found for this signature in database
GPG key ID: 06BC317B515ACE7C
5 changed files with 200 additions and 7 deletions

View file

@ -299,10 +299,14 @@ def multicell_command(payload: str) -> None:
write(f'{OSC}{TEXT_SIZE_CODE};{m.rstrip(":")};{text}\a')
def screen_multi_cursor(rest: str) -> None:
write(f'{CSI}>{rest.strip()} q')
def replay(raw: str) -> None:
specials = frozenset({
'draw', 'set_title', 'set_icon', 'set_dynamic_color', 'set_color_table_color', 'select_graphic_rendition',
'process_cwd_notification', 'clipboard_control', 'shell_prompt_marking', 'multicell_command',
'process_cwd_notification', 'clipboard_control', 'shell_prompt_marking', 'multicell_command', 'screen_multi_cursor',
})
for line in raw.splitlines():
if line.strip() and not line.startswith('#'):

View file

@ -31,6 +31,7 @@
#include "char-props.h"
#include "wcswidth.h"
#include <stdalign.h>
#include <stdio.h>
#include "keys.h"
#include "vt-parser.h"
#include "resize.h"
@ -2851,6 +2852,106 @@ screen_set_cursor(Screen *self, unsigned int mode, uint8_t secondary) {
}
}
#define NAME multi_cursor_map
#define KEY_TY index_type
#define VAL_TY uint8_t
#include "kitty-verstable.h"
void
screen_multi_cursor(Screen *self, int queried_shape, int *params, unsigned num_params) {
// printf("%d;", queried_shape); for (unsigned i = 0; i < num_params; i++) {printf("%d:", params[i]);} printf("\n");
if (!num_params) {
if (params == NULL) {
write_escape_code_to_child(self, ESC_CSI, ">-1;1;2;3 q");
} else if (queried_shape == -2) {
size_t sz = self->extra_cursors.count * 32 + 64;
RAII_ALLOC(char, buf, malloc(sz));
if (buf) {
char *p = buf + snprintf(buf, sz, ">-2;");
for (unsigned i = 0; i < self->extra_cursors.count; i++) {
index_type cell = self->extra_cursors.locations[i].cell, shape = self->extra_cursors.locations[i].shape;
index_type y = cell / self->columns, x = cell - (y * self->columns);
p += snprintf(p, sz - (p - buf), "%d:2:%u:%u;", shape > 3 ? -1 : (int)shape, y+1, x+1);
}
if (*(p-1) == ';') p--;
p += snprintf(p, sz - (p - buf), " q"); *p = 0;
write_escape_code_to_child(self, ESC_CSI, buf);
}
}
return;
}
uint8_t shape = 0;
if (queried_shape < 0) {
shape = 4;
} else {
shape = MIN(queried_shape, 3);
}
self->extra_cursors.dirty = true;
int type = params[0]; params++; num_params--;
switch (type) {
case 2: {
multi_cursor_map s; vt_init(&s);
for (unsigned i = 0; i < self->extra_cursors.count; i++) {
vt_insert(&s, self->extra_cursors.locations[i].cell, self->extra_cursors.locations[i].shape);
}
for (unsigned i = 0; i+1 < num_params; i+=2) {
index_type y = params[i]-1, x = params[i+1]-1;
if (!shape) { vt_erase(&s, y * self->columns + x); }
else if (y < self->lines && x < self->columns) vt_insert(&s, y * self->columns + x, shape);
}
self->extra_cursors.count = vt_size(&s);
ensure_space_for(&self->extra_cursors, locations, ExtraCursor, self->extra_cursors.count, capacity, 20 * 80, false);
self->extra_cursors.count = 0;
vt_create_for_loop(multi_cursor_map_itr, i, &s) {
self->extra_cursors.locations[self->extra_cursors.count++] = (ExtraCursor){
.shape = i.data->val, .cell = i.data->key};
}
vt_cleanup(&s);
} break;
case 4: {
if (!num_params) { // full screen
switch(shape) {
default: self->extra_cursors.count = 0; break;
case 1: case 2: case 3: case 4:
ensure_space_for(&self->extra_cursors, locations, ExtraCursor, self->lines * self->columns, capacity, 20 * 80, false);
self->extra_cursors.count = self->lines * self->columns;
for (index_type cell = 0; cell < self->lines * self->columns; cell++) {
self->extra_cursors.locations[cell].shape = shape;
self->extra_cursors.locations[cell].cell = cell;
}
break;
}
break;
}
unsigned count = 0;
for (unsigned i = 0; i < self->extra_cursors.count; i++) {
bool in_some_region = false;
index_type y = self->extra_cursors.locations[i].cell / self->columns, x = self->extra_cursors.locations[i].cell - (self->columns * y);
for (unsigned i = 0; i + 3 < num_params && !in_some_region; i += 4) {
index_type top = params[i]-1, left = params[i+1]-1, bottom = params[i+2]-1, right = params[i+3]-1;
in_some_region = top <= y && y <= bottom && left <= x && x <= right;
}
if (!in_some_region) self->extra_cursors.locations[count++] = self->extra_cursors.locations[i];
}
self->extra_cursors.count = count;
if (shape) {
for (unsigned i = 0; i + 3 < num_params; i += 4) {
index_type top = params[i]-1, left = params[i+1]-1, bottom = params[i+2]-1, right = params[i+3]-1;
index_type xnum = right + 1 - left, ynum = bottom + 1 - top;
ensure_space_for(&self->extra_cursors, locations, ExtraCursor,
self->extra_cursors.count + xnum * ynum, capacity, 20 * 80, false);
for (index_type y = top; y <= bottom; y++) {
for (index_type x = left; x <= right; x++) {
self->extra_cursors.locations[self->extra_cursors.count++] = (ExtraCursor){
.shape=shape, .cell=y*self->columns + x};
}
}
}
}
} break;
}
}
void
set_title(Screen *self, PyObject *title) {
CALLBACK("title_changed", "O", title);

View file

@ -305,6 +305,7 @@ bool parse_sgr(Screen *screen, const uint8_t *buf, unsigned int num, const char
bool screen_pause_rendering(Screen *self, bool pause, int for_in_ms);
void screen_check_pause_rendering(Screen *self, monotonic_t now);
void screen_designate_charset(Screen *self, uint32_t which, uint32_t as);
void screen_multi_cursor(Screen *self, int queried_shape, int *params, unsigned num_params);
#define DECLARE_CH_SCREEN_HANDLER(name) void screen_##name(Screen *screen);
DECLARE_CH_SCREEN_HANDLER(bell)
DECLARE_CH_SCREEN_HANDLER(backspace)

View file

@ -72,6 +72,21 @@ _report_params(PyObject *dump_callback, id_type window_id, const char *name, int
Py_XDECREF(PyObject_CallFunction(dump_callback, "Kss", window_id, name, buf)); PyErr_Clear();
}
static void
_report_params_with_first(PyObject *dump_callback, id_type window_id, const char *name, int first_param, int *params, unsigned count) {
static char buf[MAX_CSI_PARAMS*3] = {0};
unsigned int i, p=0;
p += snprintf(buf + p, sizeof(buf) - 2, "%d;", first_param);
for(i = 0; i < count && p < arraysz(buf)-20; i++) {
int n = snprintf(buf + p, arraysz(buf) - p, "%i:", params[i]);
if (n < 0) break;
p += n;
}
buf[count ? p-1 : p] = 0;
Py_XDECREF(PyObject_CallFunction(dump_callback, "Kss", window_id, name, buf)); PyErr_Clear();
}
#define DUMP_UNUSED
#define REPORT_ERROR(...) _report_error(self->dump_callback, self->window_id, __VA_ARGS__);
@ -110,7 +125,9 @@ _report_params(PyObject *dump_callback, id_type window_id, const char *name, int
}
#define REPORT_PARAMS(name, params, num, is_group, region) _report_params(self->dump_callback, self->window_id, name, params, num_params, is_group, region)
#define REPORT_PARAMS(name, params, num, is_group, region) _report_params(self->dump_callback, self->window_id, name, params, num, is_group, region)
#define REPORT_PARAMS_WITH_FIRST(name, first, params, num) _report_params_with_first(self->dump_callback, self->window_id, name, first, params, num)
#define REPORT_OSC(name, string) \
Py_XDECREF(PyObject_CallFunction(self->dump_callback, "KsO", self->window_id, #name, string)); PyErr_Clear();
@ -127,6 +144,7 @@ _report_params(PyObject *dump_callback, id_type window_id, const char *name, int
#define REPORT_VA_COMMAND(...)
#define REPORT_DRAW(...)
#define REPORT_PARAMS(...)
#define REPORT_PARAMS_WITH_FIRST(...)
#define REPORT_OSC(name, string)
#define REPORT_OSC2(name, code, string)
#define REPORT_HYPERLINK(id, url)
@ -875,6 +893,35 @@ consume_csi(PS *self) {
return csi_parse_loop(self, &self->csi, self->buf, &self->read.pos, self->read.sz, self->read.consumed);
}
static void
_parse_multi_cursors(PS *self, ParsedCSI *csi) {
switch(csi->num_params) {
case 0:
REPORT_COMMAND("screen_multi_cursor");
screen_multi_cursor(self->screen, 0, NULL, 0);
break;
case 1:
REPORT_PARAMS_WITH_FIRST("screen_multi_cursor", csi->params[0], csi->params, 0);
screen_multi_cursor(self->screen, csi->params[0], csi->params, 0);
break;
default: {
unsigned pos = 1, first_param = pos;
for (; pos < csi->num_params; pos++) {
if (pos > first_param) {
if (!csi->is_sub_param[pos]) {
REPORT_PARAMS_WITH_FIRST("screen_multi_cursor", csi->params[0], csi->params + first_param, pos - first_param);
screen_multi_cursor(self->screen, csi->params[0], csi->params + first_param, pos - first_param);
first_param = pos;
}
}
}
if (pos > first_param) {
REPORT_PARAMS_WITH_FIRST("screen_multi_cursor", csi->params[0], csi->params + first_param, pos - first_param);
screen_multi_cursor(self->screen, csi->params[0], csi->params + first_param, pos - first_param);
}}}
}
static unsigned int
parse_region(const ParsedCSI *csi, Region *r) {
switch(csi->num_params) {
@ -895,7 +942,6 @@ parse_region(const ParsedCSI *csi, Region *r) {
}
}
static bool
_parse_sgr(PS *self, ParsedCSI *csi) {
#define SEND_SGR if (num_params) { \
@ -1297,10 +1343,13 @@ dispatch_csi(PS *self) {
REPORT_ERROR("Unknown CSI x sequence with start and end modifiers: '%c' '%c'", start_modifier, end_modifier);
break;
case DECSCUSR:
if (!start_modifier && end_modifier == ' ') {
CALL_CSI_HANDLER1M(screen_set_cursor, 1);
}
if (start_modifier == '>' && !end_modifier) {
if (end_modifier == ' ') {
if (!start_modifier) { CALL_CSI_HANDLER1M(screen_set_cursor, 1); }
if (start_modifier == '>') {
_parse_multi_cursors(self, &self->csi);
break;
}
} else if (end_modifier == 0 && start_modifier == '>') {
CALL_CSI_HANDLER1(screen_xtversion, 0);
}
REPORT_ERROR("Unknown CSI q sequence with start and end modifiers: '%c' '%c'", start_modifier, end_modifier);

View file

@ -1589,6 +1589,44 @@ class TestScreen(BaseTest):
q({'transparent_background_color2': '#ffffff@-1'})
q({'transparent_background_color2': '?'}, {'transparent_background_color2': (Color(255, 255, 255), 255)})
def test_multi_cursors(self):
s = self.create_screen()
c = s.callbacks
# Test detection
parse_bytes(s, b'\x1b[> q') # ]
self.ae(c.wtcbuf, b'\x1b[>-1;1;2;3 q') # ]
def current() -> dict[int, tuple[int, int]]:
ans = {}
c.clear()
parse_bytes(s, '\x1b[>-2 q'.encode()) # ]
for entry in c.wtcbuf[6:-2].decode().split(';'):
if entry:
which, _, y, x = map(int, entry.split(':'))
ans.setdefault(which, set()).add((x-1, y-1))
return ans
self.ae({}, current())
def a(which: int, *positions: tuple[int, int], region=None) -> dict[int, tuple[int, int]]:
if positions:
buf = [f'\x1b[>{which};'] # ]
buf.extend(f'2:{y+1}:{x+1};' for x, y in positions)
parse_bytes(s, ''.join(buf).encode() + b' q')
if region:
if region is True:
parse_bytes(s, f'\x1b[>{which};4 q'.encode()) # ]
else:
left, top, right, bottom = region
parse_bytes(s, f'\x1b[>{which};4:{top+1}:{left+1}:{bottom+1}:{right+1} q'.encode()) # ]
return current()
self.ae(a(1, region=True), {1:{(x, y) for x in range(s.columns) for y in range(s.lines)}})
self.ae(a(0, region=True), {})
self.ae(a(-1, region=(1, 2, 2, 3)), {-1: {(1, 2), (2, 2), (1, 3), (2, 3)}})
self.ae(a(2, (1, 2), (1, 3)), {-1: {(2, 3), (2, 2)}, 2: {(1, 2), (1, 3)}})
self.ae(a(0, (1, 2), (2, 3)), {-1: {(2, 2)}, 2: {(1, 3)}})
self.ae(a(0, region=True), {})
def detect_url(self, scale=1):
s = self.create_screen(cols=30 * scale)