欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

17、go语言:分布式爬虫

程序员文章站 2022-05-19 14:15:01
...

1、分布式系统简介:

多个节点:
容错性
可扩展性(性能)
固有分布性

消息传递:
节点具有私有存储
易于开发
可扩展性(功能)
对比:并行计算

完成特定的需求:

消息传递的方法:
REST(动作、URL)
RPC(序列化传输、远端序列化后调用)
中间件(可以存储消息、一对多、消息队列)

一般消息传递的方法:
对外:REST
模块内部:RPC
模块之间:中间件,REST

分布式架构VS为微服务架构
分布式:指导节点之间如何通信
微服务:鼓励按业务划分模块
微服务架构通过分布式架构来实现

多层架构VS微服务架构
微服务架构具有更多的“服务”
微服务通常需要配合自动化测试,部署,服务发现等
目前我们倾向于微服务架构

2、分布式爬虫架构:

并发版爬虫的架构:
17、go语言:分布式爬虫

目前的问题:
限流问题
去重问题
数据存储问题(固有分布式)

解决限流问题:
单节点能够承受的流量有限–> 将worker放到不同的节点
17、go语言:分布式爬虫
每个机器都可以起很多worker,任务不一定分发到本机的worker

解决去重问题:
单节点能承受的去重数据量有限
无法保存之前去重结果
基于Key-Value Store(如Redis)进行分布式去重
17、go语言:分布式爬虫
每来一个请求都要去连接一次去重服务,容易被卡住,所以需要把去重的工作交给worker,worker卡住
没关系,可以起很多个goroutine的worker

解决存储问题:
存储部分的结构,技术栈和爬虫部分区别很大
进一步优化需要特殊的ElasticSearch技术背景
固有分布式
将ItemSaver单独做一个存储服务
17、go语言:分布式爬虫
本课程架构:

17、go语言:分布式爬虫
实现的关键是从channel到分布式
goroutine -> channel ->goroutine

RPC同步调用
17、go语言:分布式爬虫

RPC:
jsonrpc
grpc(使用protobuf)
Thrift
17、go语言:分布式爬虫
使用*协议:
docker/libchan
NATS streaming
gocircuit
根据自己需求

通过消息中间件:

17、go语言:分布式爬虫

3、jsonrpc的使用:

//--
package rpcdemo

//Service.Method
type DemoService struct{}

type Args struct {
	A,B int
}

//RPC框架要求要有两个参数,一个是参数,一个是结果
func (DemoService) Div(args Args ,result *float64) error {
	if args.B == 0 {
		return errors.New("division by zero")
	}
	
	*result = float64(args.A) /float64(args.B)
	return nil
}

//--
func main(){
	rpc.Register(recdemo.DemoService{})
	listener,err := net.Listen("tcp",":1234")
	if err != nil {
		panic(err)
	}
	
	for {
		conn,err := listener.Accept()
		if err != nil {
			log.Printf("accept error : %v",err)
			continue
		}
		go jsonrpc.ServeConn(conn)
	}
}

//--
//模拟发送
func main(){
	conn,err := net.Dial("tcp",":1234")
	if err != nil {
		panic(err)
	}
	var result float64
	client := jsonrpc.NewClient(conn)
	client.Call("DemoService.Div",rpcdemo.Args{10,3},&result)
	fmt.Println(result,err)
}

4、ItemServer服务:

//--
package rpcsupport

func ServeRpc(host string,service interface{}) error {
	rpc.Register(service)
	listener,err := net.Listen("tcp",host)
	if err != nil {
		return err
	}
	
	for {
		conn,err := listener.Accept()
		if err != nil {
			log.Printf("accept error : %v",err)
			continue
		}
		go jsonrpc.ServeConn(conn)
	}
	return nil
}

func NewClieny(host string})(*rpc.Client,error) {
	conn,err := net.Dial("tcp",host)
	if err != nil {
		return nil,err
	}
	return jsonrpc.NewClient(conn),nil
}

//--
package persist
type ItemSaverService struct{
	Client *elastic.Client
	Index string
}

func (s *ItemSaverService) Save(item engine.Item,result *string) error {
	err := persist.Save(s.Client,s.Index,item)
	if err != nil {
		*result = 'ok'
	}
	return err
}

//---
func main(){
//client,err := elastic.NewClient(elastic.SetSniff(false))
//if err != nil {
//panic(err)
//}
//rpcsupport.ServeRpc(":1234",persist.ItemSaverService{
//Client : client,
//Index : "dating_profile",
//})
 serveRpc(":1234",'dating_profile')
}

func serveRpc(host,index string) error {
	client,err := elastic.NewClient(elastic.SetSniff(false))
	if err != nil {
		return  err
	}
	return &rpcsupport.ServeRpc(host,persist.ItemSaverService{
				Client : client,
				Index : index,
				})
}

//测试
func TestItemSaver(t *testing.T) {
	const host = ":1234"
	//start ItemSaverServer
	go serveRpc(host,'test1')
	time.sleep(Time.Second)
	//start ItemSaverClient
	client,err := rpcsupport.NewClient(host)
	if err != nil {
		panic(err)
	}
	//Call save
	item := engine.Item {
				Url : 'xxxx',
				Type : "zhenai",
				Payload : model.Profile {
					Age :24,
					Name : "安静的雪"
					}
				}
	
	result := ""
	err = client.Call("ItemSaverService.Save",item,&result)
	if err != nil || result != ok {
		t.Errorf("result: %s;err : %s",result,err)
	}
}

5、整合ItemServer服务:

func ItemSaver(host string) (chan engine.Item,err) {
	client,err :=rpcsupport.NewClient(host)
	if err != nil {
		return nil,err
	}
	out := make(chan engine.Item)
	go func() {
		itemCount := 0
		for {
				item := <- out
				log.Printf("Item Saver : got item " + "#%d: %v",itemCount,item)
				itemCount++
				
				//Call Rpc
				result := ""
				err := client.Call("ItemSaverService.Save",item,&result)
				if err != nil {
					log.Print("Item Saver: error " + "saving item %v : %v",item,err)
				}
			
			}
		}()
	return out
}

//重新写main函数
func main(){
	//itemChan,err := client.ItemSaver(":1234")
	itemChan,err := client.ItemSaver(fmt.Sprintf(":%d",config.ItemSaverPort))
	if err != nil {
		panic(err)
	}
	e := engine.ConcurrentEngine{
			Scheduler: &scheduler.SimplerScheduler{},
			Workercount : 10,
			ItemChan : itemchan,
			}
	e.Run(engine.Request{
			Url : "http://www.zhenai.com/zhenghun",
			ParseFunc : parser.ParseCityList,
		})
}

//--
package config

const (
	ItemSaverPort = 1234
	ElasticIndex = "datiing_profile"
)

6、解析器的序列化:

解析器原先的定义为函数
需要处理函数的序列化和反序列化

需要一种机制,将函数进行序列化和反序列化的转换

//--
package engine

//type SerializedParser struct {
//Name string
//Args interface{}
//}

// {"ParseCityList",nil}  ,  {"ProfileParser",userName}

type Parse interface {
	Parse(contents []byte,url string) ParseResult
	Serialize() (name string,args interface{})
}

type Request struct {
	Url string
	Parser Parser
}

type NilParser struct {}

func (NilParser) Parse(_ []byte,_ string) ParseResult {
	return ParseResult{}
}

func (NilParser) Serialize() (name string,args interface{}){
	return "NilParser",nil
}

type FuncParser struct {
	parser ParserFunc
	Name string
}

//用工厂方法来建FuncParser
func  NewFuncParser(p ParserFunc,name string) *FuncParser {
	return FuncParser {
		parser : p,
		name : name,
		}
}

func (f *FuncParser) Parse(contents []byte,url string) ParseResult {
	return f.parser(contents,url)
}

func (f *FuncParser) Serialize() (name string,args interface{}){
	return f.name,nil
}

//profileParser的代码在此处省略

7、实现爬虫服务:

type SerializedParser struct {
	Name string
	Args interface{}
}

type Request struct {
	Url string
	Parser SerializedParser
}

type ParseResult struct {
	Items []engine.Item
	Requests []Request
}

func SerializeRequest(r engine.Request) Request {
	name,args := r.Parser.Serialize()
	return Request{
			Url : r.Url,
			Parser : SerializedParser{
				Name : name,
				Args: args,
			},
		}
}

func SerializeResult(r engine.PareResult) ParseResult {
	result := ParseResult{
				Items : r.Items,
				}
	for _,req := range r.Requests {
		result.Requests = append(result.Requests,SerializeRequest(req))
	}
	return result
}

func DeserializeRequest(r Request) engine.Request{
	return engine.Request{
				Url : r.Url,
				Parser : deserializeParser(r.Parser)
			}
}

func DeserializeResult(r ParseResult) engine.ParseResult {
	result := engine.Parseresult{
					Items : r.Items,
				}
	for _,req := range r.Requests {
		engineReq,err := DeserializeRequest(req)
		if err!= nil {
			log.Printf("error deserializing request: %v",err)
			continue
		}
		result.Requests = append(result.Requests,engineReq)
	}
	return result
}

func deserializeParser(p SerializedParser) (engine.Parser,error{
	switch p.Name {
		case config.PareseCityList:
			return  engine.NewFuncParser(
						parser.ParseCityList,
						config.ParseCityList),nil
		case config.ParseProfile:
			if userName,ok := p.Args.(string);ok {
				return parser.NewProfileParser(
				userName),nil
			}else{
				return nil,fmt.Errorf("invalid args : %v",p.Args)
			}
		
		default:
			return nil,errors.New("unknown parser name")
			//其他case省略
	}
}
//--
package worker
type CrawlService struct{}

func (CrawlService) Process (req Request,result *ParseResult) error {
	engineReq,err := DeserializeRequest(req)
	if err != nil {
		return err
	}
	engine.Result,err := engine.Worker(engineReq)
	if err !=nil {
		return err
	}
	*result = SerializeResult(engineResult)
	return nil
}

//---
func main(){
	log.Fatal(rpcsupporrt.ServeRpc(
		fmt.Sprintf(":%d",config.WorkerPort0),
		worker.CrawlService{}))
}

//testing

func TestCrawlService(t *testing.T){
	const host = ":9000"
	go rpcsupport.ServeRpc(
		host,worker.CrawlService{})
	time.Sleep(time.Second)
	
	client,err := rpcsupport.NewClient(host)
	if err != nil {
		panic(err)
	}
	req := worker.Request{
				Url : "http://album.zhenai.com/u/108906739",
				Parser : worker.SerializedParser{
				Name : config.ParseProfile,
				Args: "安静的雪",
			},
		}
	var result worker.ParseResult
	err = client.Call(crawl.CrawlServiceRpc,req,&result)
	if err != nil {
		t.Error(err)
	}else{
	fmt.Println(result)
	}
}

8、完整分布式爬虫的运行:

//改造worker

type Processor func(Request) (ParseResult,error)

//--
package client

func CreateProcessor() (engine.Processor,error) {
	client,err := rpcsupport.Newclient(fmt.Sprintf(":%d",config.WorkerPort0))
	if err != nil {
		return nil,err
	}
	return func(req engine.Request)(engine.ParseResult,error){
		sReq := worker.SerializeRequest(req)
		var sResult worker.ParseResult
		err := client.Call(config.CrawlServiceRpc,sReq,&sResult)
		if err != nil {
			return engine.ParseResult{},err
		}
		return worker.DeserializeResult(sResult),nil
		},nil
}

9、使用连接池链接爬虫集群:

//通过chan解决加锁的问题
func CreateProcessor(clientChan chan *rpc.Client) engine.Processor {

	return func(req engine.Request)(engine.ParseResult,error){
			sReq := worker.SerializeRequest(req)
			var sResult worker.ParseResult
			client := <- clientChan
			err := client.Call(config.CrawlServiceRpc,sReq,&sResult)
			if err != nil {
				return engine.ParseResult{}
			}
			return worker.DeserializeResult(sResult),nil
		}
}

func createClientPool (host []string) chan *rpc.Client {
	var clients []*rpc.Client
	for _,h := range hosts {
		client,err := rpcsupport.NewClient(h)
		if err == nil {
			clients = append(clients,client)
			log.Printf("connecting to %s",h)
		}else{
			log.Printf("error connecting to %s : %v",h,err)
		}
	}
	
	out := make(chan *rpc.Client)
	go func() {
		for {
			for _,client := range clients {
				out <- client
			}
		}
	}()
	return out
}
--main函数
//使用flag标准库
//命令行参数
//go run worker.go --help
var port = flag.Int("port",0,"the port for me to listen on")

func main(){
	flag.Parse()
	if *port == 0 {
		fmt.Println("must specify a port")
		return 
	}
	log.Printf("Listening on %s",host)
	log.Fatal(rpcsupport.ServeRpc(fmt.Sprintf(":%d",port),worker.CrawlService{}))
}

//--itemSaver的main函数

var (
	itemSaverHost = flag.String("itemsaver_host","","itemsaver host")
	workerHosts = flag.String("worker_hosts","","worker hosts (comma separated)") //逗号分隔
)

func main(){
	flag.Parse()
	itemChan,err := itemsaver.ItemSaver(*itemSaverHost)
	if err != nil {
		panic(err)
	}
	pool := createClientPool(strings.Split(*workerHosts,","))
	processor := worker.CreateProcessor(pool)
	
	e := engine.ConcurrentEngine{
				Scheduler : &scheduler.QueuedScheduler{},
				WorkerCount : 100,
				ItemChan : itemChan,
				RequestProcessor : processor
			}
}

// go run worker.go --port=9000

//go run crawler_distributed/main.go --itemsaver_host=":1234" --worker_hosts=":9000,:9001,:9002,:9003"

//浏览器
//go run crawler/frontend/starter.go

10、实战项目总结:

//详情请观看视频

11、进一步的工作:

爬取更多的网站,使用css选择器/xpath分析数据
对抗反爬技术/遵循robots协议
模拟登录,爬取动态网页
分布式去重
//—
优化elasticSearch查询质量(安装插件)
优化用户查询体验
爬取照片,动态展示
大数据,AI

“Elastic Stack从入门到实践” 幕客课程

使用脚本进行部署
使用docker + k8s进行部署
集成服务发现框架如consul(go语言)
用LogStash汇总和分析日志

相关标签: golang