用Scala打造精悍爬虫(一)游记篇
分享一下前段学习Scala做的一个爬虫程序。
【关于爬虫】
接触爬虫的时间并不长,发现python在这个领域有很大的份额。虽然也用过python,但是始终觉得动态语言做这种“严谨“工作还是不如Java,当然更没法和Scala比。
总结一下爬虫的主要困难:
痛点1:网断,大量爬取时,各种超时错是司空见惯,需要有良好的重试机制防止被打断。
痛点2:验证码,一般大网站都有反爬机制,当一定时间访问过多,就会跳转到验证码页面(携程就有)甚至禁止访问。另外,做模拟登陆的时候这个更是是绕不开的坎,真正的爬虫噩梦。详见: 知乎上一篇《为什么有些验证码看起来很容易但是没人做自动识别的?》 黄凯迪的文章。
痛点3:速度瓶颈,一般爬取数据都是百万级甚至更多,为了获得好的速度,多线程是必不可少的,单机不能满足需求就要分布式。但是这个又会增加上面两个问题的解决难度。
【关于反爬虫】
为什么聊这个?当然是知己知彼百战不殆。
网上看到一篇,还正好是携程出的,名字挺牛气。《关于反爬虫,看这一篇就够了》
【项目简述】
本篇程序用Scala+Jsoup 实现一个携程游记的爬虫,单机角度解决上面的问题。
先简要分析下携程游记,http://you.ctrip.com/travels/,作为国内数一数二的旅游类平台,携程主要通过收购小网站的方式壮大其游记规模,已经到了巨无霸级别,这次主要爬取游记目录规模 100万篇左右。由于数量过多,按照携程自己做的标签分类进行过滤,“精华”,“美图”,“典藏”,“实用”四类作为抓取对象。
【那些包?】
全部是标准库
import java.io.File import java.io.PrintWriter import java.text.SimpleDateFormat import java.util.Date import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.ConcurrentHashMap import org.jsoup.Jsoup import org.jsoup.nodes.Document import scala.collection.JavaConversions._ import scala.collection.parallel.ForkJoinTaskSupport import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.forkjoin.ForkJoinPool import scala.util.Failure import scala.util.Success import scala.util.Try
废话不多说,上程序,第一部分是纯网页分析的东东,用携程自身的地区分类索引做遍历,最大分页号发现页面上给的是错的,就花了几分钟调查了一下手写(关于爬取的页面分析,网上有很多,也是爬虫基本功,这里就不详述。):
// 携程游记一览Url,可变部分(1:地区 2:最大分页号(每页9篇游记)) val Url = "http://you.ctrip.com/travels/%s/s3-p%d.html" val ctripMap = Map( "国内" -> ("china110000", 42398), "亚洲" -> ("asia120001", 50071), "欧洲" -> ("europe120002", 2987), "大洋洲" -> ("oceania120003", 866), "非洲" -> ("africa120006", 463), "南美洲" -> ("southamerica120005", 115), "北美洲" -> ("northamerica120004", 1273), "南极洲" -> ("antarctica120481", 11) ) // 解析单页的游记,过滤出“精华”,“美图”,“典藏”,“实用”四种类型游记 def parseDoc(doc: Document) = { var allCnt, objCnt = 0 for (e <- doc.select("a.journal-item.cf")) { var tn = "" if (!e.select("span.pic-tagico-1").isEmpty()) tn += "精" if (!e.select("span.pic-tagico-2").isEmpty()) tn += "美" if (!e.select("span.pic-tagico-3").isEmpty()) tn += "实" if (!e.select("span.pic-tagico-4").isEmpty()) tn += "典" if (tn != "") { // 只保留符合条件的数据 map.put(e.attr("href"),e.attr("href") + "\t" //Url + e.select("dt.ellipsis").html + "\t" //标题 + tn + "\t" //类型名(精|美|实|典) + e.select("dd.item-user").html.replaceAll("\n", "") + "\t" //作者+发表时间 + e.select("dd.item-short").html + "\t" //摘要 + e.select("span.tips_a").html + "\t" //天数+旅游时间+花费+同伴关系 + e.select("span.tips_b").html + "\t" //tips_b + e.select("i.numview").html + "\t" //点击数 + e.select("i.want").html + "\t" //点赞数 + e.select("i.numreply").html); //回复数 objCnt += 1 } allCnt += 1 } (allCnt,objCnt) }
下面利用Try语法及递归调用,解决各种异常重试问题,注意携程有时侦测到单机IP大量访问会强制跳转到验证码页面,好在只维持一段时间,这里简化处理,休眠后再试。怎么看都比Java的trycatch 漂亮多了不是?
def sleep(i: Long) = Thread.sleep(i) val aiAll, aiCnt, aiFail: AtomicInteger = new AtomicInteger(0) val map = new java.util.concurrent.ConcurrentHashMap[String,String]() // 利用递归实现自动重试(重试100次,每次休眠30秒) def promiseGetUrl(times: Int=100, delay: Long=30000)(z: String, i: Int): Unit = { Try(Jsoup.connect(Url.format(z,i)).get()) match { case Failure(e) => if (times != 0) { println(e.getMessage); aiFail.addAndGet(1); sleep(delay); promiseGetUrl(times - 1, delay)(z,i) }else throw e case Success(d) => val (all, obj) = parseDoc(d); if (all ==0) {sleep(delay); promiseGetUrl(times - 1, delay)(z,i) }//携程跳转验证码走这里! aiAll.addAndGet(all); aiCnt.addAndGet(obj); } }
相比其他语言Scala中递归算是很常见的了,Scala还可以通过@tailrec注解确保对尾递归实施优化,当然本例并不适合。其实对于限定次数来说,大多数时候没有必要担心内存压力。
第三步,解决加速问题,使用Scala并发集合的线程池,用起来感觉像在上外挂,非常简洁。
// 并发集合多线程执行 def concurrentCrawler(zone: String, maxPage: Int, threadNum: Int) = { val loopPar = (1 to maxPage).par loopPar.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(threadNum)) // 设置并发线程数 loopPar.foreach(promiseGetUrl()(zone, _)) // 利用并发集合多线程同步抓取 output(zone) }
最后,输出结果,值得注意的是,线程池不宜设置过大,过大会导致网站反爬跳转高发反而拖慢速度,需要在不同时段尝试,我的机器上测试出白天网络比较慢30就可以了,晚上可调高一些。
// 获取当前日期 (简单机能用Java就Ok) def getNowDate():String={ new SimpleDateFormat("yyyyMMdd").format( new Date() ) } // 爬取内容写入文件 def output(zone: String) = { val writer = new PrintWriter(new File(getNowDate()+"_"+zone++".txt")) for ((_,value) <- map) writer.println(value) writer.close() } val Thread_Num = 30 //指定并发执行线程数 val t1 = System.currentTimeMillis //全体抓取 ctripMap.foreach{ m => concurrentCrawler(m._2._1, m._2._2, map, Thread_Num) map = new ConcurrentHashMap[String,String](); } //个别抓取 //val tup = ctripMap("欧洲"); concurrentCrawler(tup._1, tup._2, Thread_Num) val t2 = System.currentTimeMillis println(s"抓取数:$aiCnt 重试数:$aiFail 耗时(秒):"+(t2-t1)/1000)
到此,一个无需监控的爬虫完工,实测它可以抵御任何网络异常超时以及携程的屏蔽,不间断(休眠时间除外)执行到完成所有任务。
【执行结果】
下面是晚上8点左右,开50线程的执行结果
***********************************************************
已分析游记数:883656 好游记:26018
抓取数:26018 重试数:28 耗时(秒):4541
***********************************************************
1小时15分搞定,平均抓取速度 21.6个url/秒,考虑到单机无人值守无漏抓保证,这个速度还是比较满意了。
同时赞一下携程网给力的服务器。
【引申话题】
本篇只是爬虫一个简单小例子,还属于“屌丝”级别。在真实环境实现一个强大的爬虫要做到大规模实时要求,分布式是必不可少的,另外规避验证码这类反爬手段不能再用“傻等”的方式了,代理ip池成为必备,当然这个也是需要付出一些成本。还有webspec,selenium,PhantomJs等等神兵利器,更有高大上的图像识别技术搞定验证码,这里不一一介绍了,有兴趣的同学可以接着了解。个人感觉作为Scala的杀手级应用Akka,在实现一个分布式爬虫方面也是大有可为。
【广告时间】
如果你对Java八股一样的语法倦了,对python那样动态语言又没有安全感,就来试试Scala吧。
推荐阅读