sqlmap/tests/test_entries.py
Miroslav Štampar 2297c81309 Update of tests
2026-06-28 18:27:59 +02:00

802 lines
30 KiB
Python

#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
Unit tests for plugins/generic/entries.py (Entries), exercising dumpTable /
dumpAll / dumpFoundTables / dumpFoundColumn by MOCKING the injection layer
(lib.request.inject.getValue) and the dumper.
No network and no DBMS are involved: conf.direct=True selects the simple inband
branches, or conf.direct=False with a BOOLEAN injection state selects the
inference (blind) branches; inject.getValue is patched to return canned rows in
the exact shape the methods parse, and conf.dumper is replaced with a recording
stub so we can assert on what each method produced (kb.data caches / returned
dicts). Every test restores all touched conf.* / kb.* / patched module attributes
in tearDown so nothing leaks.
"""
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _testutils import bootstrap, set_dbms
bootstrap()
from lib.core.common import Backend
from lib.core.data import conf, kb
from lib.core.enums import EXPECTED, PAYLOAD
import plugins.generic.search as smod
import plugins.generic.entries as emod
import plugins.generic.custom as cmod
import plugins.generic.misc as mmod
from plugins.generic.entries import Entries
# --------------------------------------------------------------------------- #
# Helpers/base from tests/test_search_enum.py (inband TestEntries)
# --------------------------------------------------------------------------- #
class _RecordingDumperSE(object):
"""Minimal stand-in for conf.dumper that records calls instead of printing/writing."""
def __init__(self):
self.reset()
def reset(self):
self.listed = [] # (header, elements)
self.dbTablesArg = None
self.dbColumnsArg = None
self.dbTableColumnsArg = None
self.tableValues = []
def lister(self, header, elements, content_type=None, sort=True):
self.listed.append((header, list(elements) if elements else []))
def dbTables(self, dbTables):
self.dbTablesArg = dbTables
def dbColumns(self, dbColumnsDict, colConsider, dbs):
self.dbColumnsArg = (dbColumnsDict, colConsider, dbs)
def dbTableColumns(self, tableColumns, content_type=None):
self.dbTableColumnsArg = tableColumns
def dbTableValues(self, tableValues):
self.tableValues.append(tableValues)
class _TestEntriesSE(Entries):
"""Entries with cross-mixin collaborators stubbed (forceDbmsEnum/getCurrentDb/getColumns/getTables)."""
def __init__(self):
Entries.__init__(self)
self.getColumnsResult = {} # {db: {tbl: {col: type}}}
self.getTablesResult = {} # value assigned to kb.data.cachedTables
self.getColumnsCalls = []
def forceDbmsEnum(self):
pass
def getCurrentDb(self):
return "testdb"
def getColumns(self, onlyColNames=False, colTuple=None, bruteForce=None, dumpMode=False):
self.getColumnsCalls.append((conf.db, conf.tbl))
kb.data.cachedColumns = dict(self.getColumnsResult)
def getTables(self, bruteForce=None):
kb.data.cachedTables = dict(self.getTablesResult)
class _SearchEnumBase(unittest.TestCase):
def setUp(self):
# Save mutated globals
self._saved_conf = {k: conf.get(k) for k in (
"db", "tbl", "col", "direct", "excludeSysDbs", "exclude", "search",
"disableHashing", "noKeyset", "keyset", "forcePivoting",
)}
self._saved_dumper = conf.get("dumper")
self._search_getValue = smod.inject.getValue
self._entries_getValue = emod.inject.getValue
self._search_readInput = smod.readInput
self._entries_readInput = emod.readInput
self._saved_has_is = kb.data.get("has_information_schema")
self._saved_cachedColumns = kb.data.get("cachedColumns")
self._saved_cachedTables = kb.data.get("cachedTables")
self._saved_dumpedTable = kb.data.get("dumpedTable")
self._saved_dumpKbInt = kb.get("dumpKeyboardInterrupt")
self._saved_permissionFlag = kb.get("permissionFlag")
set_dbms("MySQL")
conf.direct = True
conf.excludeSysDbs = False
conf.exclude = None
conf.search = True
conf.disableHashing = True
conf.noKeyset = True
conf.keyset = False
conf.forcePivoting = False
conf.dumper = _RecordingDumperSE()
kb.data.has_information_schema = True
kb.data.cachedColumns = {}
kb.data.cachedTables = {}
kb.data.dumpedTable = {}
kb.dumpKeyboardInterrupt = False
kb.permissionFlag = False
# Non-interactive prompts: collapse readInput to its default.
def _readInput(message, default=None, checkBatch=True, boolean=False):
if boolean:
return True if (default in (None, 'Y', 'y', True)) else False
return default
smod.readInput = _readInput
emod.readInput = _readInput
def tearDown(self):
for k, v in self._saved_conf.items():
conf[k] = v
conf.dumper = self._saved_dumper
smod.inject.getValue = self._search_getValue
emod.inject.getValue = self._entries_getValue
smod.readInput = self._search_readInput
emod.readInput = self._entries_readInput
kb.data.has_information_schema = self._saved_has_is
kb.data.cachedColumns = self._saved_cachedColumns
kb.data.cachedTables = self._saved_cachedTables
kb.data.dumpedTable = self._saved_dumpedTable
kb.dumpKeyboardInterrupt = self._saved_dumpKbInt
kb.permissionFlag = self._saved_permissionFlag
class TestEntries(_SearchEnumBase):
def _entries_with_cols(self, db="testdb", tbl="users", cols=("id", "name")):
e = _TestEntriesSE()
e.getColumnsResult = {db: {tbl: {c: "varchar" for c in cols}}}
return e
# --- dumpTable: inband (conf.direct) ------------------------------------
def test_dump_table_inband_rows(self):
e = self._entries_with_cols(cols=("id", "name"))
conf.db = "testdb"
conf.tbl = "users"
conf.col = None
# MySQL inband dump returns a list of [colVal, colVal] rows.
emod.inject.getValue = lambda *a, **k: [["1", "alice"], ["2", "bob"]]
e.dumpTable()
dumped = conf.dumper.tableValues[-1]
self.assertEqual(dumped["__infos__"]["count"], 2)
self.assertEqual(dumped["__infos__"]["table"], "users")
self.assertEqual(dumped["__infos__"]["db"], "testdb")
self.assertEqual(list(dumped["id"]["values"]), ["1", "2"])
self.assertEqual(list(dumped["name"]["values"]), ["alice", "bob"])
def test_dump_table_uses_foundData(self):
e = _TestEntriesSE()
conf.db = "testdb"
conf.tbl = "users"
conf.col = None
emod.inject.getValue = lambda *a, **k: [["x"]]
foundData = {"testdb": {"users": {"id": "int"}}}
e.dumpTable(foundData=foundData)
# foundData short-circuits column discovery: getColumns must not run.
self.assertEqual(e.getColumnsCalls, [])
self.assertIn("id", conf.dumper.tableValues[-1])
def test_dump_table_no_columns_skips(self):
e = _TestEntriesSE()
e.getColumnsResult = {} # discovery yields nothing
conf.db = "testdb"
conf.tbl = "ghost"
conf.col = None
emod.inject.getValue = lambda *a, **k: self.fail("should not fetch entries")
e.dumpTable()
# No columns => no values dumped.
self.assertEqual(conf.dumper.tableValues, [])
def test_dump_table_empty_entries(self):
e = self._entries_with_cols(cols=("id",))
conf.db = "testdb"
conf.tbl = "users"
conf.col = None
emod.inject.getValue = lambda *a, **k: None # no rows
e.dumpTable()
# Nothing retrieved => dumpedTable empty => dbTableValues not called.
self.assertEqual(conf.dumper.tableValues, [])
def test_dump_table_current_db(self):
e = self._entries_with_cols(db="testdb", tbl="users", cols=("id",))
conf.db = None # triggers getCurrentDb() -> "testdb"
conf.tbl = "users"
conf.col = None
emod.inject.getValue = lambda *a, **k: [["7"]]
e.dumpTable()
self.assertEqual(conf.db, "testdb")
self.assertEqual(list(conf.dumper.tableValues[-1]["id"]["values"]), ["7"])
def test_dump_table_multiple_db_error(self):
e = _TestEntriesSE()
conf.db = "a,b"
conf.tbl = "users"
conf.col = None
from lib.core.exception import SqlmapMissingMandatoryOptionException
self.assertRaises(SqlmapMissingMandatoryOptionException, e.dumpTable)
def test_dump_table_get_tables_when_no_tbl(self):
e = _TestEntriesSE()
e.getTablesResult = {"testdb": ["users"]}
e.getColumnsResult = {"testdb": {"users": {"id": "int"}}}
conf.db = "testdb"
conf.tbl = None
conf.col = None
emod.inject.getValue = lambda *a, **k: [["42"]]
e.dumpTable()
# Tables were discovered via getTables, then the row dumped.
self.assertEqual(list(conf.dumper.tableValues[-1]["id"]["values"]), ["42"])
# --- dumpAll: single-db delegation --------------------------------------
def test_dump_all_single_db_delegates(self):
e = self._entries_with_cols(db="testdb", tbl="users", cols=("id",))
# dumpAll with db set & tbl None must delegate straight to dumpTable.
conf.db = "testdb"
conf.tbl = None
conf.col = None
e.getTablesResult = {"testdb": ["users"]}
emod.inject.getValue = lambda *a, **k: [["9"]]
e.dumpAll()
self.assertTrue(conf.dumper.tableValues)
# --------------------------------------------------------------------------- #
# Helpers/base from tests/test_generic_more.py (inband dump branches)
# --------------------------------------------------------------------------- #
class _RecordingDumperGM(object):
"""Recording stand-in for conf.dumper (no printing / file writing)."""
def __init__(self):
self.tableValues = []
self.sqlQueries = []
def dbTableValues(self, tableValues):
self.tableValues.append(tableValues)
def sqlQuery(self, query, queryRes):
self.sqlQueries.append((query, queryRes))
class _TestEntriesGM(Entries):
"""Entries with cross-mixin collaborators stubbed.
forceDbmsEnum / getCurrentDb / getColumns / getTables are normally supplied by
sibling mixins; we emulate column/table discovery by populating kb.data.cached*
from canned attributes, exactly as the production plugins do.
"""
def __init__(self):
Entries.__init__(self)
self.getColumnsResult = {} # assigned to kb.data.cachedColumns
self.getTablesResult = {} # assigned to kb.data.cachedTables
self.getColumnsCalls = []
self.getTablesCalls = 0
def forceDbmsEnum(self):
pass
def getCurrentDb(self):
return "testdb"
def getColumns(self, onlyColNames=False, colTuple=None, bruteForce=None, dumpMode=False):
self.getColumnsCalls.append((conf.db, conf.tbl))
kb.data.cachedColumns = dict(self.getColumnsResult)
def getTables(self, bruteForce=None):
self.getTablesCalls += 1
kb.data.cachedTables = dict(self.getTablesResult)
class _GenericBase(unittest.TestCase):
"""Snapshot/restore for everything the generic mixins touch."""
_CONF_KEYS = (
"db", "tbl", "col", "direct", "batch", "exclude", "search",
"disableHashing", "noKeyset", "keyset", "forcePivoting", "dumpWhere",
"tmpPath", "sqlQuery", "sqlFile", "regKey", "regVal", "regData",
"regType", "osPwn", "osShell", "cleanup", "privEsc",
)
def setUp(self):
self._saved_conf = {k: conf.get(k) for k in self._CONF_KEYS}
self._saved_dumper = conf.get("dumper")
self._saved_getValue = {
emod: emod.inject.getValue,
cmod: cmod.inject.getValue,
mmod: mmod.inject.getValue,
}
self._saved_goStacked = {
cmod: cmod.inject.goStacked,
mmod: mmod.inject.goStacked,
}
self._saved_emod_readInput = emod.readInput
self._saved_mmod_readInput = mmod.readInput
self._saved_kb = {
"cachedColumns": kb.data.get("cachedColumns"),
"cachedTables": kb.data.get("cachedTables"),
"dumpedTable": kb.data.get("dumpedTable"),
"has_information_schema": kb.data.get("has_information_schema"),
"dumpKeyboardInterrupt": kb.get("dumpKeyboardInterrupt"),
"permissionFlag": kb.get("permissionFlag"),
"hintValue": kb.get("hintValue"),
"injection_data": kb.injection.data,
"bannerFp": kb.get("bannerFp"),
"os": kb.get("os"),
}
self._saved_forceDbms = kb.get("forcedDbms")
conf.direct = True
conf.batch = True
conf.exclude = None
conf.search = False
conf.disableHashing = True
conf.noKeyset = True
conf.keyset = False
conf.forcePivoting = False
conf.dumpWhere = None
conf.dumper = _RecordingDumperGM()
kb.data.cachedColumns = {}
kb.data.cachedTables = {}
kb.data.dumpedTable = {}
kb.data.has_information_schema = True
kb.dumpKeyboardInterrupt = False
kb.permissionFlag = False
def _readInput(message, default=None, checkBatch=True, boolean=False):
if boolean:
return default in (None, 'Y', 'y', True)
return default
emod.readInput = _readInput
mmod.readInput = _readInput
def tearDown(self):
for k, v in self._saved_conf.items():
conf[k] = v
conf.dumper = self._saved_dumper
for mod, fn in self._saved_getValue.items():
mod.inject.getValue = fn
for mod, fn in self._saved_goStacked.items():
mod.inject.goStacked = fn
emod.readInput = self._saved_emod_readInput
mmod.readInput = self._saved_mmod_readInput
kb.data.cachedColumns = self._saved_kb["cachedColumns"]
kb.data.cachedTables = self._saved_kb["cachedTables"]
kb.data.dumpedTable = self._saved_kb["dumpedTable"]
kb.data.has_information_schema = self._saved_kb["has_information_schema"]
kb.dumpKeyboardInterrupt = self._saved_kb["dumpKeyboardInterrupt"]
kb.permissionFlag = self._saved_kb["permissionFlag"]
kb.hintValue = self._saved_kb["hintValue"]
kb.injection.data = self._saved_kb["injection_data"]
kb.bannerFp = self._saved_kb["bannerFp"]
kb.os = self._saved_kb["os"]
kb.forcedDbms = self._saved_forceDbms
@staticmethod
def _force_os(os_name):
# Backend.setOs only assigns when kb.os is currently None; reset first so
# tests can deterministically pin the back-end OS.
kb.os = None
Backend.setOs(os_name)
class TestEntriesDumpTable(_GenericBase):
def _entries(self, db="testdb", tbl="users", cols=("id", "name")):
e = _TestEntriesGM()
e.getColumnsResult = {db: {tbl: {c: "varchar" for c in cols}}}
return e
def test_exclude_filters_columns(self):
set_dbms("MySQL")
e = self._entries(cols=("id", "secret"))
conf.db = "testdb"
conf.tbl = "users"
conf.col = None
conf.exclude = "secret"
emod.inject.getValue = lambda *a, **k: [["1"]]
e.dumpTable()
dumped = conf.dumper.tableValues[-1]
self.assertIn("id", dumped)
self.assertNotIn("secret", dumped)
def test_exclude_all_columns_skips(self):
set_dbms("MySQL")
e = self._entries(cols=("secret",))
conf.db = "testdb"
conf.tbl = "users"
conf.col = None
conf.exclude = "secret"
emod.inject.getValue = lambda *a, **k: self.fail("should not fetch entries")
e.dumpTable()
# all columns excluded => "no usable column names" => nothing dumped
self.assertEqual(conf.dumper.tableValues, [])
def test_dumpwhere_rewrites_query(self):
set_dbms("MySQL")
e = self._entries(cols=("id",))
conf.db = "testdb"
conf.tbl = "users"
conf.col = None
conf.dumpWhere = "id>5"
captured = {}
def gv(query, *a, **k):
captured["query"] = query
return [["9"]]
emod.inject.getValue = gv
e.dumpTable()
# agent.whereQuery folds conf.dumpWhere into the dump query
self.assertIn("id>5", captured["query"])
self.assertEqual(list(conf.dumper.tableValues[-1]["id"]["values"]), ["9"])
def test_disablehashing_false_path(self):
# conf.disableHashing False => attackDumpedTable() is invoked; with no
# hashes present it must complete without raising and still emit values.
set_dbms("MySQL")
e = self._entries(cols=("id", "name"))
conf.db = "testdb"
conf.tbl = "users"
conf.col = None
conf.disableHashing = False
emod.inject.getValue = lambda *a, **k: [["1", "alice"]]
# Spy on attackDumpedTable: with disableHashing False it MUST be invoked
# after the values are dumped. A recorder replaces it so we can assert the
# call happened (and no real dictionary attack runs).
saved_attack = emod.attackDumpedTable
calls = {"n": 0}
emod.attackDumpedTable = lambda *a, **k: calls.__setitem__("n", calls["n"] + 1)
try:
e.dumpTable()
finally:
emod.attackDumpedTable = saved_attack
self.assertEqual(calls["n"], 1)
self.assertEqual(conf.dumper.tableValues[-1]["__infos__"]["count"], 1)
def test_missing_columns_skips_table(self):
# getColumns yields nothing for the targeted table => skip without fetching.
set_dbms("MySQL")
e = _TestEntriesGM()
e.getColumnsResult = {"testdb": {"other": {"id": "int"}}}
conf.db = "testdb"
conf.tbl = "users"
conf.col = None
emod.inject.getValue = lambda *a, **k: self.fail("should not fetch entries")
e.dumpTable()
self.assertEqual(conf.dumper.tableValues, [])
def test_multiple_tables_one_dumped(self):
set_dbms("MySQL")
e = _TestEntriesGM()
e.getColumnsResult = {"testdb": {"users": {"id": "int"}, "posts": {"pid": "int"}}}
conf.db = "testdb"
conf.tbl = "users,posts"
conf.col = None
emod.inject.getValue = lambda *a, **k: [["1"]]
e.dumpTable()
# both tables share the same cachedColumns dict => both dumped
tables = [tv["__infos__"]["table"] for tv in conf.dumper.tableValues]
self.assertIn("users", tables)
self.assertIn("posts", tables)
def test_metadb_suffix_db(self):
# A db whose name carries the METADB_SUFFIX must not get a "db" prefix in
# kb.dumpTable, and dumping still succeeds.
from lib.core.settings import METADB_SUFFIX
set_dbms("MySQL")
metadb = "x%s" % METADB_SUFFIX
e = self._entries(db=metadb, tbl="t", cols=("c",))
conf.db = metadb
conf.tbl = "t"
conf.col = None
emod.inject.getValue = lambda *a, **k: [["v"]]
e.dumpTable()
self.assertEqual(list(conf.dumper.tableValues[-1]["c"]["values"]), ["v"])
class TestEntriesDumpAll(_GenericBase):
def test_dumpall_multiple_dbs_tables(self):
set_dbms("MySQL")
e = _TestEntriesGM()
conf.db = None
conf.tbl = None
conf.col = None
e.getTablesResult = {"db1": ["t1"], "db2": ["t2"]}
# dumpTable re-discovers columns per (db, tbl); supply both.
e.getColumnsResult = {
"db1": {"t1": {"a": "int"}},
"db2": {"t2": {"b": "int"}},
}
emod.inject.getValue = lambda *a, **k: [["x"]]
e.dumpAll()
# Every table contributed a values batch.
self.assertEqual(len(conf.dumper.tableValues), 2)
def test_dumpall_list_cached_tables(self):
# cachedTables as a bare list => wrapped under {None: [...]}.
set_dbms("MySQL")
e = _TestEntriesGM()
conf.db = None
conf.tbl = None
conf.col = None
# getTables sets cachedTables; emulate the list shape directly.
class _ListTables(_TestEntriesGM):
def getTables(self_inner, bruteForce=None):
kb.data.cachedTables = ["users"]
e = _ListTables()
# dumpAll wraps a bare list as {None: [...]}; dumpTable then resolves the
# None db via getCurrentDb() -> "testdb", so columns live under "testdb".
e.getColumnsResult = {"testdb": {"users": {"id": "int"}}}
emod.inject.getValue = lambda *a, **k: [["1"]]
e.dumpAll()
self.assertTrue(conf.dumper.tableValues)
# The bare-list None db must be resolved via getCurrentDb() -> "testdb"
# before the dump; assert the dumped __infos__ carries the real db (not
# None) for the requested "users" table.
infos = conf.dumper.tableValues[-1]["__infos__"]
self.assertEqual(infos["db"], "testdb")
self.assertEqual(infos["table"], "users")
def test_dumpall_exclude_skips_table(self):
set_dbms("MySQL")
e = _TestEntriesGM()
conf.db = None
conf.tbl = None
conf.col = None
conf.exclude = "secret"
e.getTablesResult = {"db1": ["secret", "users"]}
e.getColumnsResult = {"db1": {"users": {"id": "int"}, "secret": {"id": "int"}}}
emod.inject.getValue = lambda *a, **k: [["1"]]
e.dumpAll()
tables = [tv["__infos__"]["table"] for tv in conf.dumper.tableValues]
self.assertIn("users", tables)
self.assertNotIn("secret", tables)
class TestEntriesDumpFound(_GenericBase):
def _entries(self):
e = _TestEntriesGM()
e.getColumnsResult = {"testdb": {"users": {"id": "int"}}}
return e
def test_dump_found_tables_yes_all(self):
set_dbms("MySQL")
e = self._entries()
emod.inject.getValue = lambda *a, **k: [["1"]]
# batch readInput -> 'Y' (boolean True) and 'a'/'a' for db/table choices.
e.dumpFoundTables({"testdb": ["users"]})
self.assertTrue(conf.dumper.tableValues)
# The interactive selection must dump the REQUESTED db/table, not just
# "something": assert the dumped __infos__ maps to testdb.users.
infos = conf.dumper.tableValues[-1]["__infos__"]
self.assertEqual(infos["db"], "testdb")
self.assertEqual(infos["table"], "users")
def test_dump_found_tables_declined(self):
set_dbms("MySQL")
e = self._entries()
def _no(message, default=None, checkBatch=True, boolean=False):
if boolean:
return False
return default
emod.readInput = _no
emod.inject.getValue = lambda *a, **k: self.fail("must not dump when declined")
e.dumpFoundTables({"testdb": ["users"]})
self.assertEqual(conf.dumper.tableValues, [])
def test_dump_found_column_yes_all(self):
set_dbms("MySQL")
e = self._entries()
emod.inject.getValue = lambda *a, **k: [["1"]]
dbs = {"testdb": {"users": {"id": "int"}}}
e.dumpFoundColumn(dbs, foundCols=None, colConsider='1')
self.assertTrue(conf.dumper.tableValues)
# The selection must dump the REQUESTED db/table mapping, not just
# "something": assert the dumped __infos__ maps to testdb.users.
infos = conf.dumper.tableValues[-1]["__infos__"]
self.assertEqual(infos["db"], "testdb")
self.assertEqual(infos["table"], "users")
# --------------------------------------------------------------------------- #
# Helpers/base from tests/test_generic_enum_more.py (inference branches)
# --------------------------------------------------------------------------- #
class _RecordingDumperInf(object):
def __init__(self):
self.tableValues = []
def dbTableValues(self, tableValues):
self.tableValues.append(tableValues)
class _TestEntriesInf(Entries):
def __init__(self):
Entries.__init__(self)
self.getColumnsResult = {}
self.getTablesResult = {}
def forceDbmsEnum(self):
pass
def getCurrentDb(self):
return "testdb"
def getColumns(self, onlyColNames=False, colTuple=None, bruteForce=None, dumpMode=False):
kb.data.cachedColumns = dict(self.getColumnsResult)
def getTables(self, bruteForce=None):
kb.data.cachedTables = dict(self.getTablesResult)
class _EntriesBase(unittest.TestCase):
_CONF_KEYS = ("db", "tbl", "col", "direct", "technique", "exclude", "search",
"disableHashing", "noKeyset", "keyset", "forcePivoting", "dumpWhere")
def setUp(self):
self._saved_conf = {k: conf.get(k) for k in self._CONF_KEYS}
self._saved_dumper = conf.get("dumper")
self._gv = emod.inject.getValue
self._cbe = emod.inject.checkBooleanExpression
self._readInput = emod.readInput
self._saved_has_is = kb.data.get("has_information_schema")
self._saved_cachedColumns = kb.data.get("cachedColumns")
self._saved_cachedTables = kb.data.get("cachedTables")
self._saved_dumpedTable = kb.data.get("dumpedTable")
self._saved_dumpKbInt = kb.get("dumpKeyboardInterrupt")
self._saved_permissionFlag = kb.get("permissionFlag")
self._saved_injection_data = kb.injection.data
set_dbms("MySQL")
conf.direct = False
conf.technique = None
conf.exclude = None
conf.search = False
conf.disableHashing = True
conf.noKeyset = True
conf.keyset = False
conf.forcePivoting = False
conf.dumpWhere = None
conf.dumper = _RecordingDumperInf()
kb.data.has_information_schema = True
kb.data.cachedColumns = {}
kb.data.cachedTables = {}
kb.data.dumpedTable = {}
kb.dumpKeyboardInterrupt = False
kb.permissionFlag = False
kb.injection.data = {PAYLOAD.TECHNIQUE.BOOLEAN: {"title": "AND boolean-based blind"}}
emod.readInput = lambda *a, **k: (k.get("default") if k.get("default") is not None else (a[1] if len(a) > 1 else None))
def tearDown(self):
for k, v in self._saved_conf.items():
conf[k] = v
conf.dumper = self._saved_dumper
emod.inject.getValue = self._gv
emod.inject.checkBooleanExpression = self._cbe
emod.readInput = self._readInput
kb.data.has_information_schema = self._saved_has_is
kb.data.cachedColumns = self._saved_cachedColumns
kb.data.cachedTables = self._saved_cachedTables
kb.data.dumpedTable = self._saved_dumpedTable
kb.dumpKeyboardInterrupt = self._saved_dumpKbInt
kb.permissionFlag = self._saved_permissionFlag
kb.injection.data = self._saved_injection_data
class TestEntriesInference(_EntriesBase):
def _entries(self, db="testdb", tbl="users", cols=("id", "name")):
e = _TestEntriesInf()
e.getColumnsResult = {db: {tbl: {c: "varchar" for c in cols}}}
return e
def test_dump_table_inference_column_pivot(self):
# Blind dump (conf.direct=False, BOOLEAN available): a row count, then one
# value per (index, column). Assert the per-column pivoted values match.
set_dbms("MySQL")
e = self._entries(cols=("id", "name"))
conf.db = "testdb"
conf.tbl = "users"
conf.col = None
# data[index][column] -> value. 2 rows, columns id/name.
data = {0: {"id": "1", "name": "alice"}, 1: {"id": "2", "name": "bob"}}
def gv(query, *a, **k):
if k.get("expected") == EXPECTED.INT:
return "2" # row count
# MySQL blind cell query: 'SELECT <col> FROM testdb.users ORDER BY ...
# LIMIT <index>,1'. The row index is the LIMIT offset; the column is the
# SELECT projection.
import re as _re
idx = int(_re.search(r"LIMIT\s+(\d+)\s*,\s*1", query).group(1))
proj = query.split(" FROM ", 1)[0]
col = "name" if "name" in proj else "id"
return data[idx][col]
emod.inject.getValue = gv
e.dumpTable()
dumped = conf.dumper.tableValues[-1]
self.assertEqual(dumped["__infos__"]["count"], 2)
self.assertEqual(list(dumped["id"]["values"]), ["1", "2"])
self.assertEqual(list(dumped["name"]["values"]), ["alice", "bob"])
def test_dump_table_inference_empty_table(self):
# A zero row count in the inference path yields empty per-column value
# lists and no dbTableValues emission (dumpedTable stays effectively empty).
set_dbms("MySQL")
e = self._entries(cols=("id",))
conf.db = "testdb"
conf.tbl = "users"
conf.col = None
emod.inject.getValue = lambda query, *a, **k: ("0" if k.get("expected") == EXPECTED.INT else self.fail("must not fetch cells for empty table"))
e.dumpTable()
# count 0 => empty entries => nothing dumped
self.assertEqual(conf.dumper.tableValues, [])
def test_dump_table_inference_count_failure_skips(self):
# A non-numeric count in the inference path => the table is skipped with a
# warning, no values dumped.
set_dbms("MySQL")
e = self._entries(cols=("id",))
conf.db = "testdb"
conf.tbl = "users"
conf.col = None
def gv(query, *a, **k):
if k.get("expected") == EXPECTED.INT:
return None # count failed
self.fail("must not fetch cells when count failed")
emod.inject.getValue = gv
e.dumpTable()
self.assertEqual(conf.dumper.tableValues, [])
if __name__ == "__main__":
unittest.main()