Golang 的类Scrapy爬虫实现
核心部分
N0.1 Request/Response
type Response struct {
Url string
Body string
Meta *map[string]string
}
type Request struct {
Url string
Parse func(response *Response)
Meta *map[string]string
}
请求与响应的封装结构体,对于 Response
, Url
为请求时的 Url
, Body
为此 Url
的网页源码, Meta
为上一层传过来的数据(之所以用指针是因为指针省内存);
对于 Request
结构, Url
是需要请求的链接, Parse
是该 Url
对应的解析函数, Meta
是传给 Parse
函数参数 response
的数据。
No.2 请求池/数据池
var RequestChan = make(chan *Request, 100) // 请求池
var DataChan = make(chan *map[string]string, 100) // 数据池
请求池 RequestChan
用于存放等待完成的请求,数据池 DataChan
用于存放还未处理的数据。
No.3 细节部分
var wg sync.WaitGroup //定义一个同步等待的组
var swg sync.WaitGroup // 等待数据处理结束
var Save = func(data *map[string]string) { // 保存数据的函数
fmt.Println(data)
}
var Download = func(url string) (content string, statusCode int) { // 网页源码下载函数
defer func() { fmt.Printf("get<%d>: %s\n", statusCode, url) }()
resp, err1 := http.Get(url)
if err1 != nil {
statusCode = -100
return
}
defer resp.Body.Close()
data, err2 := ioutil.ReadAll(resp.Body)
if err2 != nil {
statusCode = -200
return
}
statusCode = resp.StatusCode
content = string(data)
return
}
func doRequest(request *Request) {
defer wg.Done()
html, sta := Download(request.Url)
if sta != 200 {
return
}
request.Parse(&Response{request.Url, html, request.Meta})
}
func save() {
defer swg.Done()
for {
data := <-DataChan
if data == nil {
break
}
Save(data)
}
}
对于请求与数据的处理,我们需要全部完成,且并发数量要限制在一定范围内,因此我们定义了 wg
与 swg
两个协程组, Save
变量是一个函数,之所以定义成一个变量,是因为 Save
是提供给用户操作的,例如用户可能将数据保存在文件、数据库、云盘等等地方,我们只提供一个默认保存方案,即向控制台输出数据。同理 Download
变量也是一个函数,用于处理下载问题。doRequest
函数不提供给用户操作,因此其首字母小写,其作用是处理请求 Request
, save
函数是我们的数据处理函数,从数据池中取出数据,然后调用用户定义的 Save
方法处理数据。
NO.4 调度器Scheduler(核心)
func Scheduler(threads int) {
go save()
swg.Add(1)
for {
for i := 0; i < threads; i++ { // 每次开启这么多线程
req := <-RequestChan
if req == nil {
DataChan <- nil
return
}
go doRequest(req)
wg.Add(1)
}
wg.Wait()
}
swg.Wait() // 必须在这里等待,不然数据有可能还没处理完就挂掉了
}
调度器的作用是调度请求,首先,调度器先开启数据处理线程 save
然后再依次开启 threads
个线程处理请求,当前一批 request
请求完了后,再从请求池里取出 threads
个 request
进行请求,当所有请求完毕后,将数据池的队尾写入 nil
提示所有 request
都处理完毕了, 如果数据也处理完毕,则可结束数据处理线程,此时调度结束,控制权交给用户的 main
函数。
NO.5 完整代码
package myspider
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
)
type Response struct {
Url string
Body string
Meta *map[string]string
}
type Request struct {
Url string
Parse func(response *Response)
Meta *map[string]string
}
var RequestChan = make(chan *Request, 100) // 请求池
var DataChan = make(chan *map[string]string, 100) // 数据池
var wg sync.WaitGroup //定义一个同步等待的组
var swg sync.WaitGroup // 等待数据处理结束
var Save = func(data *map[string]string) { // 保存数据的函数
fmt.Println(data)
}
var Download = func(url string) (content string, statusCode int) { // 网页源码下载函数
defer func() { fmt.Printf("get<%d>: %s\n", statusCode, url) }()
resp, err1 := http.Get(url)
if err1 != nil {
statusCode = -100
return
}
defer resp.Body.Close()
data, err2 := ioutil.ReadAll(resp.Body)
if err2 != nil {
statusCode = -200
return
}
statusCode = resp.StatusCode
content = string(data)
return
}
func doRequest(request *Request) {
defer wg.Done()
html, sta := Download(request.Url)
if sta != 200 {
return
}
request.Parse(&Response{request.Url, html, request.Meta})
}
func save() {
defer swg.Done()
for {
data := <-DataChan
if data == nil {
break
}
Save(data)
}
}
func Scheduler(threads int) {
go save()
swg.Add(1)
for {
for i := 0; i < threads; i++ { // 每次开启这么多线程
req := <-RequestChan
if req == nil {
DataChan <- nil
return
}
go doRequest(req)
wg.Add(1)
}
wg.Wait()
}
swg.Wait() // 必须在这里等待,不然数据有可能还没处理完就挂掉了
}
示例
NO.1 古诗词网(纵向结构)
package main
import (
. "./myspider"
"regexp"
)
/**
aspx">雨打梨花深闭门,忘了青春,误了青春。</a><span style=" color:#65645F; margin-top:-7px; float:left; margin-left:5px; margin-right:10px;">____</span><a style=" float:left;" target="_blank" href="/view_72645.aspx">唐寅《一剪梅·雨打梨花深闭门》</a>
</div>
*/
var textItem = regexp.MustCompile(`aspx">(.*?)<.*?aspx">(.*?)</a>`)
// <a style="width:60px;" href="Default.aspx?p=2&c=&t=">下一页</a>
var nextReg = regexp.MustCompile(`href="(.*?)">下一页</a>`)
var lll = 0
func parse(response *Response) {
nt := nextReg.FindStringSubmatch(response.Body)
if len(nt) == 2 && lll < 2 { // 有下一页
lll++
RequestChan <- &Request{"http://so.gushiwen.org/mingju/" + nt[1], parse, nil}
} else { // 最后一页,设置标志
defer func() { RequestChan <- nil }()
}
res := textItem.FindAllStringSubmatch(response.Body, 100)
for _, it := range res {
if len(it) == 3 {
data := make(map[string]string)
data["text"] = it[1]
data["title"] = it[2]
DataChan <- &data
}
}
}
func main() {
RequestChan <- &Request{"http://so.gushiwen.org/mingju/", parse, nil}
Scheduler(4)
}
执行过程:首先生成第一个请求,扔到请求池,然后开始同时处理4个请求的调度过程。
NO.3 8edy电影网(纵横结构)
// httpwww.8edy.tvkh
package main
import (
. "./myspider"
"fmt"
"os"
"regexp"
)
// <a href="/movie/28293/" target="_blank">独立日2</a>
var detReg = regexp.MustCompile(`"(/movie/[0-9]*?/)" target="_blank">`)
// <a href="/kh/p2/">下一页</a>
var nt = regexp.MustCompile(`"(/kh/p[0-9]*?)">下一页`)
var cur = 0
var max = 2
func parse1(response *Response) {
cur++
for _, link := range detReg.FindAllStringSubmatch(response.Body, 40) { // 具体电影情况
RequestChan <- &Request{"http://www.8edy.tv" + link[1], parse2, nil}
}
next := nt.FindStringSubmatch(response.Body)
if len(next) > 0 && cur < max { // 下一页链接
RequestChan <- &Request{"http://www.8edy.tv" + next[1], parse1, nil}
} else { // 最后一页
RequestChan <- nil
}
}
/*
<div class="nrjb_tlt"><h1 class="lvzi">末日危城</h1> <br>类型:动作电影 科幻电影 <br>
年份:2008<br>
地区:欧美<br>
语言: <br>
格式:MP4 / 3GP <br>
导演:<a href=" /do/s?wd=乌维·鲍尔" target="_blank">乌维·鲍尔</a><br>
<div class="zhuyan"><span class="mvdf">主演:</span><p class="nrnmd"> 杰森·斯坦森 雷·利奥塔 克莱尔·弗兰妮</p></div></div>
*/
var titleReg = regexp.MustCompile(`<h1 class="lvzi">(.*?)</h1>`)
var ftypeReg = regexp.MustCompile(`类型:(.*?)<br/>`)
var yearReg = regexp.MustCompile(`年份:(.*?)<br/>`)
var areaReg = regexp.MustCompile(`地区:(.*?)<br/>`)
var formReg = regexp.MustCompile(`格式:(.*?)<br/>`)
func parse2(response *Response) {
html := response.Body
data := make(map[string]string)
data["title"] = titleReg.FindStringSubmatch(html)[1]
data["type"] = ftypeReg.FindStringSubmatch(html)[1]
data["year"] = yearReg.FindStringSubmatch(html)[1]
data["area"] = areaReg.FindStringSubmatch(html)[1]
data["form"] = formReg.FindStringSubmatch(html)[1]
DataChan <- &data
}
func main() {
file, err := os.Create("8edy.csv")
defer file.Close()
if err != nil {
fmt.Println(err)
return
}
file.WriteString(fmt.Sprintf("%s,%s,%s,%s,%s\n", "标题", "类型", "年份", "地区", "格式"))
Save = func(data *map[string]string) {
file.WriteString(fmt.Sprintf("%s,%s,%s,%s,%s\n", (*data)["title"], (*data)["type"], (*data)["year"], (*data)["area"], (*data)["form"]))
}
RequestChan <- &Request{"http://www.8edy.tv/kh/", parse1, nil}
Scheduler(10)
}
过程与上面的一样,这个程序多一个自定义数据存储过程。