Python实现SQL注入检测插件实例代码
扫描器需要实现的功能思维导图
爬虫编写思路
首先需要开发一个爬虫用于收集网站的链接,爬虫需要记录已经爬取的链接和待爬取的链接,并且去重,用 python 的set()就可以解决,大概流程是:
- 输入 url
- 下载解析出 url
- url 去重,判断是否为本站
- 加入到待爬列表
- 重复循环
sql 判断思路
- 通过在 url 后面加上and %d=%d或者or not (%d>%d)
- %d后面的数字是随机可变的
- 然后搜索网页中特殊关键词,比如:
mysql 中是 sql syntax.*mysql
microsoft sql server 是 warning.*mssql_
microsoft access 是 microsoft access driver
oracle 是 oracle error
ibm db2 是 db2 sql error
sqlite 是 sqlite.exception
...
通过这些关键词就可以判断出所用的数据库
- 还需要判断一下 waf 之类的东西,有这种东西就直接停止。简单的方法就是用特定的 url 访问,如果出现了像ip banned,fierwall之类的关键词,可以判断出是waf。具体的正则表达式是(?i)(\a|\b)ip\b.*\b(banned|blocked|bl(a|o)ck\s?list|firewall)
- 开发准备展开目录
请安装这些库
pip install requests pip install beautifulsoup4
实验环境是 linux,创建一个code目录,在其中创建一个work文件夹,将其作为工作目录
目录结构
/w8ay.py // 项目启动主文件
/lib/core // 核心文件存放目录
/lib/core/config.py // 配置文件
/script // 插件存放
/exp // exp和poc存放
步骤
sql 检测脚本编写
dbms_errors = { 'mysql': (r"sql syntax.*mysql", r"warning.*mysql_.*", r"valid mysql result", r"mysqlclient\."), "postgresql": (r"postgresql.*error", r"warning.*\wpg_.*", r"valid postgresql result", r"npgsql\."), "microsoft sql server": (r"driver.* sql[\-\_\ ]*server", r"ole db.* sql server", r"(\w|\a)sql server.*driver", r"warning.*mssql_.*", r"(\w|\a)sql server.*[0-9a-fa-f]{8}", r"(?s)exception.*\wsystem\.data\.sqlclient\.", r"(?s)exception.*\wroadhouse\.cms\."), "microsoft access": (r"microsoft access driver", r"jet database engine", r"access database engine"), "oracle": (r"\bora-[0-9][0-9][0-9][0-9]", r"oracle error", r"oracle.*driver", r"warning.*\woci_.*", r"warning.*\wora_.*"), "ibm db2": (r"cli driver.*db2", r"db2 sql error", r"\bdb2_\w+\("), "sqlite": (r"sqlite/jdbcdriver", r"sqlite.exception", r"system.data.sqlite.sqliteexception", r"warning.*sqlite_.*", r"warning.*sqlite3::", r"\[sqlite_error\]"), "sybase": (r"(?i)warning.*sybase.*", r"sybase message", r"sybase.*server message.*"), }
通过正则表达式就可以判断出是哪个数据库了
for (dbms, regex) in ((dbms, regex) for dbms in dbms_errors for regex in dbms_errors[dbms]): if (re.search(regex,_content)): return true
下面是我们测试语句的payload
boolean_tests = (" and %d=%d", " or not (%d=%d)")
用报错语句返回正确的内容和错误的内容进行对比
for test_payload in boolean_tests: # right page randint = random.randint(1, 255) _url = url + test_payload % (randint, randint) content["true"] = downloader.get(_url) _url = url + test_payload % (randint, randint + 1) content["false"] = downloader.get(_url) if content["origin"] == content["true"] != content["false"]: return "sql found: %" % url
这句
content["origin"] == content["true"] != content["false"]
意思就是当原始网页等于正确的网页不等于错误的网页内容时,就可以判定这个地址存在注入漏洞
完整代码:
import re, random from lib.core import download def sqlcheck(url): if (not url.find("?")): # pseudo-static page return false; downloader = download.downloader() boolean_tests = (" and %d=%d", " or not (%d=%d)") dbms_errors = { # regular expressions used for dbms recognition based on error message response "mysql": (r"sql syntax.*mysql", r"warning.*mysql_.*", r"valid mysql result", r"mysqlclient\."), "postgresql": (r"postgresql.*error", r"warning.*\wpg_.*", r"valid postgresql result", r"npgsql\."), "microsoft sql server": (r"driver.* sql[\-\_\ ]*server", r"ole db.* sql server", r"(\w|\a)sql server.*driver", r"warning.*mssql_.*", r"(\w|\a)sql server.*[0-9a-fa-f]{8}", r"(?s)exception.*\wsystem\.data\.sqlclient\.", r"(?s)exception.*\wroadhouse\.cms\."), "microsoft access": (r"microsoft access driver", r"jet database engine", r"access database engine"), "oracle": (r"\bora-[0-9][0-9][0-9][0-9]", r"oracle error", r"oracle.*driver", r"warning.*\woci_.*", r"warning.*\wora_.*"), "ibm db2": (r"cli driver.*db2", r"db2 sql error", r"\bdb2_\w+\("), "sqlite": (r"sqlite/jdbcdriver", r"sqlite.exception", r"system.data.sqlite.sqliteexception", r"warning.*sqlite_.*", r"warning.*sqlite3::", r"\[sqlite_error\]"), "sybase": (r"(?i)warning.*sybase.*", r"sybase message", r"sybase.*server message.*"), } _url = url + "%29%28%22%27" _content = downloader.get(_url) for (dbms, regex) in ((dbms, regex) for dbms in dbms_errors for regex in dbms_errors[dbms]): if (re.search(regex,_content)): return true content = {} content['origin'] = downloader.get(_url) for test_payload in boolean_tests: # right page randint = random.randint(1, 255) _url = url + test_payload % (randint, randint) content["true"] = downloader.get(_url) _url = url + test_payload % (randint, randint + 1) content["false"] = downloader.get(_url) if content["origin"] == content["true"] != content["false"]: return "sql found: %" % url
将这个文件命名为sqlcheck.py,放在/script目录中。代码的第 4 行作用是查找 url 是否包含?,如果不包含,比方说伪静态页面,可能不太好注入,因此需要过滤掉
爬虫的编写
爬虫的思路上面讲过了,先完成 url 的管理,我们单独将它作为一个类,文件保存在/lib/core/urlmanager.py
#-*- coding:utf-8 -*- class urlmanager(object): def __init__(self): self.new_urls = set() self.old_urls = set() def add_new_url(self, url): if url is none: return if url not in self.new_urls and url not in self.old_urls: self.new_urls.add(url) def add_new_urls(self, urls): if urls is none or len(urls) == 0: return for url in urls: self.add_new_url(url) def has_new_url(self): return len(self.new_urls) != 0 def get_new_url(self): new_url = self.new_urls.pop() self.old_urls.add(new_url) return new_url
为了方便,我们也将下载功能单独作为一个类使用,文件保存在lib/core/downloader.py
#-*- coding:utf-8 -*- import requests class downloader(object): def get(self, url): r = requests.get(url, timeout = 10) if r.status_code != 200: return none _str = r.text return _str def post(self, url, data): r = requests.post(url, data) _str = r.text return _str def download(self, url, htmls): if url is none: return none _str = {} _str["url"] = url try: r = requests.get(url, timeout = 10) if r.status_code != 200: return none _str["html"] = r.text except exception as e: return none htmls.append(_str)
特别说明,因为我们要写的爬虫是多线程的,所以类中有个download方法是专门为多线程下载专用的
在lib/core/spider.py中编写爬虫
#-*- coding:utf-8 -*- from lib.core import downloader, urlmanager import threading from urllib import parse from urllib.parse import urljoin from bs4 import beautifulsoup class spidermain(object): def __init__(self, root, threadnum): self.urls = urlmanager.urlmanager() self.download = downloader.downloader() self.root = root self.threadnum = threadnum def _judge(self, domain, url): if (url.find(domain) != -1): return true return false def _parse(self, page_url, content): if content is none: return soup = beautifulsoup(content, 'html.parser') _news = self._get_new_urls(page_url, soup) return _news def _get_new_urls(self, page_url, soup): new_urls = set() links = soup.find_all('a') for link in links: new_url = link.get('href') new_full_url = urljoin(page_url, new_url) if (self._judge(self.root, new_full_url)): new_urls.add(new_full_url) return new_urls def craw(self): self.urls.add_new_url(self.root) while self.urls.has_new_url(): _content = [] th = [] for i in list(range(self.threadnum)): if self.urls.has_new_url() is false: break new_url = self.urls.get_new_url() ## sql check try: if (sqlcheck.sqlcheck(new_url)): print("url:%s sqlcheck is valueable" % new_url) except: pass print("craw:" + new_url) t = threading.thread(target = self.download.download, args = (new_url, _content)) t.start() th.append(t) for t in th: t.join() for _str in _content: if _str is none: continue new_urls = self._parse(new_url, _str["html"]) self.urls.add_new_urls(new_urls)
爬虫通过调用craw()方法传入一个网址进行爬行,然后采用多线程的方法下载待爬行的网站,下载之后的源码用_parse方法调用beautifulsoup进行解析,之后将解析出的 url 列表丢入 url 管理器,这样循环,最后只要爬完了网页,爬虫就会停止
threading库可以自定义需要开启的线程数,线程开启后,每个线程会得到一个 url 进行下载,然后线程会阻塞,阻塞完毕后线程放行
爬虫和 sql 检查的结合
在lib/core/spider.py文件引用一下from script import sqlcheck,在craw()方法中,取出新的 url 地方调用一下
##sql check try: if(sqlcheck.sqlcheck(new_url)): print("url:%s sqlcheck is valueable"%new_url) except: pass
用try检测可能出现的异常,绕过它,在文件w8ay.py中进行测试
#-*- coding:utf-8 -*- ''' name: w8ayscan author: mathor copyright (c) 2019 ''' import sys from lib.core.spider import spidermain def main(): root = "https://wmathor.com" threadnum = 50 w8 = spidermain(root, threadnum) w8.craw() if __name__ == "__main__": main()
很重要的一点!为了使得lib和script文件夹中的.py文件可以可以被认作是模块,请在lib、lib/core和script文件夹中创建__init__.py文件,文件中什么都不需要写
总结
sql 注入检测通过一些payload使页面出错,判断原始网页,正确网页,错误网页即可检测出是否存在 sql 注入漏洞
通过匹配出 sql 报错出来的信息,可以正则判断所用的数据库
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。
上一篇: oracle数据库冷备份的方法