欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python如何编写类似nmap的扫描工具

程序员文章站 2022-08-07 18:01:17
本文主要是利用scapy包编写了一个简易扫描工具,支持arp、icmp、tcp、udp发现扫描,支持tcp syn、udp端口扫描,如下:usage: python scan.py <-p pi...

本文主要是利用scapy包编写了一个简易扫描工具,支持arp、icmp、tcp、udp发现扫描,支持tcp syn、udp端口扫描,如下:

usage: python scan.py <-p ping扫描类型> <-s 端口发现类型> [-t target] [--port ports]

简单扫描工具,可以进行存活扫描及端口扫描.
存活扫描包括:arp扫描、icmp扫描、tcp扫描、udp扫描.
端口扫描包括:tcp syn扫描、tcp ack扫描、tcp fin扫描.

optional arguments:
 -h, --help     show this help message and exit
 -v, --version   show program's version number and exit

target group:
 用于设置ip、port参数

 --target target  target为ip或ip段,如192.168.1.1,192.168.1.x,或1
           92.168.1.1-254
 --port port    port为待扫描的端口,如21,80,...或21-80

ping group:
 用于开启存活扫描相关选项

 -p         开启存活扫描
 --arp       启动arp扫描
 --icmp       启动icmp扫描
 --tcp       启动tcp扫描
 --udp       启动udp扫描

port scan group:
 用于开启端口扫描相关选项

 -s         开启端口扫描
 --syn       开启syn扫描
 --ack       开启ack扫描
 --fin       开启fin扫描
 --uport      开启udp端口扫描

utils group:
 用于开启扫描过程中的一些实用选项

 --timeout timeout 设置发包超时时间,默认0.5秒
 --retry retry   设置发包重试次数,默认不重试

以上做为说明,祝好运!

一、发现扫描

1.首先进行arp扫描

python scan.py -p --target 192.168.1.1-254 --arp
[+]ip: 192.168.1.1 => mac: 14:75:90:xx:xx:xx
[+]ip: 192.168.1.111 => mac: c6:36:55:xx:xx:xx
[+]总共耗时9.84091806412秒.

通过retry参数增加发包尝试次数,如下:

python scan.py -p --target 192.168.1.1-254 --arp --retry 2
[+]ip: 192.168.1.1 => mac: 14:75:90:xx:xx:xx
[+]ip: 192.168.1.111 => mac: c6:36:55:xx:xx:xx
[+]ip: 192.168.1.102 => mac: 58:1f:28:xx:xx:xx
[+]ip: 192.168.1.114 => mac: 6c:8d:c1:xx:xx:xx
[+]ip: 192.168.1.103 => mac: 84:38:38:xx:xx:xx
[+]总共耗时20.429942131秒.

2.使用icmp扫描,若没有指定任何扫描类型参数,默认会启用icmp扫描,如下:

python scan.py -p --target 192.168.1.1-254
[+]没有指定任何ping扫描方式,默认选择icmp扫描
[+]ip:主机192.168.1.1 echo-reply.
[+]ip:主机192.168.1.111 echo-reply.
[+]总共耗时10.7177450657秒.

通过timeout参数,设置较长的超时,可以防止网络状况不好造成的丢包,如下:

python scan.py -p --target 192.168.1.1-254 --timeout 2
[+]没有指定任何ping扫描方式,默认选择icmp扫描
[+]ip:主机192.168.1.1 echo-reply.
[+]ip:主机192.168.1.111 echo-reply.
[+]ip:主机192.168.1.114 echo-reply.
[+]总共耗时10.7566649914秒.

3.使用tcp扫描

python scan.py -p --target 192.168.1.100-120 --tcp --timeout 1
[+]请稍等,时间较长!
[!]扫描... 192.168.1.100
[!]扫描... 192.168.1.101
[!]扫描... 192.168.1.102
[!]扫描... 192.168.1.103
[!]扫描... 192.168.1.104
[!]扫描... 192.168.1.105
[!]扫描... 192.168.1.106
[!]扫描... 192.168.1.107
[!]扫描... 192.168.1.108
[!]扫描... 192.168.1.109
[!]扫描... 192.168.1.110
[!]扫描... 192.168.1.111
[!]扫描... 192.168.1.112
[!]扫描... 192.168.1.113
[!]扫描... 192.168.1.114
[!]扫描... 192.168.1.115
[!]扫描... 192.168.1.116
[!]扫描... 192.168.1.117
[!]扫描... 192.168.1.118
[!]扫描... 192.168.1.119
[!]扫描... 192.168.1.120
[+]正在处理扫描信息.
====================
[+]主机 192.168.1.102 在线.
[+]主机 192.168.1.103 在线.
[+]主机 192.168.1.111 在线.
[+]主机 192.168.1.114 在线.
[+]总共耗时16.4359779358秒.

4.使用udp扫描

python scan.py -p --target 192.168.1.100-120 --udp --retry 3
[+]请稍等,时间较长!
[!]扫描... 192.168.1.100
[!]扫描... 192.168.1.101
[!]扫描... 192.168.1.102
[!]扫描... 192.168.1.103
[!]扫描... 192.168.1.104
[!]扫描... 192.168.1.105
[!]扫描... 192.168.1.106
[!]扫描... 192.168.1.107
[!]扫描... 192.168.1.108
[!]扫描... 192.168.1.109
[!]扫描... 192.168.1.110
[!]扫描... 192.168.1.111
[!]扫描... 192.168.1.112
[!]扫描... 192.168.1.113
[!]扫描... 192.168.1.114
[!]扫描... 192.168.1.115
[!]扫描... 192.168.1.116
[!]扫描... 192.168.1.117
[!]扫描... 192.168.1.118
[!]扫描... 192.168.1.119
[!]扫描... 192.168.1.120
[+]正在处理扫描信息.


====================
[+]主机 192.168.1.102 在线.
[+]主机 192.168.1.103 在线.
[+]主机 192.168.1.111 在线.
[+]主机 192.168.1.114 在线.
[+]总共耗时33.5198891163秒.

二、端口扫描

1、tcp syn端口扫描,不设置端口参数,则默认扫描1-1024端口

python scan.py --target 192.168.1.110-115 -s --syn
[+]没有指定任何扫描端口,默认扫描1-1024
[!]扫描... 192.168.1.110
[!]扫描... 192.168.1.111
[!]扫描... 192.168.1.112
[!]扫描... 192.168.1.113
[!]扫描... 192.168.1.114
[!]扫描... 192.168.1.115
[+]正在处理扫描信息.


====================
[+]主机 192.168.1.111 开放的tcp端口有:[80]
[+]总共耗时165.125555992秒.

扫描指定端口:

python scan.py --target 192.168.1.1-254 -s --syn --port 80 --timeout 1
[!]扫描... 192.168.1.1
[!]扫描... 192.168.1.2
[!]扫描... 192.168.1.3
[!]扫描... 192.168.1.4
...
[!]扫描... 192.168.1.253
[!]扫描... 192.168.1.254
[+]正在处理扫描信息.


====================
[+]主机 192.168.1.111 开放的tcp端口有:[80]
[+]主机 192.168.1.1 开放的tcp端口有:[80]
[+]总共耗时9.72222185135秒.

2、扫描udp端口

python scan.py --target 192.168.1.1 -s --uport --timeout 1
[+]没有指定任何扫描端口,默认扫描1-1024
[!]扫描... 192.168.1.1
[+]正在处理扫描信息.
====================
[+]主机 192.168.1.1 开放的udp端口有:[520]
[+]总共耗时27.4742250443秒.

也可同时进行发现扫描与端口扫描,如下:

python scan.py --target 192.168.1.1-254 -p --arp -s --syn --port 80 --timeout 1 --retry 2
[+]ip: 192.168.1.1 => mac: 14:75:90:xx:xx:xx
[+]ip: 192.168.1.102 => mac: 58:1f:28:xx:xx:xx
[+]ip: 192.168.1.114 => mac: 6c:8d:c1:xx:xx:xx
[+]ip: 192.168.1.103 => mac: 84:38:38:xx:xx:xx
[+]ip: 192.168.1.101 => mac: 5c:f7:e6:xx:xx:xx
[!]扫描... 192.168.1.1
[!]扫描... 192.168.1.2
...
[!]扫描... 192.168.1.253
[!]扫描... 192.168.1.254
[+]正在处理扫描信息.


====================
[+]主机 192.168.1.1 开放的tcp端口有:[80]
[+]主机 192.168.1.111 开放的tcp端口有:[80]
[+]总共耗时45.2775988579秒.

ok,最后附上源码:

import argparse
import re
import time
import threading
from scapy.all import *

import logging
logging.getlogger('scapy.runtime').setlevel(logging.error)


class discovery_scan(object):
  '''
  说明:用于发现扫描
  '''

  def __init__(self,args,timeout=0.5,retry=0):
    self.targets = parse_target(args)
    self.timeout = timeout
    self.retry = retry

  def arp_scan(self,pdst):
    #arp发现扫描
    ans = sr1(arp(pdst=pdst),timeout=self.timeout,retry=self.retry,verbose=false)
    if ans:
      if ans[arp].op == 2:  #操作码为2是is-at,是arp响应
        print '[+]ip: %s => mac: %s' % (pdst,ans[arp].hwsrc)

  def icmp_scan(self,dst):
    #icmp发现扫描
    ans = sr1(ip(dst=dst)/icmp(),timeout=self.timeout,retry=self.retry,verbose=false)
    if ans:
      if ans[icmp].type == 0: #icmp type为0表示是icmp echo-reply
        print '[+]ip:主机%s echo-reply.' % dst

  tcp_info = {}
  def tcp_scan(self,dst,port):
    #tcp syn,发送tcp syn包,有响应表示端口开放
    ans,unans = sr(ip(dst=dst)/tcp(sport=randshort(),dport=port,flags='s'),
            timeout=self.timeout,retry=self.retry,verbose=false)
    if ans.res:
      if ans.res[0][0][ip].dst not in discovery_scan.tcp_info:
        discovery_scan.tcp_info[ans.res[0][0][ip].dst] = true

  udp_info = {}
  def udp_scan(self,dst,port):
    #udp,发送udp包,有响应表示端口开放
    ans,uans = sr(ip(dst=dst)/udp(sport=randshort(),dport=port),
           timeout=self.timeout,retry=self.retry,verbose=false)
    if ans.res:
      if ans.res[0][0][ip].dst not in discovery_scan.udp_info:
        discovery_scan.udp_info[ans.res[0][0][ip].dst] = true

class port_scan(object):
  '''
  说明:用于进行端口扫描,判断端口是否开放
  '''
  def __init__(self,args,timeout=0.5,retry=0):
    self.targets = parse_target(args)
    self.timeout = timeout
    self.retry = retry

  syn_port_dict = {}
  def syn_port_scan(self,dst,port):
    #tcp syn端口扫描,若syn包返回携带syn、ack(即tcp.flags=18)标志的包,则表明此端口打开。
    ans,uans = sr(ip(dst=dst)/tcp(sport=randshort(),dport=port,flags='s'),
           timeout=self.timeout,retry=self.retry,verbose=false)
    if ans:
      first_respons_pkt = ans.res[0][1]
      if first_respons_pkt[tcp] and first_respons_pkt[tcp].flags == 18:
        if first_respons_pkt[ip].src not in port_scan.syn_port_dict:
          port_scan.syn_port_dict[first_respons_pkt[ip].src] = [first_respons_pkt[tcp].sport]
        else:
          port_scan.syn_port_dict[first_respons_pkt[ip].src].append(first_respons_pkt[tcp].sport)

  udp_port_dict = {}
  def udp_port_scan(self,dst,port):
    #udp端口扫描,若udp端口返回icmp port-unreachable,则表示端口打开。(排除某些主机对任何udp端口的探测都响应为icmp port-unrechable)
    ans,uans = sr(ip(dst=dst)/udp(sport=randshort(),dport=port),
           timeout=self.timeout, retry=self.retry, verbose=false)
    if ans.res and ans.res[0][1].haslayer(udperror):
      first_respons_pkt = ans.res[0][1]
      if first_respons_pkt[ip].src not in port_scan.udp_port_dict:
        port_scan.udp_port_dict[first_respons_pkt[ip].src] = [first_respons_pkt[udperror].dport]
      else:
        port_scan.udp_port_dict[first_respons_pkt[ip].src].append(first_respons_pkt[udperror].dport)

def parse_opt():
  '''
  @说明:通过argparse模块解析程序传入的参数
  @return:args
  '''
  usage = 'python %(prog)s <-p ping扫描类型> <-s 端口发现类型> [-t target] [--port ports]'
  description = '简单扫描工具,可以进行存活扫描及端口扫描.\n' \
         '存活扫描包括:arp扫描、icmp扫描、tcp扫描、udp扫描.\n' \
         '端口扫描包括:tcp syn扫描、tcp ack扫描、tcp fin扫描.'
  epilog = '以上做为说明,祝好运!'
  parser = argparse.argumentparser(usage=usage,description=description,epilog=epilog,version='v1.0')

  target_group = parser.add_argument_group('target group',description='用于设置ip、port参数')
  target_group.add_argument('--target',dest='target',action='store',
               help='target为ip或ip段,如192.168.1.1,192.168.1.x,或192.168.1.1-254')
  target_group.add_argument('--port',dest='port',action='store',
               help='port为待扫描的端口,如21,80,...或21-80')

  ping_group = parser.add_argument_group('ping group',description='用于开启存活扫描相关选项')
  ping_group.add_argument('-p',dest='ping',action='store_true',help='开启存活扫描')
  ping_group.add_argument('--arp',dest='arp',action='store_true',help='启动arp扫描')
  ping_group.add_argument('--icmp',dest='icmp',action='store_true',help='启动icmp扫描')
  ping_group.add_argument('--tcp',dest='tcp',action='store_true',help='启动tcp扫描')
  ping_group.add_argument('--udp',dest='udp',action='store_true',help='启动udp扫描')

  port_scan_group = parser.add_argument_group('port scan group',description='用于开启端口扫描相关选项')
  port_scan_group.add_argument('-s',dest='scan',action='store_true',help='开启端口扫描')
  port_scan_group.add_argument('--syn',dest='syn',action='store_true',help='开启syn扫描')
  port_scan_group.add_argument('--ack',dest='ack',action='store_true',help='开启ack扫描')
  port_scan_group.add_argument('--fin',dest='fin',action='store_true',help='开启fin扫描')
  port_scan_group.add_argument('--uport', dest='uport', action='store_true', help='开启udp端口扫描')

  utils_group = parser.add_argument_group('utils group',description='用于开启扫描过程中的一些实用选项')
  utils_group.add_argument('--timeout',dest='timeout',action='store',type=float,help='设置发包超时时间,默认0.5秒')
  utils_group.add_argument('--retry',dest='retry',action='store',type=int,help='设置发包重试次数,默认不重试')

  args = parser.parse_args()
  if not args.ping and not args.scan:
    print '[-]必须通过-p/-s选项开启一种扫描'
    print '\n'
    parser.print_help()
    exit(1)
  elif not args.target:
    print '[-]必须通过--target选项指定扫描的对象'
    print '\n'
    parser.print_help()
    exit(1)
  if args.ping:
    if not args.arp and not args.icmp and not args.tcp and not args.udp:
      args.icmp = true #若没有指定任何ping扫描方式,则默认选择icmp扫描
      print '[+]没有指定任何ping扫描方式,默认选择icmp扫描'
  if args.scan:
    if not args.syn and not args.ack and not args.fin and not args.uport:
      args.syn = true  #若没有指定任何端口扫描方式,则默认选择syn扫描
      print '[+]没有指定任何端口扫描方式,默认选择syn扫描'
    if not args.port:
      args.port = '1-1024'  #若没有指定任何扫描端口,则默认扫描1-1024
      print '[+]没有指定任何扫描端口,默认扫描1-1024'

  return args

def parse_target(args):
  '''
  @说明:用于解析如'192.168.1.1,192.168.1.x,...或192.168.1.1-254'格式的ip为单独的ip,用于解析如'21,80,...或21-80'格式的端口为单独的端口
  @param: args,一个namespace对象
  @return: (ip_list,port_list)
  '''
  pattern1 = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'
  pattern2 = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}-\d{1,3}$'
  pattern3 = r'\d{1,5}$'
  pattern4 = r'\d{1,5}-\d{1,5}$'
  ip_list,port_list = none,none
  if args.target:
    if re.search(pattern1,args.target):
      ip_list = args.target.split(',')
    elif re.match(pattern2,args.target):
      _split = args.target.split('-')
      first_ip = _split[0]
      ip_split = first_ip.split('.')
      ipdot4 = range(int(ip_split[3]), int(_split[1]) + 1)
      ip_list = [ip_split[0] + '.' + ip_split[1] + '.' + ip_split[2] + '.' + str(p) for p in ipdot4]
    else:
      print '[-]target格式输入有误,请查看帮助!'
      exit(1)
  if args.port:
    if re.match(pattern4,args.port):
      _split = args.port.split('-')
      port_list = range(int(_split[0]),int(_split[1])+1)
    elif re.search(pattern3,args.port):
      port_list = args.port.split(',')
    else:
      print '[-]port格式输入有误,请查看帮助!'
      exit(1)
  return ip_list,port_list


def main():
  '''
  @说明:扫描的主程序,首先根据条件创建ping扫描或端口扫描对象,然后调用相关的扫描方法进行扫描。
  '''
  args = parse_opt()
  if args.ping: #是否启动ping扫描
    if not args.timeout and not args.retry:
      obj_ping = discovery_scan(args)
    elif args.timeout and not args.retry:
      obj_ping = discovery_scan(args,timeout=args.timeout)
    elif not args.timeout and args.retry:
      obj_ping = discovery_scan(args,retry=args.retry)
    else:
      obj_ping = discovery_scan(args,args.timeout,args.retry)
    ip_list = obj_ping.targets[0]
    if ip_list:
      #arp扫描
      if args.arp:
        for pdst in ip_list:
          t = threading.thread(target=obj_ping.arp_scan,args=(pdst,))
          t.start()
        while threading.activecount() != 1:   #避免线程还没有运行完就提前输出不全的结果
          time.sleep(1)
      #icmp扫描
      elif args.icmp:
        for dst in ip_list:
          t = threading.thread(target=obj_ping.icmp_scan,args=(dst,))
          t.start()
        while threading.activecount() != 1:   #避免线程还没有运行完就提前输出不全的结果
          time.sleep(1)
      #tcp扫描
      elif args.tcp:
        port_list = [80,443,21,22,23,25,53,135,139,137,445,1158,1433,1521,3306,3389,7001,8000,8080,9090]
        print '[+]请稍等,时间较长!'
        for dst in ip_list:
          print '[!]扫描...',dst
          for port in port_list:
            t = threading.thread(target=obj_ping.tcp_scan,args=(dst,port))
            t.start()

        print '[+]正在处理扫描信息.'
        while threading.activecount() != 1:   #避免线程还没有运行完就提前输出不全的结果
          time.sleep(1)

        if not obj_ping.tcp_info:
          print '\n'
          print '=' * 20
          print '[+]未发现在线主机.'
        else:
          print '\n'
          print '=' * 20
          for ip_a in sorted(obj_ping.tcp_info.keys()):
            print '[+]主机 %s 在线.' % ip_a
      #udp扫描
      elif args.udp:
        port_list = [7,9.13,15,37,53,67,68,69,135,137,138,139,445,520]
        print '[+]请稍等,时间较长!'
        for dst in ip_list:
          print '[!]扫描...',dst
          for port in port_list:
            t = threading.thread(target=obj_ping.udp_scan,args=(dst,port))
            t.start()

        print '[+]正在处理扫描信息.'
        while threading.activecount() != 1:   #避免线程还没有运行完就提前输出不全的结果
          time.sleep(1)

        if not obj_ping.udp_info:
          print '\n'
          print '=' * 20
          print '[+]未发现在线主机.'
        else:
          print '\n'
          print '=' * 20
          for ip_a in sorted(obj_ping.udp_info.keys()):
            print '[+]主机 %s 在线.' % ip_a
  if args.scan:  #是否启动端口扫描
    if not args.timeout and not args.retry:
      obj_port = port_scan(args)
    elif args.timeout and not args.retry:
      obj_port = port_scan(args,timeout=args.timeout)
    elif not args.timeout and args.retry:
      obj_port = port_scan(args,retry=args.retry)
    else:
      obj_port = port_scan(args,args.timeout,args.retry)

    ip_list,port_list = obj_port.targets
    if ip_list and port_list:
      if args.syn:
        for dst in ip_list:
          print '[!]扫描...',dst
          for port in port_list:
            t = threading.thread(target=obj_port.syn_port_scan,args=(dst,int(port)))
            t.start()

        print '[+]正在处理扫描信息.'
        while threading.activecount() != 1:   #避免线程还没有运行完就提前输出不全的结果
          time.sleep(1)

        if not obj_port.syn_port_dict:
          print '\n'
          print '=' * 20
          print '[+]未发现开放tcp端口.'
        else:
          print '\n'
          print '=' * 20
          for k,v in obj_port.syn_port_dict.items():
            print '[+]主机 %s 开放的tcp端口有:%s' % (k,str(v))
      elif args.ack:
        pass  #基本不能使用
      elif args.fin:
        pass  #基本不能使用
      elif args.uport:
        for dst in ip_list:
          print '[!]扫描...',dst
          for port in port_list:
            t = threading.thread(target=obj_port.udp_port_scan,args=(dst,int(port)))
            t.start()

        print '[+]正在处理扫描信息.'
        while threading.activecount() != 1:   #避免线程还没有运行完就提前输出不全的结果
          time.sleep(1)

        if not obj_port.udp_port_dict:
          print '\n'
          print '=' * 20
          print '[+]未发现开放udp端口.'
        else:
          print '\n'
          print '=' * 20
          for k,v in obj_port.udp_port_dict.items():
            print '[+]主机 %s 开放的udp端口有:%s' % (k,str(v))

if __name__ == '__main__':
  try:
    start_time = time.time()
    main()
    stop_time = time.time()
    print '[+]总共耗时'+str(stop_time-start_time)+'秒.'
  except exception,e:
    print '[-]执行出错,具体错误见下面信息.'
    print e

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。