python实现封装得到virustotal扫描结果
程序员文章站
2023-12-12 08:43:52
本文实例讲述了python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下:
import simplejson
impor...
本文实例讲述了python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下:
import simplejson import urllib import urllib2 import os, sys import logging try: import sqlite3 except importerror: sys.stderr.write("error: unable to locate python sqlite3 module. " \ "please verify your installation. exiting...\n") sys.exit(-1) md5 = "5248f774d2ee0a10936d0b1dc89107f1" md5 = "12fa5fb74201d9b6a14f63fbf9a81ff6" #do not have report on virustotal.com apikey = "xxxxxxxxxxxxxxxxxx"用自己的 class virustotaldatabase: """ database abstraction layer. """ def __init__(self, db_file): log = logging.getlogger("database.init") self.__dbfile = db_file self._conn = none self._cursor = none # check if sqlite database already exists. if it doesn't exist i invoke # the generation procedure. if not os.path.exists(self.__dbfile): if self._generate(): print("generated database \"%s\" which didn't" \ " exist before." % self.__dbfile) else: print("unable to generate database") # once the database is generated of it already has been, i can # initialize the connection. try: self._conn = sqlite3.connect(self.__dbfile) self._cursor = self._conn.cursor() except exception, why: print("unable to connect to database \"%s\": %s." % (self.__dbfile, why)) log.debug("connected to sqlite database \"%s\"." % self.__dbfile) def _generate(self): """ creates database structure in a sqlite file. """ if os.path.exists(self.__dbfile): return false db_dir = os.path.dirname(self.__dbfile) if not os.path.exists(db_dir): try: os.makedirs(db_dir) except (ioerror, os.error), why: print("something went wrong while creating database " \ "directory \"%s\": %s" % (db_dir, why)) return false conn = sqlite3.connect(self.__dbfile) cursor = conn.cursor() cursor.execute("create table virustotal (\n" \ " id integer primary key,\n" \ " md5 text not null,\n" \ " kaspersky text default null,\n" \ " mcafee text default null,\n" \ " symantec text default null,\n" \ " norman text default null,\n" \ " avast text default null,\n" \ " nod32 text default null,\n" \ " bitdefender text default null,\n" \ " microsoft text default null,\n" \ " rising text default null,\n" \ " panda text default null\n" \ ");") print "create db:%s sucess" % self.__dbfile return true def _get_task_dict(self, row): try: task = {} task["id"] = row[0] task["md5"] = row[1] task["kaspersky"] = row[2] task["mcafee"] = row[3] task["symantec"] = row[4] task["norman"] = row[5] task["avast"] = row[6] task["nod32"] = row[7] task["bitdefender"] = row[8] task["microsoft"] = row[9] task["rising"] = row[10] task["panda"] = row[11] return task except exception, why: return none def add_sample(self, md5, virus_dict): """ """ task_id = none if not self._cursor: return none if not md5 or md5 == "": return none kaspersky = virus_dict.get("kaspersky", none) mcafee = virus_dict.get("mcafee", none) symantec = virus_dict.get("symantec", none) norman = virus_dict.get("norman", none) avast = virus_dict.get("avast", none) nod32 = virus_dict.get("nod32", none) bitdefender = virus_dict.get("bitdefender", none) microsoft = virus_dict.get("microsoft", none) rising = virus_dict.get("rising", none) panda = virus_dict.get("panda", none) self._conn.text_factory = str try: self._cursor.execute("select id from virustotal where md5 = ?;", (md5,)) sample_row = self._cursor.fetchone() except sqlite3.operationalerror, why: print "sqlite3 error:%s\n" % str(why) return false if sample_row: try: sample_row = sample_row[0] self._cursor.execute("update virustotal set kaspersky=?, mcafee=?, symantec=?, norman=?, avast=?, \ nod32=?, bitdefender=?, microsoft=?, rising=?, panda=? where id = ?;", (kaspersky, mcafee, symantec, norman, avast, nod32, bitdefender, microsoft,\ rising, panda, sample_row)) self._conn.commit() task_id = sample_row except sqlite3.operationalerror, why: print("unable to update database: %s." % why) return false else: #the sample not in the database try: self._cursor.execute("insert into virustotal " \ "(md5, kaspersky, mcafee, symantec, norman, avast, nod32, bitdefender,\ microsoft, rising, panda) " \ "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", (md5, kaspersky, mcafee, symantec, norman, avast, nod32, bitdefender,\ microsoft, rising, panda)) self._conn.commit() task_id = self._cursor.lastrowid except sqlite3.operationalerror, why: print "why",str(why) return none print "add_to_db:%s, task_id:%s" % (str(self.__dbfile), str(task_id)) return task_id def get_sample(self): """ gets a task from pending queue. """ log = logging.getlogger("database.gettask") if not self._cursor: log.error("unable to acquire cursor.") return none # select one item from the queue table with higher priority and older # addition date which has not already been processed. try: self._cursor.execute("select * from virustotal " \ #"where lock = 0 " \ #"and status = 0 " \ "order by id, added_on limit 1;") except sqlite3.operationalerror, why: log.error("unable to query database: %s." % why) return none sample_row = self._cursor.fetchone() if sample_row: return self._get_task_dict(sample_row) else: return none def search_md5(self, md5): """ """ if not self._cursor: return none if not md5 or len(md5) != 32: return none try: self._cursor.execute("select * from virustotal " \ "where md5 = ? " \ #"and status = 1 " \ "order by id desc;", (md5,)) except sqlite3.operationalerror, why: return none task_dict = {} for row in self._cursor.fetchall(): task_dict = self._get_task_dict(row) #if task_dict: #tasks.append(task_dict) return task_dict class virustotal: """""" def __init__(self, md5): """constructor""" self._virus_dict = {} self._md5 = md5 self._db_file = r"./db/virustotal.db" self.get_report_dict() def repr(self): return str(self._virus_dict) def submit_md5(self, file_path): import postfile #submit the file file_name = os.path.basename(file_path) host = "www.virustotal.com" selector = "https://www.virustotal.com/vtapi/v2/file/scan" fields = [("apikey", apikey)] file_to_send = open(file_path, "rb").read() files = [("file", file_name, file_to_send)] json = postfile.post_multipart(host, selector, fields, files) print json pass def get_report_dict(self): result_dict = {} url = "https://www.virustotal.com/vtapi/v2/file/report" parameters = {"resource": self._md5, "apikey": apikey} data = urllib.urlencode(parameters) req = urllib2.request(url, data) response = urllib2.urlopen(req) json = response.read() response_dict = simplejson.loads(json) if response_dict["response_code"]: #has result scans_dict = response_dict.get("scans", {}) for anti_virus_comany, virus_name in scans_dict.iteritems(): if virus_name["detected"]: result_dict.setdefault(anti_virus_comany, virus_name["result"]) return result_dict def write_to_db(self): """""" db = virustotaldatabase(self._db_file) virus_dict = self.get_report_dict() db.add_sample(self._md5, virus_dict)
使用方法如下:
config = {'input':"inputmd5s"} fp = open(config['input'], "r") content = fp.readlines() md5s = [] for md5 in ifilter(lambda x:len(x)>0, imap(string.strip, content)): md5s.append(md5) print "md5s",md5s fp.close() from getvirustotalinfo import virustotal #得到扫描结果并写入数库 for md5 in md5s: virus_total = virustotal(md5) virus_total.write_to_db()
希望本文所述对大家的python程序设计有所帮助。