python爬取微信公众号文章的方法
程序员文章站
2022-04-09 12:43:04
最近在学习python3网络爬虫开发实践(崔庆才 著)刚好也学习到他使用代理爬取公众号文章这里,但是照着他的代码写,出现了一些问题。在这里我用到了这本书的前面讲的一些内容进...
最近在学习python3网络爬虫开发实践(崔庆才 著)刚好也学习到他使用代理爬取公众号文章这里,但是照着他的代码写,出现了一些问题。在这里我用到了这本书的前面讲的一些内容进行了完善。(作者写这个代码已经是半年前的事了,但腾讯的网站在这半年前进行了更新)
下面我直接上代码:
timeout = 20 from requests import request, session, preparedrequest import requests from selenium import webdriver from selenium.common.exceptions import nosuchelementexception from bs4 import beautifulsoup as bs import pymysql # 要爬取的内容 keyword = '美女图片' options = webdriver.chromeoptions() # 设置中文 options.add_argument('lang=zh_cn.utf-8') # 更换头部 options.add_argument( 'user-agent="mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/69.0.3497.100 safari/537.36"') browser = webdriver.chrome(chrome_options=options) redis_host = '192.168.1.248' redis_port = 6379 redis_password = '*****' redis_key = 'requests' proxy_pool_url = 'http://127.0.0.1:8080/random' max_failed_time = 5 mysql_host = 'localhost' mysql_port = 3306 mysql_user = 'moxiao' mysql_password = '******' class mysqlconn(): def __init__(self, host=mysql_host, username=mysql_user, password=mysql_password, port=mysql_port): """ mysql 初始化 :param host: :param username: :param password: :param port: """ try: self.db = pymysql.connection(host=host, user=username, password=password, database='weixin_data', port=port) self.cursor = self.db.cursor() except pymysql.mysqlerror as e: print(e.args) def insert(self, table, data): keys = ', '.join(data.keys()) values = ', '.join(['%s'] * len(data)) sql = 'insert into %s (%s) values (%s)' % (table, keys, values) try: self.cursor.execute(sql, tuple(data.values())) self.db.commit() except pymysql.mysqlerror as e: print(e.args) self.db.rollback() class weixinrequest(request): def __init__(self, url, callback, method="get", headers=none, need_proxy=false, fail_time=0, timeout=timeout): super(weixinrequest, self).__init__(url=url, method=method, headers=headers) self.callback = callback self.need_proxy = need_proxy self.fail_time = fail_time self.timeout = timeout def prepare(self): p = preparedrequest() p.prepare( method=self.method, url=self.url, headers=self.headers, ) return p class weixinresponse(): def __init__(self, text): self.text = text def set_status_code(self, status_code): self.status_code = status_code import pickle from redis import strictredis class redisqueue(): def __init__(self): """ 初始化redis """ self.db = strictredis(host=redis_host, port=redis_port, password=redis_password, db=3) def add(self, request): """ 向队列添加序列化后的request :param request:请求对象 :return:添加结果 """ if isinstance(request, weixinrequest): return self.db.rpush(redis_key, pickle.dumps(request)) return false def pop(self): """ 取出下一个request并反序列化 :return: request 或者 none """ if self.db.llen(redis_key): return pickle.loads(self.db.lpop(redis_key)) return false def empty(self): return self.db.llen(redis_key) == 0 def del_all(self): return self.db.delete(redis_key) def get_proxy(self): """ 从代理池获取代理ip :return: """ try: response = requests.get(proxy_pool_url) if response.status_code == 200: print('get proxy', response.text) return response.text except requests.connectionerror: return none from urllib.parse import urlencode from requests import readtimeout, connectionerror from pyquery import pyquery as pq vald_statues = [200] class spider(): base_url = 'http://weixin.sogou.com/weixin?' # 这里的page可以修改,即第几页,我本来想获取所有的个数再除以10 这样就能爬完了,但是我只是测试所以这里并没有做 # 但如果需要做可以加到schedule方法的while循环内的最下面 即self.params['page']+=1 params = {'type': 2, 's_from': 'input', 'query': keyword, 'page': 1, 'ie': 'utf8', '_sug_': 'n', '_sug_type_': ''} headers = {'host': 'weixin.sogou.com', 'connection': 'keep-alive', 'cache-control': 'max-age=0', 'upgrade-insecure-requests': '1', 'user-agent': 'mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/69.0.3497.100 safari/537.36', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'accept-encoding': 'gzip, deflate', 'accept-language': 'zh-cn,zh;q=0.9', 'referer': 'http: // weixin.sogou.com /', 'cookie': '你的cookie'} # todo 不可能把我的给你撒 session = session() queue = redisqueue() queue.del_all() mysql = mysqlconn() def start(self): """ 初始化工作 :return: """ # 全局更新headers # 如果你试过用这个方法修改headers,那么就知道这个在这里好像没什么用,我在这里浪费了至少两个小时 self.session.headers.update(self.headers) start_url = self.base_url + urlencode(self.params) # 这里我将need_proxy=false设为了false 即并没有使用代理 ps:我也就是测试一下 # 真正修改了headers是在这里 weixin_request = weixinrequest(url=start_url, callback=self.parse_index, headers=self.headers, need_proxy=false) # 调度第一个请求 self.queue.add(weixin_request) def schedule(self): """ 调度请求 :return: """ while not self.queue.empty(): weixin_request = self.queue.pop() callback = weixin_request.callback print('schedule', weixin_request.url) response = self.request(weixin_request) if response and response.status_code in vald_statues: results = list(callback(response)) if results: for result in results: print('new result', result) if isinstance(result, weixinrequest): # 将新的文章详情的url也加入队列 self.queue.add(result) if isinstance(result, dict): # 储存到mysql self.mysql.insert('articles', result) else: self.error(weixin_request) else: self.error(weixin_request) def request(self, weixin_request): """ 执行请求 :param weixin_request:请求 :return: 响应 """ if not 'http://mp.weixin.qq.com/s?src' in weixin_request.url: try: if weixin_request.need_proxy: proxy = self.queue.get_proxy() if proxy: proxies = { 'http': 'http://' + proxy, 'https': 'https://' + proxy } return self.session.send(weixin_request.prepare(), timeout=weixin_request.timeout, allow_redirects=false, proxies=proxies) return self.session.send(weixin_request.prepare(), timeout=weixin_request.timeout, allow_redirects=false) except (connectionerror, readtimeout) as e: print(e.args) return false else: print('-' * 20) browser.get(weixin_request.url) try: browser.find_element_by_class_name('rich_media_area_primary_inner') wr = weixinresponse(browser.page_source) wr.set_status_code(200) return wr except nosuchelementexception: wr = weixinresponse('') wr.set_status_code(403) return wr def parse_index(self, response): """ 解析索引页 :param response: 响应 :return: 新的响应 """ doc = pq(response.text) items = doc('.news-box .news-list li .txt-box h3 a').items() for item in items: url = item.attr('href') weixin_request = weixinrequest(url=url, callback=self.parse_detail) yield weixin_request def parse_detail(self, response): """ 解析详情页 :param response: 响应 :return: 微信公众号文章 """ doc = pq(response.text) profile_inner = doc('.profile_inner') data = { 'title': doc('.rich_media_title').text(), 'content': doc('.rich_media_content').text(), 'date': doc('#publish_time').text(), # 'nickname':doc('#js_profile_qrcode > div > strong').text(), 'nickname': profile_inner.find('.profile_nickname').text(), 'wechat': [ns for ns in profile_inner.find('.profile_meta').find('.profile_meta_value').items()][ 0].text() } # 储存图片 print('#' * 30) soup = bs(response.text) wn = soup.find_all('img') for img in wn: if img.has_attr('_width') and img.has_attr('data-src'): print(img.attrs['data-src']) yield data def error(self, weixin_request): """ 错误处理 :param weixin_request:请求 :return: """ weixin_request.fail_time = weixin_request.fail_time + 1 print('request failed', weixin_request.fail_time, 'times', weixin_request.url) if weixin_request.fail_time < max_failed_time: self.queue.add(weixin_request) def run(self): self.start() self.schedule() if __name__ == '__main__': spider = spider() spider.run()
2018-10-6更新:
今天测试之后使用了cookie并不能登录这个网站了,也许是腾讯使用了新的安全验证,具体也无从得知,但使用浏览器访问没有问题
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。