欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Golang 的类Scrapy爬虫实现

程序员文章站 2022-05-06 18:47:27
...

核心部分

N0.1 Request/Response

type Response struct {
    Url  string
    Body string
    Meta *map[string]string
}

type Request struct {
    Url   string
    Parse func(response *Response)
    Meta  *map[string]string
}

请求与响应的封装结构体,对于 ResponseUrl 为请求时的 UrlBody 为此 Url 的网页源码, Meta 为上一层传过来的数据(之所以用指针是因为指针省内存);

对于 Request 结构, Url 是需要请求的链接, Parse 是该 Url 对应的解析函数, Meta 是传给 Parse 函数参数 response 的数据。

No.2 请求池/数据池

var RequestChan = make(chan *Request, 100)        // 请求池
var DataChan = make(chan *map[string]string, 100) // 数据池

请求池 RequestChan 用于存放等待完成的请求,数据池 DataChan 用于存放还未处理的数据。

No.3 细节部分

var wg sync.WaitGroup                             //定义一个同步等待的组
var swg sync.WaitGroup                            // 等待数据处理结束
var Save = func(data *map[string]string) {        // 保存数据的函数
    fmt.Println(data)
}
var Download = func(url string) (content string, statusCode int) { // 网页源码下载函数

    defer func() { fmt.Printf("get<%d>: %s\n", statusCode, url) }()
    resp, err1 := http.Get(url)
    if err1 != nil {
        statusCode = -100
        return
    }
    defer resp.Body.Close()
    data, err2 := ioutil.ReadAll(resp.Body)
    if err2 != nil {
        statusCode = -200
        return
    }
    statusCode = resp.StatusCode
    content = string(data)
    return
}

func doRequest(request *Request) {
    defer wg.Done()
    html, sta := Download(request.Url)
    if sta != 200 {
        return
    }
    request.Parse(&Response{request.Url, html, request.Meta})
}

func save() {
    defer swg.Done()
    for {
        data := <-DataChan
        if data == nil {
            break
        }
        Save(data)
    }
}

对于请求与数据的处理,我们需要全部完成,且并发数量要限制在一定范围内,因此我们定义了 wgswg 两个协程组, Save 变量是一个函数,之所以定义成一个变量,是因为 Save 是提供给用户操作的,例如用户可能将数据保存在文件、数据库、云盘等等地方,我们只提供一个默认保存方案,即向控制台输出数据。同理 Download 变量也是一个函数,用于处理下载问题。doRequest 函数不提供给用户操作,因此其首字母小写,其作用是处理请求 Requestsave 函数是我们的数据处理函数,从数据池中取出数据,然后调用用户定义的 Save 方法处理数据。

NO.4 调度器Scheduler(核心)

func Scheduler(threads int) {
    go save()
    swg.Add(1)
    for {
        for i := 0; i < threads; i++ { // 每次开启这么多线程
            req := <-RequestChan
            if req == nil {
                DataChan <- nil
                return
            }
            go doRequest(req)
            wg.Add(1)
        }
        wg.Wait()
    }
    swg.Wait() // 必须在这里等待,不然数据有可能还没处理完就挂掉了
}

调度器的作用是调度请求,首先,调度器先开启数据处理线程 save 然后再依次开启 threads 个线程处理请求,当前一批 request 请求完了后,再从请求池里取出 threadsrequest 进行请求,当所有请求完毕后,将数据池的队尾写入 nil 提示所有 request 都处理完毕了, 如果数据也处理完毕,则可结束数据处理线程,此时调度结束,控制权交给用户的 main 函数。

NO.5 完整代码

package myspider

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "sync"
)

type Response struct {
    Url  string
    Body string
    Meta *map[string]string
}

type Request struct {
    Url   string
    Parse func(response *Response)
    Meta  *map[string]string
}

var RequestChan = make(chan *Request, 100)        // 请求池
var DataChan = make(chan *map[string]string, 100) // 数据池
var wg sync.WaitGroup                             //定义一个同步等待的组
var swg sync.WaitGroup                            // 等待数据处理结束
var Save = func(data *map[string]string) {        // 保存数据的函数
    fmt.Println(data)
}
var Download = func(url string) (content string, statusCode int) { // 网页源码下载函数

    defer func() { fmt.Printf("get<%d>: %s\n", statusCode, url) }()
    resp, err1 := http.Get(url)
    if err1 != nil {
        statusCode = -100
        return
    }
    defer resp.Body.Close()
    data, err2 := ioutil.ReadAll(resp.Body)
    if err2 != nil {
        statusCode = -200
        return
    }
    statusCode = resp.StatusCode
    content = string(data)
    return
}

func doRequest(request *Request) {
    defer wg.Done()
    html, sta := Download(request.Url)
    if sta != 200 {
        return
    }
    request.Parse(&Response{request.Url, html, request.Meta})
}

func save() {
    defer swg.Done()
    for {
        data := <-DataChan
        if data == nil {
            break
        }
        Save(data)
    }
}

func Scheduler(threads int) {
    go save()
    swg.Add(1)
    for {
        for i := 0; i < threads; i++ { // 每次开启这么多线程
            req := <-RequestChan
            if req == nil {
                DataChan <- nil
                return
            }
            go doRequest(req)
            wg.Add(1)
        }
        wg.Wait()
    }
    swg.Wait() // 必须在这里等待,不然数据有可能还没处理完就挂掉了
}

示例

NO.1 古诗词网(纵向结构)

package main

import (
    . "./myspider"
    "regexp"
)

/**
aspx">雨打梨花深闭门,忘了青春,误了青春。</a><span style=" color:#65645F; margin-top:-7px; float:left; margin-left:5px; margin-right:10px;">____</span><a style=" float:left;" target="_blank" href="/view_72645.aspx">唐寅《一剪梅·雨打梨花深闭门》</a>
</div>
*/
var textItem = regexp.MustCompile(`aspx">(.*?)<.*?aspx">(.*?)</a>`)

// <a style="width:60px;" href="Default.aspx?p=2&amp;c=&amp;t=">下一页</a>
var nextReg = regexp.MustCompile(`href="(.*?)">下一页</a>`)
var lll = 0

func parse(response *Response) {
    nt := nextReg.FindStringSubmatch(response.Body)
    if len(nt) == 2 && lll < 2 { // 有下一页
        lll++
        RequestChan <- &Request{"http://so.gushiwen.org/mingju/" + nt[1], parse, nil}
    } else { // 最后一页,设置标志
        defer func() { RequestChan <- nil }()
    }
    res := textItem.FindAllStringSubmatch(response.Body, 100)
    for _, it := range res {
        if len(it) == 3 {
            data := make(map[string]string)
            data["text"] = it[1]
            data["title"] = it[2]
            DataChan <- &data
        }
    }

}

func main() {
    RequestChan <- &Request{"http://so.gushiwen.org/mingju/", parse, nil}
    Scheduler(4)
}

执行过程:首先生成第一个请求,扔到请求池,然后开始同时处理4个请求的调度过程。

NO.3 8edy电影网(纵横结构)

// httpwww.8edy.tvkh
package main

import (
    . "./myspider"
    "fmt"
    "os"
    "regexp"
)

// <a href="/movie/28293/" target="_blank">独立日2</a>
var detReg = regexp.MustCompile(`"(/movie/[0-9]*?/)" target="_blank">`)

// <a href="/kh/p2/">下一页</a>
var nt = regexp.MustCompile(`"(/kh/p[0-9]*?)">下一页`)

var cur = 0
var max = 2

func parse1(response *Response) {
    cur++
    for _, link := range detReg.FindAllStringSubmatch(response.Body, 40) { // 具体电影情况
        RequestChan <- &Request{"http://www.8edy.tv" + link[1], parse2, nil}
    }
    next := nt.FindStringSubmatch(response.Body)
    if len(next) > 0 && cur < max { // 下一页链接
        RequestChan <- &Request{"http://www.8edy.tv" + next[1], parse1, nil}
    } else { // 最后一页
        RequestChan <- nil
    }
}

/*
<div class="nrjb_tlt"><h1 class="lvzi">末日危城</h1> <br>类型:动作电影 科幻电影 <br>
            年份:2008<br>
            地区:欧美<br>
            语言:  <br>
            格式:MP4 / 3GP <br>
            导演:<a href=" /do/s?wd=乌维·鲍尔" target="_blank">乌维·鲍尔</a><br>
          <div class="zhuyan"><span class="mvdf">主演:</span><p class="nrnmd"> 杰森·斯坦森 雷·利奥塔 克莱尔·弗兰妮</p></div></div>
*/
var titleReg = regexp.MustCompile(`<h1 class="lvzi">(.*?)</h1>`)
var ftypeReg = regexp.MustCompile(`类型:(.*?)<br/>`)
var yearReg = regexp.MustCompile(`年份:(.*?)<br/>`)
var areaReg = regexp.MustCompile(`地区:(.*?)<br/>`)
var formReg = regexp.MustCompile(`格式:(.*?)<br/>`)

func parse2(response *Response) {
    html := response.Body
    data := make(map[string]string)
    data["title"] = titleReg.FindStringSubmatch(html)[1]
    data["type"] = ftypeReg.FindStringSubmatch(html)[1]
    data["year"] = yearReg.FindStringSubmatch(html)[1]
    data["area"] = areaReg.FindStringSubmatch(html)[1]
    data["form"] = formReg.FindStringSubmatch(html)[1]
    DataChan <- &data
}

func main() {
    file, err := os.Create("8edy.csv")
    defer file.Close()
    if err != nil {
        fmt.Println(err)
        return
    }
    file.WriteString(fmt.Sprintf("%s,%s,%s,%s,%s\n", "标题", "类型", "年份", "地区", "格式"))
    Save = func(data *map[string]string) {
        file.WriteString(fmt.Sprintf("%s,%s,%s,%s,%s\n", (*data)["title"], (*data)["type"], (*data)["year"], (*data)["area"], (*data)["form"]))
    }
    RequestChan <- &Request{"http://www.8edy.tv/kh/", parse1, nil}
    Scheduler(10)
}

过程与上面的一样,这个程序多一个自定义数据存储过程。

更多更新在这里:https://github.com/ChenL1994/GoScrapy