diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 58aeb75b425..18afa00b406 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -47,7 +47,10 @@ jobs: run: python -B -m unittest discover -s tests -p "test_*.py" - name: Smoke test - run: python sqlmap.py --smoke + run: python sqlmap.py --smoke-test - name: Vuln test - run: python sqlmap.py --vuln + run: python sqlmap.py --vuln-test + + - name: API test + run: python sqlmap.py --api-test diff --git a/data/txt/sha256sums.txt b/data/txt/sha256sums.txt index 03b0a1934ae..55391e1c09c 100644 --- a/data/txt/sha256sums.txt +++ b/data/txt/sha256sums.txt @@ -162,12 +162,12 @@ df768bcb9838dc6c46dab9b4a877056cb4742bd6cfaaf438c4a3712c5cc0d264 extra/shutils/ 9e5e4d3d9acb767412259895a3ee75e1a5f42d0b9923f17605d771db384a6f60 extra/vulnserver/vulnserver.py b8411d1035bb49b073476404e61e1be7f4c61e205057730e2f7880beadcd5f60 lib/controller/action.py 6da812281a69c8b7a5181c2f76374dc695e4727b2936042651bacbeda4e6bcc9 lib/controller/checks.py -c1881685bef8504ded32c51abed00ab51849008c84b74e8a66117e5f5041b3df lib/controller/controller.py +6068e48ec6337a6955ca6c9ca4479bf6dabaf963f28b459d9c52cee3910f3cda lib/controller/controller.py d69e84f1648cdb907f5d2dd454f03874a4613752b07867510145d51d84b3c56f lib/controller/handler.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/controller/__init__.py b36b085ff1b5797e375c1e2ca3b12c7ab4204f48acd1a1efb075cff8302d9750 lib/core/agent.py ca3e5ce56cb1cae0a8e815425ab6810068004bffe8861d1037c7c87c0ae02477 lib/core/bigarray.py -2e5ee80b24bd6dd961b64357e745012145a44d52c49a525d8f5f5e893a8ccb8d lib/core/common.py +1452ffc42657bea207583173de9829dddf4afd9b159c785284e43878de492afb lib/core/common.py 8f1272487e1adfcc8c755a2f56f0c6d21eac5e685a73a9a159482f9dc9142bc5 lib/core/compat.py 742bce10b97034966021ec60c7ac294db4af4fe7893613d63172a02c29f009f8 lib/core/convert.py c03dc585f89642cfd81b087ac2723e3e1bb3bfa8c60e6f5fe58ef3b0113ebfe6 lib/core/data.py @@ -175,12 +175,12 @@ c03dc585f89642cfd81b087ac2723e3e1bb3bfa8c60e6f5fe58ef3b0113ebfe6 lib/core/data. 70fb2528e580b22564899595b0dff6b1bc257c6a99d2022ce3996a3d04e68e4e lib/core/decorators.py 147823c37596bd6a56d677697781f34b8d1d1671d5a2518fbc9468d623c6d07d lib/core/defaults.py 2f44a1bfe6f18aafe64147b99e69aa93cf438c0e7befe59f4e2aee9065c8b7b6 lib/core/dicts.py -8aee07fba24082ee6355a29d01842bc3657194148a7f9062079b5f0a85ec53e3 lib/core/dump.py -23e33f0b457e2a7114c9171ba9b42e1751b71ee3f384bba7fad39e4490adb803 lib/core/enums.py +2592b0fd38c272c0b0d49878f4449437eb8ba8ff7536bb39b2ac9a2511010f7c lib/core/dump.py +6b9932d9c789a0e2ac28a493fb7914f49100a1c91de989bcdb20df9d40648522 lib/core/enums.py 5387168e5dfedd94ae22af7bb255f27d6baaca50b24179c6b98f4f325f5cc7b4 lib/core/exception.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/core/__init__.py 914a13ee21fd610a6153a37cbe50830fcbd1324c7ebc1e7fc206d5e598b0f7ad lib/core/log.py -67ea32c993cbf23cdbd5170360c020ca33363b7c516ff3f8da4124ef7cb0254d lib/core/optiondict.py +3ec59b5eb336d9808d28496f1cbbad716b4a0e276b5399023142826e460e3fd2 lib/core/optiondict.py 3ff871fe8391952c3ec3bb528ba592a13926c80ca0b68fd322a317f69a651ef7 lib/core/option.py ccc4a717e887652b1fcce073d9409d9c59a3b28548c703a9e453d15845f90cd7 lib/core/patch.py 49c0fa7e3814dfda610d665ee02b12df299b28bc0b6773815b4395514ddf8dec lib/core/profiling.py @@ -188,18 +188,18 @@ ccc4a717e887652b1fcce073d9409d9c59a3b28548c703a9e453d15845f90cd7 lib/core/patch 48797d6c34dd9bb8a53f7f3794c85f4288d82a9a1d6be7fcf317d388cb20d4b3 lib/core/replication.py 0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py 888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py -8eb10b15440aaa6ddc592e1b29199e9fa575df6b46335fcf7b7374c5f8f68480 lib/core/settings.py +ef64975437d734f34f15026d9fec87eb147999912c187985a2c83c9bb3ffb08e lib/core/settings.py cd5a66deee8963ba8e7e9af3dd36eb5e8127d4d68698811c29e789655f507f82 lib/core/shell.py bcb5d8090d5e3e0ef2a586ba09ba80eef0c6d51feb0f611ed25299fbb254f725 lib/core/subprocessng.py 70ea3768f1b3062b22d20644df41c86238157ec80dd43da40545c620714273c6 lib/core/target.py -8bbc9312147ee8ca719860bc7ad472eac25230e4d46976fbb405efe43fe15ef6 lib/core/testing.py +daf2ad65fcea430b6272e3c538022c9871fdc3aba78f71669130fb0bc954c78e lib/core/testing.py e3e653364d08d04d7492aa40a2bd29c6a28f4d78fecdd6c10f21f6cb28b98b4c lib/core/threads.py b9aacb840310173202f79c2ba125b0243003ee6b44c92eca50424f2bdfc83c02 lib/core/unescaper.py 53e396902cb2546eaa09e77073fcba8be8827ee9ce055cfc899e81b0e6ad4d6d lib/core/update.py 2400e465fa4d13e4c32795910878c71ff212e4361b46428d57ce43983f5e997c lib/core/wordlist.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/__init__.py 54bfd31ebded3ffa5848df1c644f196eb704116517c7a3d860b5d081e984d821 lib/parse/banner.py -4c56ad26ffb893d37813167de172b6c95c120588bfdc899f102977a2997b9bb9 lib/parse/cmdline.py +053079fe796dfce09cf94ac6f094043f2dfa393b5631387fadb4f735cf1ac6a4 lib/parse/cmdline.py 02d82e4069bd98c52755417f8b8e306d79945672656ac24f1a45e7a6eff4b158 lib/parse/configfile.py c5b258be7485089fac9d9cd179960e774fbd85e62836dc67cce76cc028bb6aeb lib/parse/handler.py 5c9a9caee948843d5537745640cc7b98d70a0412cc0949f59d4ebe8b2907c06c lib/parse/headers.py @@ -230,18 +230,18 @@ f522436fbd14bdab090a1d305fcac0361800cb8e36c8cbcb47933298376a71e0 lib/takeover/r 0787f78e6bd9bb21d4267c95c4c99806711bb57c5518485c2e25f10fcf9c41fc lib/takeover/udf.py 23d73af417604dab460b74cdc230896153f018a6c00d144019491053640a172f lib/takeover/web.py 8cc1e226d4150fe8aa1a056e5d32d858ed6444d3d4e2af7fb4bc08f0bbe9d527 lib/takeover/xp_cmdshell.py -7b62bbb4d94f1271380a44142b407dc9eeed1d8b0319cdad57493dc1a12caff8 lib/techniques/blind/inference.py +09c3759b59bc111712f75b0b1762d195c0da0e0741dd76379546c429e8ed4457 lib/techniques/blind/inference.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/blind/__init__.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/dns/__init__.py 3df9839fb92a81d46b6194d7adacb43f391efb78b071783c132e8d596ecbfaf1 lib/techniques/dns/test.py 2934514a60cbcd48675053a73f785b4c7bfe606b51c34ae81a86818362ec4672 lib/techniques/dns/use.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/error/__init__.py -f552b6140d4069be6a44792a08f295da8adabc1c4bb6a5e100f222f87144ca9d lib/techniques/error/use.py +ee63b978154b0cb9a385fe51926ef6dc6f425b07f62b0d17208e82b4ac020f5c lib/techniques/error/use.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/__init__.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/union/__init__.py 30cae858e2a5a75b40854399f65ad074e6bb808d56d5ee66b94d4002dc6e101b lib/techniques/union/test.py -a8a795f29ec6fd66482926f04b054ed492a033982c3b7837c5d2ea32368acec0 lib/techniques/union/use.py -8720a744d46471fe46f5a67e16b2d4147339c6685fbf0fdf50f1a40e9a75c23a lib/utils/api.py +5b49f5bca4e35362fa7d83896e0769fdb01ad152f30059aafd8ce0f093400a3f lib/techniques/union/use.py +aeefb42ea0c68f72744bc1bfd7194ec1bc06480d8a7e23f4b8d3d23fbba2b014 lib/utils/api.py 442555ab85277aff7c9e0cf465ea5b0d28395c326f68363449b2d3941f4b6de2 lib/utils/brute.py da5bcbcda3f667582adf5db8c1b5d511b469ac61b55d387cec66de35720ed718 lib/utils/crawler.py a94958be0ec3e9d28d8171813a6a90655a9ad7e6aa33c661e8d8ebbfcf208dbb lib/utils/deps.py @@ -490,9 +490,9 @@ cedf45d33461bd7e5400d06611a63c8a4ffae1a4510030c5696b9d46ed6a9883 plugins/generi 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 plugins/__init__.py 5d72f0af46ff3c9e3fe80300e83cb78749132278e8db88915764a94d7130a04c README.md 46517f1444c202710e388873960130850ed092e17bd6f4dd5f2fedea3dbb8ffc sqlmapapi.py -e0607378f46f7664349552c628f25c4689569c788fd2364eef3075dd2cce127b sqlmapapi.yaml +f09d1b06901e7e02d0dbf4de607f6a4a9889acc322ae9353b98ea9101fb9548a sqlmapapi.yaml 627d90f1194335b800cbc9cc78db6697cf9e02e193a83598e0d4d0abb55b63b8 sqlmap.conf -65159b82795604069a2d14ccbd1f66e888a26b05db0401a1ddadb40c665c93dc sqlmap.py +f8974aac701639b54ca34b0e11803c836e5cb1e1c5a6eaf275315949b6487310 sqlmap.py eb37a88357522fd7ad00d90cdc5da6b57442b4fec49366aadb2944c4fbf8b804 tamper/0eunion.py a9785a4c111d6fee2e6d26466ba5efb3b229c00520b26e8024b041553b53efba tamper/apostrophemask.py cf26bc8006519bd25ce06d347f72770cd75b61575cf65e5812274e8ab9392eb4 tamper/apostrophenullencode.py @@ -577,6 +577,7 @@ a48c411fea864e6bcd6a1c7e1a35094b8cda8d15088fd9e7b0270542ae20daa9 tests/test_com 3804eb2d730220360f9dc07d5994eb64e9f65acf3b0d8648df8df2a2177ba8fd tests/test_decodepage.py e40a49cfa73c45b3c3c6d1d1d00738861e270cb7a07b28f5a5356f9c7c800cf2 tests/test_dialect.py 993a2d4d87c4fbaf261663b069629acc95ee4405aa0c42cf5a8f39649fdb0fff tests/test_dicts.py +9cd5841349bc4db818658d12184929a96f7f279eff1f53ad18a54dbefbd6b276 tests/test_dump_jsonl.py 2bbe4b01f79992cfa8884651fc0a28dbd0e3abb0cbea9eb7eadf1f98ca3c3420 tests/test_encoding.py bb6991260a994fcbe79e05febaa34affd5631d02299fbc626820addd5f6ea4f4 tests/test_error_engine.py 8105de9978fe286a29f6b635a58db1e9998d86e8dded54d7efdfb9d52a121094 tests/test_hashdb.py @@ -584,10 +585,13 @@ c04e8358fb6df45f69f2f26435c971acde280535bf304e84d30cf2681158c6a7 tests/test_has 205e84827461101a78b2cffaa3de49795a1214e92276fc7fd40f3456657062b9 tests/test_identifiers_output.py 5372270b7ed82b62f273c2e9bd1f7ecd8605371e66cd0ad70663762cb08d42f1 tests/test_inference_engine.py caa06fed7323b2bb6d0f2443ce343de94f75bf8ad012c055d5e07741d908ebad tests/test_misc.py +57fa9713a3186020be8bcc3f06399e92bf9ce82ec6d3413c76babe19606bb698 tests/test_openapi_drift.py cde0bea1263ae857561f91ed2bd515e972b716743f017d31b1718a8546c72759 tests/test_pagecontent.py 4bac34af2abddce003756d6776e89b2fda220bb7603ef3761f4f37ee29f9c369 tests/test_payload_marking.py 6bfc8201724078bd9d6d559916ef73c9ff97e19b0f2948f37e588a49b027795f tests/test_payloads_structure.py +5dc46919f971f89a3073118ec00bf420cc9cecf0b072b2f896df2f860e87adec tests/test_property.py 5c95e7863190e440234f231864fb1219c35207132762858cc95181c57086bafc tests/test_replication.py +67a5241aeebc20eb1c20cfc490422a59af5179040824e5731bd785db2e6bf750 tests/test_report.py cec98d72992c0799229a780fa7f0d7f3fb01ec2d708187ce0e4a05c8612f291b tests/test_safe2bin.py a1c6cda1e5b483f61e6a4f8ddd0b06a15ddaa3fd2119bfb9dbd9cc970d7a751d tests/test_settings_regex.py d3d991331096e16e5019de3d652e9fff92c09bd9f97c50b1c2c3ceb0ed49b17e tests/test_sqlparse.py @@ -596,8 +600,9 @@ f3a628db8a3e05baee580c02132e95b164695e4b3ee1785707e3ea148702449a tests/test_tam b3e13febe9e0ff6f97334f2868655bfdbaa18755e464a6dc4c6d424f513bad02 tests/test_targeturl.py 639851dc68f62b559b200b09c308e64e453f414969940005bac75dc0ab07a6b6 tests/test_texthelpers.py 708b3c040f8b677a84020dd6f7c4242f77260b3c6d2697fe8189e1881b0e1365 tests/test_union_engine.py +48b0ae4abe0fdde8ce4975c5cbf4c3514a2815021cb2e3a490a189bea5edfe78 tests/test_unpickle_security.py 4b646f513c6da1e33200184ed6eabe0aa345eb2e2a19598dc123e191168591bf tests/test_urls.py -4f095ebda1b9bddde082ed464e863400cf23e9bf26f081948706213b35069195 tests/_testutils.py +23ffd75b5aec33066e6d6aad01ab2c9c1b12ee20c1a0990f8f1be81f1ad16161 tests/_testutils.py 2364db35025a53ea4e5a0a80c034997642785f7e6d1566d0d0f1db959fe3c82e tests/test_utils.py 81bb6d7449f224fa337734ae361c1a340bf9a51768a854d6a1a6e718ed1263ca tests/test_wordlist.py 55eaefc664bd8598329d535370612351ec8443c52465f0a37172ea46a97c458a thirdparty/ansistrm/ansistrm.py diff --git a/lib/controller/controller.py b/lib/controller/controller.py index 69d515f125b..ff64a81bd34 100644 --- a/lib/controller/controller.py +++ b/lib/controller/controller.py @@ -70,6 +70,7 @@ from lib.core.settings import DEFAULT_GET_POST_DELIMITER from lib.core.settings import EMPTY_FORM_FIELDS_REGEX from lib.core.settings import GOOGLE_ANALYTICS_COOKIE_REGEX +from lib.core.settings import HASHDB_STALE_DAYS from lib.core.settings import HOST_ALIASES from lib.core.settings import IGNORE_PARAMETERS from lib.core.settings import LOW_TEXT_PERCENT @@ -181,9 +182,29 @@ def _showInjections(): conf.dumper.string("", {"url": conf.url, "query": conf.parameters.get(PLACE.GET), "data": conf.parameters.get(PLACE.POST)}, content_type=CONTENT_TYPE.TARGET) conf.dumper.string("", kb.injections, content_type=CONTENT_TYPE.TECHNIQUES) else: + # --report-json: capture the same TARGET/TECHNIQUES structures the API emits, without + # printing them (the human-readable injection points are rendered just below) + if conf.reportJson: + conf.dumper._reportData({"url": conf.url, "query": conf.parameters.get(PLACE.GET), "data": conf.parameters.get(PLACE.POST)}, CONTENT_TYPE.TARGET) + conf.dumper._reportData(kb.injections, CONTENT_TYPE.TECHNIQUES) + data = "".join(set(_formatInjection(_) for _ in kb.injections)).rstrip("\n") conf.dumper.string(header, data) + # when results were resumed (no test requests this run), nudge if the session file is stale - + # this is the common "why is it showing old/unexpected results?" confusion + if kb.testQueryCount == 0 and not conf.freshQueries: + try: + days = int((time.time() - os.path.getmtime(conf.hashDBFile)) / (24 * 3600)) + except (OSError, IOError, TypeError): + days = 0 + + if days >= HASHDB_STALE_DAYS: + warnMsg = "results above were resumed from a session file last updated %d days ago, " % days + warnMsg += "so they may be stale. Rerun with '--flush-session' to retest " + warnMsg += "or '--fresh-queries' to ignore cached query results" + logger.warning(warnMsg) + if conf.tamper: warnMsg = "changes made by tampering scripts are not " warnMsg += "included in shown payload content(s)" diff --git a/lib/core/common.py b/lib/core/common.py index 87b0f986328..b1b205ddfd0 100644 --- a/lib/core/common.py +++ b/lib/core/common.py @@ -1839,7 +1839,7 @@ def escapeJsonValue(value): retVal = "" for char in value: - if char < ' ' or char == '"': + if char < ' ' or char in ('"', '\\'): # Note: backslash must be escaped too, otherwise a '\' in the value corrupts the surrounding JSON string retVal += json.dumps(char)[1:-1] else: retVal += char @@ -3703,8 +3703,8 @@ def unArrayizeValue(value): if isListLike(value): if not value: value = None - elif len(value) == 1 and not isListLike(value[0]): - value = value[0] + elif len(value) == 1 and not isListLike(next(iter(value))): # Note: next(iter(...)) not value[0] - a set/OrderedSet is list-like but not subscriptable + value = next(iter(value)) else: value = [_ for _ in flattenValue(value) if _ is not None] value = value[0] if len(value) > 0 else None diff --git a/lib/core/dump.py b/lib/core/dump.py index d55291e5129..ebc7d0cd041 100644 --- a/lib/core/dump.py +++ b/lib/core/dump.py @@ -6,6 +6,7 @@ """ import hashlib +import json import os import re import shutil @@ -14,6 +15,7 @@ from lib.core.common import Backend from lib.core.common import checkFile +from lib.core.common import clearColors from lib.core.common import dataToDumpFile from lib.core.common import dataToStdout from lib.core.common import filterNone @@ -30,6 +32,7 @@ from lib.core.compat import xrange from lib.core.convert import getBytes from lib.core.convert import getConsoleLength +from lib.core.convert import stdoutEncode from lib.core.convert import getText from lib.core.convert import getUnicode from lib.core.convert import htmlEscape @@ -59,6 +62,7 @@ from lib.utils.safe2bin import safechardecode from thirdparty import six from thirdparty.magic import magic +from thirdparty.odict import OrderedDict class Dump(object): """ @@ -96,6 +100,19 @@ def _write(self, data, newline=True, console=True, content_type=None): kb.dataOutputFlag = True + def _reportData(self, data, content_type): + """ + --report-json: capture a structured result exactly as the REST API would store it (the raw + value + COMPLETE status), independent of console/file rendering. No-op unless a report + collector is active - which is only ever the case for a CLI --report-json run, never under + --api - so this never double-captures alongside StdDbOut. A None content_type is resolved + via the kb.partRun fallback (e.g. the fingerprint line), mirroring the API exactly. + """ + + if conf.get("reportCollector") is not None: + from lib.utils.api import _storeData, REPORT_TASKID + _storeData(conf.reportCollector, REPORT_TASKID, stdoutEncode(clearColors(data)), CONTENT_STATUS.COMPLETE, content_type) + def flush(self): if self._outputFP: try: @@ -116,9 +133,12 @@ def setOutputFile(self): raise SqlmapGenericException(errMsg) def singleString(self, data, content_type=None): + self._reportData(data, content_type) self._write(data, content_type=content_type) def string(self, header, data, content_type=None, sort=True): + self._reportData(data, content_type) + if conf.api: self._write(data, content_type=content_type) @@ -153,6 +173,8 @@ def lister(self, header, elements, content_type=None, sort=True): except: pass + self._reportData(elements, content_type) + if conf.api: self._write(elements, content_type=content_type) @@ -204,6 +226,8 @@ def userSettings(self, header, userSettings, subHeader, content_type=None): users = [_ for _ in userSettings.keys() if _ is not None] users.sort(key=lambda _: _.lower() if hasattr(_, "lower") else _) + self._reportData(userSettings, content_type) + if conf.api: self._write(userSettings, content_type=content_type) @@ -237,6 +261,8 @@ def dbs(self, dbs): def dbTables(self, dbTables): if isinstance(dbTables, dict) and len(dbTables) > 0: + self._reportData(dbTables, CONTENT_TYPE.TABLES) + if conf.api: self._write(dbTables, content_type=CONTENT_TYPE.TABLES) @@ -279,6 +305,8 @@ def dbTables(self, dbTables): def dbTableColumns(self, tableColumns, content_type=None): if isinstance(tableColumns, dict) and len(tableColumns) > 0: + self._reportData(tableColumns, content_type) + if conf.api: self._write(tableColumns, content_type=content_type) @@ -352,6 +380,8 @@ def dbTableColumns(self, tableColumns, content_type=None): def dbTablesCount(self, dbTables): if isinstance(dbTables, dict) and len(dbTables) > 0: + self._reportData(dbTables, CONTENT_TYPE.COUNT) + if conf.api: self._write(dbTables, content_type=CONTENT_TYPE.COUNT) @@ -413,6 +443,8 @@ def dbTableValues(self, tableValues): safeDb = re.sub(r"[^\w]", UNSAFE_DUMP_FILEPATH_REPLACEMENT, unsafeSQLIdentificatorNaming(db)) safeTable = re.sub(r"[^\w]", UNSAFE_DUMP_FILEPATH_REPLACEMENT, unsafeSQLIdentificatorNaming(table)) + self._reportData(tableValues, CONTENT_TYPE.DUMP_TABLE) + if conf.api: self._write(tableValues, content_type=CONTENT_TYPE.DUMP_TABLE) @@ -431,7 +463,7 @@ def dbTableValues(self, tableValues): if conf.dumpFormat == DUMP_FORMAT.SQLITE: replication = Replication(os.path.join(conf.dumpPath, "%s.sqlite3" % safeDb)) - elif conf.dumpFormat in (DUMP_FORMAT.CSV, DUMP_FORMAT.HTML): + elif conf.dumpFormat in (DUMP_FORMAT.CSV, DUMP_FORMAT.HTML, DUMP_FORMAT.JSONL): if not os.path.isdir(dumpDbPath): try: os.makedirs(dumpDbPath) @@ -594,6 +626,7 @@ def dbTableValues(self, tableValues): console = (i >= count - TRIM_STDOUT_DUMP_SIZE) field = 1 values = [] + record = OrderedDict() if i == 0 and count > TRIM_STDOUT_DUMP_SIZE: self._write(" ...") @@ -644,6 +677,11 @@ def dbTableValues(self, tableValues): dataToDumpFile(dumpFP, "%s%s" % (safeCSValue(value), conf.csvDel)) elif conf.dumpFormat == DUMP_FORMAT.HTML: dataToDumpFile(dumpFP, "%s" % getUnicode(htmlEscape(value).encode("ascii", "xmlcharrefreplace"))) + elif conf.dumpFormat == DUMP_FORMAT.JSONL: + if len(info["values"]) <= i or info["values"][i] is None or info["values"][i] == " ": # NULL + record[unsafeSQLIdentificatorNaming(column)] = None + else: + record[unsafeSQLIdentificatorNaming(column)] = getUnicode(info["values"][i]) field += 1 @@ -656,6 +694,8 @@ def dbTableValues(self, tableValues): dataToDumpFile(dumpFP, "\n") elif conf.dumpFormat == DUMP_FORMAT.HTML: dataToDumpFile(dumpFP, "\n") + elif conf.dumpFormat == DUMP_FORMAT.JSONL: + dataToDumpFile(dumpFP, "%s\n" % getUnicode(json.dumps(record, ensure_ascii=False))) self._write("|", console=console) @@ -665,10 +705,10 @@ def dbTableValues(self, tableValues): rtable.endTransaction() logger.info("table '%s.%s' dumped to SQLITE database '%s'" % (db, table, replication.dbpath)) - elif conf.dumpFormat in (DUMP_FORMAT.CSV, DUMP_FORMAT.HTML): + elif conf.dumpFormat in (DUMP_FORMAT.CSV, DUMP_FORMAT.HTML, DUMP_FORMAT.JSONL): if conf.dumpFormat == DUMP_FORMAT.HTML: dataToDumpFile(dumpFP, "\n\n\n\n") - else: + elif conf.dumpFormat == DUMP_FORMAT.CSV: dataToDumpFile(dumpFP, "\n") dumpFP.close() @@ -679,6 +719,8 @@ def dbTableValues(self, tableValues): logger.warning(msg) def dbColumns(self, dbColumnsDict, colConsider, dbs): + self._reportData(dbColumnsDict, CONTENT_TYPE.COLUMNS) + if conf.api: self._write(dbColumnsDict, content_type=CONTENT_TYPE.COLUMNS) diff --git a/lib/core/enums.py b/lib/core/enums.py index 2e1881f19be..137be5d0293 100644 --- a/lib/core/enums.py +++ b/lib/core/enums.py @@ -238,6 +238,7 @@ class DUMP_FORMAT(object): CSV = "CSV" HTML = "HTML" SQLITE = "SQLITE" + JSONL = "JSONL" class HTTP_HEADER(object): ACCEPT = "Accept" diff --git a/lib/core/optiondict.py b/lib/core/optiondict.py index 44b4ca8f560..c7e8c97177b 100644 --- a/lib/core/optiondict.py +++ b/lib/core/optiondict.py @@ -235,6 +235,7 @@ "postprocess": "string", "preprocess": "string", "repair": "boolean", + "reportJson": "string", "saveConfig": "string", "scope": "string", "skipHeuristics": "boolean", @@ -272,6 +273,7 @@ "forceDns": "boolean", "murphyRate": "integer", "smokeTest": "boolean", + "apiTest": "boolean", }, "API": { diff --git a/lib/core/settings.py b/lib/core/settings.py index 5fa1a8f153c..e76d5180a14 100644 --- a/lib/core/settings.py +++ b/lib/core/settings.py @@ -20,7 +20,7 @@ from thirdparty import six # sqlmap version (...) -VERSION = "1.10.6.107" +VERSION = "1.10.6.115" TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable" TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34} VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE) @@ -717,6 +717,9 @@ # Restricted PAT token for automated crash reporting (last rotation: 2026-04-24) GITHUB_REPORT_PAT_TOKEN = "0EZh0n8npcacTH4oBcdKKWvfZLcdGWx0N5XFHD2xYaQDOkmI9LWaeDvZRZUMDz8l96RDH3+LVsbwGE5zUtaau0kld9VXG20fVbYES3ooFpNv+U9J5OTnaT2OlZcYzk4w5veT+GiHV5cuCngOJ6QgL1+qRpZDX1gzFecXbm2sNfQ2SGjT5McQe1mtxMTN7WsS1fQfPH+RhMUgbnwXJ5YG6EsBNZWOyk0C16QnekrVtuQpK0/ZVvU560uQhoMsP1/FBguBwJe" +# Age (in days) past which a resumed session file is considered stale (triggers a one-time nudge) +HASHDB_STALE_DAYS = 7 + # Flush HashDB threshold number of cached items HASHDB_FLUSH_THRESHOLD_ITEMS = 200 @@ -843,6 +846,15 @@ # Default adapter to use for bottle server RESTAPI_DEFAULT_ADAPTER = "wsgiref" +# REST API / scan-data contract version (semantic versioning), INDEPENDENT of the sqlmap version. +# Bump MAJOR for breaking changes (removed/renamed field, changed type, restructured response), +# MINOR for additive backward-compatible changes (new field/endpoint), PATCH for non-contract fixes. +# Exposed at GET /version (as "api_version"), in the --report-json "meta", and as the OpenAPI +# info.version (keep sqlmapapi.yaml in sync). Maintained by hand when the contract changes. +# 2.0.0: first explicitly-versioned contract; a MAJOR break from the old implicit shape +# (TECHNIQUES is now a named list, DUMP_TABLE restructured, internal fields dropped, type_name added). +RESTAPI_VERSION = "2.0.0" + # Default REST API server listen address RESTAPI_DEFAULT_ADDRESS = "127.0.0.1" @@ -850,7 +862,7 @@ RESTAPI_DEFAULT_PORT = 8775 # Unsupported options by REST API server -RESTAPI_UNSUPPORTED_OPTIONS = ("sqlShell", "wizard", "evalCode", "alert") +RESTAPI_UNSUPPORTED_OPTIONS = ("sqlShell", "wizard", "evalCode", "alert", "reportJson") # Use "Supplementary Private Use Area-A" INVALID_UNICODE_PRIVATE_AREA = False diff --git a/lib/core/testing.py b/lib/core/testing.py index bcb773fa7f2..8493f2cf579 100644 --- a/lib/core/testing.py +++ b/lib/core/testing.py @@ -6,12 +6,14 @@ """ import doctest +import json import logging import os import random import re import socket import sqlite3 +import subprocess import sys import tempfile import threading @@ -20,17 +22,22 @@ from extra.vulnserver import vulnserver from lib.core.common import clearConsoleLine from lib.core.common import dataToStdout +from lib.core.common import getSafeExString from lib.core.common import randomInt from lib.core.common import randomStr from lib.core.common import shellExec from lib.core.compat import round +from lib.core.compat import xrange from lib.core.convert import encodeBase64 +from lib.core.convert import getBytes +from lib.core.convert import getText from lib.core.data import kb from lib.core.data import logger from lib.core.data import paths from lib.core.data import queries from lib.core.patch import unisonRandom from lib.core.settings import IS_WIN +from lib.core.settings import RESTAPI_VERSION def vulnTest(): """ @@ -224,6 +231,156 @@ def _thread(): return retVal +def apiTest(): + """ + Runs a basic live test of the REST API: launches the server in a separate process + ('sqlmapapi.py -s') and drives the control-plane endpoints with an HTTP client - a real + server + client round-trip, without launching an actual scan. A separate process (rather + than an in-process thread) isolates the single-threaded server from the client's GIL and + from sqlmap's global HTTP machinery, which otherwise makes the round-trip flaky. + """ + + retVal = True + + # pick a free port the same way vulnTest() does + while True: + address, port = "127.0.0.1", random.randint(10000, 65535) + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if s.connect_ex((address, port)): + break + else: + time.sleep(1) + finally: + s.close() + + username, password = "test", "test" + apipath = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "sqlmapapi.py")) + + try: + devnull = subprocess.DEVNULL + except AttributeError: + devnull = open(os.devnull, "wb") + + process = subprocess.Popen([sys.executable, apipath, "-s", "-H", address, "-p", str(port), "--username", username, "--password", password], stdout=devnull, stderr=devnull) + + base = "http://%s:%d" % (address, port) + + def _call(path, data=None, authorize=True): + # NOTE: a raw socket is used deliberately instead of urllib/http.client. The host sqlmap + # process installs a global keep-alive opener and patches http.client, which makes a + # library client flaky against the single-threaded server; a hand-rolled HTTP/1.0 request + # (Connection: close, read to EOF) is hermetic and immune to all of that. + method = "POST" if data is not None else "GET" + lines = ["%s %s HTTP/1.0" % (method, path), "Host: %s:%d" % (address, port)] + if authorize: + lines.append("Authorization: Basic %s" % encodeBase64("%s:%s" % (username, password), binary=False)) + body = getBytes(json.dumps(data)) if data is not None else b"" + if data is not None: + lines.append("Content-Type: application/json") + lines.append("Content-Length: %d" % len(body)) + lines.append("Connection: close") + request = getBytes("\r\n".join(lines) + "\r\n\r\n") + body + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(10) + try: + s.connect((address, port)) + s.sendall(request) + raw = b"" + while True: + chunk = s.recv(8192) + if not chunk: + break + raw += chunk + except Exception as ex: + logger.debug("API test: request to '%s' failed (%s)" % (path, getSafeExString(ex))) + return None, None + finally: + s.close() + + head, _, payload = raw.partition(b"\r\n\r\n") + try: + code = int(head.split(b"\r\n")[0].split(b" ")[1]) + except (IndexError, ValueError): + return None, None + try: + return code, json.loads(getText(payload)) + except ValueError: + return code, None + + try: + # wait for the server process to come up (or die trying) + for _ in xrange(200): + if process.poll() is not None: + logger.error("API test: server process exited prematurely (address: '%s')" % base) + return False + code, data = _call("/version") + if code == 200 and data and data.get("success"): + break + time.sleep(0.1) + else: + logger.error("API test: server did not come up (address: '%s')" % base) + return False + + logger.info("REST API server running at '%s'..." % base) + + results = [] + + def _check(name, condition): + results.append((name, bool(condition))) + if not condition: + logger.error("API test: check '%s' FAILED" % name) + + # GET /version - success envelope + MAJOR-only integer api_version + code, data = _call("/version") + _check("version", code == 200 and data and data.get("success") is True and data.get("api_version") == int(RESTAPI_VERSION.split(".")[0]) and data.get("version")) + + # the auth hook must reject an unauthenticated request + code, _ = _call("/version", authorize=False) + _check("auth-401", code == 401) + + # GET /task/new - mint a task + code, data = _call("/task/new") + taskid = data.get("taskid") if data else None + _check("task-new", code == 200 and data and data.get("success") and taskid) + + # POST /option//set then read it back via /get and /list (JSON round-trip + IPC) + code, data = _call("/option/%s/set" % taskid, {"flushSession": True}) + _check("option-set", code == 200 and data and data.get("success")) + + code, data = _call("/option/%s/get" % taskid, ["flushSession"]) + _check("option-get", data and data.get("success") and (data.get("options") or {}).get("flushSession") is True) + + code, data = _call("/option/%s/list" % taskid) + _check("option-list", data and data.get("success") and isinstance(data.get("options"), dict)) + + # GET /admin/list - the IP-bound listing (our client is the task's creator) must see it + code, data = _call("/admin/list") + _check("admin-list", data and data.get("success") and taskid in (data.get("tasks") or {})) + + # a bogus task ID must produce a failure envelope (not a crash) + code, data = _call("/option/%s/list" % "nonexistent") + _check("invalid-task", data is not None and data.get("success") is False) + + # GET /task//delete - tear the task down + code, data = _call("/task/%s/delete" % taskid) + _check("task-delete", data and data.get("success")) + + if all(ok for _, ok in results): + logger.info("API test final result: PASSED") + else: + retVal = False + logger.error("API test final result: FAILED (%s)" % ", ".join(name for name, ok in results if not ok)) + finally: + try: + process.terminate() + process.wait() + except Exception: + pass + + return retVal + def smokeTest(): """ Runs the basic smoke testing of a program diff --git a/lib/parse/cmdline.py b/lib/parse/cmdline.py index cf200380630..6482356043f 100644 --- a/lib/parse/cmdline.py +++ b/lib/parse/cmdline.py @@ -686,7 +686,7 @@ def cmdLineParser(argv=None): help="Store dumped data to a custom file") general.add_argument("--dump-format", dest="dumpFormat", - help="Format of dumped data (CSV (default), HTML or SQLITE)") + help="Dump data format (CSV (default), HTML, SQLITE, JSONL)") general.add_argument("--encoding", dest="encoding", help="Character encoding used for data retrieval (e.g. GBK)") @@ -727,6 +727,9 @@ def cmdLineParser(argv=None): general.add_argument("--repair", dest="repair", action="store_true", help="Redump entries having unknown character marker (%s)" % INFERENCE_UNKNOWN_CHAR) + general.add_argument("--report-json", dest="reportJson", + help="Store run results to a JSON file") + general.add_argument("--save", dest="saveConfig", help="Save options to a configuration INI file") @@ -872,6 +875,9 @@ def cmdLineParser(argv=None): parser.add_argument("--vuln-test", dest="vulnTest", action="store_true", help=SUPPRESS) + parser.add_argument("--api-test", dest="apiTest", action="store_true", + help=SUPPRESS) + parser.add_argument("--disable-json", dest="disableJson", action="store_true", help=SUPPRESS) @@ -1126,7 +1132,7 @@ def _format_action_invocation(self, action): else: args.stdinPipe = None - if not any((args.direct, args.url, args.logFile, args.bulkFile, args.googleDork, args.configFile, args.requestFile, args.updateAll, args.smokeTest, args.vulnTest, args.wizard, args.dependencies, args.purge, args.listTampers, args.hashFile, args.stdinPipe)): + if not any((args.direct, args.url, args.logFile, args.bulkFile, args.googleDork, args.configFile, args.requestFile, args.updateAll, args.smokeTest, args.vulnTest, args.apiTest, args.wizard, args.dependencies, args.purge, args.listTampers, args.hashFile, args.stdinPipe)): errMsg = "missing a mandatory option (-d, -u, -l, -m, -r, -g, -c, --wizard, --shell, --update, --purge, --list-tampers or --dependencies). " errMsg += "Use -h for basic and -hh for advanced help\n" parser.error(errMsg) diff --git a/lib/techniques/blind/inference.py b/lib/techniques/blind/inference.py index 1758d98089f..faf0a9383ea 100644 --- a/lib/techniques/blind/inference.py +++ b/lib/techniques/blind/inference.py @@ -127,10 +127,11 @@ def bisection(payload, expression, length=None, charsetType=None, firstChar=None expression = match.group(2).strip() try: - # Set kb.partRun in case "common prediction" feature (a.k.a. "good samaritan") is used or the engine is called from the API + # Set kb.partRun in case "common prediction" feature (a.k.a. "good samaritan") is used, or the + # engine is called from the API, or a JSON report is being collected (so enumeration output is tagged) if conf.predictOutput: kb.partRun = getPartRun() - elif conf.api: + elif conf.api or conf.reportJson: kb.partRun = getPartRun(alias=False) else: kb.partRun = None diff --git a/lib/techniques/error/use.py b/lib/techniques/error/use.py index a9ae8bac007..2eb38c1c46e 100644 --- a/lib/techniques/error/use.py +++ b/lib/techniques/error/use.py @@ -314,8 +314,8 @@ def errorUse(expression, dump=False): _, _, _, _, _, expressionFieldsList, expressionFields, _ = agent.getFields(expression) - # Set kb.partRun in case the engine is called from the API - kb.partRun = getPartRun(alias=False) if conf.api else None + # Set kb.partRun in case the engine is called from the API or a JSON report is being collected + kb.partRun = getPartRun(alias=False) if (conf.api or conf.reportJson) else None # We have to check if the SQL query might return multiple entries # and in such case forge the SQL limiting the query output one diff --git a/lib/techniques/union/use.py b/lib/techniques/union/use.py index 3802b463575..59ce5de670c 100644 --- a/lib/techniques/union/use.py +++ b/lib/techniques/union/use.py @@ -258,8 +258,8 @@ def unionUse(expression, unpack=True, dump=False): _, _, _, _, _, expressionFieldsList, expressionFields, _ = agent.getFields(origExpr) - # Set kb.partRun in case the engine is called from the API - kb.partRun = getPartRun(alias=False) if conf.api else None + # Set kb.partRun in case the engine is called from the API or a JSON report is being collected + kb.partRun = getPartRun(alias=False) if (conf.api or conf.reportJson) else None if expressionFieldsList and len(expressionFieldsList) > 1 and "ORDER BY" in expression.upper(): # Removed ORDER BY clause because UNION does not play well with it diff --git a/lib/utils/api.py b/lib/utils/api.py index 4a4559635de..90d0c0b9e3c 100644 --- a/lib/utils/api.py +++ b/lib/utils/api.py @@ -44,7 +44,9 @@ from lib.core.dicts import PART_RUN_CONTENT_TYPES from lib.core.enums import AUTOCOMPLETE_TYPE from lib.core.enums import CONTENT_STATUS +from lib.core.enums import CONTENT_TYPE from lib.core.enums import MKSTEMP_PREFIX +from lib.core.enums import PAYLOAD from lib.core.exception import SqlmapConnectionException from lib.core.log import LOGGER_HANDLER from lib.core.optiondict import optDict @@ -53,6 +55,7 @@ from lib.core.settings import RESTAPI_DEFAULT_ADDRESS from lib.core.settings import RESTAPI_DEFAULT_PORT from lib.core.settings import RESTAPI_UNSUPPORTED_OPTIONS +from lib.core.settings import RESTAPI_VERSION from lib.core.settings import VERSION_STRING from lib.core.shell import autoCompletion from lib.core.subprocessng import Popen @@ -80,6 +83,195 @@ class DataStore(object): RESTAPI_READONLY_OPTIONS = ("api", "taskid", "database") +# Reverse map CONTENT_TYPE int -> name (e.g. 2 -> "DBMS_FINGERPRINT"), for machine-readable reports +CONTENT_TYPE_NAMES = dict((v, k) for k, v in vars(CONTENT_TYPE).items() if not k.startswith("_") and isinstance(v, int)) + +# Task id used for the single-target CLI collector backing --report-json +REPORT_TASKID = 0 + +def _storeData(cursor, taskid, value, status=CONTENT_STATUS.IN_PROGRESS, content_type=None): + """ + Records a single (status, content_type, value) result row into an IPC-style 'data' table. + + Shared by the REST API (via StdDbOut) and the CLI --report-json collector so both capture + results through identical logic (partial outputs are appended; a COMPLETE output replaces + its partials). Mirrors the API's per-content_type merge semantics. + """ + + if content_type is None: + if kb.partRun is not None: + content_type = PART_RUN_CONTENT_TYPES.get(kb.partRun) + else: + # Ignore all non-relevant (untyped) messages + return + + output = cursor.execute("SELECT id, status, value FROM data WHERE taskid = ? AND content_type = ?", (taskid, content_type)) + + # Delete partial output from the database if we have got a complete output + if status == CONTENT_STATUS.COMPLETE: + if len(output) > 0: + for index in xrange(len(output)): + cursor.execute("DELETE FROM data WHERE id = ?", (output[index][0],)) + + cursor.execute("INSERT INTO data VALUES(NULL, ?, ?, ?, ?)", (taskid, status, content_type, jsonize(value))) + if kb.partRun: + kb.partRun = None + + elif status == CONTENT_STATUS.IN_PROGRESS: + if len(output) == 0: + cursor.execute("INSERT INTO data VALUES(NULL, ?, ?, ?, ?)", (taskid, status, content_type, jsonize(value))) + else: + new_value = "%s%s" % (dejsonize(output[0][2]), value) + cursor.execute("UPDATE data SET value = ? WHERE id = ?", (jsonize(new_value), output[0][0])) + +# Internal detection/plumbing fields that are meaningless to API/report consumers and are stripped +# from the assembled output (the underlying kb/session structures keep them; only the output is cleaned) +INJECTION_INTERNAL_FIELDS = ("conf", "prefix", "suffix", "ptype", "clause") # detection/construction internals, irrelevant to a result consumer +TECHNIQUE_INTERNAL_FIELDS = ("matchRatio", "trueCode", "falseCode", "templatePayload", "where") # per-technique internals + +def _cleanIdentifier(name): + """ + Strips SQL identifier quoting (`backticks`, "double quotes", [brackets]) in a DBMS-INDEPENDENT + way. Used instead of unsafeSQLIdentificatorNaming (which needs Backend.getIdentifiedDbms) so the + result is identical in the CLI and in the API server process - which has no Backend context + because the scan ran in a subprocess. Context-free => API and report stay in parity. + """ + + if isinstance(name, six.string_types): + for ch in ("`", "\"", "[", "]"): + name = name.replace(ch, "") + return name + +def _cleanIdentifiersDeep(value): + """ + Recursively unquotes every identifier in a metadata structure (dict keys and string leaves - + db/table/column names). Used for the schema-listing content types (TABLES/COLUMNS/SCHEMA/COUNT) + whose payload is entirely identifiers + types/counts (never user row data), so cleaning every + string is safe. NOT used for DUMP_TABLE, whose leaf values are real row data. + """ + + if isinstance(value, dict): + return dict((_cleanIdentifier(k), _cleanIdentifiersDeep(v)) for k, v in value.items()) + elif isinstance(value, (list, tuple)): + return [_cleanIdentifiersDeep(_) for _ in value] + elif isinstance(value, six.string_types): + return _cleanIdentifier(value) + return value + +# Schema-listing content types: pure identifiers + types/counts, so identifier quoting is cleaned +# recursively for consistency with DUMP_TABLE (which is handled separately because it carries row data) +IDENTIFIER_KEYED_TYPES = (CONTENT_TYPE.TABLES, CONTENT_TYPE.COLUMNS, CONTENT_TYPE.SCHEMA, CONTENT_TYPE.COUNT) + +def _sanitizeScanData(content_type, value): + """ + Reshapes an assembled result value into the clean, consumer-facing form used by BOTH the API + response and the --report-json file: internal detection/plumbing fields are dropped, the + per-technique map becomes a named list, and dumped-table identifiers are unquoted. Operates on + the dejsonized copy, so the live kb/session structures are never modified. Falls back to the raw + value on any surprise. + """ + + try: + if content_type == CONTENT_TYPE.TECHNIQUES and isinstance(value, (list, tuple)): + cleaned = [] + for injection in value: + if not isinstance(injection, dict): + cleaned.append(injection) + continue + injection = dict(injection) + for field in INJECTION_INTERNAL_FIELDS: + injection.pop(field, None) + techniques = injection.get("data") + if isinstance(techniques, dict): + # turn the {"1": {...}, "2": {...}} map (keyed by opaque technique ids) into an + # ordered list, each entry naming its technique (e.g. "boolean-based blind") + reduced = [] + for stype in sorted(techniques, key=lambda _: int(_) if str(_).isdigit() else _): + details = techniques[stype] + if isinstance(details, dict): + details = dict(details) + for field in TECHNIQUE_INTERNAL_FIELDS: + details.pop(field, None) + key = int(stype) if str(stype).isdigit() else stype + entry = {"technique": PAYLOAD.SQLINJECTION.get(key, key)} + entry.update(details) + details = entry + reduced.append(details) + injection["data"] = reduced + cleaned.append(injection) + return cleaned + + elif content_type == CONTENT_TYPE.DUMP_TABLE and isinstance(value, dict): + infos = value.get("__infos__") or {} + result = {"db": _cleanIdentifier(infos.get("db")), "table": _cleanIdentifier(infos.get("table")), "count": infos.get("count"), "columns": {}} + for column, cell in value.items(): + if column == "__infos__": + continue + # clean the identifier, drop the per-column display 'length', keep just the values list + values = cell.get("values") if isinstance(cell, dict) else cell + if isinstance(values, (list, tuple)): + # sqlmap represents a DB NULL as a single space (DUMP_REPLACEMENTS); surface it as + # JSON null. An empty string "" is a genuine empty value and is left as-is. + values = [None if _ == " " else _ for _ in values] + result["columns"][_cleanIdentifier(column)] = values + return result + + elif content_type in IDENTIFIER_KEYED_TYPES and isinstance(value, (dict, list, tuple)): + return _cleanIdentifiersDeep(value) + + except Exception as ex: + logger.debug("failed to sanitize scan data (content type %s): %s" % (content_type, getSafeExString(ex))) + + return value + +def _assembleData(cursor, taskid): + """ + Assembles all stored results for a task into the canonical scan-data structure + {"success": True, "data": [{status, type, type_name, value}, ...], "error": [...]}. + + Shared by the REST API endpoint /scan//data and the CLI --report-json writer so the two + produce identical output (the CLI report is this dict plus a 'meta' wrapper). + """ + + json_data_message = list() + json_errors_message = list() + + for status, content_type, value in cursor.execute("SELECT status, content_type, value FROM data WHERE taskid = ? ORDER BY id ASC", (taskid,)): + json_data_message.append({"status": status, "type": content_type, "type_name": CONTENT_TYPE_NAMES.get(content_type), "value": _sanitizeScanData(content_type, dejsonize(value))}) + + for error, in cursor.execute("SELECT error FROM errors WHERE taskid = ? ORDER BY id ASC", (taskid,)): + json_errors_message.append(error) + + return {"success": True, "data": json_data_message, "error": json_errors_message} + +def setupReportCollector(): + """ + Creates an in-memory IPC-style database used to collect results for a CLI --report-json run. + Reuses the same Database/schema the REST API uses so capture+assembly logic is shared. + """ + + collector = Database(":memory:") + collector.connect("report") + collector.init() + return collector + +def writeReportJson(collector, filepath): + """ + Writes the collected results to filepath as JSON, in the same shape as the REST API's + /scan//data response, wrapped with a small 'meta' block for standalone consumers. + """ + + result = _assembleData(collector, REPORT_TASKID) + result["meta"] = { + "api_version": int(RESTAPI_VERSION.split(".")[0]), # MAJOR only - the part that matters for client compatibility + "sqlmap_version": VERSION_STRING, + "url": conf.get("url"), + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), + } + + with openFile(filepath, "w+") as f: + f.write(getText(jsonize(result))) + # API objects class Database(object): filepath = None @@ -236,31 +428,7 @@ def __init__(self, taskid, messagetype="stdout"): def write(self, value, status=CONTENT_STATUS.IN_PROGRESS, content_type=None): if self.messagetype == "stdout": - if content_type is None: - if kb.partRun is not None: - content_type = PART_RUN_CONTENT_TYPES.get(kb.partRun) - else: - # Ignore all non-relevant messages - return - - output = conf.databaseCursor.execute("SELECT id, status, value FROM data WHERE taskid = ? AND content_type = ?", (self.taskid, content_type)) - - # Delete partial output from IPC database if we have got a complete output - if status == CONTENT_STATUS.COMPLETE: - if len(output) > 0: - for index in xrange(len(output)): - conf.databaseCursor.execute("DELETE FROM data WHERE id = ?", (output[index][0],)) - - conf.databaseCursor.execute("INSERT INTO data VALUES(NULL, ?, ?, ?, ?)", (self.taskid, status, content_type, jsonize(value))) - if kb.partRun: - kb.partRun = None - - elif status == CONTENT_STATUS.IN_PROGRESS: - if len(output) == 0: - conf.databaseCursor.execute("INSERT INTO data VALUES(NULL, ?, ?, ?, ?)", (self.taskid, status, content_type, jsonize(value))) - else: - new_value = "%s%s" % (dejsonize(output[0][2]), value) - conf.databaseCursor.execute("UPDATE data SET value = ? WHERE id = ?", (jsonize(new_value), output[0][0])) + _storeData(conf.databaseCursor, self.taskid, value, status, content_type) else: conf.databaseCursor.execute("INSERT INTO errors VALUES(NULL, ?, ?)", (self.taskid, str(value) if value else "")) @@ -429,9 +597,13 @@ def task_list(token=None): """ tasks = {} - for key in DataStore.tasks: + for key in list(DataStore.tasks): if is_admin(token) or DataStore.tasks[key].remote_addr == request.remote_addr: - tasks[key] = dejsonize(scan_status(key))["status"] + # NOTE: tolerate a task being deleted concurrently (scan_status would then return an + # error envelope without a "status" key); skip it rather than raising KeyError + status = dejsonize(scan_status(key)).get("status") + if status is not None: + tasks[key] = status logger.debug("(%s) Listed task pool (%s)" % (token, "admin" if is_admin(token) else request.remote_addr)) return jsonize({"success": True, "tasks": tasks, "tasks_num": len(tasks)}) @@ -606,23 +778,15 @@ def scan_data(taskid): Retrieve the data of a scan """ - json_data_message = list() - json_errors_message = list() - if taskid not in DataStore.tasks: logger.warning("[%s] Invalid task ID provided to scan_data()" % taskid) return jsonize({"success": False, "message": "Invalid task ID"}) - # Read all data from the IPC database for the taskid - for status, content_type, value in DataStore.current_db.execute("SELECT status, content_type, value FROM data WHERE taskid = ? ORDER BY id ASC", (taskid,)): - json_data_message.append({"status": status, "type": content_type, "value": dejsonize(value)}) - - # Read all error messages from the IPC database - for error, in DataStore.current_db.execute("SELECT error FROM errors WHERE taskid = ? ORDER BY id ASC", (taskid,)): - json_errors_message.append(error) + # Read all data and error messages from the IPC database (shared assembler - same output as --report-json) + result = _assembleData(DataStore.current_db, taskid) logger.debug("(%s) Retrieved scan data and error messages" % taskid) - return jsonize({"success": True, "data": json_data_message, "error": json_errors_message}) + return jsonize(result) # Functions to handle scans' logs @get("/scan//log//") @@ -702,7 +866,7 @@ def version(token=None): """ logger.debug("Fetched version (%s)" % ("admin" if is_admin(token) else request.remote_addr)) - return jsonize({"success": True, "version": VERSION_STRING.split('/')[-1]}) + return jsonize({"success": True, "version": VERSION_STRING.split('/')[-1], "api_version": int(RESTAPI_VERSION.split(".")[0])}) def server(host=RESTAPI_DEFAULT_ADDRESS, port=RESTAPI_DEFAULT_PORT, adapter=RESTAPI_DEFAULT_ADAPTER, username=None, password=None, database=None): """ diff --git a/sqlmap.py b/sqlmap.py index 7ed61e529c6..19987565651 100755 --- a/sqlmap.py +++ b/sqlmap.py @@ -176,6 +176,10 @@ def main(): init() + if conf.get("reportJson"): + from lib.utils.api import setupReportCollector + conf.reportCollector = setupReportCollector() + if not conf.updateAll: # Postponed imports (faster start) if conf.smokeTest: @@ -184,6 +188,9 @@ def main(): elif conf.vulnTest: from lib.core.testing import vulnTest os._exitcode = 1 - (vulnTest() or 0) + elif conf.apiTest: + from lib.core.testing import apiTest + os._exitcode = 1 - (apiTest() or 0) else: from lib.controller.controller import start if conf.profile: @@ -568,6 +575,21 @@ def main(): warnMsg = "your sqlmap version is outdated" logger.warning(warnMsg) + # emit the JSON report BEFORE the closing banner, so it does not appear awkwardly after + # "[*] ending @ ..." + if conf.get("reportCollector") is not None: + try: + from lib.utils.api import writeReportJson + writeReportJson(conf.reportCollector, conf.reportJson) + logger.info("JSON report written to '%s'" % conf.reportJson) + except Exception as ex: + logger.error("unable to write JSON report to '%s' ('%s')" % (conf.reportJson, getSafeExString(ex))) + finally: + try: + conf.reportCollector.disconnect() + except Exception as ex: + logger.debug("problem occurred while closing the report collector ('%s')" % getSafeExString(ex)) + if conf.get("showTime"): dataToStdout("\n[*] ending @ %s\n\n" % time.strftime("%X /%Y-%m-%d/"), forceOutput=True) @@ -581,7 +603,7 @@ def main(): except OSError: pass - if any((conf.vulnTest, conf.smokeTest)) or not filterNone(filepath for filepath in glob.glob(os.path.join(tempDir, '*')) if not any(filepath.endswith(_) for _ in (".lock", ".exe", ".so", '_'))): # ignore junk files + if any((conf.vulnTest, conf.smokeTest, conf.apiTest)) or not filterNone(filepath for filepath in glob.glob(os.path.join(tempDir, '*')) if not any(filepath.endswith(_) for _ in (".lock", ".exe", ".so", '_'))): # ignore junk files try: shutil.rmtree(tempDir, ignore_errors=True) except OSError: diff --git a/sqlmapapi.yaml b/sqlmapapi.yaml index a5829d7a466..28e273875e3 100644 --- a/sqlmapapi.yaml +++ b/sqlmapapi.yaml @@ -1,7 +1,7 @@ openapi: 3.0.3 info: title: sqlmap REST API - version: "1.0.0" + version: "2.0.0" description: | OpenAPI/Swagger specification for sqlmapapi.py, the sqlmap REST API server. @@ -48,11 +48,13 @@ paths: get: tags: [Version] operationId: getVersion - summary: Fetch server version - description: Returns the sqlmap version string reported by the API server. + summary: Fetch server and API version + description: >- + Returns the sqlmap version string and the API contract version (api_version), which follows + semantic versioning independently of the sqlmap version so clients can check compatibility. responses: "200": - description: Server version returned. + description: Server and API version returned. content: application/json: schema: @@ -62,6 +64,7 @@ paths: value: success: true version: "1.10.6.51#dev" + api_version: 2 "401": $ref: "#/components/responses/Unauthorized" @@ -459,8 +462,43 @@ paths: success: true data: - status: 1 - type: 0 - value: [] + type: 2 + type_name: DBMS_FINGERPRINT + value: "back-end DBMS: MySQL >= 5.1" + - status: 1 + type: 4 + type_name: CURRENT_USER + value: "root@%" + - status: 1 + type: 12 + type_name: DBS + value: ["information_schema", "mysql", "testdb"] + - status: 1 + type: 1 + type_name: TECHNIQUES + value: + - place: GET + parameter: id + dbms: MySQL + dbms_version: [">= 5.1"] + os: null + notes: [] + data: + - technique: "boolean-based blind" + title: "AND boolean-based blind - WHERE or HAVING clause" + payload: "id=1 AND 7997=7997" + vector: "AND [INFERENCE]" + comment: "" + - status: 1 + type: 17 + type_name: DUMP_TABLE + value: + db: testdb + table: users + count: 2 + columns: + id: ["1", "2"] + name: ["admin", null] error: [] "401": $ref: "#/components/responses/Unauthorized" @@ -670,7 +708,7 @@ components: VersionResponse: type: object - required: [success, version] + required: [success, version, api_version] properties: success: type: boolean @@ -679,6 +717,13 @@ components: type: string description: sqlmap version string without the `sqlmap/` prefix. example: "1.10.6.51#dev" + api_version: + type: integer + description: >- + MAJOR API-contract version (integer), independent of the sqlmap version. Only the major + is exposed at runtime because only a major bump breaks clients; the full semantic version + is this document's info.version. Clients compare e.g. api_version == 2. + example: 2 additionalProperties: false TaskNewResponse: @@ -811,16 +856,23 @@ components: ScanDataItem: type: object - required: [status, type, value] + required: [status, type, type_name, value] properties: status: type: integer - description: Numeric content status stored by sqlmap. + description: Numeric content status (0 = in progress, 1 = complete). example: 1 type: type: integer description: Numeric content type stored by sqlmap. - example: 0 + example: 2 + type_name: + type: string + nullable: true + description: >- + Human-readable name of the content type (e.g. "DBMS_FINGERPRINT", "CURRENT_USER", + "DBS", "TECHNIQUES", "DUMP_TABLE"). null for any unmapped type. + example: DBMS_FINGERPRINT value: anyOf: - type: string @@ -832,7 +884,13 @@ components: items: {} - type: object additionalProperties: true - description: JSON-decoded scan output value. Shape depends on the content type. + description: >- + JSON-decoded scan output value; its shape depends on the content type. Internal + plumbing is stripped: TECHNIQUES is a list of injection points whose "data" is a list of + techniques each named via a "technique" field (matchRatio/trueCode/falseCode/ + templatePayload/where/conf are not exposed); DUMP_TABLE is + {db, table, count, columns: {column: [values]}} (the internal __infos__ wrapper and + per-column length are not exposed). additionalProperties: true ScanDataResponse: diff --git a/tests/_testutils.py b/tests/_testutils.py index 1858e9d857e..7ec9a4e3b4f 100644 --- a/tests/_testutils.py +++ b/tests/_testutils.py @@ -87,3 +87,66 @@ def set_dbms(name): from lib.core.data import kb kb.stickyDBMS = False Backend.forceDbms(name) + + +# --- property/fuzz testing harness (shared so individual test files don't each reinvent it) --- + +_PROPERTY_BASE = 0x51A1 + + +class Rng(object): + """Deterministic, cross-version-identical PRNG (a pure-integer LCG, no global state). + + sqlmap runs on Python 2.7 and 3.x, whose stdlib `random` yield DIFFERENT sequences + for the same seed - and `random.Random` instance methods are not unified by + patch.unisonRandom() (which only patches the module-level random.choice/randint/ + sample/seed). Property tests need inputs that are byte-for-byte identical on every + interpreter so a CI-only failure reproduces everywhere; integer math is identical + across versions, so this LCG (same constants as unisonRandom) guarantees it by + construction. Draw ONLY through these methods - never random.random()/shuffle()/etc. + """ + + def __init__(self, seed): + self.x = seed & 0xFFFFFF + + def _next(self): + self.x = (1140671485 * self.x + 128201163) % (2 ** 24) + return self.x + + def randint(self, a, b): + return a + self._next() % (b - a + 1) + + def choice(self, seq): + return seq[self.randint(0, len(seq) - 1)] + + def sample(self, seq, k): + # Note: with replacement (matches unisonRandom's _sample); fine for input generation + return [self.choice(seq) for _ in range(k)] + + def blob(self, n): + return bytes(bytearray(self.randint(0, 255) for _ in range(n))) + + +def _label_offset(label): + # stable across versions/runs (unlike hash(), which varies with PYTHONHASHSEED): just sum bytes + return sum(bytearray((label or "").encode("utf-8"))) * 7919 + + +def for_all(testcase, generator, prop, n=400, label=""): + """Property runner: draw `n` cases from generator(rng) and assert prop(case) holds. + + `prop` passes by returning True/None, fails by returning False or raising. On any + failure the EXACT offending input and its case index are reported; the same input + is reproducible (and identical on every interpreter) via Rng(seed_for(label, i)). + """ + base = _PROPERTY_BASE + _label_offset(label) + for i in range(n): + case = generator(Rng(base + i)) + try: + ok = prop(case) + except Exception as ex: + testcase.fail("%s: raised %r on input %r (case %d)" % (label or "property", ex, case, i)) + return + if ok is False: + testcase.fail("%s: property does not hold on input %r (case %d)" % (label or "property", case, i)) + return diff --git a/tests/test_dump_jsonl.py b/tests/test_dump_jsonl.py new file mode 100644 index 00000000000..9dc5cac8a2b --- /dev/null +++ b/tests/test_dump_jsonl.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission + +JSONL output of the per-table dumper (Dump.dbTableValues in lib/core/dump.py). + +--dump-format=JSONL writes one self-describing JSON object per row to a +/dump//.jsonl file, streaming-safe (one independent line per +row, no surrounding array/header/footer). These tests pin the contract that an +automated consumer relies on: column order preserved (so it matches the CSV +column order and is reproducible on Python 2's unordered dict), the DB-NULL +marker (" ") mapped to JSON null exactly like --report-json, the empty string +left intact (NOT collapsed to null), and a strict one-object-per-line layout. +""" + +import io +import json +import os +import shutil +import sys +import tempfile +import unittest + +from collections import OrderedDict + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from _testutils import bootstrap +bootstrap() + +from lib.core.common import Backend +from lib.core.data import conf, kb +from lib.core.dump import Dump +from lib.core.enums import DUMP_FORMAT + + +class _JsonlDumpCase(unittest.TestCase): + def setUp(self): + self._saved = dict((k, conf.get(k)) for k in ("dumpFormat", "dumpPath", "dumpFile", "col", "api", "reportCollector", "limitStart", "limitStop", "csvDel", "forceDbms", "dbms")) + self._savedKb = dict((k, kb.get(k)) for k in ("forcedDbms", "dbms")) + # A DBMS leaked from an earlier test (e.g. one that uppercases identifiers) would change + # both the on-disk filename and the JSON keys, so pin a neutral, case-preserving back-end. + conf.forceDbms = conf.dbms = None + kb.dbms = None + Backend.forceDbms("MySQL") + self.tmp = tempfile.mkdtemp(prefix="sqlmap-jsonl-test") + conf.dumpFormat = DUMP_FORMAT.JSONL + conf.dumpPath = self.tmp + conf.dumpFile = None + conf.col = None + conf.api = False + conf.reportCollector = None + conf.limitStart = conf.limitStop = None + conf.csvDel = "," + self.d = Dump() + self.d._write = lambda *a, **k: None # silence the console table + + def tearDown(self): + for k, v in self._saved.items(): + conf[k] = v + for k, v in self._savedKb.items(): + kb[k] = v + shutil.rmtree(self.tmp, ignore_errors=True) + + def _dump(self, table_values): + self.d.dbTableValues(table_values) + db = table_values["__infos__"]["db"] or "All" + path = os.path.join(self.tmp, db, "%s.jsonl" % table_values["__infos__"]["table"]) + # sqlmap writes the dump file as UTF-8; read it the same way (not the platform default, + # which is cp1252 on Windows CI and would mojibake multibyte values) + with io.open(path, encoding="utf-8") as f: + content = f.read() + return content + + def _rows(self, content): + return [json.loads(line) for line in content.splitlines() if line.strip()] + + +class TestJsonlContract(_JsonlDumpCase): + def test_one_object_per_row(self): + content = self._dump({ + "__infos__": {"count": 2, "db": "testdb", "table": "users"}, + "id": {"length": 2, "values": ["1", "2"]}, + "name": {"length": 6, "values": ["luther", "fluffy"]}, + }) + # exactly N non-empty lines, each terminated by a newline, each a standalone object + lines = content.splitlines() + self.assertEqual(len(lines), 2) + self.assertTrue(content.endswith("\n")) + rows = self._rows(content) + self.assertEqual(rows[0], {"id": "1", "name": "luther"}) + self.assertEqual(rows[1], {"id": "2", "name": "fluffy"}) + + def test_no_header_or_footer(self): + # unlike CSV (header row) / HTML (doc scaffold), JSONL must be pure data lines + content = self._dump({ + "__infos__": {"count": 1, "db": "testdb", "table": "t"}, + "id": {"length": 2, "values": ["1"]}, + }) + lines = [l for l in content.splitlines() if l.strip()] + self.assertEqual(len(lines), 1) + self.assertEqual(json.loads(lines[0]), {"id": "1"}) + + def test_db_null_becomes_json_null(self): + # sqlmap stores a DB NULL as a single space (" "); the machine format must emit JSON null, + # consistent with --report-json. An empty string is a real value and must stay "". + content = self._dump({ + "__infos__": {"count": 1, "db": "testdb", "table": "t"}, + "a": {"length": 1, "values": [" "]}, # DB NULL marker + "b": {"length": 1, "values": [""]}, # genuine empty string + "c": {"length": 1, "values": ["x"]}, + }) + row = self._rows(content)[0] + self.assertIsNone(row["a"]) + self.assertEqual(row["b"], "") + self.assertEqual(row["c"], "x") + + def test_missing_value_is_null(self): + # a column whose values list is short for this row index must serialize as null, not crash + content = self._dump({ + "__infos__": {"count": 2, "db": "testdb", "table": "t"}, + "id": {"length": 2, "values": ["1", "2"]}, + "lagging": {"length": 4, "values": ["only-one"]}, # missing index 1 + }) + rows = self._rows(content) + self.assertEqual(rows[0], {"id": "1", "lagging": "only-one"}) + self.assertEqual(rows[1], {"id": "2", "lagging": None}) + + def test_column_order_matches_csv(self): + # The serialized byte stream must keep the (priority-sorted) column order so output is + # reproducible - even on Python 2 where a plain dict would not - and that order must be + # the SAME one CSV uses. Build the input as an OrderedDict so the expectation is fixed, + # then dump the identical data as both JSONL and CSV and compare the column sequences. + def table(): + tv = OrderedDict() + tv["__infos__"] = {"count": 1, "db": "testdb", "table": "t"} + tv["zebra"] = {"length": 1, "values": ["1"]} + tv["alpha"] = {"length": 1, "values": ["2"]} + tv["middle"] = {"length": 1, "values": ["3"]} + return tv + + jsonl_line = [l for l in self._dump(table()).splitlines() if l.strip()][0] + jsonl_order = [k for k, _ in json.loads(jsonl_line, object_pairs_hook=lambda p: p)] + + conf.dumpFormat = DUMP_FORMAT.CSV + csv_path = os.path.join(self.tmp, "testdb", "t.csv") + if os.path.exists(csv_path): + os.remove(csv_path) + self.d.dbTableValues(table()) + with io.open(csv_path, encoding="utf-8") as f: + csv_header = f.read().splitlines()[0] + csv_order = [c.strip() for c in csv_header.split(conf.csvDel)] + + self.assertEqual(jsonl_order, csv_order) + + def test_unicode_value_not_escaped(self): + # ensure_ascii=False keeps multibyte data readable; it must round-trip through json.loads + content = self._dump({ + "__infos__": {"count": 1, "db": "testdb", "table": "t"}, + "name": {"length": 6, "values": [u"\u0107evap"]}, + }) + self.assertEqual(self._rows(content)[0]["name"], u"\u0107evap") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_openapi_drift.py b/tests/test_openapi_drift.py new file mode 100644 index 00000000000..b38fd16eb37 --- /dev/null +++ b/tests/test_openapi_drift.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission + +Contract test: the OpenAPI spec (sqlmapapi.yaml) must stay in lock-step with the +REST API actually served by lib/utils/api.py. The spec is hand-maintained, so it +is the exact thing that silently drifts when an endpoint is added/renamed/retyped. + +This walks the live Bottle route table (every @get/@post registers at import time) +and the spec's `paths:` block, and asserts the (method, path) sets are identical +in BOTH directions - no undocumented route, no phantom spec entry - plus that the +spec's advertised version matches the runtime RESTAPI_VERSION. + +PyYAML is not bundled (and the suite is stdlib-only / no pip), so the spec is read +with a tiny indentation-aware scanner that only needs the paths + info.version. +""" + +import os +import re +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from _testutils import bootstrap +bootstrap() + +import lib.utils.api # noqa: F401 (importing registers every route on Bottle's default app) +from lib.core.settings import RESTAPI_VERSION +from thirdparty.bottle.bottle import default_app + +ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +SPEC = os.path.join(ROOT, "sqlmapapi.yaml") + +# Bottle-only routes that are not part of the documented public contract +INTERNAL_RULES = ("/error/401",) + +HTTP_METHODS = ("get", "post", "put", "delete", "patch", "head", "options") + + +def _normalize_rule(rule): + # Bottle '' / '' -> OpenAPI '{taskid}' / '{filename}' + return re.sub(r"<([^:>]+)(?::[^>]+)?>", r"{\1}", rule) + + +def _app_pairs(): + pairs = set() + for route in default_app().routes: + rule = _normalize_rule(route.rule) + if rule in INTERNAL_RULES: + continue + pairs.add((route.method.lower(), rule)) + return pairs + + +def _spec_paths_and_version(text): + """Returns (set of (method, path), info.version) from the YAML text.""" + pairs = set() + version = None + section = None + current_path = None + + for line in text.splitlines(): + if not line.strip() or line.lstrip().startswith("#"): + continue + + top = re.match(r"^(\S[^:]*):", line) # a column-0 key starts a new top-level section + if top: + section = top.group(1) + current_path = None + continue + + if section == "info": + m = re.match(r"^ version:\s*(.+?)\s*$", line) + if m: + version = m.group(1).strip().strip('"').strip("'") + elif section == "paths": + m = re.match(r"^ (/\S*):\s*$", line) # 2-space path key + if m: + current_path = m.group(1) + continue + m = re.match(r"^ (\w+):\s*$", line) # 4-space method key + if m and current_path and m.group(1).lower() in HTTP_METHODS: + pairs.add((m.group(1).lower(), current_path)) + + return pairs, version + + +class TestOpenAPIDrift(unittest.TestCase): + def setUp(self): + with open(SPEC) as f: + self.spec_pairs, self.spec_version = _spec_paths_and_version(f.read()) + self.app_pairs = _app_pairs() + + def test_parsers_found_something(self): + # guard against a silently-empty parse making the equality checks vacuously pass + self.assertTrue(len(self.app_pairs) >= 15, self.app_pairs) + self.assertEqual(len(self.spec_pairs), len(self.app_pairs)) + + def test_no_undocumented_endpoint(self): + missing = self.app_pairs - self.spec_pairs + self.assertEqual(missing, set(), "served but absent from sqlmapapi.yaml: %s" % sorted(missing)) + + def test_no_phantom_spec_entry(self): + extra = self.spec_pairs - self.app_pairs + self.assertEqual(extra, set(), "in sqlmapapi.yaml but not served: %s" % sorted(extra)) + + def test_version_matches_runtime(self): + self.assertEqual(self.spec_version, RESTAPI_VERSION, "sqlmapapi.yaml version '%s' != RESTAPI_VERSION '%s'" % (self.spec_version, RESTAPI_VERSION)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_property.py b/tests/test_property.py new file mode 100644 index 00000000000..cc1b00e3a1a --- /dev/null +++ b/tests/test_property.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission + +Property/fuzz tests for the pure parsers and transforms. Where the other test +files pin specific examples, these assert INVARIANTS over hundreds of randomized +(but deterministic, cross-version-identical - see _testutils.Rng) inputs, which is +the cheap net for the edge-bug class that example tests miss (commas inside quoted +literals / nested parens, NUL / 0xff / astral code points in codecs, etc.). + +Property families: + - codec/serializer pairs round-trip: decode(encode(x)) == x + - structure transforms preserve their contract (flat/de-arrayized/permutation) + - string transforms hold their stated invariant (ASCII-only, no newlines, ...) + - random helpers respect length / alphabet / range bounds + - splitFields/zeroDepthSearch partition faithfully and never cut inside a group + - a batch of transforms never raise on arbitrary input + +On failure _testutils.for_all prints the exact offending input + its case index so +it reproduces on any interpreter. +""" + +import os +import string +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from _testutils import bootstrap, for_all, set_dbms +bootstrap() + +from extra.cloak.cloak import cloak, decloak +from lib.core.common import (escapeJsonValue, filterStringValue, flattenValue, isListLike, normalizeUnicode, + prioritySortColumns, randomInt, randomRange, randomStr, safeSQLIdentificatorNaming, + sanitizeStr, splitFields, unArrayizeValue, unsafeSQLIdentificatorNaming, urldecode, + urlencode, zeroDepthSearch) +from lib.core.convert import (base64pickle, base64unpickle, decodeBase64, decodeHex, dejsonize, encodeBase64, + encodeHex, getBytes, getConsoleLength, getOrds, getText, htmlEscape, htmlUnescape, + jsonize, stdoutEncode) +from lib.core.data import kb +from lib.utils.safe2bin import safecharencode + + +# --- input strategies (draw ONLY through rng: randint / choice / sample / blob) --- + +# deliberately loaded with structural metacharacters + tricky code points +_TEXT = [u"a", u"Z", u"7", u" ", u",", u"'", u'"', u"(", u")", u"\\", u";", + u"\n", u"\t", u"\x00", u"\x7f", u"\xe9", u"\u0107", u"\u4e2d", u"\U0001F600", u" FROM "] + + +def gen_text(rng): + return u"".join(rng.choice(_TEXT) for _ in range(rng.randint(0, 24))) + + +def gen_ascii(rng): + return u"".join(rng.choice(string.printable) for _ in range(rng.randint(0, 20))) + + +def gen_blob(rng): + return rng.blob(rng.randint(0, 32)) + + +def gen_json(rng): + # JSON-safe only: tuples become lists and non-str keys are coerced, so exclude them here + if rng.randint(0, 4) == 0: + return [gen_json(rng) for _ in range(rng.randint(0, 3))] + if rng.randint(0, 4) == 0: + return dict((u"k%d" % j, gen_json(rng)) for j in range(rng.randint(0, 3))) + return rng.choice([0, 1, -1, 2 ** 31, 1.5, -0.25, True, False, None, u"", u"x", u"\u0107", u'a"b,c']) + + +def gen_pickle(rng): + kind = rng.randint(0, 9) + if kind < 5: + return rng.choice([0, -7, 2 ** 40, 3.5, True, False, None, u"\u0107x", b"\x00\xff", u""]) + if kind < 7: + return [gen_pickle(rng) for _ in range(rng.randint(0, 3))] + if kind < 8: + return tuple(gen_pickle(rng) for _ in range(rng.randint(0, 3))) + if kind < 9: + return set(rng.choice([1, 2, 3, u"a", u"b"]) for _ in range(rng.randint(0, 3))) + return dict((u"k%d" % j, gen_pickle(rng)) for j in range(rng.randint(0, 2))) + + +def gen_columns(rng): + return [rng.choice([u"id", u"userid", u"name", u"password", u"a", u"created_id", u"x_id_y", u"data"]) + for _ in range(rng.randint(0, 6))] + + +def gen_ident(rng): + # clean (round-trippable) identifier names: letters/digits/underscore, optional dot/space + chars = string.ascii_letters + string.digits + u"_" + name = u"".join(rng.choice(chars) for _ in range(rng.randint(1, 10))) + if rng.randint(0, 3) == 0: + name += rng.choice([u".col", u" alias", u"_2"]) + return name + + +# well-formed field lists: balanced parens, properly closed/escaped quotes +_TOKENS = [u"foo", u"bar", u"id", u"a b", u"1", u"*", u"max(a)", u"COALESCE(a, b, c)", u"func(x, y)"] +_QUOTED = [u"a,b", u"x, y", u"f(1, 2)", u"o''k", u"plain", u""] + + +def gen_sql_fields(rng): + parts = [] + for _ in range(rng.randint(1, 5)): + t = rng.randint(0, 9) + if t < 5: + parts.append(rng.choice(_TOKENS)) + elif t < 8: + q = rng.choice([u"'", u'"']) + parts.append(q + rng.choice(_QUOTED) + q) + else: + parts.append(u"g(%s, %s)" % (rng.choice(_TOKENS), rng.choice(_TOKENS))) + return u", ".join(parts) + + +class TestCodecRoundTrips(unittest.TestCase): + def test_base64(self): + for_all(self, gen_blob, lambda b: decodeBase64(encodeBase64(b)) == b, label="base64") + + def test_hex(self): + for_all(self, gen_blob, lambda b: decodeHex(encodeHex(b)) == b, label="hex") + + def test_getbytes_gettext(self): + # unsafe=False -> plain UTF-8 (no \xNN escape interpretation), so it is a clean round-trip + for_all(self, gen_text, lambda s: getText(getBytes(s, unsafe=False)) == s, label="bytes-text") + + def test_json(self): + for_all(self, gen_json, lambda v: dejsonize(jsonize(v)) == v, label="json") + + def test_pickle(self): + for_all(self, gen_pickle, lambda v: base64unpickle(base64pickle(v)) == v, label="pickle") + + def test_html_escape(self): + for_all(self, gen_text, lambda s: htmlUnescape(htmlEscape(s)) == s, label="html") + + def test_cloak(self): + for_all(self, gen_blob, lambda b: decloak(data=cloak(data=b)) == b, label="cloak") + + +class TestStructureTransforms(unittest.TestCase): + def test_unarrayize_never_listlike(self): + # the whole point of unArrayizeValue is that the result is a scalar, never a list/tuple + # (gen_pickle includes sets - they used to crash here; see test_unarrayize_set regression) + for_all(self, gen_pickle, lambda v: not isListLike(unArrayizeValue(v)), label="unarrayize") + + def test_flatten_is_flat(self): + for_all(self, gen_pickle, lambda v: all(not isListLike(x) for x in flattenValue([v])), label="flatten") + + def test_unarrayize_set(self): + # regression: a 1-element set is list-like but not subscriptable; unArrayizeValue must + # de-arrayize it rather than crash on value[0] + self.assertEqual(unArrayizeValue(set(["x"])), "x") + self.assertEqual(unArrayizeValue(set()), None) + self.assertEqual(unArrayizeValue(["1"]), "1") # ordinary fast-path still works + + def test_prioritysort_is_permutation(self): + # sorting must not invent/drop columns, and must be idempotent + def prop(cols): + out = prioritySortColumns(cols) + return sorted(out) == sorted(cols) and prioritySortColumns(out) == out + for_all(self, gen_columns, prop, label="prioritysort") + + +class TestStringTransforms(unittest.TestCase): + def test_normalize_unicode_is_ascii(self): + for_all(self, gen_text, lambda s: all(ord(c) < 128 for c in normalizeUnicode(s)), label="normalize-ascii") + + def test_sanitizestr_strips_newlines(self): + for_all(self, gen_text, lambda s: "\n" not in sanitizeStr(s) and "\r" not in sanitizeStr(s), label="sanitizestr") + + def test_filterstringvalue_charset(self): + allowed = set("0123456789abcdef") + for_all(self, gen_text, lambda s: set(filterStringValue(s, r"[0-9a-f]")) <= allowed, label="filterstring") + + def test_escapejson_no_control_char(self): + # control chars and bare quotes must be escaped away (output is JSON-string-body safe re: those) + for_all(self, gen_text, lambda s: all(c >= " " for c in escapeJsonValue(s)), label="escapejson-invariant") + + def test_escapejson_json_roundtrip(self): + # escapeJsonValue(s) embedded in a JSON string must parse back to s - for ALL text, + # including backslash (the F1 fix; this used to fail on '\') + import json + for_all(self, gen_text, lambda s: json.loads(u'"%s"' % escapeJsonValue(s)) == s, label="escapejson-roundtrip") + + def test_escapejson_backslash(self): + # regression for F1: backslash is now escaped, so the round-trip holds + import json + self.assertEqual(json.loads(u'"%s"' % escapeJsonValue(u"a\\b")), u"a\\b") + + def test_getords_length(self): + for_all(self, gen_text, lambda s: len(getOrds(s)) == len(s) and all(isinstance(o, int) for o in getOrds(s)), label="getords") + + def test_consolelength_ascii(self): + for_all(self, gen_ascii, lambda s: getConsoleLength(s) == len(s), label="consolelength") + + +class TestRandomHelpers(unittest.TestCase): + def test_randomstr_length_and_alphabet(self): + for_all(self, lambda r: r.randint(0, 16), + lambda n: len(randomStr(n)) == n and set(randomStr(n)) <= set(string.ascii_letters), label="randomstr") + + def test_randomstr_lowercase(self): + for_all(self, lambda r: r.randint(0, 16), + lambda n: set(randomStr(n, lowercase=True)) <= set(string.ascii_lowercase), label="randomstr-lower") + + def test_randomint_digits(self): + for_all(self, lambda r: r.randint(1, 8), lambda n: len(str(randomInt(n))) == n, label="randomint") + + def test_randomrange_bounds(self): + def prop(_): + a = _[0] + b = _[0] + _[1] + return a <= randomRange(a, b) <= b + for_all(self, lambda r: (r.randint(-50, 50), r.randint(0, 100)), prop, label="randomrange") + + +class TestSplitterInvariants(unittest.TestCase): + def test_reconstruction(self): + # pure partition identity: rejoining the 0-depth split must reproduce the (space-normalized) input + for_all(self, gen_text, lambda s: u",".join(splitFields(s)) == s.replace(", ", ","), label="split-reconstruct-text") + for_all(self, gen_sql_fields, lambda s: u",".join(splitFields(s)) == s.replace(", ", ","), label="split-reconstruct-sql") + + def test_never_cuts_inside_parens(self): + # on well-formed input no field may carry unbalanced parens (i.e. a split never lands inside a group) + for_all(self, gen_sql_fields, lambda s: all(f.count(u"(") == f.count(u")") for f in splitFields(s)), label="split-balanced") + + def test_zerodepth_indices_are_real_commas(self): + def prop(s): + idx = zeroDepthSearch(s, ",") + return all(s[i] == u"," for i in idx) and idx == sorted(idx) and len(set(idx)) == len(idx) + for_all(self, gen_text, prop, label="zerodepth-commas-text") + for_all(self, gen_sql_fields, prop, label="zerodepth-commas-sql") + + +class TestIdentifierRoundTrip(unittest.TestCase): + def setUp(self): + self._saved = kb.get("forcedDbms") + set_dbms("MySQL") # identifier quoting is DBMS-specific; pin a case-preserving back-end + + def tearDown(self): + kb.forcedDbms = self._saved + + def test_safe_unsafe_roundtrip(self): + for_all(self, gen_ident, lambda n: unsafeSQLIdentificatorNaming(safeSQLIdentificatorNaming(n)) == n, label="identifier") + + +class TestRobustness(unittest.TestCase): + # total functions: must never raise on arbitrary text (return value unconstrained) + def test_urlencode_urldecode(self): + for_all(self, gen_text, lambda s: (urlencode(s), urldecode(s)) and True, label="urlcodec") + + def test_safecharencode(self): + for_all(self, gen_text, lambda s: safecharencode(s) is not None or s == u"", label="safecharencode") + + def test_stdoutencode(self): + for_all(self, gen_text, lambda s: stdoutEncode(s) is not None or s == u"", label="stdoutencode") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_report.py b/tests/test_report.py new file mode 100644 index 00000000000..63c4fd7e06a --- /dev/null +++ b/tests/test_report.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission + +JSON scan report collector/assembler (lib/utils/api.py), shared by the REST API +endpoint /scan//data and the CLI --report-json writer. + +The whole point of the feature is that both produce the SAME structure, so these +tests pin the shared contract: the per-content_type merge (partial -> complete), +the assembled {success, data:[{status,type,type_name,value}], error} shape, the +partRun fallback for untyped output, and the meta-wrapped file written to disk. +A regression here is a divergence between the API and the report - the exact bug +this design exists to prevent. +""" + +import io +import json +import os +import sys +import tempfile +import unittest + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from _testutils import bootstrap +bootstrap() + +import lib.utils.api as api +from lib.core.data import conf, kb +from lib.core.enums import CONTENT_TYPE, CONTENT_STATUS + + +class _CollectorCase(unittest.TestCase): + def setUp(self): + self.c = api.setupReportCollector() + self._saved_partRun = kb.get("partRun") + + def tearDown(self): + kb.partRun = self._saved_partRun + try: + self.c.disconnect() + except Exception: + pass + + def _store(self, value, content_type, status=CONTENT_STATUS.COMPLETE): + api._storeData(self.c, api.REPORT_TASKID, value, status, content_type) + + +class TestAssembledShape(_CollectorCase): + def test_structure_and_typename(self): + self._store("MySQL >= 5.0.12", CONTENT_TYPE.DBMS_FINGERPRINT) + result = api._assembleData(self.c, api.REPORT_TASKID) + self.assertEqual(result["success"], True) + self.assertEqual(result["error"], []) + self.assertEqual(len(result["data"]), 1) + entry = result["data"][0] + self.assertEqual(sorted(entry.keys()), ["status", "type", "type_name", "value"]) + self.assertEqual(entry["type"], CONTENT_TYPE.DBMS_FINGERPRINT) + self.assertEqual(entry["type_name"], "DBMS_FINGERPRINT") # int -> readable name + self.assertEqual(entry["value"], "MySQL >= 5.0.12") + + def test_structured_values_preserved(self): + # dict / list / bool must survive as native JSON types (not stringified) - this is what + # makes the report machine-consumable, exactly like the API + self._store({"url": "http://h/?id=1", "data": None}, CONTENT_TYPE.TARGET) + self._store(["a", "b", "c"], CONTENT_TYPE.DBS) + self._store(True, CONTENT_TYPE.IS_DBA) + by_type = {d["type"]: d["value"] for d in api._assembleData(self.c, api.REPORT_TASKID)["data"]} + self.assertEqual(by_type[CONTENT_TYPE.TARGET], {"url": "http://h/?id=1", "data": None}) + self.assertEqual(by_type[CONTENT_TYPE.DBS], ["a", "b", "c"]) + self.assertIs(by_type[CONTENT_TYPE.IS_DBA], True) + + +class TestMergeSemantics(_CollectorCase): + def test_complete_replaces_partials(self): + # the API appends IN_PROGRESS chunks then a COMPLETE replaces them; final value is COMPLETE + self._store("roo", CONTENT_TYPE.CURRENT_USER, CONTENT_STATUS.IN_PROGRESS) + self._store("t@localhost", CONTENT_TYPE.CURRENT_USER, CONTENT_STATUS.COMPLETE) + data = api._assembleData(self.c, api.REPORT_TASKID)["data"] + self.assertEqual(len(data), 1) # one row, not two + self.assertEqual(data[0]["value"], "t@localhost") + self.assertEqual(data[0]["status"], CONTENT_STATUS.COMPLETE) + + def test_inprogress_chunks_accumulate(self): + self._store("foo", CONTENT_TYPE.BANNER, CONTENT_STATUS.IN_PROGRESS) + self._store("bar", CONTENT_TYPE.BANNER, CONTENT_STATUS.IN_PROGRESS) + data = api._assembleData(self.c, api.REPORT_TASKID)["data"] + self.assertEqual(data[0]["value"], "foobar") # appended + + +class TestPartRunFallback(_CollectorCase): + def test_untyped_output_tagged_via_partrun(self): + # untyped output during a part-run (e.g. the fingerprint line) is tagged by kb.partRun - + # this is how DBMS_FINGERPRINT is captured with no explicit content_type + kb.partRun = "getFingerprint" + self._store("back-end DBMS: MySQL >= 5.1", None) # content_type=None + data = api._assembleData(self.c, api.REPORT_TASKID)["data"] + self.assertEqual(len(data), 1) + self.assertEqual(data[0]["type"], CONTENT_TYPE.DBMS_FINGERPRINT) + self.assertEqual(data[0]["value"], "back-end DBMS: MySQL >= 5.1") + + def test_untyped_output_without_partrun_is_ignored(self): + kb.partRun = None + self._store("just a log line", None) + self.assertEqual(api._assembleData(self.c, api.REPORT_TASKID)["data"], []) + + +class TestSanitize(unittest.TestCase): + """The shared assembler strips internal plumbing (matchRatio/trueCode/falseCode/templatePayload/ + where/conf) from TECHNIQUES and restructures DUMP_TABLE (drop __infos__ wrapper + per-column + 'length'), so neither the API nor the report leaks consumer-irrelevant internals. Deterministic + (no run variance), unlike the live API-vs-report comparison.""" + + def test_techniques_internals_stripped_and_named(self): + injection = { + "place": "GET", "parameter": "id", "ptype": 1, "dbms": "MySQL", + "conf": {"string": "x", "regexp": None}, # internal -> must be dropped + "data": {"1": {"title": "boolean", "payload": "id=1 AND 1=1", "vector": "AND [INFERENCE]", + "comment": "", "where": 1, "matchRatio": 0.74, "trueCode": 200, + "falseCode": 200, "templatePayload": None}, + "6": {"title": "union", "payload": "id=1 UNION ...", "vector": "...", "comment": ""}}, + } + injection["ptype"] = 1 + injection["clause"] = [1, 8, 9] + injection["prefix"] = "" + injection["suffix"] = "" + original = json.loads(json.dumps(injection)) # deep copy to prove no mutation + out = api._sanitizeScanData(CONTENT_TYPE.TECHNIQUES, [injection])[0] + # detection/construction internals dropped + for field in ("conf", "ptype", "clause", "prefix", "suffix"): + self.assertNotIn(field, out) + # data is now an ordered LIST (not a map keyed by opaque ids), each entry named + self.assertIsInstance(out["data"], list) + self.assertEqual([t["technique"] for t in out["data"]], ["boolean-based blind", "UNION query"]) + first = out["data"][0] + self.assertEqual(sorted(first.keys()), ["comment", "payload", "technique", "title", "vector"]) + self.assertEqual(first["payload"], "id=1 AND 1=1") # consumer-relevant fields preserved + self.assertEqual(out["dbms"], "MySQL") + # input not mutated (operates on a copy - must not corrupt live kb.injections) + self.assertEqual(injection, original) + + def test_dump_table_restructured_and_unquoted(self): + value = { + "__infos__": {"db": "`master`", "table": "users", "count": 3}, + "id": {"length": 2, "values": ["1", "2", "3"]}, + "`name`": {"length": 9, "values": ["alice", " ", ""]}, # backtick id; " " is a DB NULL, "" is empty + } + out = api._sanitizeScanData(CONTENT_TYPE.DUMP_TABLE, value) + self.assertEqual(sorted(out.keys()), ["columns", "count", "db", "table"]) + self.assertNotIn("__infos__", out) + self.assertEqual(out["db"], "master") # quoting stripped (context-free) + self.assertEqual(out["table"], "users") + self.assertEqual(out["count"], 3) + # columns flattened to value lists (no 'length'), identifiers unquoted + self.assertEqual(out["columns"]["id"], ["1", "2", "3"]) + self.assertNotIn("`name`", out["columns"]) + # DB NULL (" ") -> JSON null; genuine empty string ("") preserved + self.assertEqual(out["columns"]["name"], ["alice", None, ""]) + + def test_schema_listing_identifiers_cleaned(self): + # TABLES/COLUMNS/SCHEMA/COUNT must have their identifiers unquoted too (consistency with + # DUMP_TABLE) - a regression here is the exact "X cleaned but Y not" inconsistency to avoid + tables = api._sanitizeScanData(CONTENT_TYPE.TABLES, {"`master`": ["users", "`order`"]}) + self.assertEqual(tables, {"master": ["users", "order"]}) + columns = api._sanitizeScanData(CONTENT_TYPE.COLUMNS, + {"`master`": {"users": {"id": "int", "`name`": "varchar(500)"}}}) + self.assertEqual(columns, {"master": {"users": {"id": "int", "name": "varchar(500)"}}}) + schema = api._sanitizeScanData(CONTENT_TYPE.SCHEMA, {"sys": {"w": {"`events`": "varchar(128)"}}}) + self.assertEqual(schema, {"sys": {"w": {"events": "varchar(128)"}}}) + count = api._sanitizeScanData(CONTENT_TYPE.COUNT, {"`master`": {"5": ["users"]}}) + self.assertEqual(count, {"master": {"5": ["users"]}}) + + def test_identifier_unquoting_is_context_free(self): + # all DBMS quote styles handled without Backend context (so CLI and API server agree) + self.assertEqual(api._cleanIdentifier("`tbl`"), "tbl") # MySQL + self.assertEqual(api._cleanIdentifier('"tbl"'), "tbl") # PostgreSQL/Oracle + self.assertEqual(api._cleanIdentifier("[tbl]"), "tbl") # MSSQL + self.assertEqual(api._cleanIdentifier("plain"), "plain") + + def test_other_types_pass_through(self): + # non-TECHNIQUES/DUMP_TABLE values are returned unchanged + self.assertEqual(api._sanitizeScanData(CONTENT_TYPE.CURRENT_USER, "root@%"), "root@%") + self.assertEqual(api._sanitizeScanData(CONTENT_TYPE.DBS, ["a", "b"]), ["a", "b"]) + self.assertIs(api._sanitizeScanData(CONTENT_TYPE.IS_DBA, True), True) + + +class TestErrors(_CollectorCase): + def test_errors_captured(self): + self.c.execute("INSERT INTO errors VALUES(NULL, ?, ?)", (api.REPORT_TASKID, "something failed")) + result = api._assembleData(self.c, api.REPORT_TASKID) + self.assertEqual(result["error"], ["something failed"]) + + +class TestWriteReportJson(_CollectorCase): + def test_file_is_valid_json_with_meta(self): + self._store("admin", CONTENT_TYPE.CURRENT_USER) + saved_url = conf.get("url") + conf.url = "http://target/?id=1" + fd, path = tempfile.mkstemp(suffix=".json") + os.close(fd) + try: + api.writeReportJson(self.c, path) + with io.open(path, encoding="utf-8") as f: # explicit UTF-8 + closed handle (no ResourceWarning, no cp1252 on Windows) + loaded = json.load(f) + # core shape == API /scan//data, plus a meta wrapper + self.assertEqual(sorted(loaded.keys()), ["data", "error", "meta", "success"]) + self.assertEqual(loaded["data"][0]["value"], "admin") + self.assertEqual(loaded["data"][0]["type_name"], "CURRENT_USER") + self.assertEqual(loaded["meta"]["url"], "http://target/?id=1") + self.assertEqual(loaded["meta"]["api_version"], 2) # MAJOR-only integer, for compatibility checks + self.assertIn("sqlmap_version", loaded["meta"]) + self.assertIn("timestamp", loaded["meta"]) + finally: + conf.url = saved_url + os.remove(path) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/test_unpickle_security.py b/tests/test_unpickle_security.py new file mode 100644 index 00000000000..a3cf63a2e7b --- /dev/null +++ b/tests/test_unpickle_security.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission + +Locks the RestrictedUnpickler security control (lib/core/patch.py, installed over +pickle.loads by dirtyPatches()). sqlmap deserializes pickled blobs out of its own +session DB / cache, so the unpickler is an ALLOWLIST: only safe builtin data types +and sqlmap's own (lib/plugins/thirdparty) classes may be reconstructed. + +Two directions, both of which must keep holding: + - LEGIT round-trips sqlmap actually relies on (AttribDict, BigArray, nested + builtins, and - the easy-to-regress one - bytes under PICKLE_PROTOCOL=2, which + emits a _codecs.encode global) must survive base64pickle -> base64unpickle. + - MALICIOUS / exotic globals (eval, os.system, subprocess.Popen, importlib, + operator.attrgetter, and even the non-whitelisted _codecs.lookup) must be + REJECTED at find_class time, before the object is ever built. + +A regression in either direction is a security or a data-loss bug, hence the test. +""" + +import os +import pickle +import subprocess +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from _testutils import bootstrap +bootstrap() # installs dirtyPatches(), i.e. the RestrictedUnpickler over pickle.loads + +from lib.core.bigarray import BigArray +from lib.core.convert import base64pickle, base64unpickle, encodeBase64 +from lib.core.datatype import AttribDict +from lib.core.settings import PICKLE_PROTOCOL + + +class _EvilReduce(object): + """On unpickling, __reduce__ asks the loader to resolve (and would call) an arbitrary global.""" + def __init__(self, func, args): + self._func = func + self._args = args + + def __reduce__(self): + return (self._func, self._args) + + +def _payload(func, *args): + # built with the REAL pickler (only pickle.loads is restricted, not dumps); base64 to mirror + # exactly what base64unpickle() consumes from sqlmap's session store + return encodeBase64(pickle.dumps(_EvilReduce(func, args), PICKLE_PROTOCOL), binary=False) + + +class TestUnpicklerIsInstalled(unittest.TestCase): + def test_patch_active(self): + # if this is False the whole allowlist is bypassed and the negative tests would pass vacuously + self.assertTrue(getattr(pickle, "_patched", False)) + + +class TestLegitRoundTrips(unittest.TestCase): + def _roundtrip(self, value): + return base64unpickle(base64pickle(value)) + + def test_nested_builtins(self): + value = {"a": [1, 2.5, True, None, complex(1, 2)], "b": (u"x", b"y"), "c": {3, 4}, "d": frozenset([5])} + self.assertEqual(self._roundtrip(value), value) + + def test_bytes_protocol2(self): + # protocol-2 pickling of bytes on Python 3 emits a _codecs.encode global; this is the + # exact case the allowlist explicitly permits, and the one most likely to silently break + for value in (b"", b"\x00\x01\x02binary\xff", bytearray(b"abc")): + self.assertEqual(self._roundtrip(value), value) + + def test_attribdict(self): + value = AttribDict() + value.foo = "bar" + value.nested = {"k": [1, 2]} + restored = self._roundtrip(value) + self.assertIsInstance(restored, AttribDict) + self.assertEqual(restored.foo, "bar") + self.assertEqual(restored.nested, {"k": [1, 2]}) + + def test_bigarray(self): + restored = self._roundtrip(BigArray([1, 2, 3])) + self.assertIsInstance(restored, BigArray) + self.assertEqual(list(restored), [1, 2, 3]) + + +class TestMaliciousRejected(unittest.TestCase): + def _assert_blocked(self, payload): + # find_class() raises ValueError; base64unpickle only swallows TypeError, so it propagates + self.assertRaises(ValueError, base64unpickle, payload) + + def test_dangerous_builtins(self): + # builtins are allowed ONLY for the safe data-type subset; callables must be refused + for func in (eval, getattr, __import__): + self._assert_blocked(_payload(func, "1+1") if func is eval else _payload(func, "x")) + + def test_os_system(self): + self._assert_blocked(_payload(os.system, "echo pwned")) + + def test_subprocess_popen(self): + self._assert_blocked(_payload(subprocess.Popen, "echo pwned")) + + def test_importlib(self): + import importlib + self._assert_blocked(_payload(importlib.import_module, "os")) + + def test_operator_attrgetter(self): + import operator + self._assert_blocked(_payload(operator.attrgetter, "system")) + + def test_codecs_lookup_not_whitelisted(self): + # only _codecs.encode is allowed (for the bytes round-trip); every other _codecs name stays blocked + import codecs + self._assert_blocked(_payload(codecs.lookup, "utf-8")) + + +if __name__ == "__main__": + unittest.main()