mirror of
https://github.com/sqlmapproject/sqlmap.git
synced 2026-06-29 13:01:02 +00:00
521 lines
18 KiB
Python
521 lines
18 KiB
Python
#!/usr/bin/env python
|
|
|
|
"""
|
|
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
|
|
See the file 'LICENSE' for copying permission
|
|
|
|
Target-environment setup in lib/core/target.py.
|
|
|
|
target.py wires a single scan's per-host state together: it derives the output /
|
|
dump / files directories from the hostname, opens (and optionally flushes) the
|
|
HashDB session file, resumes a previously-fingerprinted DBMS/OS and stored kb
|
|
values out of that session, decides the custom injection marker, normalizes the
|
|
POST body (url-decode / base64), splits GET/POST/Cookie/header strings into the
|
|
testable paramDict, and restores the per-target merged options between targets.
|
|
|
|
None of that needs a live HTTP target or a real DBMS connection: every function
|
|
reads conf/kb globals (which are set up here per-test and restored in tearDown)
|
|
and at most touches the local filesystem (pointed at a private temp tree) or a
|
|
local SQLite session file. Those side-effecting paths are still pure with
|
|
respect to the network, so they are exercised here against real temp dirs.
|
|
|
|
All expected values below were probed from actual output, not assumed.
|
|
"""
|
|
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from _testutils import bootstrap
|
|
bootstrap()
|
|
|
|
from lib.core.data import conf
|
|
from lib.core.data import kb
|
|
from lib.core.data import mergedOptions
|
|
from lib.core.data import paths
|
|
from lib.core.common import Backend
|
|
from lib.core.common import hashDBWrite
|
|
from lib.core.enums import HASHDB_KEYS
|
|
from lib.core.enums import HTTP_HEADER
|
|
from lib.core.enums import HTTPMETHOD
|
|
from lib.core.enums import PLACE
|
|
from lib.core.exception import SqlmapGenericException
|
|
from lib.core.exception import SqlmapNoneDataException
|
|
from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR
|
|
from lib.core.settings import RESTORE_MERGED_OPTIONS
|
|
from lib.core.settings import UNENCODED_ORIGINAL_VALUE
|
|
from lib.core.threads import getCurrentThreadData
|
|
from lib.utils.hashdb import HashDB
|
|
from lib.core.target import _createDumpDir
|
|
from lib.core.target import _createFilesDir
|
|
from lib.core.target import _createTargetDirs
|
|
from lib.core.target import _resumeDBMS
|
|
from lib.core.target import _resumeHashDBValues
|
|
from lib.core.target import _resumeOS
|
|
from lib.core.target import _restoreMergedOptions
|
|
from lib.core.target import _setAuxOptions
|
|
from lib.core.target import _setHashDB
|
|
from lib.core.target import _setRequestParams
|
|
from lib.core.target import _setResultsFile
|
|
from lib.core.target import initTargetEnv
|
|
|
|
SCRATCH = "/tmp/claude-1000/-tmp-tmp-oUnlQJzlQN/fcd55d25-6313-49ed-817e-dcbe7fc2bf22/scratchpad"
|
|
|
|
# conf/kb keys that the tests below mutate; saved in setUp, restored in tearDown so
|
|
# one test can never leak global state into another (or into the rest of the suite).
|
|
_CONF_KEYS = (
|
|
"direct", "parameters", "paramDict", "method", "data", "cookie", "httpHeaders",
|
|
"testParameter", "csrfToken", "url", "forms", "crawlDepth", "hostname", "path",
|
|
"port", "dbms", "os", "offline", "tmpPath", "technique", "dumpTable", "dumpAll",
|
|
"search", "fileRead", "commonFiles", "dumpPath", "filePath", "outputPath",
|
|
"hashDB", "hashDBFile", "sessionFile", "flushSession", "freshQueries",
|
|
"multipleTargets", "resultsFP", "resultsFile", "base64Parameter", "forceDbms",
|
|
)
|
|
_KB_KEYS = (
|
|
"processUserMarks", "postHint", "customInjectionMark", "testOnlyCustom",
|
|
"resumeValues", "aliasName", "errorChunkLength", "xpCmdshellAvailable",
|
|
"chars", "originalUrls", "postUrlEncode", "postSpaceToPlus",
|
|
)
|
|
_PATH_KEYS = ("SQLMAP_OUTPUT_PATH", "SQLMAP_DUMP_PATH", "SQLMAP_FILES_PATH")
|
|
|
|
|
|
class _TargetTestBase(unittest.TestCase):
|
|
"""Snapshot/restore conf, kb and paths globals around each test."""
|
|
|
|
def setUp(self):
|
|
self._conf = {k: conf.get(k) for k in _CONF_KEYS}
|
|
self._kb = {k: kb.get(k) for k in _KB_KEYS}
|
|
self._paths = {k: paths.get(k) for k in _PATH_KEYS}
|
|
# _resumeDBMS/_resumeOS mutate these kb fields via Backend.set* (not in _KB_KEYS)
|
|
self._dbms = kb.get("dbms")
|
|
self._forcedDbms = kb.get("forcedDbms")
|
|
self._tmpdirs = []
|
|
|
|
def tearDown(self):
|
|
# close any session DB we opened before restoring globals
|
|
if conf.get("hashDB"):
|
|
try:
|
|
conf.hashDB.close()
|
|
except Exception:
|
|
pass
|
|
getCurrentThreadData().hashDBCursor = None
|
|
if conf.get("resultsFP"):
|
|
try:
|
|
conf.resultsFP.close()
|
|
except Exception:
|
|
pass
|
|
for k, v in self._conf.items():
|
|
conf[k] = v
|
|
for k, v in self._kb.items():
|
|
kb[k] = v
|
|
for k, v in self._paths.items():
|
|
paths[k] = v
|
|
kb.dbms = self._dbms # _resumeDBMS may have set an identified DBMS
|
|
kb.forcedDbms = self._forcedDbms
|
|
for d in self._tmpdirs:
|
|
shutil.rmtree(d, ignore_errors=True)
|
|
|
|
def _outdir(self, name):
|
|
d = os.path.join(SCRATCH, name)
|
|
shutil.rmtree(d, ignore_errors=True)
|
|
self._tmpdirs.append(d)
|
|
paths.SQLMAP_OUTPUT_PATH = d
|
|
paths.SQLMAP_DUMP_PATH = os.path.join(d, "%s", "dump")
|
|
paths.SQLMAP_FILES_PATH = os.path.join(d, "%s", "files")
|
|
return d
|
|
|
|
def _new_hashdb(self):
|
|
handle, path = tempfile.mkstemp(suffix=".sqlite", dir=SCRATCH)
|
|
os.close(handle)
|
|
os.remove(path)
|
|
getCurrentThreadData().hashDBCursor = None
|
|
conf.hashDB = HashDB(path)
|
|
conf.hostname = "h"
|
|
conf.path = "/"
|
|
conf.port = 80
|
|
kb.resumeValues = True
|
|
conf.flushSession = False
|
|
conf.freshQueries = False
|
|
# another test file may have force-set a DBMS via set_dbms(); a leaked forcedDbms
|
|
# takes precedence in getIdentifiedDbms() and would mask what _resumeDBMS resolves
|
|
conf.forceDbms = None
|
|
kb.forcedDbms = None
|
|
kb.dbms = None
|
|
self.addCleanup(self._cleanup_hashdb, path)
|
|
return path
|
|
|
|
def _cleanup_hashdb(self, path):
|
|
for f in (path, path + "-wal", path + "-shm"):
|
|
if os.path.exists(f):
|
|
try:
|
|
os.remove(f)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
class TestRestoreMergedOptions(_TargetTestBase):
|
|
def test_restores_each_option_from_mergedOptions(self):
|
|
saved = {}
|
|
for opt in RESTORE_MERGED_OPTIONS:
|
|
saved[opt] = mergedOptions.get(opt)
|
|
mergedOptions[opt] = "VAL_%s" % opt
|
|
conf[opt] = "tampered"
|
|
try:
|
|
_restoreMergedOptions()
|
|
for opt in RESTORE_MERGED_OPTIONS:
|
|
self.assertEqual(conf[opt], "VAL_%s" % opt,
|
|
msg="option %r not restored from mergedOptions" % opt)
|
|
finally:
|
|
for opt, v in saved.items():
|
|
mergedOptions[opt] = v
|
|
|
|
|
|
class TestSetAuxOptions(_TargetTestBase):
|
|
def test_alias_is_nonempty_string(self):
|
|
conf.hostname = "example.com"
|
|
_setAuxOptions()
|
|
self.assertIsInstance(kb.aliasName, str)
|
|
self.assertTrue(kb.aliasName)
|
|
|
|
def test_alias_deterministic_for_same_host(self):
|
|
conf.hostname = "example.com"
|
|
_setAuxOptions()
|
|
first = kb.aliasName
|
|
_setAuxOptions()
|
|
self.assertEqual(kb.aliasName, first)
|
|
|
|
def test_alias_handles_none_host(self):
|
|
conf.hostname = None
|
|
_setAuxOptions() # seed=hash("") must not raise
|
|
self.assertIsInstance(kb.aliasName, str)
|
|
|
|
|
|
class TestInitTargetEnv(_TargetTestBase):
|
|
def _base(self):
|
|
conf.url = "http://h/?id=1"
|
|
conf.data = None
|
|
conf.httpHeaders = []
|
|
conf.base64Parameter = None
|
|
conf.multipleTargets = False
|
|
|
|
def test_default_injection_marker(self):
|
|
self._base()
|
|
initTargetEnv()
|
|
self.assertEqual(kb.customInjectionMark, CUSTOM_INJECTION_MARK_CHAR)
|
|
|
|
def test_inject_here_marker_detected(self):
|
|
self._base()
|
|
conf.url = "http://h/?id=%INJECT_HERE%"
|
|
initTargetEnv()
|
|
self.assertEqual(kb.customInjectionMark, "%INJECT_HERE%")
|
|
|
|
def test_urlencoded_post_body_is_decoded(self):
|
|
self._base()
|
|
conf.url = "http://h/"
|
|
conf.data = "id=a%20b"
|
|
conf.httpHeaders = [("Content-Type", "application/x-www-form-urlencoded")]
|
|
initTargetEnv()
|
|
self.assertTrue(kb.postUrlEncode)
|
|
self.assertEqual(str(conf.data), "id=a b")
|
|
# the raw (still-encoded) original is preserved as an attribute for later re-encoding
|
|
self.assertEqual(getattr(conf.data, UNENCODED_ORIGINAL_VALUE, None), "id=a%20b")
|
|
|
|
def test_non_urlencoded_content_type_skips_decode(self):
|
|
self._base()
|
|
conf.url = "http://h/"
|
|
conf.data = "id=a%20b"
|
|
conf.httpHeaders = [("Content-Type", "application/json")]
|
|
conf.base64Parameter = None
|
|
initTargetEnv()
|
|
self.assertFalse(kb.postUrlEncode)
|
|
|
|
def test_base64_post_body_is_decoded(self):
|
|
self._base()
|
|
conf.url = "http://h/"
|
|
conf.data = "aWQ9MQ==" # base64 of "id=1"
|
|
conf.httpHeaders = [("Content-Type", "application/json")]
|
|
conf.base64Parameter = "POST"
|
|
initTargetEnv()
|
|
self.assertEqual(str(conf.data), "id=1")
|
|
self.assertEqual(getattr(conf.data, UNENCODED_ORIGINAL_VALUE, None), "aWQ9MQ==")
|
|
|
|
|
|
class TestSetRequestParams(_TargetTestBase):
|
|
def _fresh(self):
|
|
conf.direct = None
|
|
conf.parameters = {}
|
|
conf.paramDict = {}
|
|
conf.method = HTTPMETHOD.GET
|
|
conf.data = None
|
|
conf.cookie = None
|
|
conf.httpHeaders = []
|
|
conf.testParameter = None
|
|
conf.csrfToken = None
|
|
conf.url = "http://h/"
|
|
conf.forms = False
|
|
conf.crawlDepth = None
|
|
kb.processUserMarks = None
|
|
kb.postHint = None
|
|
kb.customInjectionMark = "*"
|
|
kb.testOnlyCustom = False
|
|
|
|
def test_direct_connection_shortcut(self):
|
|
self._fresh()
|
|
conf.direct = "mysql://u:p@h/db"
|
|
conf.parameters = {}
|
|
_setRequestParams()
|
|
self.assertEqual(conf.parameters[None], "direct connection")
|
|
|
|
def test_get_parameters_split(self):
|
|
self._fresh()
|
|
conf.parameters = {PLACE.GET: "id=1&name=foo"}
|
|
conf.url = "http://h/?id=1&name=foo"
|
|
_setRequestParams()
|
|
self.assertEqual(dict(conf.paramDict[PLACE.GET]), {"id": "1", "name": "foo"})
|
|
|
|
def test_post_parameters_split(self):
|
|
self._fresh()
|
|
conf.method = HTTPMETHOD.POST
|
|
conf.data = "a=1&b=2"
|
|
_setRequestParams()
|
|
self.assertEqual(dict(conf.paramDict[PLACE.POST]), {"a": "1", "b": "2"})
|
|
|
|
def test_cookie_parameters_split(self):
|
|
self._fresh()
|
|
conf.parameters = {PLACE.GET: "id=1"}
|
|
conf.url = "http://h/?id=1"
|
|
conf.cookie = "sess=abc; uid=5"
|
|
_setRequestParams()
|
|
self.assertIn(PLACE.COOKIE, conf.paramDict)
|
|
self.assertEqual(dict(conf.paramDict[PLACE.COOKIE]), {"sess": "abc", "uid": "5"})
|
|
|
|
def test_user_agent_header_is_testable(self):
|
|
self._fresh()
|
|
conf.httpHeaders = [(HTTP_HEADER.USER_AGENT, "Mozilla")]
|
|
_setRequestParams()
|
|
self.assertIn(PLACE.USER_AGENT, conf.paramDict)
|
|
|
|
def test_referer_header_is_testable(self):
|
|
self._fresh()
|
|
conf.httpHeaders = [(HTTP_HEADER.REFERER, "http://ref/")]
|
|
_setRequestParams()
|
|
self.assertIn(PLACE.REFERER, conf.paramDict)
|
|
|
|
def test_no_parameters_raises(self):
|
|
self._fresh()
|
|
with self.assertRaises(SqlmapGenericException):
|
|
_setRequestParams()
|
|
|
|
def test_empty_post_body_defaults_to_empty_string(self):
|
|
self._fresh()
|
|
conf.method = HTTPMETHOD.POST
|
|
conf.data = None
|
|
conf.parameters = {PLACE.GET: "id=1"} # keep a testable param so it doesn't raise
|
|
conf.url = "http://h/?id=1"
|
|
_setRequestParams()
|
|
self.assertEqual(conf.data, "")
|
|
|
|
|
|
class TestResumeDBMS(_TargetTestBase):
|
|
def test_resumes_dbms_with_version(self):
|
|
self._new_hashdb()
|
|
conf.dbms = None
|
|
conf.offline = False
|
|
hashDBWrite(HASHDB_KEYS.DBMS, "MySQL 5.0")
|
|
_resumeDBMS()
|
|
self.assertEqual(Backend.getIdentifiedDbms(), "MySQL")
|
|
|
|
def test_no_stored_dbms_returns_quietly(self):
|
|
self._new_hashdb()
|
|
conf.dbms = None
|
|
conf.offline = False
|
|
_resumeDBMS() # nothing stored: must just return
|
|
# the quiet-return branch must leave NO DBMS identified (a real
|
|
# side-effect assertion, not merely "did not raise")
|
|
self.assertIsNone(Backend.getIdentifiedDbms())
|
|
|
|
def test_offline_without_session_raises(self):
|
|
self._new_hashdb()
|
|
conf.dbms = None
|
|
conf.offline = True
|
|
with self.assertRaises(SqlmapNoneDataException):
|
|
_resumeDBMS()
|
|
|
|
|
|
class TestResumeOS(_TargetTestBase):
|
|
def test_resumes_os(self):
|
|
self._new_hashdb()
|
|
conf.os = None
|
|
hashDBWrite(HASHDB_KEYS.OS, "Linux")
|
|
_resumeOS()
|
|
self.assertEqual(conf.os, "Linux")
|
|
|
|
def test_no_stored_os_returns_quietly(self):
|
|
self._new_hashdb()
|
|
conf.os = None
|
|
_resumeOS()
|
|
self.assertIsNone(conf.os)
|
|
|
|
def test_stored_none_string_is_ignored(self):
|
|
self._new_hashdb()
|
|
conf.os = None
|
|
hashDBWrite(HASHDB_KEYS.OS, "None")
|
|
_resumeOS()
|
|
self.assertIsNone(conf.os)
|
|
|
|
|
|
class TestResumeHashDBValues(_TargetTestBase):
|
|
def _base(self):
|
|
self._new_hashdb()
|
|
conf.dbms = None
|
|
conf.offline = False
|
|
conf.os = None
|
|
conf.tmpPath = None
|
|
conf.technique = None
|
|
conf.paramDict = {}
|
|
|
|
def test_resumes_serialized_chars(self):
|
|
self._base()
|
|
kb.chars = None
|
|
hashDBWrite(HASHDB_KEYS.KB_CHARS, {"a": 1}, serialize=True)
|
|
_resumeHashDBValues()
|
|
self.assertEqual(kb.chars, {"a": 1})
|
|
|
|
def test_resumes_numeric_error_chunk_length(self):
|
|
self._base()
|
|
kb.errorChunkLength = None
|
|
hashDBWrite(HASHDB_KEYS.KB_ERROR_CHUNK_LENGTH, "5")
|
|
_resumeHashDBValues()
|
|
self.assertEqual(kb.errorChunkLength, 5)
|
|
|
|
def test_non_numeric_chunk_length_becomes_none(self):
|
|
self._base()
|
|
kb.errorChunkLength = 99
|
|
hashDBWrite(HASHDB_KEYS.KB_ERROR_CHUNK_LENGTH, "notanumber")
|
|
_resumeHashDBValues()
|
|
self.assertIsNone(kb.errorChunkLength)
|
|
|
|
def test_xp_cmdshell_true_coerced_to_bool(self):
|
|
self._base()
|
|
kb.xpCmdshellAvailable = False
|
|
hashDBWrite(HASHDB_KEYS.KB_XP_CMDSHELL_AVAILABLE, str(True))
|
|
_resumeHashDBValues()
|
|
self.assertIs(kb.xpCmdshellAvailable, True)
|
|
|
|
|
|
class TestSetHashDB(_TargetTestBase):
|
|
def test_derives_session_file_under_output_path(self):
|
|
out = self._outdir("hdb_out")
|
|
os.makedirs(out)
|
|
getCurrentThreadData().hashDBCursor = None
|
|
conf.hashDBFile = None
|
|
conf.sessionFile = None
|
|
conf.outputPath = out
|
|
conf.flushSession = False
|
|
conf.hashDB = None
|
|
_setHashDB()
|
|
self.assertTrue(conf.hashDBFile.startswith(out))
|
|
self.assertIsInstance(conf.hashDB, HashDB)
|
|
|
|
def test_explicit_session_file_takes_precedence(self):
|
|
out = self._outdir("hdb_out2")
|
|
os.makedirs(out)
|
|
sess = os.path.join(out, "custom.sqlite")
|
|
getCurrentThreadData().hashDBCursor = None
|
|
conf.hashDBFile = None
|
|
conf.sessionFile = sess
|
|
conf.outputPath = out
|
|
conf.flushSession = False
|
|
conf.hashDB = None
|
|
_setHashDB()
|
|
self.assertEqual(conf.hashDBFile, sess)
|
|
|
|
|
|
class TestCreateDirs(_TargetTestBase):
|
|
def test_dump_dir_skipped_without_dump_flags(self):
|
|
self._outdir("d_out")
|
|
conf.hostname = "example.com"
|
|
conf.dumpPath = None
|
|
conf.dumpTable = False
|
|
conf.dumpAll = False
|
|
conf.search = False
|
|
_createDumpDir()
|
|
self.assertIsNone(conf.dumpPath)
|
|
|
|
def test_dump_dir_created_per_host(self):
|
|
self._outdir("d_out2")
|
|
conf.hostname = "example.com"
|
|
conf.dumpTable = True
|
|
conf.dumpAll = False
|
|
conf.search = False
|
|
_createDumpDir()
|
|
self.assertTrue(os.path.isdir(conf.dumpPath))
|
|
self.assertIn("example.com", conf.dumpPath)
|
|
|
|
def test_files_dir_skipped_without_file_flags(self):
|
|
self._outdir("f_out")
|
|
conf.hostname = "example.com"
|
|
conf.filePath = None
|
|
conf.fileRead = None
|
|
conf.commonFiles = None
|
|
_createFilesDir()
|
|
self.assertIsNone(conf.filePath)
|
|
|
|
def test_files_dir_created_per_host(self):
|
|
self._outdir("f_out2")
|
|
conf.hostname = "example.com"
|
|
conf.fileRead = "/etc/passwd"
|
|
conf.commonFiles = None
|
|
_createFilesDir()
|
|
self.assertTrue(os.path.isdir(conf.filePath))
|
|
self.assertIn("example.com", conf.filePath)
|
|
|
|
def test_target_dir_and_target_txt(self):
|
|
self._outdir("t_out")
|
|
conf.hostname = "example.com"
|
|
conf.url = "http://example.com/?id=1"
|
|
conf.data = None
|
|
conf.dumpTable = False
|
|
conf.dumpAll = False
|
|
conf.search = False
|
|
conf.fileRead = None
|
|
conf.commonFiles = None
|
|
kb.originalUrls = {}
|
|
_createTargetDirs()
|
|
self.assertTrue(os.path.isdir(conf.outputPath))
|
|
target = os.path.join(conf.outputPath, "target.txt")
|
|
self.assertTrue(os.path.exists(target))
|
|
with open(target) as f:
|
|
content = f.read()
|
|
self.assertIn("http://example.com/?id=1", content)
|
|
self.assertIn("(%s)" % HTTPMETHOD.GET, content)
|
|
|
|
|
|
class TestSetResultsFile(_TargetTestBase):
|
|
def test_skipped_when_not_multiple_targets(self):
|
|
self._outdir("r_out")
|
|
conf.multipleTargets = False
|
|
conf.resultsFP = None
|
|
_setResultsFile()
|
|
self.assertIsNone(conf.resultsFP)
|
|
|
|
def test_creates_csv_with_header_in_multiple_target_mode(self):
|
|
out = self._outdir("r_out2")
|
|
os.makedirs(out)
|
|
conf.multipleTargets = True
|
|
conf.resultsFile = os.path.join(out, "res.csv")
|
|
conf.resultsFP = None
|
|
_setResultsFile()
|
|
self.assertTrue(os.path.exists(conf.resultsFile))
|
|
conf.resultsFP.flush()
|
|
with open(conf.resultsFile) as f:
|
|
header = f.readline()
|
|
self.assertIn("Target URL", header)
|
|
self.assertIn("Parameter", header)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(verbosity=2)
|