Python: HTTP文件下载压力测试工具
程序员文章站
2022-07-13 18:05:43
...
import urllib.request import os import threading import datetime import time class urlDownloadThread(threading.Thread): def __init__(self, threadName, url, targetFolder, targetFileName): threading.Thread.__init__(self) self.threadName = threadName self.url = url self.targetFolder = targetFolder self.targetFileName = targetFileName def run(self): print ("开始线程:",self.name); # 获得锁,成功获得锁定后返回 True # 可选的timeout参数不填时将一直阻塞直到获得锁定 # 否则超时后将返回 False # threadLock.acquire() printLog(self.threadName, self.url) startTime = datetime.datetime.now() f = urllib.request.urlopen(self.url) data = f.read() with open(os.path.join(self.targetFolder, self.targetFolder+self.targetFileName),"wb") as f: f.write(data) finishTime = datetime.datetime.now() spendTime = finishTime-startTime printLog(self.threadName, spendTime) # 释放锁 # threadLock.release() def __del__(self): print (self.name, "线程结束!") def printLog(title, strLog): print ("[%s][%s] %s" % (time.ctime(time.time()), title, strLog)) #url = 'https://firebasestorage.googleapis.com/v0/b/xac-download-site.appspot.com/o/applications%2Fcom.xacusa.xacpaymentbridge%2F1.0.20200331%2Fs_RP10V0331payment.apk?alt=media&token=dbc9f07f-810a-4a7c-a632-3ea6dd93449f' #targetFolder = 'E:\\Download_Test\\' #targetFileName = 'xacpaymentbridge' #targetAttrName = '.apk' url = 'https://firebasestorage.googleapis.com/v0/b/pzbp-tms-test-site.appspot.com/o/sppls%2Fsppl_ki853dbt3p01%2F1127_pzbp_os.zip?alt=media&token=214eefa5-57b3-47e1-a137-1cb790df3a3e' targetFolder = 'E:\\Download_Test\\' targetFileName = '2F1127_pzbp_os' targetAttrName = '.zip' counter = 2 threadLock = threading.Lock() threads = [] while counter: #time.sleep(0.1) # 创建新线程 singleThread = urlDownloadThread("Thread-"+str(counter), url, targetFolder, targetFileName + str(counter) + targetAttrName) # 开启新线程 singleThread.start() # 添加线程到线程列表 threads.append(singleThread) counter -= 1 # 等待所有线程完成 for t in threads: t.join() #TODO: 计算并统计相关测试数据 print ("主进程结束!")