sqlmap/tests/test_target_parsing.py
Miroslav Štampar 2297c81309 Update of tests
2026-06-28 18:27:59 +02:00

521 lines
18 KiB
Python

#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
Target-environment setup in lib/core/target.py.
target.py wires a single scan's per-host state together: it derives the output /
dump / files directories from the hostname, opens (and optionally flushes) the
HashDB session file, resumes a previously-fingerprinted DBMS/OS and stored kb
values out of that session, decides the custom injection marker, normalizes the
POST body (url-decode / base64), splits GET/POST/Cookie/header strings into the
testable paramDict, and restores the per-target merged options between targets.
None of that needs a live HTTP target or a real DBMS connection: every function
reads conf/kb globals (which are set up here per-test and restored in tearDown)
and at most touches the local filesystem (pointed at a private temp tree) or a
local SQLite session file. Those side-effecting paths are still pure with
respect to the network, so they are exercised here against real temp dirs.
All expected values below were probed from actual output, not assumed.
"""
import os
import shutil
import sys
import tempfile
import unittest
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _testutils import bootstrap
bootstrap()
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import mergedOptions
from lib.core.data import paths
from lib.core.common import Backend
from lib.core.common import hashDBWrite
from lib.core.enums import HASHDB_KEYS
from lib.core.enums import HTTP_HEADER
from lib.core.enums import HTTPMETHOD
from lib.core.enums import PLACE
from lib.core.exception import SqlmapGenericException
from lib.core.exception import SqlmapNoneDataException
from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR
from lib.core.settings import RESTORE_MERGED_OPTIONS
from lib.core.settings import UNENCODED_ORIGINAL_VALUE
from lib.core.threads import getCurrentThreadData
from lib.utils.hashdb import HashDB
from lib.core.target import _createDumpDir
from lib.core.target import _createFilesDir
from lib.core.target import _createTargetDirs
from lib.core.target import _resumeDBMS
from lib.core.target import _resumeHashDBValues
from lib.core.target import _resumeOS
from lib.core.target import _restoreMergedOptions
from lib.core.target import _setAuxOptions
from lib.core.target import _setHashDB
from lib.core.target import _setRequestParams
from lib.core.target import _setResultsFile
from lib.core.target import initTargetEnv
SCRATCH = "/tmp/claude-1000/-tmp-tmp-oUnlQJzlQN/fcd55d25-6313-49ed-817e-dcbe7fc2bf22/scratchpad"
# conf/kb keys that the tests below mutate; saved in setUp, restored in tearDown so
# one test can never leak global state into another (or into the rest of the suite).
_CONF_KEYS = (
"direct", "parameters", "paramDict", "method", "data", "cookie", "httpHeaders",
"testParameter", "csrfToken", "url", "forms", "crawlDepth", "hostname", "path",
"port", "dbms", "os", "offline", "tmpPath", "technique", "dumpTable", "dumpAll",
"search", "fileRead", "commonFiles", "dumpPath", "filePath", "outputPath",
"hashDB", "hashDBFile", "sessionFile", "flushSession", "freshQueries",
"multipleTargets", "resultsFP", "resultsFile", "base64Parameter", "forceDbms",
)
_KB_KEYS = (
"processUserMarks", "postHint", "customInjectionMark", "testOnlyCustom",
"resumeValues", "aliasName", "errorChunkLength", "xpCmdshellAvailable",
"chars", "originalUrls", "postUrlEncode", "postSpaceToPlus",
)
_PATH_KEYS = ("SQLMAP_OUTPUT_PATH", "SQLMAP_DUMP_PATH", "SQLMAP_FILES_PATH")
class _TargetTestBase(unittest.TestCase):
"""Snapshot/restore conf, kb and paths globals around each test."""
def setUp(self):
self._conf = {k: conf.get(k) for k in _CONF_KEYS}
self._kb = {k: kb.get(k) for k in _KB_KEYS}
self._paths = {k: paths.get(k) for k in _PATH_KEYS}
# _resumeDBMS/_resumeOS mutate these kb fields via Backend.set* (not in _KB_KEYS)
self._dbms = kb.get("dbms")
self._forcedDbms = kb.get("forcedDbms")
self._tmpdirs = []
def tearDown(self):
# close any session DB we opened before restoring globals
if conf.get("hashDB"):
try:
conf.hashDB.close()
except Exception:
pass
getCurrentThreadData().hashDBCursor = None
if conf.get("resultsFP"):
try:
conf.resultsFP.close()
except Exception:
pass
for k, v in self._conf.items():
conf[k] = v
for k, v in self._kb.items():
kb[k] = v
for k, v in self._paths.items():
paths[k] = v
kb.dbms = self._dbms # _resumeDBMS may have set an identified DBMS
kb.forcedDbms = self._forcedDbms
for d in self._tmpdirs:
shutil.rmtree(d, ignore_errors=True)
def _outdir(self, name):
d = os.path.join(SCRATCH, name)
shutil.rmtree(d, ignore_errors=True)
self._tmpdirs.append(d)
paths.SQLMAP_OUTPUT_PATH = d
paths.SQLMAP_DUMP_PATH = os.path.join(d, "%s", "dump")
paths.SQLMAP_FILES_PATH = os.path.join(d, "%s", "files")
return d
def _new_hashdb(self):
handle, path = tempfile.mkstemp(suffix=".sqlite", dir=SCRATCH)
os.close(handle)
os.remove(path)
getCurrentThreadData().hashDBCursor = None
conf.hashDB = HashDB(path)
conf.hostname = "h"
conf.path = "/"
conf.port = 80
kb.resumeValues = True
conf.flushSession = False
conf.freshQueries = False
# another test file may have force-set a DBMS via set_dbms(); a leaked forcedDbms
# takes precedence in getIdentifiedDbms() and would mask what _resumeDBMS resolves
conf.forceDbms = None
kb.forcedDbms = None
kb.dbms = None
self.addCleanup(self._cleanup_hashdb, path)
return path
def _cleanup_hashdb(self, path):
for f in (path, path + "-wal", path + "-shm"):
if os.path.exists(f):
try:
os.remove(f)
except OSError:
pass
class TestRestoreMergedOptions(_TargetTestBase):
def test_restores_each_option_from_mergedOptions(self):
saved = {}
for opt in RESTORE_MERGED_OPTIONS:
saved[opt] = mergedOptions.get(opt)
mergedOptions[opt] = "VAL_%s" % opt
conf[opt] = "tampered"
try:
_restoreMergedOptions()
for opt in RESTORE_MERGED_OPTIONS:
self.assertEqual(conf[opt], "VAL_%s" % opt,
msg="option %r not restored from mergedOptions" % opt)
finally:
for opt, v in saved.items():
mergedOptions[opt] = v
class TestSetAuxOptions(_TargetTestBase):
def test_alias_is_nonempty_string(self):
conf.hostname = "example.com"
_setAuxOptions()
self.assertIsInstance(kb.aliasName, str)
self.assertTrue(kb.aliasName)
def test_alias_deterministic_for_same_host(self):
conf.hostname = "example.com"
_setAuxOptions()
first = kb.aliasName
_setAuxOptions()
self.assertEqual(kb.aliasName, first)
def test_alias_handles_none_host(self):
conf.hostname = None
_setAuxOptions() # seed=hash("") must not raise
self.assertIsInstance(kb.aliasName, str)
class TestInitTargetEnv(_TargetTestBase):
def _base(self):
conf.url = "http://h/?id=1"
conf.data = None
conf.httpHeaders = []
conf.base64Parameter = None
conf.multipleTargets = False
def test_default_injection_marker(self):
self._base()
initTargetEnv()
self.assertEqual(kb.customInjectionMark, CUSTOM_INJECTION_MARK_CHAR)
def test_inject_here_marker_detected(self):
self._base()
conf.url = "http://h/?id=%INJECT_HERE%"
initTargetEnv()
self.assertEqual(kb.customInjectionMark, "%INJECT_HERE%")
def test_urlencoded_post_body_is_decoded(self):
self._base()
conf.url = "http://h/"
conf.data = "id=a%20b"
conf.httpHeaders = [("Content-Type", "application/x-www-form-urlencoded")]
initTargetEnv()
self.assertTrue(kb.postUrlEncode)
self.assertEqual(str(conf.data), "id=a b")
# the raw (still-encoded) original is preserved as an attribute for later re-encoding
self.assertEqual(getattr(conf.data, UNENCODED_ORIGINAL_VALUE, None), "id=a%20b")
def test_non_urlencoded_content_type_skips_decode(self):
self._base()
conf.url = "http://h/"
conf.data = "id=a%20b"
conf.httpHeaders = [("Content-Type", "application/json")]
conf.base64Parameter = None
initTargetEnv()
self.assertFalse(kb.postUrlEncode)
def test_base64_post_body_is_decoded(self):
self._base()
conf.url = "http://h/"
conf.data = "aWQ9MQ==" # base64 of "id=1"
conf.httpHeaders = [("Content-Type", "application/json")]
conf.base64Parameter = "POST"
initTargetEnv()
self.assertEqual(str(conf.data), "id=1")
self.assertEqual(getattr(conf.data, UNENCODED_ORIGINAL_VALUE, None), "aWQ9MQ==")
class TestSetRequestParams(_TargetTestBase):
def _fresh(self):
conf.direct = None
conf.parameters = {}
conf.paramDict = {}
conf.method = HTTPMETHOD.GET
conf.data = None
conf.cookie = None
conf.httpHeaders = []
conf.testParameter = None
conf.csrfToken = None
conf.url = "http://h/"
conf.forms = False
conf.crawlDepth = None
kb.processUserMarks = None
kb.postHint = None
kb.customInjectionMark = "*"
kb.testOnlyCustom = False
def test_direct_connection_shortcut(self):
self._fresh()
conf.direct = "mysql://u:p@h/db"
conf.parameters = {}
_setRequestParams()
self.assertEqual(conf.parameters[None], "direct connection")
def test_get_parameters_split(self):
self._fresh()
conf.parameters = {PLACE.GET: "id=1&name=foo"}
conf.url = "http://h/?id=1&name=foo"
_setRequestParams()
self.assertEqual(dict(conf.paramDict[PLACE.GET]), {"id": "1", "name": "foo"})
def test_post_parameters_split(self):
self._fresh()
conf.method = HTTPMETHOD.POST
conf.data = "a=1&b=2"
_setRequestParams()
self.assertEqual(dict(conf.paramDict[PLACE.POST]), {"a": "1", "b": "2"})
def test_cookie_parameters_split(self):
self._fresh()
conf.parameters = {PLACE.GET: "id=1"}
conf.url = "http://h/?id=1"
conf.cookie = "sess=abc; uid=5"
_setRequestParams()
self.assertIn(PLACE.COOKIE, conf.paramDict)
self.assertEqual(dict(conf.paramDict[PLACE.COOKIE]), {"sess": "abc", "uid": "5"})
def test_user_agent_header_is_testable(self):
self._fresh()
conf.httpHeaders = [(HTTP_HEADER.USER_AGENT, "Mozilla")]
_setRequestParams()
self.assertIn(PLACE.USER_AGENT, conf.paramDict)
def test_referer_header_is_testable(self):
self._fresh()
conf.httpHeaders = [(HTTP_HEADER.REFERER, "http://ref/")]
_setRequestParams()
self.assertIn(PLACE.REFERER, conf.paramDict)
def test_no_parameters_raises(self):
self._fresh()
with self.assertRaises(SqlmapGenericException):
_setRequestParams()
def test_empty_post_body_defaults_to_empty_string(self):
self._fresh()
conf.method = HTTPMETHOD.POST
conf.data = None
conf.parameters = {PLACE.GET: "id=1"} # keep a testable param so it doesn't raise
conf.url = "http://h/?id=1"
_setRequestParams()
self.assertEqual(conf.data, "")
class TestResumeDBMS(_TargetTestBase):
def test_resumes_dbms_with_version(self):
self._new_hashdb()
conf.dbms = None
conf.offline = False
hashDBWrite(HASHDB_KEYS.DBMS, "MySQL 5.0")
_resumeDBMS()
self.assertEqual(Backend.getIdentifiedDbms(), "MySQL")
def test_no_stored_dbms_returns_quietly(self):
self._new_hashdb()
conf.dbms = None
conf.offline = False
_resumeDBMS() # nothing stored: must just return
# the quiet-return branch must leave NO DBMS identified (a real
# side-effect assertion, not merely "did not raise")
self.assertIsNone(Backend.getIdentifiedDbms())
def test_offline_without_session_raises(self):
self._new_hashdb()
conf.dbms = None
conf.offline = True
with self.assertRaises(SqlmapNoneDataException):
_resumeDBMS()
class TestResumeOS(_TargetTestBase):
def test_resumes_os(self):
self._new_hashdb()
conf.os = None
hashDBWrite(HASHDB_KEYS.OS, "Linux")
_resumeOS()
self.assertEqual(conf.os, "Linux")
def test_no_stored_os_returns_quietly(self):
self._new_hashdb()
conf.os = None
_resumeOS()
self.assertIsNone(conf.os)
def test_stored_none_string_is_ignored(self):
self._new_hashdb()
conf.os = None
hashDBWrite(HASHDB_KEYS.OS, "None")
_resumeOS()
self.assertIsNone(conf.os)
class TestResumeHashDBValues(_TargetTestBase):
def _base(self):
self._new_hashdb()
conf.dbms = None
conf.offline = False
conf.os = None
conf.tmpPath = None
conf.technique = None
conf.paramDict = {}
def test_resumes_serialized_chars(self):
self._base()
kb.chars = None
hashDBWrite(HASHDB_KEYS.KB_CHARS, {"a": 1}, serialize=True)
_resumeHashDBValues()
self.assertEqual(kb.chars, {"a": 1})
def test_resumes_numeric_error_chunk_length(self):
self._base()
kb.errorChunkLength = None
hashDBWrite(HASHDB_KEYS.KB_ERROR_CHUNK_LENGTH, "5")
_resumeHashDBValues()
self.assertEqual(kb.errorChunkLength, 5)
def test_non_numeric_chunk_length_becomes_none(self):
self._base()
kb.errorChunkLength = 99
hashDBWrite(HASHDB_KEYS.KB_ERROR_CHUNK_LENGTH, "notanumber")
_resumeHashDBValues()
self.assertIsNone(kb.errorChunkLength)
def test_xp_cmdshell_true_coerced_to_bool(self):
self._base()
kb.xpCmdshellAvailable = False
hashDBWrite(HASHDB_KEYS.KB_XP_CMDSHELL_AVAILABLE, str(True))
_resumeHashDBValues()
self.assertIs(kb.xpCmdshellAvailable, True)
class TestSetHashDB(_TargetTestBase):
def test_derives_session_file_under_output_path(self):
out = self._outdir("hdb_out")
os.makedirs(out)
getCurrentThreadData().hashDBCursor = None
conf.hashDBFile = None
conf.sessionFile = None
conf.outputPath = out
conf.flushSession = False
conf.hashDB = None
_setHashDB()
self.assertTrue(conf.hashDBFile.startswith(out))
self.assertIsInstance(conf.hashDB, HashDB)
def test_explicit_session_file_takes_precedence(self):
out = self._outdir("hdb_out2")
os.makedirs(out)
sess = os.path.join(out, "custom.sqlite")
getCurrentThreadData().hashDBCursor = None
conf.hashDBFile = None
conf.sessionFile = sess
conf.outputPath = out
conf.flushSession = False
conf.hashDB = None
_setHashDB()
self.assertEqual(conf.hashDBFile, sess)
class TestCreateDirs(_TargetTestBase):
def test_dump_dir_skipped_without_dump_flags(self):
self._outdir("d_out")
conf.hostname = "example.com"
conf.dumpPath = None
conf.dumpTable = False
conf.dumpAll = False
conf.search = False
_createDumpDir()
self.assertIsNone(conf.dumpPath)
def test_dump_dir_created_per_host(self):
self._outdir("d_out2")
conf.hostname = "example.com"
conf.dumpTable = True
conf.dumpAll = False
conf.search = False
_createDumpDir()
self.assertTrue(os.path.isdir(conf.dumpPath))
self.assertIn("example.com", conf.dumpPath)
def test_files_dir_skipped_without_file_flags(self):
self._outdir("f_out")
conf.hostname = "example.com"
conf.filePath = None
conf.fileRead = None
conf.commonFiles = None
_createFilesDir()
self.assertIsNone(conf.filePath)
def test_files_dir_created_per_host(self):
self._outdir("f_out2")
conf.hostname = "example.com"
conf.fileRead = "/etc/passwd"
conf.commonFiles = None
_createFilesDir()
self.assertTrue(os.path.isdir(conf.filePath))
self.assertIn("example.com", conf.filePath)
def test_target_dir_and_target_txt(self):
self._outdir("t_out")
conf.hostname = "example.com"
conf.url = "http://example.com/?id=1"
conf.data = None
conf.dumpTable = False
conf.dumpAll = False
conf.search = False
conf.fileRead = None
conf.commonFiles = None
kb.originalUrls = {}
_createTargetDirs()
self.assertTrue(os.path.isdir(conf.outputPath))
target = os.path.join(conf.outputPath, "target.txt")
self.assertTrue(os.path.exists(target))
with open(target) as f:
content = f.read()
self.assertIn("http://example.com/?id=1", content)
self.assertIn("(%s)" % HTTPMETHOD.GET, content)
class TestSetResultsFile(_TargetTestBase):
def test_skipped_when_not_multiple_targets(self):
self._outdir("r_out")
conf.multipleTargets = False
conf.resultsFP = None
_setResultsFile()
self.assertIsNone(conf.resultsFP)
def test_creates_csv_with_header_in_multiple_target_mode(self):
out = self._outdir("r_out2")
os.makedirs(out)
conf.multipleTargets = True
conf.resultsFile = os.path.join(out, "res.csv")
conf.resultsFP = None
_setResultsFile()
self.assertTrue(os.path.exists(conf.resultsFile))
conf.resultsFP.flush()
with open(conf.resultsFile) as f:
header = f.readline()
self.assertIn("Target URL", header)
self.assertIn("Parameter", header)
if __name__ == "__main__":
unittest.main(verbosity=2)