欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python实现多线程网页下载器

程序员文章站 2022-04-09 15:06:07
本文为大家分享了python实现的一个多线程网页下载器,供大家参考,具体内容如下 这是一个有着真实需求的实现,我的用途是拿它来通过 HTTP 方式向服务器提交游戏数据。把...

本文为大家分享了python实现的一个多线程网页下载器,供大家参考,具体内容如下

这是一个有着真实需求的实现,我的用途是拿它来通过 HTTP 方式向服务器提交游戏数据。把它放上来也是想大家帮忙挑刺,找找 bug,让它工作得更好。

keywords:python,http,multi-threads,thread,threading,httplib,urllib,urllib2,Queue,http pool,httppool

废话少说,上源码:

# -*- coding:utf-8 -*- 
import urllib, httplib 
import thread 
import time 
from Queue import Queue, Empty, Full 
HEADERS = {"Content-type": "application/x-www-form-urlencoded", 
            'Accept-Language':'zh-cn', 
            'User-Agent': 'Mozilla/4.0 (compatible; MSIE 6.0;Windows NT 5.0)', 
            "Accept": "text/plain"} 
UNEXPECTED_ERROR = -1 
POST = 'POST' 
GET = 'GET' 
def base_log(msg): 
  print msg 
def base_fail_op(task, status, log): 
  log('fail op. task = %s, status = %d'%(str(task), status)) 
def get_remote_data(tasks, results, fail_op = base_fail_op, log = base_log): 
  while True: 
    task = tasks.get() 
    try: 
      tid = task['id'] 
      hpt = task['conn_args'] # hpt <= host:port, timeout 
    except KeyError, e: 
      log(str(e)) 
      continue 
    log('thread_%s doing task %d'%(thread.get_ident(), tid)) 
    #log('hpt = ' + str(hpt)) 
    conn = httplib.HTTPConnection(**hpt) 
       
    try: 
      params = task['params'] 
    except KeyError, e: 
      params = {} 
    params = urllib.urlencode(params) 
    #log('params = ' + params) 
     
    try: 
      method = task['method'] 
    except KeyError: 
      method = 'GET' 
    #log('method = ' + method) 
     
    try: 
      url = task['url'] 
    except KeyError: 
      url = '/' 
    #log('url = ' + url) 
     
    headers = HEADERS 
    try: 
      tmp = task['headers'] 
    except KeyError, e: 
      tmp = {} 
    headers.update(tmp) 
    #log('headers = ' + str(headers)) 
    headers['Content-Length'] = len(params) 
     
    try: 
      if method == POST: 
        conn.request(method, url, params, headers) 
      else: 
        conn.request(method, url + params) 
      response = conn.getresponse() 
    except Exception, e: 
      log('request failed. method = %s, url = %s, params = %s headers = %s'%( 
            method, url, params, headers)) 
      log(str(e)) 
      fail_op(task, UNEXPECTED_ERROR, log) 
      continue 
       
    if response.status != httplib.OK: 
      fail_op(task, response.status, log) 
      continue 
       
    data = response.read() 
    results.put((tid, data), True) 
     
class HttpPool(object): 
  def __init__(self, threads_count, fail_op, log): 
    self._tasks = Queue() 
    self._results = Queue() 
     
    for i in xrange(threads_count): 
      thread.start_new_thread(get_remote_data,  
                              (self._tasks, self._results, fail_op, log)) 
       
  def add_task(self, tid, host, url, params, headers = {}, method = 'GET', timeout = None): 
    task = { 
      'id' : tid, 
      'conn_args' : {'host' : host} if timeout is None else {'host' : host, 'timeout' : timeout}, 
      'headers' : headers, 
      'url' : url, 
      'params' : params, 
      'method' : method, 
      } 
    try: 
      self._tasks.put_nowait(task) 
    except Full: 
      return False 
    return True 
     
  def get_results(self): 
    results = [] 
    while True: 
      try: 
        res = self._results.get_nowait() 
      except Empty: 
        break 
      results.append(res) 
    return results 
     
def test_google(task_count, threads_count): 
  hp = HttpPool(threads_count, base_fail_op, base_log) 
  for i in xrange(task_count): 
    if hp.add_task(i, 
        'www.google.cn', 
        '/search?', 
        {'q' : 'lai'}, 
#        method = 'POST' 
        ): 
      print 'add task successed.' 
       
  while True: 
    results = hp.get_results() 
    if not results: 
      time.sleep(1.0 * random.random()) 
    for i in results: 
      print i[0], len(i[1]) 
#      print unicode(i[1], 'gb18030') 
       
if __name__ == '__main__': 
  import sys, random 
  task_count, threads_count = int(sys.argv[1]), int(sys.argv[2]) 
  test_google(task_count, threads_count)

 有兴趣想尝试运行的朋友,可以把它保存为 xxxx.py,然后执行 python xxxx.py 10 4,其中 10 表示向 google.cn 请求 10 次查询,4 表示由 4 条线程来执行这些任务。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。