欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python实现SQL注入检测插件实例代码

程序员文章站 2022-10-15 08:19:03
扫描器需要实现的功能思维导图 爬虫编写思路 首先需要开发一个爬虫用于收集网站的链接,爬虫需要记录已经爬取的链接和待爬取的链接,并且去重,用 python 的se...

扫描器需要实现的功能思维导图

Python实现SQL注入检测插件实例代码

爬虫编写思路

首先需要开发一个爬虫用于收集网站的链接,爬虫需要记录已经爬取的链接和待爬取的链接,并且去重,用 python 的set()就可以解决,大概流程是:

  • 输入 url
  • 下载解析出 url
  • url 去重,判断是否为本站
  • 加入到待爬列表
  • 重复循环

sql 判断思路

  • 通过在 url 后面加上and %d=%d或者or not (%d>%d)
  • %d后面的数字是随机可变的
  • 然后搜索网页中特殊关键词,比如:

mysql 中是 sql syntax.*mysql
microsoft sql server 是 warning.*mssql_
microsoft access 是 microsoft access driver
oracle 是 oracle error
ibm db2 是 db2 sql error
sqlite 是 sqlite.exception
...

通过这些关键词就可以判断出所用的数据库

  • 还需要判断一下 waf 之类的东西,有这种东西就直接停止。简单的方法就是用特定的 url 访问,如果出现了像ip banned,fierwall之类的关键词,可以判断出是waf。具体的正则表达式是(?i)(\a|\b)ip\b.*\b(banned|blocked|bl(a|o)ck\s?list|firewall)
  • 开发准备展开目录

请安装这些库

pip install requests
pip install beautifulsoup4

实验环境是 linux,创建一个code目录,在其中创建一个work文件夹,将其作为工作目录

目录结构

/w8ay.py  // 项目启动主文件
/lib/core // 核心文件存放目录
/lib/core/config.py // 配置文件
/script   // 插件存放
/exp      // exp和poc存放

步骤

sql 检测脚本编写

dbms_errors = {
  'mysql': (r"sql syntax.*mysql", r"warning.*mysql_.*", r"valid mysql result", r"mysqlclient\."),
  "postgresql": (r"postgresql.*error", r"warning.*\wpg_.*", r"valid postgresql result", r"npgsql\."),
  "microsoft sql server": (r"driver.* sql[\-\_\ ]*server", r"ole db.* sql server", r"(\w|\a)sql server.*driver", r"warning.*mssql_.*", r"(\w|\a)sql server.*[0-9a-fa-f]{8}", r"(?s)exception.*\wsystem\.data\.sqlclient\.", r"(?s)exception.*\wroadhouse\.cms\."),
  "microsoft access": (r"microsoft access driver", r"jet database engine", r"access database engine"),
  "oracle": (r"\bora-[0-9][0-9][0-9][0-9]", r"oracle error", r"oracle.*driver", r"warning.*\woci_.*", r"warning.*\wora_.*"),
  "ibm db2": (r"cli driver.*db2", r"db2 sql error", r"\bdb2_\w+\("),
  "sqlite": (r"sqlite/jdbcdriver", r"sqlite.exception", r"system.data.sqlite.sqliteexception", r"warning.*sqlite_.*", r"warning.*sqlite3::", r"\[sqlite_error\]"),
  "sybase": (r"(?i)warning.*sybase.*", r"sybase message", r"sybase.*server message.*"),
}

通过正则表达式就可以判断出是哪个数据库了

for (dbms, regex) in ((dbms, regex) for dbms in dbms_errors for regex in dbms_errors[dbms]):
  if (re.search(regex,_content)):
    return true

下面是我们测试语句的payload

boolean_tests = (" and %d=%d", " or not (%d=%d)")

用报错语句返回正确的内容和错误的内容进行对比

for test_payload in boolean_tests:
  # right page
  randint = random.randint(1, 255)
  _url = url + test_payload % (randint, randint)
  content["true"] = downloader.get(_url)
  _url = url + test_payload % (randint, randint + 1)
  content["false"] = downloader.get(_url)
  if content["origin"] == content["true"] != content["false"]:
    return "sql found: %" % url

这句

content["origin"] == content["true"] != content["false"]

意思就是当原始网页等于正确的网页不等于错误的网页内容时,就可以判定这个地址存在注入漏洞

完整代码:

import re, random
from lib.core import download
def sqlcheck(url):
  if (not url.find("?")): # pseudo-static page
    return false;
  downloader = download.downloader()
  boolean_tests = (" and %d=%d", " or not (%d=%d)")
  dbms_errors = {
    # regular expressions used for dbms recognition based on error message response
    "mysql": (r"sql syntax.*mysql", r"warning.*mysql_.*", r"valid mysql result", r"mysqlclient\."),
    "postgresql": (r"postgresql.*error", r"warning.*\wpg_.*", r"valid postgresql result", r"npgsql\."),
    "microsoft sql server": (r"driver.* sql[\-\_\ ]*server", r"ole db.* sql server", r"(\w|\a)sql server.*driver", r"warning.*mssql_.*", r"(\w|\a)sql server.*[0-9a-fa-f]{8}", r"(?s)exception.*\wsystem\.data\.sqlclient\.", r"(?s)exception.*\wroadhouse\.cms\."),
    "microsoft access": (r"microsoft access driver", r"jet database engine", r"access database engine"),
    "oracle": (r"\bora-[0-9][0-9][0-9][0-9]", r"oracle error", r"oracle.*driver", r"warning.*\woci_.*", r"warning.*\wora_.*"),
    "ibm db2": (r"cli driver.*db2", r"db2 sql error", r"\bdb2_\w+\("),
    "sqlite": (r"sqlite/jdbcdriver", r"sqlite.exception", r"system.data.sqlite.sqliteexception", r"warning.*sqlite_.*", r"warning.*sqlite3::", r"\[sqlite_error\]"),
    "sybase": (r"(?i)warning.*sybase.*", r"sybase message", r"sybase.*server message.*"),
  }
  _url = url + "%29%28%22%27"
  _content = downloader.get(_url)
  for (dbms, regex) in ((dbms, regex) for dbms in dbms_errors for regex in dbms_errors[dbms]):
    if (re.search(regex,_content)):
      return true
  content = {}
  content['origin'] = downloader.get(_url)
  for test_payload in boolean_tests:
    # right page
    randint = random.randint(1, 255)
    _url = url + test_payload % (randint, randint)
    content["true"] = downloader.get(_url)
    _url = url + test_payload % (randint, randint + 1)
    content["false"] = downloader.get(_url)
    if content["origin"] == content["true"] != content["false"]:
      return "sql found: %" % url

将这个文件命名为sqlcheck.py,放在/script目录中。代码的第 4 行作用是查找 url 是否包含?,如果不包含,比方说伪静态页面,可能不太好注入,因此需要过滤掉

爬虫的编写

爬虫的思路上面讲过了,先完成 url 的管理,我们单独将它作为一个类,文件保存在/lib/core/urlmanager.py

#-*- coding:utf-8 -*-

class urlmanager(object):
  def __init__(self):
    self.new_urls = set()
    self.old_urls = set()
    
  def add_new_url(self, url):
    if url is none:
      return
    if url not in self.new_urls and url not in self.old_urls:
      self.new_urls.add(url)
   
  def add_new_urls(self, urls):
    if urls is none or len(urls) == 0:
      return
    for url in urls:
      self.add_new_url(url)
    
  def has_new_url(self):
    return len(self.new_urls) != 0
   
  def get_new_url(self):
    new_url = self.new_urls.pop()
    self.old_urls.add(new_url)
    return new_url

为了方便,我们也将下载功能单独作为一个类使用,文件保存在lib/core/downloader.py

#-*- coding:utf-8 -*-
import requests

class downloader(object):
  def get(self, url):
    r = requests.get(url, timeout = 10)
    if r.status_code != 200:
      return none
    _str = r.text
    return _str
  
  def post(self, url, data):
    r = requests.post(url, data)
    _str = r.text
    return _str
  
  def download(self, url, htmls):
    if url is none:
      return none
    _str = {}
    _str["url"] = url
    try:
      r = requests.get(url, timeout = 10)
      if r.status_code != 200:
        return none
      _str["html"] = r.text
    except exception as e:
      return none
    htmls.append(_str)

特别说明,因为我们要写的爬虫是多线程的,所以类中有个download方法是专门为多线程下载专用的

在lib/core/spider.py中编写爬虫

#-*- coding:utf-8 -*-

from lib.core import downloader, urlmanager
import threading
from urllib import parse
from urllib.parse import urljoin
from bs4 import beautifulsoup

class spidermain(object):
  def __init__(self, root, threadnum):
    self.urls = urlmanager.urlmanager()
    self.download = downloader.downloader()
    self.root = root
    self.threadnum = threadnum
  
  def _judge(self, domain, url):
    if (url.find(domain) != -1):
      return true
    return false
  
  def _parse(self, page_url, content):
    if content is none:
      return
    soup = beautifulsoup(content, 'html.parser')
    _news = self._get_new_urls(page_url, soup)
    return _news
    
  def _get_new_urls(self, page_url, soup):
    new_urls = set()
    links = soup.find_all('a')
    for link in links:
      new_url = link.get('href')
      new_full_url = urljoin(page_url, new_url)
      if (self._judge(self.root, new_full_url)):
        new_urls.add(new_full_url)
    return new_urls
    
  def craw(self):
    self.urls.add_new_url(self.root)
    while self.urls.has_new_url():
      _content = []
      th = []
      for i in list(range(self.threadnum)):
        if self.urls.has_new_url() is false:
          break
        new_url = self.urls.get_new_url()
        
        ## sql check
        try:
          if (sqlcheck.sqlcheck(new_url)):
            print("url:%s sqlcheck is valueable" % new_url)
        except:
          pass
            
        print("craw:" + new_url)
        t = threading.thread(target = self.download.download, args = (new_url, _content))
        t.start()
        th.append(t)
      for t in th:
        t.join()
      for _str in _content:
        if _str is none:
          continue
        new_urls = self._parse(new_url, _str["html"])
        self.urls.add_new_urls(new_urls)

爬虫通过调用craw()方法传入一个网址进行爬行,然后采用多线程的方法下载待爬行的网站,下载之后的源码用_parse方法调用beautifulsoup进行解析,之后将解析出的 url 列表丢入 url 管理器,这样循环,最后只要爬完了网页,爬虫就会停止

threading库可以自定义需要开启的线程数,线程开启后,每个线程会得到一个 url 进行下载,然后线程会阻塞,阻塞完毕后线程放行

爬虫和 sql 检查的结合

在lib/core/spider.py文件引用一下from script import sqlcheck,在craw()方法中,取出新的 url 地方调用一下

##sql check
try:
  if(sqlcheck.sqlcheck(new_url)):
    print("url:%s sqlcheck is valueable"%new_url)
except:
  pass

用try检测可能出现的异常,绕过它,在文件w8ay.py中进行测试

#-*- coding:utf-8 -*-
'''
name: w8ayscan
author: mathor
copyright (c) 2019
'''
import sys
from lib.core.spider import spidermain
def main():
  root = "https://wmathor.com"
  threadnum = 50
  w8 = spidermain(root, threadnum)
  w8.craw()
 
if __name__ == "__main__":
  main()

很重要的一点!为了使得lib和script文件夹中的.py文件可以可以被认作是模块,请在lib、lib/core和script文件夹中创建__init__.py文件,文件中什么都不需要写

总结

sql 注入检测通过一些payload使页面出错,判断原始网页,正确网页,错误网页即可检测出是否存在 sql 注入漏洞
通过匹配出 sql 报错出来的信息,可以正则判断所用的数据库

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。