17、go语言:分布式爬虫
1、分布式系统简介:
多个节点:
容错性
可扩展性(性能)
固有分布性
消息传递:
节点具有私有存储
易于开发
可扩展性(功能)
对比:并行计算
完成特定的需求:
消息传递的方法:
REST(动作、URL)
RPC(序列化传输、远端序列化后调用)
中间件(可以存储消息、一对多、消息队列)
一般消息传递的方法:
对外:REST
模块内部:RPC
模块之间:中间件,REST
分布式架构VS为微服务架构
分布式:指导节点之间如何通信
微服务:鼓励按业务划分模块
微服务架构通过分布式架构来实现
多层架构VS微服务架构
微服务架构具有更多的“服务”
微服务通常需要配合自动化测试,部署,服务发现等
目前我们倾向于微服务架构
2、分布式爬虫架构:
并发版爬虫的架构:
目前的问题:
限流问题
去重问题
数据存储问题(固有分布式)
解决限流问题:
单节点能够承受的流量有限–> 将worker放到不同的节点
每个机器都可以起很多worker,任务不一定分发到本机的worker
解决去重问题:
单节点能承受的去重数据量有限
无法保存之前去重结果
基于Key-Value Store(如Redis)进行分布式去重
每来一个请求都要去连接一次去重服务,容易被卡住,所以需要把去重的工作交给worker,worker卡住
没关系,可以起很多个goroutine的worker
解决存储问题:
存储部分的结构,技术栈和爬虫部分区别很大
进一步优化需要特殊的ElasticSearch技术背景
固有分布式
将ItemSaver单独做一个存储服务
本课程架构:
实现的关键是从channel到分布式
goroutine -> channel ->goroutine
RPC同步调用
RPC:
jsonrpc
grpc(使用protobuf)
Thrift
使用*协议:
docker/libchan
NATS streaming
gocircuit
根据自己需求
通过消息中间件:
3、jsonrpc的使用:
//--
package rpcdemo
//Service.Method
type DemoService struct{}
type Args struct {
A,B int
}
//RPC框架要求要有两个参数,一个是参数,一个是结果
func (DemoService) Div(args Args ,result *float64) error {
if args.B == 0 {
return errors.New("division by zero")
}
*result = float64(args.A) /float64(args.B)
return nil
}
//--
func main(){
rpc.Register(recdemo.DemoService{})
listener,err := net.Listen("tcp",":1234")
if err != nil {
panic(err)
}
for {
conn,err := listener.Accept()
if err != nil {
log.Printf("accept error : %v",err)
continue
}
go jsonrpc.ServeConn(conn)
}
}
//--
//模拟发送
func main(){
conn,err := net.Dial("tcp",":1234")
if err != nil {
panic(err)
}
var result float64
client := jsonrpc.NewClient(conn)
client.Call("DemoService.Div",rpcdemo.Args{10,3},&result)
fmt.Println(result,err)
}
4、ItemServer服务:
//--
package rpcsupport
func ServeRpc(host string,service interface{}) error {
rpc.Register(service)
listener,err := net.Listen("tcp",host)
if err != nil {
return err
}
for {
conn,err := listener.Accept()
if err != nil {
log.Printf("accept error : %v",err)
continue
}
go jsonrpc.ServeConn(conn)
}
return nil
}
func NewClieny(host string})(*rpc.Client,error) {
conn,err := net.Dial("tcp",host)
if err != nil {
return nil,err
}
return jsonrpc.NewClient(conn),nil
}
//--
package persist
type ItemSaverService struct{
Client *elastic.Client
Index string
}
func (s *ItemSaverService) Save(item engine.Item,result *string) error {
err := persist.Save(s.Client,s.Index,item)
if err != nil {
*result = 'ok'
}
return err
}
//---
func main(){
//client,err := elastic.NewClient(elastic.SetSniff(false))
//if err != nil {
//panic(err)
//}
//rpcsupport.ServeRpc(":1234",persist.ItemSaverService{
//Client : client,
//Index : "dating_profile",
//})
serveRpc(":1234",'dating_profile')
}
func serveRpc(host,index string) error {
client,err := elastic.NewClient(elastic.SetSniff(false))
if err != nil {
return err
}
return &rpcsupport.ServeRpc(host,persist.ItemSaverService{
Client : client,
Index : index,
})
}
//测试
func TestItemSaver(t *testing.T) {
const host = ":1234"
//start ItemSaverServer
go serveRpc(host,'test1')
time.sleep(Time.Second)
//start ItemSaverClient
client,err := rpcsupport.NewClient(host)
if err != nil {
panic(err)
}
//Call save
item := engine.Item {
Url : 'xxxx',
Type : "zhenai",
Payload : model.Profile {
Age :24,
Name : "安静的雪"
}
}
result := ""
err = client.Call("ItemSaverService.Save",item,&result)
if err != nil || result != ok {
t.Errorf("result: %s;err : %s",result,err)
}
}
5、整合ItemServer服务:
func ItemSaver(host string) (chan engine.Item,err) {
client,err :=rpcsupport.NewClient(host)
if err != nil {
return nil,err
}
out := make(chan engine.Item)
go func() {
itemCount := 0
for {
item := <- out
log.Printf("Item Saver : got item " + "#%d: %v",itemCount,item)
itemCount++
//Call Rpc
result := ""
err := client.Call("ItemSaverService.Save",item,&result)
if err != nil {
log.Print("Item Saver: error " + "saving item %v : %v",item,err)
}
}
}()
return out
}
//重新写main函数
func main(){
//itemChan,err := client.ItemSaver(":1234")
itemChan,err := client.ItemSaver(fmt.Sprintf(":%d",config.ItemSaverPort))
if err != nil {
panic(err)
}
e := engine.ConcurrentEngine{
Scheduler: &scheduler.SimplerScheduler{},
Workercount : 10,
ItemChan : itemchan,
}
e.Run(engine.Request{
Url : "http://www.zhenai.com/zhenghun",
ParseFunc : parser.ParseCityList,
})
}
//--
package config
const (
ItemSaverPort = 1234
ElasticIndex = "datiing_profile"
)
6、解析器的序列化:
解析器原先的定义为函数
需要处理函数的序列化和反序列化
需要一种机制,将函数进行序列化和反序列化的转换
//--
package engine
//type SerializedParser struct {
//Name string
//Args interface{}
//}
// {"ParseCityList",nil} , {"ProfileParser",userName}
type Parse interface {
Parse(contents []byte,url string) ParseResult
Serialize() (name string,args interface{})
}
type Request struct {
Url string
Parser Parser
}
type NilParser struct {}
func (NilParser) Parse(_ []byte,_ string) ParseResult {
return ParseResult{}
}
func (NilParser) Serialize() (name string,args interface{}){
return "NilParser",nil
}
type FuncParser struct {
parser ParserFunc
Name string
}
//用工厂方法来建FuncParser
func NewFuncParser(p ParserFunc,name string) *FuncParser {
return FuncParser {
parser : p,
name : name,
}
}
func (f *FuncParser) Parse(contents []byte,url string) ParseResult {
return f.parser(contents,url)
}
func (f *FuncParser) Serialize() (name string,args interface{}){
return f.name,nil
}
//profileParser的代码在此处省略
7、实现爬虫服务:
type SerializedParser struct {
Name string
Args interface{}
}
type Request struct {
Url string
Parser SerializedParser
}
type ParseResult struct {
Items []engine.Item
Requests []Request
}
func SerializeRequest(r engine.Request) Request {
name,args := r.Parser.Serialize()
return Request{
Url : r.Url,
Parser : SerializedParser{
Name : name,
Args: args,
},
}
}
func SerializeResult(r engine.PareResult) ParseResult {
result := ParseResult{
Items : r.Items,
}
for _,req := range r.Requests {
result.Requests = append(result.Requests,SerializeRequest(req))
}
return result
}
func DeserializeRequest(r Request) engine.Request{
return engine.Request{
Url : r.Url,
Parser : deserializeParser(r.Parser)
}
}
func DeserializeResult(r ParseResult) engine.ParseResult {
result := engine.Parseresult{
Items : r.Items,
}
for _,req := range r.Requests {
engineReq,err := DeserializeRequest(req)
if err!= nil {
log.Printf("error deserializing request: %v",err)
continue
}
result.Requests = append(result.Requests,engineReq)
}
return result
}
func deserializeParser(p SerializedParser) (engine.Parser,error) {
switch p.Name {
case config.PareseCityList:
return engine.NewFuncParser(
parser.ParseCityList,
config.ParseCityList),nil
case config.ParseProfile:
if userName,ok := p.Args.(string);ok {
return parser.NewProfileParser(
userName),nil
}else{
return nil,fmt.Errorf("invalid args : %v",p.Args)
}
default:
return nil,errors.New("unknown parser name")
//其他case省略
}
}
//--
package worker
type CrawlService struct{}
func (CrawlService) Process (req Request,result *ParseResult) error {
engineReq,err := DeserializeRequest(req)
if err != nil {
return err
}
engine.Result,err := engine.Worker(engineReq)
if err !=nil {
return err
}
*result = SerializeResult(engineResult)
return nil
}
//---
func main(){
log.Fatal(rpcsupporrt.ServeRpc(
fmt.Sprintf(":%d",config.WorkerPort0),
worker.CrawlService{}))
}
–
//testing
func TestCrawlService(t *testing.T){
const host = ":9000"
go rpcsupport.ServeRpc(
host,worker.CrawlService{})
time.Sleep(time.Second)
client,err := rpcsupport.NewClient(host)
if err != nil {
panic(err)
}
req := worker.Request{
Url : "http://album.zhenai.com/u/108906739",
Parser : worker.SerializedParser{
Name : config.ParseProfile,
Args: "安静的雪",
},
}
var result worker.ParseResult
err = client.Call(crawl.CrawlServiceRpc,req,&result)
if err != nil {
t.Error(err)
}else{
fmt.Println(result)
}
}
8、完整分布式爬虫的运行:
//改造worker
type Processor func(Request) (ParseResult,error)
//--
package client
func CreateProcessor() (engine.Processor,error) {
client,err := rpcsupport.Newclient(fmt.Sprintf(":%d",config.WorkerPort0))
if err != nil {
return nil,err
}
return func(req engine.Request)(engine.ParseResult,error){
sReq := worker.SerializeRequest(req)
var sResult worker.ParseResult
err := client.Call(config.CrawlServiceRpc,sReq,&sResult)
if err != nil {
return engine.ParseResult{},err
}
return worker.DeserializeResult(sResult),nil
},nil
}
9、使用连接池链接爬虫集群:
//通过chan解决加锁的问题
func CreateProcessor(clientChan chan *rpc.Client) engine.Processor {
return func(req engine.Request)(engine.ParseResult,error){
sReq := worker.SerializeRequest(req)
var sResult worker.ParseResult
client := <- clientChan
err := client.Call(config.CrawlServiceRpc,sReq,&sResult)
if err != nil {
return engine.ParseResult{}
}
return worker.DeserializeResult(sResult),nil
}
}
func createClientPool (host []string) chan *rpc.Client {
var clients []*rpc.Client
for _,h := range hosts {
client,err := rpcsupport.NewClient(h)
if err == nil {
clients = append(clients,client)
log.Printf("connecting to %s",h)
}else{
log.Printf("error connecting to %s : %v",h,err)
}
}
out := make(chan *rpc.Client)
go func() {
for {
for _,client := range clients {
out <- client
}
}
}()
return out
}
--main函数
//使用flag标准库
//命令行参数
//go run worker.go --help
var port = flag.Int("port",0,"the port for me to listen on")
func main(){
flag.Parse()
if *port == 0 {
fmt.Println("must specify a port")
return
}
log.Printf("Listening on %s",host)
log.Fatal(rpcsupport.ServeRpc(fmt.Sprintf(":%d",port),worker.CrawlService{}))
}
//--itemSaver的main函数
var (
itemSaverHost = flag.String("itemsaver_host","","itemsaver host")
workerHosts = flag.String("worker_hosts","","worker hosts (comma separated)") //逗号分隔
)
func main(){
flag.Parse()
itemChan,err := itemsaver.ItemSaver(*itemSaverHost)
if err != nil {
panic(err)
}
pool := createClientPool(strings.Split(*workerHosts,","))
processor := worker.CreateProcessor(pool)
e := engine.ConcurrentEngine{
Scheduler : &scheduler.QueuedScheduler{},
WorkerCount : 100,
ItemChan : itemChan,
RequestProcessor : processor
}
}
// go run worker.go --port=9000
//go run crawler_distributed/main.go --itemsaver_host=":1234" --worker_hosts=":9000,:9001,:9002,:9003"
//浏览器
//go run crawler/frontend/starter.go
10、实战项目总结:
//详情请观看视频
11、进一步的工作:
爬取更多的网站,使用css选择器/xpath分析数据
对抗反爬技术/遵循robots协议
模拟登录,爬取动态网页
分布式去重
//—
优化elasticSearch查询质量(安装插件)
优化用户查询体验
爬取照片,动态展示
大数据,AI
“Elastic Stack从入门到实践” 幕客课程
使用脚本进行部署
使用docker + k8s进行部署
集成服务发现框架如consul(go语言)
用LogStash汇总和分析日志
上一篇: 同学们好