diff --git a/kitty/fast_data_types.pyi b/kitty/fast_data_types.pyi index bc05af971..c19725cde 100644 --- a/kitty/fast_data_types.pyi +++ b/kitty/fast_data_types.pyi @@ -1672,6 +1672,8 @@ class AES256GCMDecrypt: class Shlex: def __init__(self, src: str, allow_ansi_quoted_strings: bool = False): ... def next_word(self) -> Tuple[int, str]: ... + def __next__(self) -> str: ... + def __iter__(self) -> Iterator[str]: ... class SingleKey: diff --git a/kitty/launcher/shlex.h b/kitty/launcher/shlex.h new file mode 100644 index 000000000..51194bc14 --- /dev/null +++ b/kitty/launcher/shlex.h @@ -0,0 +1,244 @@ +/* + * shlex.h + * Copyright (C) 2025 Kovid Goyal + * + * Distributed under terms of the GPL3 license. + */ + +#pragma once + +#include +#include +#include + +typedef enum { NORMAL, WORD, STRING_WITHOUT_ESCAPES, STRING_WITH_ESCAPES, ANSI_C_QUOTED } ShlexEnum; + +typedef struct { + const char *src; + bool support_ansi_c_quoting; + char *buf; + size_t src_sz, src_pos, word_start, buf_pos; + ShlexEnum state; + const char *err; +} ShlexState; + + +static bool +alloc_shlex_state(ShlexState *s, const char *src, size_t src_sz, bool support_ansi_c_quoting) { + *s = (ShlexState){.src=src, .src_sz=src_sz, .support_ansi_c_quoting=support_ansi_c_quoting, .buf=malloc(src_sz)}; + return s->buf != NULL; +} + +static void +dealloc_shlex_state(ShlexState *s) { + free(s->buf); s->buf = NULL; + *s = (ShlexState){0}; +} +#define WHITESPACE ' ': case '\n': case '\t': case '\r' +#define STRING_WITH_ESCAPES_DELIM '"' +#define STRING_WITHOUT_ESCAPES_DELIM '\'' +#define ESCAPE_CHAR '\\' + +static void +start_word(ShlexState *self) { + self->word_start = self->src_pos - 1; + self->buf_pos = 0; +} + +static void +write_ch(ShlexState *self, char ch) { + self->buf[self->buf_pos++] = ch; +} + +static unsigned +encode_utf8(unsigned long ch, char* dest) { + if (ch < 0x80) { // only lower 7 bits can be 1 + dest[0] = (char)ch; // 0xxxxxxx + return 1; + } + if (ch < 0x800) { // only lower 11 bits can be 1 + dest[0] = (ch>>6) | 0xC0; // 110xxxxx + dest[1] = (ch & 0x3F) | 0x80; // 10xxxxxx + return 2; + } + if (ch < 0x10000) { // only lower 16 bits can be 1 + dest[0] = (ch>>12) | 0xE0; // 1110xxxx + dest[1] = ((ch>>6) & 0x3F) | 0x80; // 10xxxxxx + dest[2] = (ch & 0x3F) | 0x80; // 10xxxxxx + return 3; + } + if (ch < 0x110000) { // only lower 21 bits can be 1 + dest[0] = (ch>>18) | 0xF0; // 11110xxx + dest[1] = ((ch>>12) & 0x3F) | 0x80; // 10xxxxxx + dest[2] = ((ch>>6) & 0x3F) | 0x80; // 10xxxxxx + dest[3] = (ch & 0x3F) | 0x80; // 10xxxxxx + return 4; + } + return 0; +} + +static void +write_unich(ShlexState *self, unsigned long ch) { + self->buf_pos += encode_utf8(ch, self->buf + self->buf_pos); +} + + +static size_t +get_word(ShlexState *self) { + size_t ans = self->buf_pos; self->buf_pos = 0; + return ans; +} + +static char +read_ch(ShlexState *self) { + return self->src[self->src_pos++]; +} + +static bool +write_escape_ch(ShlexState *self) { + if (self->src_pos < self->src_sz) { + char nch = read_ch(self); + write_ch(self, nch); + return true; + } + return false; +} + +static bool +write_control_ch(ShlexState *self) { + if (self->src_pos >= self->src_sz) { + self->err = "Trailing \\c escape at end of input data"; + return false; + } + char ch = read_ch(self); + write_ch(self, ch & 0x1f); + return true; +} + +static void +read_valid_digits(ShlexState *self, int max, char *output, bool(*is_valid)(char ch)) { + for (int i = 0; i < max && self->src_pos < self->src_sz; i++, output++) { + char ch = read_ch(self); + if (!is_valid(ch)) { self->src_pos--; break; } + *output = ch; + } +} + +static bool +is_octal_digit(char ch) { return '0' <= ch && ch <= '7'; } + +static bool +is_hex_digit(char ch) { return ('0' <= ch && ch <= '9') || ('a' <= ch && ch <= 'f') || ('A' <= ch && ch <= 'F'); } + +static void +write_octal_ch(ShlexState *self, char ch) { + char chars[4] = {ch, 0, 0, 0}; + read_valid_digits(self, 2, chars + 1, is_octal_digit); + write_unich(self, strtol(chars, NULL, 8)); +} + +static bool +write_unicode_ch(ShlexState *self, int max) { + char chars[16] = {0}; + read_valid_digits(self, max, chars, is_hex_digit); + if (!chars[0]) { self->err = "Trailing unicode escape at end of input data"; return false; } + write_unich(self, strtol(chars, NULL, 16)); + return true; +} + +static bool +write_ansi_escape_ch(ShlexState *self) { + if (self->src_pos >= self->src_sz) { self->err = "Trailing backslash at end of input data"; return false; } + char ch = read_ch(self); + switch(ch) { + case 'a': write_ch(self, '\a'); return true; + case 'b': write_ch(self, '\b'); return true; + case 'e': case 'E': write_ch(self, 0x1b); return true; + case 'f': write_ch(self, '\f'); return true; + case 'n': write_ch(self, '\n'); return true; + case 'r': write_ch(self, '\r'); return true; + case 't': write_ch(self, '\t'); return true; + case 'v': write_ch(self, '\v'); return true; + case '\\': write_ch(self, '\\'); return true; + case '\'': write_ch(self, '\''); return true; + case '\"': write_ch(self, '\"'); return true; + case '\?': write_ch(self, '\?'); return true; + + case 'c': return write_control_ch(self); + case 'x': return write_unicode_ch(self, 2); + case 'u': return write_unicode_ch(self, 4); + case 'U': return write_unicode_ch(self, 8); + case '0': case '1': case '2': case '3': case '4': case '5': case '6': case '7': write_octal_ch(self, ch); return true; + default: + write_ch(self, ch); return true; + } +} + +static void +set_state(ShlexState *self, ShlexEnum s) { + self->state = s; +} + +static ssize_t +next_word(ShlexState *self) { +#define write_escaped_or_fail() if (!write_escape_ch(self)) { self->err = "Trailing backslash at end of input data"; return -1; } + char prev_word_ch = 0; + while (self->src_pos < self->src_sz) { + char ch = read_ch(self); + switch(self->state) { + case NORMAL: + switch(ch) { + case WHITESPACE: break; + case STRING_WITH_ESCAPES_DELIM: set_state(self, STRING_WITH_ESCAPES); start_word(self); break; + case STRING_WITHOUT_ESCAPES_DELIM: set_state(self, STRING_WITHOUT_ESCAPES); start_word(self); break; + case ESCAPE_CHAR: start_word(self); write_escaped_or_fail(); set_state(self, WORD); break; + default: set_state(self, WORD); start_word(self); write_ch(self, ch); prev_word_ch = ch; break; + } + break; + case WORD: + switch(ch) { + case WHITESPACE: set_state(self, NORMAL); if (self->buf_pos) return get_word(self); break; + case STRING_WITH_ESCAPES_DELIM: set_state(self, STRING_WITH_ESCAPES); break; + case STRING_WITHOUT_ESCAPES_DELIM: + if (self->support_ansi_c_quoting && prev_word_ch == '$') { self->buf_pos--; set_state(self, ANSI_C_QUOTED); } + else set_state(self, STRING_WITHOUT_ESCAPES); + break; + case ESCAPE_CHAR: write_escaped_or_fail(); break; + default: write_ch(self, ch); prev_word_ch = ch; break; + } break; + case STRING_WITHOUT_ESCAPES: + switch(ch) { + case STRING_WITHOUT_ESCAPES_DELIM: set_state(self, WORD); break; + default: write_ch(self, ch); break; + } break; + case STRING_WITH_ESCAPES: + switch(ch) { + case STRING_WITH_ESCAPES_DELIM: set_state(self, WORD); break; + case ESCAPE_CHAR: write_escaped_or_fail(); break; + default: write_ch(self, ch); break; + } break; + case ANSI_C_QUOTED: + switch(ch) { + case STRING_WITHOUT_ESCAPES_DELIM: set_state(self, WORD); break; + case ESCAPE_CHAR: if (!write_ansi_escape_ch(self)) return -1; break; + default: write_ch(self, ch); break; + } break; + } + } + switch (self->state) { + case WORD: + self->state = NORMAL; + if (self->buf_pos) return get_word(self); + break; + case STRING_WITH_ESCAPES: case STRING_WITHOUT_ESCAPES: case ANSI_C_QUOTED: + self->err = "Unterminated string at the end of input"; + self->state = NORMAL; + return -1; + case NORMAL: + break; + } + return -2; +#undef write_escaped_or_fail +} + + diff --git a/kitty/shlex.c b/kitty/shlex.c index 4846e41d8..55cdc77a9 100644 --- a/kitty/shlex.c +++ b/kitty/shlex.c @@ -6,16 +6,16 @@ */ #include "data-types.h" +#include "unicodeobject.h" +#include "launcher/shlex.h" -typedef enum { NORMAL, WORD, STRING_WITHOUT_ESCAPES, STRING_WITH_ESCAPES, ANSI_C_QUOTED } State; typedef struct { PyObject_HEAD - + ShlexState state; PyObject *src; - Py_UCS4 *buf; - Py_ssize_t src_sz, src_pos, word_start, buf_pos; - int kind, support_ansi_c_quoting; void *src_data; - State state; + bool yielded; + void *data; int kind; + size_t unicode_pos, src_pos_at_last_unicode_pos; } Shlex; @@ -24,205 +24,73 @@ new_shlex_object(PyTypeObject *type, PyObject *args, PyObject UNUSED *kwds) { Shlex *self; self = (Shlex *)type->tp_alloc(type, 0); if (self) { - PyObject *src; - if (!PyArg_ParseTuple(args, "U|p", &src, &self->support_ansi_c_quoting)) return NULL; - self->src_sz = PyUnicode_GET_LENGTH(src); - self->buf = malloc(sizeof(Py_UCS4) * self->src_sz); - if (self->buf) { - self->src = src; - Py_INCREF(src); - self->kind = PyUnicode_KIND(src); - self->src_data = PyUnicode_DATA(src); - } else { Py_CLEAR(self); PyErr_NoMemory(); } + const char *src; Py_ssize_t sz; + int support_ansi_c_quoting; + if (!PyArg_ParseTuple(args, "s#|p", &src, &sz, &support_ansi_c_quoting)) return NULL; + if (!alloc_shlex_state(&self->state, src, sz, support_ansi_c_quoting != 0)) return PyErr_NoMemory(); + self->src = PyTuple_GetItem(args, 0); + self->data = PyUnicode_DATA(self->src); + self->kind = PyUnicode_KIND(self->src); + Py_INCREF(self->src); } return (PyObject*) self; } static void dealloc(Shlex* self) { - Py_CLEAR(self->src); free(self->buf); - Py_TYPE(self)->tp_free((PyObject*)self); + Py_CLEAR(self->src); dealloc_shlex_state(&self->state); } -#define WHITESPACE ' ': case '\n': case '\t': case '\r' -#define STRING_WITH_ESCAPES_DELIM '"' -#define STRING_WITHOUT_ESCAPES_DELIM '\'' -#define ESCAPE_CHAR '\\' - -static void -start_word(Shlex *self) { - self->word_start = self->src_pos - 1; - self->buf_pos = 0; -} - -static void -write_ch(Shlex *self, Py_UCS4 ch) { - self->buf[self->buf_pos++] = ch; +static size_t +advance_unicode_pos(Shlex *self) { + ssize_t num_bytes = self->state.word_start - self->src_pos_at_last_unicode_pos; + self->src_pos_at_last_unicode_pos = self->state.word_start; + char buf[8]; + while (num_bytes > 0) { + Py_UCS4 ch = PyUnicode_READ(self->kind, self->data, self->unicode_pos); + num_bytes -= encode_utf8(ch, buf); + self->unicode_pos++; + } + return self->unicode_pos; } static PyObject* -get_word(Shlex *self) { - Py_ssize_t pos = self->buf_pos; self->buf_pos = 0; - return Py_BuildValue("nN", self->word_start, PyUnicode_FromKindAndData(PyUnicode_4BYTE_KIND, self->buf, pos)); -} - -static Py_UCS4 -read_ch(Shlex *self) { - Py_UCS4 nch = PyUnicode_READ(self->kind, self->src_data, self->src_pos); self->src_pos++; - return nch; -} - -static bool -write_escape_ch(Shlex *self) { - if (self->src_pos < self->src_sz) { - Py_UCS4 nch = read_ch(self); - write_ch(self, nch); - return true; - } - return false; -} - -static bool -write_control_ch(Shlex *self) { - if (self->src_pos >= self->src_sz) { PyErr_SetString(PyExc_ValueError, "Trailing \\c escape at end of input data"); return false; } - Py_UCS4 ch = read_ch(self); - write_ch(self, ch & 0x1f); - return true; -} - -static void -read_valid_digits(Shlex *self, int max, char *output, bool(*is_valid)(Py_UCS4 ch)) { - for (int i = 0; i < max && self->src_pos < self->src_sz; i++, output++) { - Py_UCS4 ch = read_ch(self); - if (!is_valid(ch)) { self->src_pos--; break; } - *output = ch; - } -} - -static bool -is_octal_digit(Py_UCS4 ch) { return '0' <= ch && ch <= '7'; } - -static bool -is_hex_digit(Py_UCS4 ch) { return ('0' <= ch && ch <= '9') || ('a' <= ch && ch <= 'f') || ('A' <= ch && ch <= 'F'); } - -static void -write_octal_ch(Shlex *self, Py_UCS4 ch) { - char chars[4] = {ch, 0, 0, 0}; - read_valid_digits(self, 2, chars + 1, is_octal_digit); - write_ch(self, strtol(chars, NULL, 8)); -} - -static bool -write_unicode_ch(Shlex *self, int max) { - char chars[16] = {0}; - read_valid_digits(self, max, chars, is_hex_digit); - if (!chars[0]) { PyErr_SetString(PyExc_ValueError, "Trailing unicode escape at end of input data"); return false; } - write_ch(self, strtol(chars, NULL, 16)); - return true; -} - -static bool -write_ansi_escape_ch(Shlex *self) { - if (self->src_pos >= self->src_sz) { PyErr_SetString(PyExc_ValueError, "Trailing backslash at end of input data"); return false; } - Py_UCS4 ch = read_ch(self); - switch(ch) { - case 'a': write_ch(self, '\a'); return true; - case 'b': write_ch(self, '\b'); return true; - case 'e': case 'E': write_ch(self, 0x1b); return true; - case 'f': write_ch(self, '\f'); return true; - case 'n': write_ch(self, '\n'); return true; - case 'r': write_ch(self, '\r'); return true; - case 't': write_ch(self, '\t'); return true; - case 'v': write_ch(self, '\v'); return true; - case '\\': write_ch(self, '\\'); return true; - case '\'': write_ch(self, '\''); return true; - case '\"': write_ch(self, '\"'); return true; - case '\?': write_ch(self, '\?'); return true; - - case 'c': return write_control_ch(self); - case 'x': return write_unicode_ch(self, 2); - case 'u': return write_unicode_ch(self, 4); - case 'U': return write_unicode_ch(self, 8); -START_ALLOW_CASE_RANGE - case '0' ... '7': write_octal_ch(self, ch); return true; -END_ALLOW_CASE_RANGE - +next_word_with_position(Shlex *self, PyObject *args UNUSED) { + ssize_t len = next_word(&self->state); + unsigned long pos = advance_unicode_pos(self); + switch(len) { + case -1: PyErr_SetString(PyExc_ValueError, self->state.err); return NULL; + case -2: + if (self->yielded) return Py_BuildValue("is#", -1, self->state.buf, 0); + len = 0; + /* fallthrough */ default: - write_ch(self, ch); return true; + self->yielded = true; + return Py_BuildValue("ks#", pos, self->state.buf, (Py_ssize_t)len); } } -static void -set_state(Shlex *self, State s) { - self->state = s; -} - static PyObject* -next_word(Shlex *self, PyObject *args UNUSED) { -#define write_escaped_or_fail() if (!write_escape_ch(self)) { PyErr_SetString(PyExc_ValueError, "Trailing backslash at end of input data"); return NULL; } - - Py_UCS4 prev_word_ch = 0; - while (self->src_pos < self->src_sz) { - Py_UCS4 ch = read_ch(self); - switch(self->state) { - case NORMAL: - switch(ch) { - case WHITESPACE: break; - case STRING_WITH_ESCAPES_DELIM: set_state(self, STRING_WITH_ESCAPES); start_word(self); break; - case STRING_WITHOUT_ESCAPES_DELIM: set_state(self, STRING_WITHOUT_ESCAPES); start_word(self); break; - case ESCAPE_CHAR: start_word(self); write_escaped_or_fail(); set_state(self, WORD); break; - default: set_state(self, WORD); start_word(self); write_ch(self, ch); prev_word_ch = ch; break; - } - break; - case WORD: - switch(ch) { - case WHITESPACE: set_state(self, NORMAL); if (self->buf_pos) return get_word(self); break; - case STRING_WITH_ESCAPES_DELIM: set_state(self, STRING_WITH_ESCAPES); break; - case STRING_WITHOUT_ESCAPES_DELIM: - if (self->support_ansi_c_quoting && prev_word_ch == '$') { self->buf_pos--; set_state(self, ANSI_C_QUOTED); } - else set_state(self, STRING_WITHOUT_ESCAPES); - break; - case ESCAPE_CHAR: write_escaped_or_fail(); break; - default: write_ch(self, ch); prev_word_ch = ch; break; - } break; - case STRING_WITHOUT_ESCAPES: - switch(ch) { - case STRING_WITHOUT_ESCAPES_DELIM: set_state(self, WORD); break; - default: write_ch(self, ch); break; - } break; - case STRING_WITH_ESCAPES: - switch(ch) { - case STRING_WITH_ESCAPES_DELIM: set_state(self, WORD); break; - case ESCAPE_CHAR: write_escaped_or_fail(); break; - default: write_ch(self, ch); break; - } break; - case ANSI_C_QUOTED: - switch(ch) { - case STRING_WITHOUT_ESCAPES_DELIM: set_state(self, WORD); break; - case ESCAPE_CHAR: if (!write_ansi_escape_ch(self)) return NULL; break; - default: write_ch(self, ch); break; - } break; - } +next(PyObject *self_) { + Shlex *self = (Shlex*)self_; + ssize_t len = next_word(&self->state); + switch(len) { + case -1: PyErr_SetString(PyExc_ValueError, self->state.err); return NULL; + case -2: + if (self->yielded) { PyErr_SetNone(PyExc_StopIteration); return NULL; } + len = 0; + /* fallthrough */ + default: + self->yielded = true; + return PyUnicode_FromStringAndSize(self->state.buf, (Py_ssize_t)len); } - switch (self->state) { - case WORD: - self->state = NORMAL; - if (self->buf_pos) return get_word(self); - break; - case STRING_WITH_ESCAPES: case STRING_WITHOUT_ESCAPES: case ANSI_C_QUOTED: - PyErr_SetString(PyExc_ValueError, "Unterminated string at the end of input"); - self->state = NORMAL; - return NULL; - case NORMAL: - break; - } - return Py_BuildValue("is", -1, ""); -#undef write_escaped_or_fail } +static PyObject* +iter(PyObject *s) { return Py_NewRef(s); } static PyMethodDef methods[] = { - METHODB(next_word, METH_NOARGS), + {"next_word", (PyCFunction)next_word_with_position, METH_NOARGS, ""}, {NULL} /* Sentinel */ }; @@ -233,8 +101,10 @@ PyTypeObject Shlex_Type = { .tp_dealloc = (destructor)dealloc, .tp_flags = Py_TPFLAGS_DEFAULT, .tp_doc = "Lexing like a shell", - .tp_methods = methods, + .tp_iternext = next, .tp_new = new_shlex_object, + .tp_iter = iter, + .tp_methods = methods, }; INIT_TYPE(Shlex) diff --git a/kitty/utils.py b/kitty/utils.py index e9f1a0f39..f3ee4fdb3 100644 --- a/kitty/utils.py +++ b/kitty/utils.py @@ -1105,23 +1105,13 @@ def key_val_matcher(items: Iterable[tuple[str, str]], key_pat: 're.Pattern[str]' def shlex_split(text: str, allow_ansi_quoted_strings: bool = False) -> Iterator[str]: - s = Shlex(text, allow_ansi_quoted_strings) - yielded = False - while (q := s.next_word())[0] > -1: - yield q[1] - yielded = True - if not yielded: - yield '' + yield from Shlex(text, allow_ansi_quoted_strings) def shlex_split_with_positions(text: str, allow_ansi_quoted_strings: bool = False) -> Iterator[tuple[int, str]]: s = Shlex(text, allow_ansi_quoted_strings) - yielded = False while (q := s.next_word())[0] > -1: yield q - yielded = True - if not yielded: - yield 0, '' def timed_debug_print(*a: Any, sep: str = ' ', end: str = '\n') -> None: diff --git a/kitty_tests/datatypes.py b/kitty_tests/datatypes.py index 3a220165b..9f4db3f26 100644 --- a/kitty_tests/datatypes.py +++ b/kitty_tests/datatypes.py @@ -28,7 +28,7 @@ from kitty.fast_data_types import ( ) from kitty.fast_data_types import Cursor as C from kitty.rgb import to_color -from kitty.utils import is_ok_to_read_image_file, is_path_in_temp_dir, sanitize_title, sanitize_url_for_dispay_to_user, shlex_split_with_positions +from kitty.utils import is_ok_to_read_image_file, is_path_in_temp_dir, sanitize_title, sanitize_url_for_dispay_to_user, shlex_split, shlex_split_with_positions from . import BaseTest, filled_cursor, filled_history_buf, filled_line_buf @@ -664,6 +664,8 @@ class TestDataTypes(BaseTest): ): with self.assertRaises(ValueError, msg=f'Failed to raise exception for {bad!r}'): tuple(shlex_split_with_positions(bad)) + with self.assertRaises(ValueError, msg=f'Failed to raise exception for {bad!r}'): + tuple(shlex_split(bad)) for q, expected in { '"ab"': ((0, 'ab'),), @@ -672,9 +674,16 @@ class TestDataTypes(BaseTest): r'\abc\ d': ((0, 'abc d'),), '': ((0, ''),), ' ': ((0, ''),), ' \tabc\n\t\r ': ((2, 'abc'),), "$'ab'": ((0, '$ab'),), + '😀': ((0, '😀'),), + '"a😀"': ((0, 'a😀'),), + '😀 a': ((0, '😀'), (2, 'a')), + ' \t😀a': ((2, '😀a'),), }.items(): actual = tuple(shlex_split_with_positions(q)) self.ae(expected, actual, f'Failed for text: {q!r}') + ex = tuple(x[1] for x in expected) + actual = tuple(shlex_split(q)) + self.ae(ex, actual, f'Failed for text: {q!r}') for q, expected in { "$'ab'": ((0, 'ab'),), @@ -692,6 +701,9 @@ class TestDataTypes(BaseTest): }.items(): actual = tuple(shlex_split_with_positions(q, True)) self.ae(expected, actual, f'Failed for text: {q!r}') + actual = tuple(shlex_split(q, True)) + ex = tuple(x[1] for x in expected) + self.ae(ex, actual, f'Failed for text: {q!r}') def test_split_into_graphemes(self): self.assertEqual(char_props_for('\ue000')['category'], 'Co')