sqlmap/tests/test_http2.py
2026-07-01 15:19:30 +02:00

283 lines
11 KiB
Python

#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
Unit coverage for the PURE (network-free) parts of the native HTTP/2 client in
lib/request/http2.py: the RFC 7540 frame codec, the RFC 7541 HPACK integer /
Huffman / string primitives, the HPACK Decoder/Encoder (static + dynamic table),
and the urllib-compatible H2Response wrapper.
Nothing here opens a socket or negotiates TLS - only the deterministic codecs and
the response adapter are exercised. Known vectors are the canonical RFC 7541
examples; everything else is a round-trip / invariant check.
stdlib unittest only (no pytest / no pip); works on Python 2.7 and 3.x.
"""
import binascii
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _testutils import bootstrap
bootstrap()
from lib.request.http2 import (
Decoder,
Encoder,
H2Response,
REDIRECT_CODES,
STATIC_LEN,
STATIC_TABLE,
DATA,
HEADERS,
FLAG_END_HEADERS,
FLAG_END_STREAM,
decode_frame_header,
decode_integer,
decode_string,
encode_frame,
encode_integer,
encode_string,
huffman_decode,
huffman_encode,
)
def _b(*ints):
# build a bytes object from ints (identical on Python 2 and 3)
return bytes(bytearray(ints))
class TestFrameCodec(unittest.TestCase):
def test_roundtrip(self):
header = encode_frame(HEADERS, FLAG_END_HEADERS, 1, b"abc")[:9]
self.assertEqual(decode_frame_header(header), (3, HEADERS, FLAG_END_HEADERS, 1))
def test_payload_is_appended_verbatim(self):
frame = encode_frame(DATA, 0, 1, b"hello")
self.assertEqual(frame[9:], b"hello")
def test_reserved_stream_bit_is_masked(self):
# the high (reserved) bit of the 31-bit stream id must be dropped on both ends
header = encode_frame(DATA, 0, 0x80000001, b"")[:9]
self.assertEqual(decode_frame_header(header), (0, DATA, 0, 1))
def test_zero_length_payload(self):
header = encode_frame(DATA, FLAG_END_STREAM, 1, b"")[:9]
length, _, flags, _ = decode_frame_header(header)
self.assertEqual(length, 0)
self.assertEqual(flags, FLAG_END_STREAM)
def test_oversized_payload_rejected(self):
with self.assertRaises(ValueError):
encode_frame(DATA, 0, 1, b"x" * (0xFFFFFF + 1))
def test_bad_header_length_rejected(self):
with self.assertRaises(ValueError):
decode_frame_header(b"123")
class TestIntegerCoding(unittest.TestCase):
def test_rfc_c11_small(self):
# RFC 7541 C.1.1: 10 with a 5-bit prefix fits in the prefix
self.assertEqual(list(encode_integer(10, 5)), [10])
def test_rfc_c12_multibyte(self):
# RFC 7541 C.1.2: 1337 with a 5-bit prefix
self.assertEqual(list(encode_integer(1337, 5)), [31, 154, 10])
self.assertEqual(decode_integer(bytearray([31, 154, 10]), 0, 5), (1337, 3))
def test_rfc_c13_full_byte_prefix(self):
# RFC 7541 C.1.3: 42 starting from a full (8-bit prefix at an octet boundary)
self.assertEqual(list(encode_integer(42, 8)), [42])
def test_roundtrip_across_prefixes(self):
for prefix in (4, 5, 6, 7, 8):
for value in (0, 1, 2, 30, 31, 32, 127, 128, 255, 256, 16384, 1000000):
encoded = bytearray(encode_integer(value, prefix))
decoded, pos = decode_integer(encoded, 0, prefix)
self.assertEqual(decoded, value)
self.assertEqual(pos, len(encoded))
def test_first_byte_bits_preserved(self):
# a caller-supplied opcode in the high bits must survive a small value
self.assertEqual(bytearray(encode_integer(5, 7, 0x80))[0], 0x80 | 5)
class TestHuffman(unittest.TestCase):
def test_known_vector_www_example_com(self):
# RFC 7541 C.4.1
self.assertEqual(binascii.hexlify(huffman_encode(b"www.example.com")), b"f1e3c2e5f23a6ba0ab90f4ff")
def test_empty(self):
self.assertEqual(huffman_encode(b""), b"")
self.assertEqual(huffman_decode(b""), b"")
def test_roundtrip(self):
for sample in (b"a", b"hello world", b"/index.html?a=1&b=2",
b"GET", b"application/json", b"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789",
bytes(bytearray(range(256)))):
self.assertEqual(huffman_decode(huffman_encode(sample)), sample)
def test_shrinks_typical_text(self):
sample = b"www.example.com"
self.assertLess(len(huffman_encode(sample)), len(sample))
def test_padding_too_long_rejected(self):
# 0xfe walks eight 1-bits into a long (unterminated) code -> more than a byte of padding
with self.assertRaises(ValueError):
huffman_decode(_b(0xFE))
class TestStringCoding(unittest.TestCase):
def test_huffman_branch_roundtrip(self):
encoded = encode_string(b"custom-value")
self.assertTrue(bytearray(encoded)[0] & 0x80) # huffman flag set for compressible text
self.assertEqual(decode_string(bytearray(encoded), 0), (b"custom-value", len(encoded)))
def test_literal_branch_when_huffman_would_not_shrink(self):
encoded = encode_string(_b(0xFF))
self.assertFalse(bytearray(encoded)[0] & 0x80) # falls back to a literal string
self.assertEqual(decode_string(bytearray(encoded), 0), (_b(0xFF), len(encoded)))
def test_disable_huffman(self):
encoded = encode_string(b"abc", huffman=False)
self.assertFalse(bytearray(encoded)[0] & 0x80)
self.assertEqual(decode_string(bytearray(encoded), 0), (b"abc", len(encoded)))
class TestHpackDecoder(unittest.TestCase):
def test_indexed_static_entries(self):
# 0x82/0x86/0x84 -> static indices 2, 6, 4
self.assertEqual(
Decoder().decode(_b(0x82, 0x86, 0x84)),
[(b":method", b"GET"), (b":scheme", b"http"), (b":path", b"/")],
)
def test_static_lookup_bounds(self):
d = Decoder()
self.assertEqual(d._get(1), (b":authority", b""))
self.assertEqual(d._get(2), (b":method", b"GET"))
self.assertEqual(d._get(STATIC_LEN), STATIC_TABLE[-1])
def test_index_zero_rejected(self):
with self.assertRaises(ValueError):
Decoder()._get(0)
def test_index_out_of_range_rejected(self):
with self.assertRaises(ValueError):
Decoder()._get(STATIC_LEN + 1) # no dynamic entries yet
def test_literal_incremental_indexing_populates_dynamic_table(self):
# 0x40 = literal with incremental indexing, new name
block = bytearray([0x40]) + encode_string(b"custom-key") + encode_string(b"custom-value")
d = Decoder()
self.assertEqual(d.decode(bytes(block)), [(b"custom-key", b"custom-value")])
# entry is now addressable at the first dynamic index (STATIC_LEN + 1)
self.assertEqual(d._get(STATIC_LEN + 1), (b"custom-key", b"custom-value"))
self.assertEqual(d._size, 32 + len(b"custom-key") + len(b"custom-value"))
def test_literal_without_indexing_does_not_touch_dynamic_table(self):
block = bytearray([0x00]) + encode_string(b"k") + encode_string(b"v")
d = Decoder()
self.assertEqual(d.decode(bytes(block)), [(b"k", b"v")])
self.assertEqual(d.dynamic, [])
def test_dynamic_table_eviction(self):
d = Decoder(max_size=40) # each 2+2 byte entry costs 32+2+2 = 36
d._add(b"aa", b"bb")
self.assertEqual(len(d.dynamic), 1)
d._add(b"cc", b"dd") # 72 > 40 -> oldest evicted
self.assertEqual(d.dynamic, [(b"cc", b"dd")])
self.assertEqual(d._size, 36)
def test_dynamic_size_update_clears(self):
d = Decoder()
d._add(b"x", b"y")
d.decode(_b(0x20)) # 0x20 = dynamic table size update to 0
self.assertEqual(d.max_size, 0)
self.assertEqual(d.dynamic, [])
class TestHpackEncoderRoundTrip(unittest.TestCase):
def test_roundtrip_through_decoder(self):
headers = [
(b":method", b"GET"),
(b":scheme", b"https"),
(b":path", b"/a/b?c=d"),
(b":authority", b"example.com"),
(b"user-agent", b"sqlmap"),
(b"accept", b""), # empty value
(b"x-custom", b"\x00\x01\xff"), # non-ASCII value
]
self.assertEqual(Decoder().decode(Encoder().encode(headers)), headers)
def test_encoder_output_is_bytes(self):
self.assertIsInstance(Encoder().encode([(b"a", b"b")]), bytes)
class TestH2Response(unittest.TestCase):
def _make(self, status=200, headers=None, body=b"body"):
headers = headers if headers is not None else [(b":status", b"200"), (b"content-type", b"text/html")]
return H2Response("https://target/x", status, headers, body)
def test_basic_fields(self):
r = self._make()
self.assertEqual(r.code, 200)
self.assertEqual(r.status, 200)
self.assertEqual(r.msg, "OK")
self.assertEqual(r.http_version, "HTTP/2.0")
self.assertEqual(r.geturl(), "https://target/x")
def test_unknown_status_message(self):
self.assertEqual(self._make(status=799).msg, "")
def test_pseudo_headers_stripped(self):
r = self._make()
self.assertNotIn(":status", r.info())
self.assertEqual(r.info().get("content-type"), "text/html")
def test_read_full_then_empty(self):
r = self._make(body=b"hello")
self.assertEqual(r.read(), b"hello")
self.assertEqual(r.read(), b"") # offset exhausted
def test_read_in_chunks(self):
r = self._make(body=b"abcdef")
self.assertEqual(r.read(2), b"ab")
self.assertEqual(r.read(3), b"cde")
self.assertEqual(r.read(10), b"f") # asking past the end returns the remainder
self.assertEqual(r.read(10), b"")
def test_str_header_names_accepted(self):
# headers may arrive already decoded to str (not only bytes)
r = H2Response("https://t/", 200, [("content-type", "application/json")], b"{}")
self.assertEqual(r.info().get("content-type"), "application/json")
def test_mimetools_style_headers_list(self):
# patchHeaders() relies on a '.headers' list of "Name: value\r\n" lines being present
r = self._make()
self.assertTrue(hasattr(r.info(), "headers"))
self.assertIn("content-type: text/html\r\n", r.info().headers)
def test_close_is_noop(self):
self.assertIsNone(self._make().close())
class TestConstants(unittest.TestCase):
def test_redirect_codes(self):
for code in (301, 302, 303, 307, 308):
self.assertIn(code, REDIRECT_CODES)
self.assertNotIn(200, REDIRECT_CODES)
def test_static_table_length(self):
self.assertEqual(STATIC_LEN, len(STATIC_TABLE))
self.assertEqual(STATIC_LEN, 61) # RFC 7541 Appendix A
if __name__ == "__main__":
unittest.main(verbosity=2)