Adding --report-json option

This commit is contained in:
Miroslav Štampar 2026-06-15 15:35:33 +02:00
parent 948d01d57a
commit 17e94c3409
13 changed files with 581 additions and 69 deletions

View file

@ -162,7 +162,7 @@ df768bcb9838dc6c46dab9b4a877056cb4742bd6cfaaf438c4a3712c5cc0d264 extra/shutils/
9e5e4d3d9acb767412259895a3ee75e1a5f42d0b9923f17605d771db384a6f60 extra/vulnserver/vulnserver.py
b8411d1035bb49b073476404e61e1be7f4c61e205057730e2f7880beadcd5f60 lib/controller/action.py
6da812281a69c8b7a5181c2f76374dc695e4727b2936042651bacbeda4e6bcc9 lib/controller/checks.py
c1881685bef8504ded32c51abed00ab51849008c84b74e8a66117e5f5041b3df lib/controller/controller.py
85146a0565467952a35cdd234031d8de01ef8f354c8676f6484b0bfb911c5347 lib/controller/controller.py
d69e84f1648cdb907f5d2dd454f03874a4613752b07867510145d51d84b3c56f lib/controller/handler.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/controller/__init__.py
b36b085ff1b5797e375c1e2ca3b12c7ab4204f48acd1a1efb075cff8302d9750 lib/core/agent.py
@ -175,12 +175,12 @@ c03dc585f89642cfd81b087ac2723e3e1bb3bfa8c60e6f5fe58ef3b0113ebfe6 lib/core/data.
70fb2528e580b22564899595b0dff6b1bc257c6a99d2022ce3996a3d04e68e4e lib/core/decorators.py
147823c37596bd6a56d677697781f34b8d1d1671d5a2518fbc9468d623c6d07d lib/core/defaults.py
2f44a1bfe6f18aafe64147b99e69aa93cf438c0e7befe59f4e2aee9065c8b7b6 lib/core/dicts.py
8aee07fba24082ee6355a29d01842bc3657194148a7f9062079b5f0a85ec53e3 lib/core/dump.py
e4b23512625bc377c0e0924d8113c595452320d8c66014828da5d8258a77f55a lib/core/dump.py
23e33f0b457e2a7114c9171ba9b42e1751b71ee3f384bba7fad39e4490adb803 lib/core/enums.py
5387168e5dfedd94ae22af7bb255f27d6baaca50b24179c6b98f4f325f5cc7b4 lib/core/exception.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/core/__init__.py
914a13ee21fd610a6153a37cbe50830fcbd1324c7ebc1e7fc206d5e598b0f7ad lib/core/log.py
67ea32c993cbf23cdbd5170360c020ca33363b7c516ff3f8da4124ef7cb0254d lib/core/optiondict.py
885042ed021e60f1739e2a849e3405cc3a4c2a67a5a169a30399d1c53446460f lib/core/optiondict.py
3ff871fe8391952c3ec3bb528ba592a13926c80ca0b68fd322a317f69a651ef7 lib/core/option.py
ccc4a717e887652b1fcce073d9409d9c59a3b28548c703a9e453d15845f90cd7 lib/core/patch.py
49c0fa7e3814dfda610d665ee02b12df299b28bc0b6773815b4395514ddf8dec lib/core/profiling.py
@ -188,7 +188,7 @@ ccc4a717e887652b1fcce073d9409d9c59a3b28548c703a9e453d15845f90cd7 lib/core/patch
48797d6c34dd9bb8a53f7f3794c85f4288d82a9a1d6be7fcf317d388cb20d4b3 lib/core/replication.py
0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py
888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py
8eb10b15440aaa6ddc592e1b29199e9fa575df6b46335fcf7b7374c5f8f68480 lib/core/settings.py
1e2a5277293de9d3d1e65b401013baf1c4033162e580f6891ca6a2686e666894 lib/core/settings.py
cd5a66deee8963ba8e7e9af3dd36eb5e8127d4d68698811c29e789655f507f82 lib/core/shell.py
bcb5d8090d5e3e0ef2a586ba09ba80eef0c6d51feb0f611ed25299fbb254f725 lib/core/subprocessng.py
70ea3768f1b3062b22d20644df41c86238157ec80dd43da40545c620714273c6 lib/core/target.py
@ -199,7 +199,7 @@ b9aacb840310173202f79c2ba125b0243003ee6b44c92eca50424f2bdfc83c02 lib/core/unesc
2400e465fa4d13e4c32795910878c71ff212e4361b46428d57ce43983f5e997c lib/core/wordlist.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/__init__.py
54bfd31ebded3ffa5848df1c644f196eb704116517c7a3d860b5d081e984d821 lib/parse/banner.py
4c56ad26ffb893d37813167de172b6c95c120588bfdc899f102977a2997b9bb9 lib/parse/cmdline.py
7bc8612fbd7ba390ab19f908c370c126ae66afa200bc7975800599ecbe029f0c lib/parse/cmdline.py
02d82e4069bd98c52755417f8b8e306d79945672656ac24f1a45e7a6eff4b158 lib/parse/configfile.py
c5b258be7485089fac9d9cd179960e774fbd85e62836dc67cce76cc028bb6aeb lib/parse/handler.py
5c9a9caee948843d5537745640cc7b98d70a0412cc0949f59d4ebe8b2907c06c lib/parse/headers.py
@ -230,18 +230,18 @@ f522436fbd14bdab090a1d305fcac0361800cb8e36c8cbcb47933298376a71e0 lib/takeover/r
0787f78e6bd9bb21d4267c95c4c99806711bb57c5518485c2e25f10fcf9c41fc lib/takeover/udf.py
23d73af417604dab460b74cdc230896153f018a6c00d144019491053640a172f lib/takeover/web.py
8cc1e226d4150fe8aa1a056e5d32d858ed6444d3d4e2af7fb4bc08f0bbe9d527 lib/takeover/xp_cmdshell.py
7b62bbb4d94f1271380a44142b407dc9eeed1d8b0319cdad57493dc1a12caff8 lib/techniques/blind/inference.py
09c3759b59bc111712f75b0b1762d195c0da0e0741dd76379546c429e8ed4457 lib/techniques/blind/inference.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/blind/__init__.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/dns/__init__.py
3df9839fb92a81d46b6194d7adacb43f391efb78b071783c132e8d596ecbfaf1 lib/techniques/dns/test.py
2934514a60cbcd48675053a73f785b4c7bfe606b51c34ae81a86818362ec4672 lib/techniques/dns/use.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/error/__init__.py
f552b6140d4069be6a44792a08f295da8adabc1c4bb6a5e100f222f87144ca9d lib/techniques/error/use.py
ee63b978154b0cb9a385fe51926ef6dc6f425b07f62b0d17208e82b4ac020f5c lib/techniques/error/use.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/__init__.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/union/__init__.py
30cae858e2a5a75b40854399f65ad074e6bb808d56d5ee66b94d4002dc6e101b lib/techniques/union/test.py
a8a795f29ec6fd66482926f04b054ed492a033982c3b7837c5d2ea32368acec0 lib/techniques/union/use.py
8720a744d46471fe46f5a67e16b2d4147339c6685fbf0fdf50f1a40e9a75c23a lib/utils/api.py
5b49f5bca4e35362fa7d83896e0769fdb01ad152f30059aafd8ce0f093400a3f lib/techniques/union/use.py
aeefb42ea0c68f72744bc1bfd7194ec1bc06480d8a7e23f4b8d3d23fbba2b014 lib/utils/api.py
442555ab85277aff7c9e0cf465ea5b0d28395c326f68363449b2d3941f4b6de2 lib/utils/brute.py
da5bcbcda3f667582adf5db8c1b5d511b469ac61b55d387cec66de35720ed718 lib/utils/crawler.py
a94958be0ec3e9d28d8171813a6a90655a9ad7e6aa33c661e8d8ebbfcf208dbb lib/utils/deps.py
@ -490,9 +490,9 @@ cedf45d33461bd7e5400d06611a63c8a4ffae1a4510030c5696b9d46ed6a9883 plugins/generi
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 plugins/__init__.py
5d72f0af46ff3c9e3fe80300e83cb78749132278e8db88915764a94d7130a04c README.md
46517f1444c202710e388873960130850ed092e17bd6f4dd5f2fedea3dbb8ffc sqlmapapi.py
e0607378f46f7664349552c628f25c4689569c788fd2364eef3075dd2cce127b sqlmapapi.yaml
f09d1b06901e7e02d0dbf4de607f6a4a9889acc322ae9353b98ea9101fb9548a sqlmapapi.yaml
627d90f1194335b800cbc9cc78db6697cf9e02e193a83598e0d4d0abb55b63b8 sqlmap.conf
65159b82795604069a2d14ccbd1f66e888a26b05db0401a1ddadb40c665c93dc sqlmap.py
d5128ba488b85080a18df85cc08b58f0baeac59494eb5ef43b9e34d66538f091 sqlmap.py
eb37a88357522fd7ad00d90cdc5da6b57442b4fec49366aadb2944c4fbf8b804 tamper/0eunion.py
a9785a4c111d6fee2e6d26466ba5efb3b229c00520b26e8024b041553b53efba tamper/apostrophemask.py
cf26bc8006519bd25ce06d347f72770cd75b61575cf65e5812274e8ab9392eb4 tamper/apostrophenullencode.py
@ -588,6 +588,7 @@ cde0bea1263ae857561f91ed2bd515e972b716743f017d31b1718a8546c72759 tests/test_pag
4bac34af2abddce003756d6776e89b2fda220bb7603ef3761f4f37ee29f9c369 tests/test_payload_marking.py
6bfc8201724078bd9d6d559916ef73c9ff97e19b0f2948f37e588a49b027795f tests/test_payloads_structure.py
5c95e7863190e440234f231864fb1219c35207132762858cc95181c57086bafc tests/test_replication.py
48bbe8403fbc52d16998b1af4fe2180d3637add0b14cd16dd71690113e96664f tests/test_report.py
cec98d72992c0799229a780fa7f0d7f3fb01ec2d708187ce0e4a05c8612f291b tests/test_safe2bin.py
a1c6cda1e5b483f61e6a4f8ddd0b06a15ddaa3fd2119bfb9dbd9cc970d7a751d tests/test_settings_regex.py
d3d991331096e16e5019de3d652e9fff92c09bd9f97c50b1c2c3ceb0ed49b17e tests/test_sqlparse.py

View file

@ -181,6 +181,12 @@ def _showInjections():
conf.dumper.string("", {"url": conf.url, "query": conf.parameters.get(PLACE.GET), "data": conf.parameters.get(PLACE.POST)}, content_type=CONTENT_TYPE.TARGET)
conf.dumper.string("", kb.injections, content_type=CONTENT_TYPE.TECHNIQUES)
else:
# --report-json: capture the same TARGET/TECHNIQUES structures the API emits, without
# printing them (the human-readable injection points are rendered just below)
if conf.reportJson:
conf.dumper._reportData({"url": conf.url, "query": conf.parameters.get(PLACE.GET), "data": conf.parameters.get(PLACE.POST)}, CONTENT_TYPE.TARGET)
conf.dumper._reportData(kb.injections, CONTENT_TYPE.TECHNIQUES)
data = "".join(set(_formatInjection(_) for _ in kb.injections)).rstrip("\n")
conf.dumper.string(header, data)

View file

@ -14,6 +14,7 @@ import threading
from lib.core.common import Backend
from lib.core.common import checkFile
from lib.core.common import clearColors
from lib.core.common import dataToDumpFile
from lib.core.common import dataToStdout
from lib.core.common import filterNone
@ -30,6 +31,7 @@ from lib.core.common import unsafeSQLIdentificatorNaming
from lib.core.compat import xrange
from lib.core.convert import getBytes
from lib.core.convert import getConsoleLength
from lib.core.convert import stdoutEncode
from lib.core.convert import getText
from lib.core.convert import getUnicode
from lib.core.convert import htmlEscape
@ -96,6 +98,19 @@ class Dump(object):
kb.dataOutputFlag = True
def _reportData(self, data, content_type):
"""
--report-json: capture a structured result exactly as the REST API would store it (the raw
value + COMPLETE status), independent of console/file rendering. No-op unless a report
collector is active - which is only ever the case for a CLI --report-json run, never under
--api - so this never double-captures alongside StdDbOut. A None content_type is resolved
via the kb.partRun fallback (e.g. the fingerprint line), mirroring the API exactly.
"""
if conf.get("reportCollector") is not None:
from lib.utils.api import _storeData, REPORT_TASKID
_storeData(conf.reportCollector, REPORT_TASKID, stdoutEncode(clearColors(data)), CONTENT_STATUS.COMPLETE, content_type)
def flush(self):
if self._outputFP:
try:
@ -116,9 +131,12 @@ class Dump(object):
raise SqlmapGenericException(errMsg)
def singleString(self, data, content_type=None):
self._reportData(data, content_type)
self._write(data, content_type=content_type)
def string(self, header, data, content_type=None, sort=True):
self._reportData(data, content_type)
if conf.api:
self._write(data, content_type=content_type)
@ -153,6 +171,8 @@ class Dump(object):
except:
pass
self._reportData(elements, content_type)
if conf.api:
self._write(elements, content_type=content_type)
@ -204,6 +224,8 @@ class Dump(object):
users = [_ for _ in userSettings.keys() if _ is not None]
users.sort(key=lambda _: _.lower() if hasattr(_, "lower") else _)
self._reportData(userSettings, content_type)
if conf.api:
self._write(userSettings, content_type=content_type)
@ -237,6 +259,8 @@ class Dump(object):
def dbTables(self, dbTables):
if isinstance(dbTables, dict) and len(dbTables) > 0:
self._reportData(dbTables, CONTENT_TYPE.TABLES)
if conf.api:
self._write(dbTables, content_type=CONTENT_TYPE.TABLES)
@ -279,6 +303,8 @@ class Dump(object):
def dbTableColumns(self, tableColumns, content_type=None):
if isinstance(tableColumns, dict) and len(tableColumns) > 0:
self._reportData(tableColumns, content_type)
if conf.api:
self._write(tableColumns, content_type=content_type)
@ -352,6 +378,8 @@ class Dump(object):
def dbTablesCount(self, dbTables):
if isinstance(dbTables, dict) and len(dbTables) > 0:
self._reportData(dbTables, CONTENT_TYPE.COUNT)
if conf.api:
self._write(dbTables, content_type=CONTENT_TYPE.COUNT)
@ -413,6 +441,8 @@ class Dump(object):
safeDb = re.sub(r"[^\w]", UNSAFE_DUMP_FILEPATH_REPLACEMENT, unsafeSQLIdentificatorNaming(db))
safeTable = re.sub(r"[^\w]", UNSAFE_DUMP_FILEPATH_REPLACEMENT, unsafeSQLIdentificatorNaming(table))
self._reportData(tableValues, CONTENT_TYPE.DUMP_TABLE)
if conf.api:
self._write(tableValues, content_type=CONTENT_TYPE.DUMP_TABLE)
@ -679,6 +709,8 @@ class Dump(object):
logger.warning(msg)
def dbColumns(self, dbColumnsDict, colConsider, dbs):
self._reportData(dbColumnsDict, CONTENT_TYPE.COLUMNS)
if conf.api:
self._write(dbColumnsDict, content_type=CONTENT_TYPE.COLUMNS)

View file

@ -235,6 +235,7 @@ optDict = {
"postprocess": "string",
"preprocess": "string",
"repair": "boolean",
"reportJson": "string",
"saveConfig": "string",
"scope": "string",
"skipHeuristics": "boolean",

View file

@ -20,7 +20,7 @@ from lib.core.enums import OS
from thirdparty import six
# sqlmap version (<major>.<minor>.<month>.<monthly commit>)
VERSION = "1.10.6.107"
VERSION = "1.10.6.108"
TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable"
TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34}
VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE)
@ -843,6 +843,15 @@ LIMITED_ROWS_TEST_NUMBER = 15
# Default adapter to use for bottle server
RESTAPI_DEFAULT_ADAPTER = "wsgiref"
# REST API / scan-data contract version (semantic versioning), INDEPENDENT of the sqlmap version.
# Bump MAJOR for breaking changes (removed/renamed field, changed type, restructured response),
# MINOR for additive backward-compatible changes (new field/endpoint), PATCH for non-contract fixes.
# Exposed at GET /version (as "api_version"), in the --report-json "meta", and as the OpenAPI
# info.version (keep sqlmapapi.yaml in sync). Maintained by hand when the contract changes.
# 2.0.0: first explicitly-versioned contract; a MAJOR break from the old implicit shape
# (TECHNIQUES is now a named list, DUMP_TABLE restructured, internal fields dropped, type_name added).
RESTAPI_VERSION = "2.0.0"
# Default REST API server listen address
RESTAPI_DEFAULT_ADDRESS = "127.0.0.1"
@ -850,7 +859,7 @@ RESTAPI_DEFAULT_ADDRESS = "127.0.0.1"
RESTAPI_DEFAULT_PORT = 8775
# Unsupported options by REST API server
RESTAPI_UNSUPPORTED_OPTIONS = ("sqlShell", "wizard", "evalCode", "alert")
RESTAPI_UNSUPPORTED_OPTIONS = ("sqlShell", "wizard", "evalCode", "alert", "reportJson")
# Use "Supplementary Private Use Area-A"
INVALID_UNICODE_PRIVATE_AREA = False

View file

@ -727,6 +727,9 @@ def cmdLineParser(argv=None):
general.add_argument("--repair", dest="repair", action="store_true",
help="Redump entries having unknown character marker (%s)" % INFERENCE_UNKNOWN_CHAR)
general.add_argument("--report-json", dest="reportJson",
help="Store run results to a JSON file")
general.add_argument("--save", dest="saveConfig",
help="Save options to a configuration INI file")

View file

@ -127,10 +127,11 @@ def bisection(payload, expression, length=None, charsetType=None, firstChar=None
expression = match.group(2).strip()
try:
# Set kb.partRun in case "common prediction" feature (a.k.a. "good samaritan") is used or the engine is called from the API
# Set kb.partRun in case "common prediction" feature (a.k.a. "good samaritan") is used, or the
# engine is called from the API, or a JSON report is being collected (so enumeration output is tagged)
if conf.predictOutput:
kb.partRun = getPartRun()
elif conf.api:
elif conf.api or conf.reportJson:
kb.partRun = getPartRun(alias=False)
else:
kb.partRun = None

View file

@ -314,8 +314,8 @@ def errorUse(expression, dump=False):
_, _, _, _, _, expressionFieldsList, expressionFields, _ = agent.getFields(expression)
# Set kb.partRun in case the engine is called from the API
kb.partRun = getPartRun(alias=False) if conf.api else None
# Set kb.partRun in case the engine is called from the API or a JSON report is being collected
kb.partRun = getPartRun(alias=False) if (conf.api or conf.reportJson) else None
# We have to check if the SQL query might return multiple entries
# and in such case forge the SQL limiting the query output one

View file

@ -258,8 +258,8 @@ def unionUse(expression, unpack=True, dump=False):
_, _, _, _, _, expressionFieldsList, expressionFields, _ = agent.getFields(origExpr)
# Set kb.partRun in case the engine is called from the API
kb.partRun = getPartRun(alias=False) if conf.api else None
# Set kb.partRun in case the engine is called from the API or a JSON report is being collected
kb.partRun = getPartRun(alias=False) if (conf.api or conf.reportJson) else None
if expressionFieldsList and len(expressionFieldsList) > 1 and "ORDER BY" in expression.upper():
# Removed ORDER BY clause because UNION does not play well with it

View file

@ -44,7 +44,9 @@ from lib.core.defaults import _defaults
from lib.core.dicts import PART_RUN_CONTENT_TYPES
from lib.core.enums import AUTOCOMPLETE_TYPE
from lib.core.enums import CONTENT_STATUS
from lib.core.enums import CONTENT_TYPE
from lib.core.enums import MKSTEMP_PREFIX
from lib.core.enums import PAYLOAD
from lib.core.exception import SqlmapConnectionException
from lib.core.log import LOGGER_HANDLER
from lib.core.optiondict import optDict
@ -53,6 +55,7 @@ from lib.core.settings import RESTAPI_DEFAULT_ADAPTER
from lib.core.settings import RESTAPI_DEFAULT_ADDRESS
from lib.core.settings import RESTAPI_DEFAULT_PORT
from lib.core.settings import RESTAPI_UNSUPPORTED_OPTIONS
from lib.core.settings import RESTAPI_VERSION
from lib.core.settings import VERSION_STRING
from lib.core.shell import autoCompletion
from lib.core.subprocessng import Popen
@ -80,6 +83,195 @@ class DataStore(object):
RESTAPI_READONLY_OPTIONS = ("api", "taskid", "database")
# Reverse map CONTENT_TYPE int -> name (e.g. 2 -> "DBMS_FINGERPRINT"), for machine-readable reports
CONTENT_TYPE_NAMES = dict((v, k) for k, v in vars(CONTENT_TYPE).items() if not k.startswith("_") and isinstance(v, int))
# Task id used for the single-target CLI collector backing --report-json
REPORT_TASKID = 0
def _storeData(cursor, taskid, value, status=CONTENT_STATUS.IN_PROGRESS, content_type=None):
"""
Records a single (status, content_type, value) result row into an IPC-style 'data' table.
Shared by the REST API (via StdDbOut) and the CLI --report-json collector so both capture
results through identical logic (partial outputs are appended; a COMPLETE output replaces
its partials). Mirrors the API's per-content_type merge semantics.
"""
if content_type is None:
if kb.partRun is not None:
content_type = PART_RUN_CONTENT_TYPES.get(kb.partRun)
else:
# Ignore all non-relevant (untyped) messages
return
output = cursor.execute("SELECT id, status, value FROM data WHERE taskid = ? AND content_type = ?", (taskid, content_type))
# Delete partial output from the database if we have got a complete output
if status == CONTENT_STATUS.COMPLETE:
if len(output) > 0:
for index in xrange(len(output)):
cursor.execute("DELETE FROM data WHERE id = ?", (output[index][0],))
cursor.execute("INSERT INTO data VALUES(NULL, ?, ?, ?, ?)", (taskid, status, content_type, jsonize(value)))
if kb.partRun:
kb.partRun = None
elif status == CONTENT_STATUS.IN_PROGRESS:
if len(output) == 0:
cursor.execute("INSERT INTO data VALUES(NULL, ?, ?, ?, ?)", (taskid, status, content_type, jsonize(value)))
else:
new_value = "%s%s" % (dejsonize(output[0][2]), value)
cursor.execute("UPDATE data SET value = ? WHERE id = ?", (jsonize(new_value), output[0][0]))
# Internal detection/plumbing fields that are meaningless to API/report consumers and are stripped
# from the assembled output (the underlying kb/session structures keep them; only the output is cleaned)
INJECTION_INTERNAL_FIELDS = ("conf", "prefix", "suffix", "ptype", "clause") # detection/construction internals, irrelevant to a result consumer
TECHNIQUE_INTERNAL_FIELDS = ("matchRatio", "trueCode", "falseCode", "templatePayload", "where") # per-technique internals
def _cleanIdentifier(name):
"""
Strips SQL identifier quoting (`backticks`, "double quotes", [brackets]) in a DBMS-INDEPENDENT
way. Used instead of unsafeSQLIdentificatorNaming (which needs Backend.getIdentifiedDbms) so the
result is identical in the CLI and in the API server process - which has no Backend context
because the scan ran in a subprocess. Context-free => API and report stay in parity.
"""
if isinstance(name, six.string_types):
for ch in ("`", "\"", "[", "]"):
name = name.replace(ch, "")
return name
def _cleanIdentifiersDeep(value):
"""
Recursively unquotes every identifier in a metadata structure (dict keys and string leaves -
db/table/column names). Used for the schema-listing content types (TABLES/COLUMNS/SCHEMA/COUNT)
whose payload is entirely identifiers + types/counts (never user row data), so cleaning every
string is safe. NOT used for DUMP_TABLE, whose leaf values are real row data.
"""
if isinstance(value, dict):
return dict((_cleanIdentifier(k), _cleanIdentifiersDeep(v)) for k, v in value.items())
elif isinstance(value, (list, tuple)):
return [_cleanIdentifiersDeep(_) for _ in value]
elif isinstance(value, six.string_types):
return _cleanIdentifier(value)
return value
# Schema-listing content types: pure identifiers + types/counts, so identifier quoting is cleaned
# recursively for consistency with DUMP_TABLE (which is handled separately because it carries row data)
IDENTIFIER_KEYED_TYPES = (CONTENT_TYPE.TABLES, CONTENT_TYPE.COLUMNS, CONTENT_TYPE.SCHEMA, CONTENT_TYPE.COUNT)
def _sanitizeScanData(content_type, value):
"""
Reshapes an assembled result value into the clean, consumer-facing form used by BOTH the API
response and the --report-json file: internal detection/plumbing fields are dropped, the
per-technique map becomes a named list, and dumped-table identifiers are unquoted. Operates on
the dejsonized copy, so the live kb/session structures are never modified. Falls back to the raw
value on any surprise.
"""
try:
if content_type == CONTENT_TYPE.TECHNIQUES and isinstance(value, (list, tuple)):
cleaned = []
for injection in value:
if not isinstance(injection, dict):
cleaned.append(injection)
continue
injection = dict(injection)
for field in INJECTION_INTERNAL_FIELDS:
injection.pop(field, None)
techniques = injection.get("data")
if isinstance(techniques, dict):
# turn the {"1": {...}, "2": {...}} map (keyed by opaque technique ids) into an
# ordered list, each entry naming its technique (e.g. "boolean-based blind")
reduced = []
for stype in sorted(techniques, key=lambda _: int(_) if str(_).isdigit() else _):
details = techniques[stype]
if isinstance(details, dict):
details = dict(details)
for field in TECHNIQUE_INTERNAL_FIELDS:
details.pop(field, None)
key = int(stype) if str(stype).isdigit() else stype
entry = {"technique": PAYLOAD.SQLINJECTION.get(key, key)}
entry.update(details)
details = entry
reduced.append(details)
injection["data"] = reduced
cleaned.append(injection)
return cleaned
elif content_type == CONTENT_TYPE.DUMP_TABLE and isinstance(value, dict):
infos = value.get("__infos__") or {}
result = {"db": _cleanIdentifier(infos.get("db")), "table": _cleanIdentifier(infos.get("table")), "count": infos.get("count"), "columns": {}}
for column, cell in value.items():
if column == "__infos__":
continue
# clean the identifier, drop the per-column display 'length', keep just the values list
values = cell.get("values") if isinstance(cell, dict) else cell
if isinstance(values, (list, tuple)):
# sqlmap represents a DB NULL as a single space (DUMP_REPLACEMENTS); surface it as
# JSON null. An empty string "" is a genuine empty value and is left as-is.
values = [None if _ == " " else _ for _ in values]
result["columns"][_cleanIdentifier(column)] = values
return result
elif content_type in IDENTIFIER_KEYED_TYPES and isinstance(value, (dict, list, tuple)):
return _cleanIdentifiersDeep(value)
except Exception as ex:
logger.debug("failed to sanitize scan data (content type %s): %s" % (content_type, getSafeExString(ex)))
return value
def _assembleData(cursor, taskid):
"""
Assembles all stored results for a task into the canonical scan-data structure
{"success": True, "data": [{status, type, type_name, value}, ...], "error": [...]}.
Shared by the REST API endpoint /scan/<id>/data and the CLI --report-json writer so the two
produce identical output (the CLI report is this dict plus a 'meta' wrapper).
"""
json_data_message = list()
json_errors_message = list()
for status, content_type, value in cursor.execute("SELECT status, content_type, value FROM data WHERE taskid = ? ORDER BY id ASC", (taskid,)):
json_data_message.append({"status": status, "type": content_type, "type_name": CONTENT_TYPE_NAMES.get(content_type), "value": _sanitizeScanData(content_type, dejsonize(value))})
for error, in cursor.execute("SELECT error FROM errors WHERE taskid = ? ORDER BY id ASC", (taskid,)):
json_errors_message.append(error)
return {"success": True, "data": json_data_message, "error": json_errors_message}
def setupReportCollector():
"""
Creates an in-memory IPC-style database used to collect results for a CLI --report-json run.
Reuses the same Database/schema the REST API uses so capture+assembly logic is shared.
"""
collector = Database(":memory:")
collector.connect("report")
collector.init()
return collector
def writeReportJson(collector, filepath):
"""
Writes the collected results to filepath as JSON, in the same shape as the REST API's
/scan/<id>/data response, wrapped with a small 'meta' block for standalone consumers.
"""
result = _assembleData(collector, REPORT_TASKID)
result["meta"] = {
"api_version": int(RESTAPI_VERSION.split(".")[0]), # MAJOR only - the part that matters for client compatibility
"sqlmap_version": VERSION_STRING,
"url": conf.get("url"),
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
}
with openFile(filepath, "w+") as f:
f.write(getText(jsonize(result)))
# API objects
class Database(object):
filepath = None
@ -236,31 +428,7 @@ class StdDbOut(object):
def write(self, value, status=CONTENT_STATUS.IN_PROGRESS, content_type=None):
if self.messagetype == "stdout":
if content_type is None:
if kb.partRun is not None:
content_type = PART_RUN_CONTENT_TYPES.get(kb.partRun)
else:
# Ignore all non-relevant messages
return
output = conf.databaseCursor.execute("SELECT id, status, value FROM data WHERE taskid = ? AND content_type = ?", (self.taskid, content_type))
# Delete partial output from IPC database if we have got a complete output
if status == CONTENT_STATUS.COMPLETE:
if len(output) > 0:
for index in xrange(len(output)):
conf.databaseCursor.execute("DELETE FROM data WHERE id = ?", (output[index][0],))
conf.databaseCursor.execute("INSERT INTO data VALUES(NULL, ?, ?, ?, ?)", (self.taskid, status, content_type, jsonize(value)))
if kb.partRun:
kb.partRun = None
elif status == CONTENT_STATUS.IN_PROGRESS:
if len(output) == 0:
conf.databaseCursor.execute("INSERT INTO data VALUES(NULL, ?, ?, ?, ?)", (self.taskid, status, content_type, jsonize(value)))
else:
new_value = "%s%s" % (dejsonize(output[0][2]), value)
conf.databaseCursor.execute("UPDATE data SET value = ? WHERE id = ?", (jsonize(new_value), output[0][0]))
_storeData(conf.databaseCursor, self.taskid, value, status, content_type)
else:
conf.databaseCursor.execute("INSERT INTO errors VALUES(NULL, ?, ?)", (self.taskid, str(value) if value else ""))
@ -429,9 +597,13 @@ def task_list(token=None):
"""
tasks = {}
for key in DataStore.tasks:
for key in list(DataStore.tasks):
if is_admin(token) or DataStore.tasks[key].remote_addr == request.remote_addr:
tasks[key] = dejsonize(scan_status(key))["status"]
# NOTE: tolerate a task being deleted concurrently (scan_status would then return an
# error envelope without a "status" key); skip it rather than raising KeyError
status = dejsonize(scan_status(key)).get("status")
if status is not None:
tasks[key] = status
logger.debug("(%s) Listed task pool (%s)" % (token, "admin" if is_admin(token) else request.remote_addr))
return jsonize({"success": True, "tasks": tasks, "tasks_num": len(tasks)})
@ -606,23 +778,15 @@ def scan_data(taskid):
Retrieve the data of a scan
"""
json_data_message = list()
json_errors_message = list()
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to scan_data()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
# Read all data from the IPC database for the taskid
for status, content_type, value in DataStore.current_db.execute("SELECT status, content_type, value FROM data WHERE taskid = ? ORDER BY id ASC", (taskid,)):
json_data_message.append({"status": status, "type": content_type, "value": dejsonize(value)})
# Read all error messages from the IPC database
for error, in DataStore.current_db.execute("SELECT error FROM errors WHERE taskid = ? ORDER BY id ASC", (taskid,)):
json_errors_message.append(error)
# Read all data and error messages from the IPC database (shared assembler - same output as --report-json)
result = _assembleData(DataStore.current_db, taskid)
logger.debug("(%s) Retrieved scan data and error messages" % taskid)
return jsonize({"success": True, "data": json_data_message, "error": json_errors_message})
return jsonize(result)
# Functions to handle scans' logs
@get("/scan/<taskid>/log/<start>/<end>")
@ -702,7 +866,7 @@ def version(token=None):
"""
logger.debug("Fetched version (%s)" % ("admin" if is_admin(token) else request.remote_addr))
return jsonize({"success": True, "version": VERSION_STRING.split('/')[-1]})
return jsonize({"success": True, "version": VERSION_STRING.split('/')[-1], "api_version": int(RESTAPI_VERSION.split(".")[0])})
def server(host=RESTAPI_DEFAULT_ADDRESS, port=RESTAPI_DEFAULT_PORT, adapter=RESTAPI_DEFAULT_ADAPTER, username=None, password=None, database=None):
"""

View file

@ -176,6 +176,10 @@ def main():
init()
if conf.get("reportJson"):
from lib.utils.api import setupReportCollector
conf.reportCollector = setupReportCollector()
if not conf.updateAll:
# Postponed imports (faster start)
if conf.smokeTest:
@ -568,6 +572,21 @@ def main():
warnMsg = "your sqlmap version is outdated"
logger.warning(warnMsg)
# emit the JSON report BEFORE the closing banner, so it does not appear awkwardly after
# "[*] ending @ ..."
if conf.get("reportCollector") is not None:
try:
from lib.utils.api import writeReportJson
writeReportJson(conf.reportCollector, conf.reportJson)
logger.info("JSON report written to '%s'" % conf.reportJson)
except Exception as ex:
logger.error("unable to write JSON report to '%s' ('%s')" % (conf.reportJson, getSafeExString(ex)))
finally:
try:
conf.reportCollector.disconnect()
except Exception as ex:
logger.debug("problem occurred while closing the report collector ('%s')" % getSafeExString(ex))
if conf.get("showTime"):
dataToStdout("\n[*] ending @ %s\n\n" % time.strftime("%X /%Y-%m-%d/"), forceOutput=True)

View file

@ -1,7 +1,7 @@
openapi: 3.0.3
info:
title: sqlmap REST API
version: "1.0.0"
version: "2.0.0"
description: |
OpenAPI/Swagger specification for sqlmapapi.py, the sqlmap REST API server.
@ -48,11 +48,13 @@ paths:
get:
tags: [Version]
operationId: getVersion
summary: Fetch server version
description: Returns the sqlmap version string reported by the API server.
summary: Fetch server and API version
description: >-
Returns the sqlmap version string and the API contract version (api_version), which follows
semantic versioning independently of the sqlmap version so clients can check compatibility.
responses:
"200":
description: Server version returned.
description: Server and API version returned.
content:
application/json:
schema:
@ -62,6 +64,7 @@ paths:
value:
success: true
version: "1.10.6.51#dev"
api_version: 2
"401":
$ref: "#/components/responses/Unauthorized"
@ -459,8 +462,43 @@ paths:
success: true
data:
- status: 1
type: 0
value: []
type: 2
type_name: DBMS_FINGERPRINT
value: "back-end DBMS: MySQL >= 5.1"
- status: 1
type: 4
type_name: CURRENT_USER
value: "root@%"
- status: 1
type: 12
type_name: DBS
value: ["information_schema", "mysql", "testdb"]
- status: 1
type: 1
type_name: TECHNIQUES
value:
- place: GET
parameter: id
dbms: MySQL
dbms_version: [">= 5.1"]
os: null
notes: []
data:
- technique: "boolean-based blind"
title: "AND boolean-based blind - WHERE or HAVING clause"
payload: "id=1 AND 7997=7997"
vector: "AND [INFERENCE]"
comment: ""
- status: 1
type: 17
type_name: DUMP_TABLE
value:
db: testdb
table: users
count: 2
columns:
id: ["1", "2"]
name: ["admin", null]
error: []
"401":
$ref: "#/components/responses/Unauthorized"
@ -670,7 +708,7 @@ components:
VersionResponse:
type: object
required: [success, version]
required: [success, version, api_version]
properties:
success:
type: boolean
@ -679,6 +717,13 @@ components:
type: string
description: sqlmap version string without the `sqlmap/` prefix.
example: "1.10.6.51#dev"
api_version:
type: integer
description: >-
MAJOR API-contract version (integer), independent of the sqlmap version. Only the major
is exposed at runtime because only a major bump breaks clients; the full semantic version
is this document's info.version. Clients compare e.g. api_version == 2.
example: 2
additionalProperties: false
TaskNewResponse:
@ -811,16 +856,23 @@ components:
ScanDataItem:
type: object
required: [status, type, value]
required: [status, type, type_name, value]
properties:
status:
type: integer
description: Numeric content status stored by sqlmap.
description: Numeric content status (0 = in progress, 1 = complete).
example: 1
type:
type: integer
description: Numeric content type stored by sqlmap.
example: 0
example: 2
type_name:
type: string
nullable: true
description: >-
Human-readable name of the content type (e.g. "DBMS_FINGERPRINT", "CURRENT_USER",
"DBS", "TECHNIQUES", "DUMP_TABLE"). null for any unmapped type.
example: DBMS_FINGERPRINT
value:
anyOf:
- type: string
@ -832,7 +884,13 @@ components:
items: {}
- type: object
additionalProperties: true
description: JSON-decoded scan output value. Shape depends on the content type.
description: >-
JSON-decoded scan output value; its shape depends on the content type. Internal
plumbing is stripped: TECHNIQUES is a list of injection points whose "data" is a list of
techniques each named via a "technique" field (matchRatio/trueCode/falseCode/
templatePayload/where/conf are not exposed); DUMP_TABLE is
{db, table, count, columns: {column: [values]}} (the internal __infos__ wrapper and
per-column length are not exposed).
additionalProperties: true
ScanDataResponse:

218
tests/test_report.py Normal file
View file

@ -0,0 +1,218 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
JSON scan report collector/assembler (lib/utils/api.py), shared by the REST API
endpoint /scan/<id>/data and the CLI --report-json writer.
The whole point of the feature is that both produce the SAME structure, so these
tests pin the shared contract: the per-content_type merge (partial -> complete),
the assembled {success, data:[{status,type,type_name,value}], error} shape, the
partRun fallback for untyped output, and the meta-wrapped file written to disk.
A regression here is a divergence between the API and the report - the exact bug
this design exists to prevent.
"""
import json
import os
import sys
import tempfile
import unittest
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _testutils import bootstrap
bootstrap()
import lib.utils.api as api
from lib.core.data import conf, kb
from lib.core.enums import CONTENT_TYPE, CONTENT_STATUS
class _CollectorCase(unittest.TestCase):
def setUp(self):
self.c = api.setupReportCollector()
self._saved_partRun = kb.get("partRun")
def tearDown(self):
kb.partRun = self._saved_partRun
try:
self.c.disconnect()
except Exception:
pass
def _store(self, value, content_type, status=CONTENT_STATUS.COMPLETE):
api._storeData(self.c, api.REPORT_TASKID, value, status, content_type)
class TestAssembledShape(_CollectorCase):
def test_structure_and_typename(self):
self._store("MySQL >= 5.0.12", CONTENT_TYPE.DBMS_FINGERPRINT)
result = api._assembleData(self.c, api.REPORT_TASKID)
self.assertEqual(result["success"], True)
self.assertEqual(result["error"], [])
self.assertEqual(len(result["data"]), 1)
entry = result["data"][0]
self.assertEqual(sorted(entry.keys()), ["status", "type", "type_name", "value"])
self.assertEqual(entry["type"], CONTENT_TYPE.DBMS_FINGERPRINT)
self.assertEqual(entry["type_name"], "DBMS_FINGERPRINT") # int -> readable name
self.assertEqual(entry["value"], "MySQL >= 5.0.12")
def test_structured_values_preserved(self):
# dict / list / bool must survive as native JSON types (not stringified) - this is what
# makes the report machine-consumable, exactly like the API
self._store({"url": "http://h/?id=1", "data": None}, CONTENT_TYPE.TARGET)
self._store(["a", "b", "c"], CONTENT_TYPE.DBS)
self._store(True, CONTENT_TYPE.IS_DBA)
by_type = {d["type"]: d["value"] for d in api._assembleData(self.c, api.REPORT_TASKID)["data"]}
self.assertEqual(by_type[CONTENT_TYPE.TARGET], {"url": "http://h/?id=1", "data": None})
self.assertEqual(by_type[CONTENT_TYPE.DBS], ["a", "b", "c"])
self.assertIs(by_type[CONTENT_TYPE.IS_DBA], True)
class TestMergeSemantics(_CollectorCase):
def test_complete_replaces_partials(self):
# the API appends IN_PROGRESS chunks then a COMPLETE replaces them; final value is COMPLETE
self._store("roo", CONTENT_TYPE.CURRENT_USER, CONTENT_STATUS.IN_PROGRESS)
self._store("t@localhost", CONTENT_TYPE.CURRENT_USER, CONTENT_STATUS.COMPLETE)
data = api._assembleData(self.c, api.REPORT_TASKID)["data"]
self.assertEqual(len(data), 1) # one row, not two
self.assertEqual(data[0]["value"], "t@localhost")
self.assertEqual(data[0]["status"], CONTENT_STATUS.COMPLETE)
def test_inprogress_chunks_accumulate(self):
self._store("foo", CONTENT_TYPE.BANNER, CONTENT_STATUS.IN_PROGRESS)
self._store("bar", CONTENT_TYPE.BANNER, CONTENT_STATUS.IN_PROGRESS)
data = api._assembleData(self.c, api.REPORT_TASKID)["data"]
self.assertEqual(data[0]["value"], "foobar") # appended
class TestPartRunFallback(_CollectorCase):
def test_untyped_output_tagged_via_partrun(self):
# untyped output during a part-run (e.g. the fingerprint line) is tagged by kb.partRun -
# this is how DBMS_FINGERPRINT is captured with no explicit content_type
kb.partRun = "getFingerprint"
self._store("back-end DBMS: MySQL >= 5.1", None) # content_type=None
data = api._assembleData(self.c, api.REPORT_TASKID)["data"]
self.assertEqual(len(data), 1)
self.assertEqual(data[0]["type"], CONTENT_TYPE.DBMS_FINGERPRINT)
self.assertEqual(data[0]["value"], "back-end DBMS: MySQL >= 5.1")
def test_untyped_output_without_partrun_is_ignored(self):
kb.partRun = None
self._store("just a log line", None)
self.assertEqual(api._assembleData(self.c, api.REPORT_TASKID)["data"], [])
class TestSanitize(unittest.TestCase):
"""The shared assembler strips internal plumbing (matchRatio/trueCode/falseCode/templatePayload/
where/conf) from TECHNIQUES and restructures DUMP_TABLE (drop __infos__ wrapper + per-column
'length'), so neither the API nor the report leaks consumer-irrelevant internals. Deterministic
(no run variance), unlike the live API-vs-report comparison."""
def test_techniques_internals_stripped_and_named(self):
injection = {
"place": "GET", "parameter": "id", "ptype": 1, "dbms": "MySQL",
"conf": {"string": "x", "regexp": None}, # internal -> must be dropped
"data": {"1": {"title": "boolean", "payload": "id=1 AND 1=1", "vector": "AND [INFERENCE]",
"comment": "", "where": 1, "matchRatio": 0.74, "trueCode": 200,
"falseCode": 200, "templatePayload": None},
"6": {"title": "union", "payload": "id=1 UNION ...", "vector": "...", "comment": ""}},
}
injection["ptype"] = 1
injection["clause"] = [1, 8, 9]
injection["prefix"] = ""
injection["suffix"] = ""
original = json.loads(json.dumps(injection)) # deep copy to prove no mutation
out = api._sanitizeScanData(CONTENT_TYPE.TECHNIQUES, [injection])[0]
# detection/construction internals dropped
for field in ("conf", "ptype", "clause", "prefix", "suffix"):
self.assertNotIn(field, out)
# data is now an ordered LIST (not a map keyed by opaque ids), each entry named
self.assertIsInstance(out["data"], list)
self.assertEqual([t["technique"] for t in out["data"]], ["boolean-based blind", "UNION query"])
first = out["data"][0]
self.assertEqual(sorted(first.keys()), ["comment", "payload", "technique", "title", "vector"])
self.assertEqual(first["payload"], "id=1 AND 1=1") # consumer-relevant fields preserved
self.assertEqual(out["dbms"], "MySQL")
# input not mutated (operates on a copy - must not corrupt live kb.injections)
self.assertEqual(injection, original)
def test_dump_table_restructured_and_unquoted(self):
value = {
"__infos__": {"db": "`master`", "table": "users", "count": 3},
"id": {"length": 2, "values": ["1", "2", "3"]},
"`name`": {"length": 9, "values": ["alice", " ", ""]}, # backtick id; " " is a DB NULL, "" is empty
}
out = api._sanitizeScanData(CONTENT_TYPE.DUMP_TABLE, value)
self.assertEqual(sorted(out.keys()), ["columns", "count", "db", "table"])
self.assertNotIn("__infos__", out)
self.assertEqual(out["db"], "master") # quoting stripped (context-free)
self.assertEqual(out["table"], "users")
self.assertEqual(out["count"], 3)
# columns flattened to value lists (no 'length'), identifiers unquoted
self.assertEqual(out["columns"]["id"], ["1", "2", "3"])
self.assertNotIn("`name`", out["columns"])
# DB NULL (" ") -> JSON null; genuine empty string ("") preserved
self.assertEqual(out["columns"]["name"], ["alice", None, ""])
def test_schema_listing_identifiers_cleaned(self):
# TABLES/COLUMNS/SCHEMA/COUNT must have their identifiers unquoted too (consistency with
# DUMP_TABLE) - a regression here is the exact "X cleaned but Y not" inconsistency to avoid
tables = api._sanitizeScanData(CONTENT_TYPE.TABLES, {"`master`": ["users", "`order`"]})
self.assertEqual(tables, {"master": ["users", "order"]})
columns = api._sanitizeScanData(CONTENT_TYPE.COLUMNS,
{"`master`": {"users": {"id": "int", "`name`": "varchar(500)"}}})
self.assertEqual(columns, {"master": {"users": {"id": "int", "name": "varchar(500)"}}})
schema = api._sanitizeScanData(CONTENT_TYPE.SCHEMA, {"sys": {"w": {"`events`": "varchar(128)"}}})
self.assertEqual(schema, {"sys": {"w": {"events": "varchar(128)"}}})
count = api._sanitizeScanData(CONTENT_TYPE.COUNT, {"`master`": {"5": ["users"]}})
self.assertEqual(count, {"master": {"5": ["users"]}})
def test_identifier_unquoting_is_context_free(self):
# all DBMS quote styles handled without Backend context (so CLI and API server agree)
self.assertEqual(api._cleanIdentifier("`tbl`"), "tbl") # MySQL
self.assertEqual(api._cleanIdentifier('"tbl"'), "tbl") # PostgreSQL/Oracle
self.assertEqual(api._cleanIdentifier("[tbl]"), "tbl") # MSSQL
self.assertEqual(api._cleanIdentifier("plain"), "plain")
def test_other_types_pass_through(self):
# non-TECHNIQUES/DUMP_TABLE values are returned unchanged
self.assertEqual(api._sanitizeScanData(CONTENT_TYPE.CURRENT_USER, "root@%"), "root@%")
self.assertEqual(api._sanitizeScanData(CONTENT_TYPE.DBS, ["a", "b"]), ["a", "b"])
self.assertIs(api._sanitizeScanData(CONTENT_TYPE.IS_DBA, True), True)
class TestErrors(_CollectorCase):
def test_errors_captured(self):
self.c.execute("INSERT INTO errors VALUES(NULL, ?, ?)", (api.REPORT_TASKID, "something failed"))
result = api._assembleData(self.c, api.REPORT_TASKID)
self.assertEqual(result["error"], ["something failed"])
class TestWriteReportJson(_CollectorCase):
def test_file_is_valid_json_with_meta(self):
self._store("admin", CONTENT_TYPE.CURRENT_USER)
saved_url = conf.get("url")
conf.url = "http://target/?id=1"
fd, path = tempfile.mkstemp(suffix=".json")
os.close(fd)
try:
api.writeReportJson(self.c, path)
loaded = json.load(open(path))
# core shape == API /scan/<id>/data, plus a meta wrapper
self.assertEqual(sorted(loaded.keys()), ["data", "error", "meta", "success"])
self.assertEqual(loaded["data"][0]["value"], "admin")
self.assertEqual(loaded["data"][0]["type_name"], "CURRENT_USER")
self.assertEqual(loaded["meta"]["url"], "http://target/?id=1")
self.assertEqual(loaded["meta"]["api_version"], 2) # MAJOR-only integer, for compatibility checks
self.assertIn("sqlmap_version", loaded["meta"])
self.assertIn("timestamp", loaded["meta"])
finally:
conf.url = saved_url
os.remove(path)
if __name__ == "__main__":
unittest.main(verbosity=2)