带你入门Go的消息队列NSQ
以前看到过nsq这个东西,也一直没去看。今天刚好有时间就搭建了下,简单尝试了下这个go语言下的消息队列nsq,我这里简要记录下。
其实,nsq国内用的是比较少的,我这里也是算了解这么个东西吧 ,稍微看下源码,学到东西而已。
nsq简介
nsq是一个基于go语言的分布式实时消息平台, 它具有分布式、去中心化的拓扑结构,支持无限水平扩展。无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。另外,nsq非常容易配置和部署, 且支持众多的消息协议。支持多种客户端,协议简单,如果有兴趣可参照协议自已实现一个也可以。
nsq的几个组件
- nsqd:一个负责接收、排队、转发消息到客户端的守护进程
- nsqlookupd:管理拓扑信息, 用于收集nsqd上报的topic和channel,并提供最终一致性的发现服务的守护进程
- nsqadmin:一套web用户界面,可实时查看集群的统计数据和执行相应的管理任务
- utilities:基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq
相关网址
nsq官网:nsq官网
github: github
nsq官方客户端:nsq官方客户端
nsq文档:nsq文档
nsq协议:nsq协议
nsq安装
nsq的安装方式有好几种,可以通过二进制、源码、docker、brew等方式安装
二进制安装,可以到 上面下载对应平台的release包,然后解压就行了。
如果是mac电脑,直接用brew安装
brew install nsq
如果是docker的安装,就参照下上面那个网址吧,按照步骤操作就行了,我没有用的这个安装方式。
我是用的源码的安装方式,因为二进制的那个放在s3上面,下起来好慢,于是直接把github的源代码下载来,这里也有一个好处就是可以看源码来跟踪学习。还方便些。
下载后的目录结构如下所示:
nsq 运行
如果用源码运行,而不是make后将可执行文件放到bin目录这种,那么下载后解决完所有的依赖包后,cd 进入到 nsqio/nsq/apps/nsqd目录后,可以执行 go run ./
或 go run main.go options.go
否则会报如下错误
nsqio/nsq/apps/nsqd/main.go:44:13: undefined: nsqdflagset nsqio/nsq/apps/nsqd/main.go:54:10: undefined: config
其实进入到apps目录执行,最终还是会到 nsqio/nsq/nsqd这个下面去执行业务处理代码的,apps这里仅仅是用go-srv这个包进行了一层服务包装而已,变成守护和一些入口参数等。
$ go run ./ [nsqd] 2020/03/22 00:55:27.597911 info: nsqd v1.2.1-alpha (built w/go1.11.2) [nsqd] 2020/03/22 00:55:27.597980 info: id: 809 [nsqd] 2020/03/22 00:55:27.598396 info: topic(test): created [nsqd] 2020/03/22 00:55:27.598449 info: topic(test): new channel(test) [nsqd] 2020/03/22 00:55:27.598535 info: topic(test): new channel(lc) [nsqd] 2020/03/22 00:55:27.598545 info: nsq: persisting topic/channel metadata to nsqd.dat [nsqd] 2020/03/22 00:55:27.599714 info: tcp: listening on [::]:4150 [nsqd] 2020/03/22 00:55:27.599806 info: http: listening on [::]:4151
看到上面的提示,表示启动成功了,它会分别开放tcp和http的端口,4150,4151可以通过配置或flag参数的方式更改, 同时它也支持tls/ssl.
http测试
启动nsqd后,可以用http来测试发送一条消息,可使用curl来操作。
$ curl -d '这是一条测试消息' 'http://127.0.0.1:4151/pub?topic=test&channel=lc' ok
nsq消息模式
我们知道消息一般有推和拉模式,nsq的消息模式为推的方式,这种模式可以保证消息的及时性,当有消息时可以及时推送出去。但是要根椐客户端的消耗能力和节奏去控制,nsq是通过更改rdy的值来实现的。当没有消息时为0, 服务端推送消息后,客户端比如调用 updaterdy()这个方法改成3, 那么服务端推送时,就会根椐这个值做流控了。
发送消息是通过连接的tcp发出去的,先发到topic下面,再转到channel下面,最后从通道 memorymsgchan 中取出msg,然后发出。
github.com/nsqio/nsq/nsqd/protocol_v2.go func (p *protocolv2) messagepump(client *clientv2, startedchan chan bool) { var err error var memorymsgchan chan *message var backendmsgchan <-chan []byte var subchannel *channel // note: `flusherchan` is used to bound message latency for // the pathological case of a channel on a low volume topic // with >1 clients having >1 rdy counts var flusherchan <-chan time.time var samplerate int32 subeventchan := client.subeventchan identifyeventchan := client.identifyeventchan outputbufferticker := time.newticker(client.outputbuffertimeout) heartbeatticker := time.newticker(client.heartbeatinterval) heartbeatchan := heartbeatticker.c msgtimeout := client.msgtimeout ... ... case msg := <-memorymsgchan: if samplerate > 0 && rand.int31n(100) > samplerate { continue } msg.attempts++ subchannel.startinflighttimeout(msg, client.id, msgtimeout) client.sendingmessage() err = p.sendmessage(client, msg) if err != nil { goto exit } flushed = false case <-client.exitchan: goto exit }
nsq还支持延时消息的发送,比如订单在30分钟未支付做无效处理等场景,延时使用的是heap包的优级先队列,实现了里面的一些方法。通过判断当前时间和延时时间做对比,然后从延时队列里面弹出消息再发送到channel中,后续流程和普通消息一样,我看网上有 人碰到过说延时消息会有并发问题,最后还用的redis的zset实现的,所以不确定这个延时的靠不靠谱,要求不高的倒是可以试试。
curl -d '这是一条延迟消息' 'http://127.0.0.1:4151/pub?topic=test&channel=lc&defer=3000' defer参数,单位:毫秒
nsq消费
消费消息时,channel类似于kafka里面的消费组的概念,比如同一个channel。那么只会被一个实例消费,不会多个实例都能消费到那条消息,所以可用于消息的负载均衡, 我看到网上有人有疑惑就是他指定topic,然后再用不同的channel去消费,说怎么能收到其它channel的消息,不能直接过滤消息,其实channel不是用来过滤的。
nsq发送的消息可以确保至少被一个消费者消费,它的消费级别为至少消费一次,为了确保消息消费,如果客户端超时、重新放入队列或重连等,重复消费是不可避免的,所以客户端业务流程一定要做消息的幂等处理。
客户端回复fin 或者 req 表示成功或者重发。如果客户端未能及时发送,则nsq将重复发送消息给该客户端。
另外,nsq不像 kafka,我们是能到消息的有序的,但nsq不行,客户端收到的消费为无序的。虽然每条消息有一个时间戳,但如果对顺序有要求的,那就要注意了。所以,nsq更适合处理数据量大但是彼此间没有顺序关系的消息。
nsq的go客户端
nsq是支持多种形式的客户端的,像http或客户端库来操作,而且官方其实还建议使用http的方式,http的方式,直接发get或post请求就行了。
这里go的话,可使用go-nsq这个库,地址为:go-nsq :
go get https://github.com/nsqio/go-nsq
发送消息
package main import ( "errors" "fmt" "github.com/nsqio/go-nsq" "time" ) func main() { var ( p1 *producer p2 *producer ) p1 = &producer{} p2 = &producer{} p1.producer,_ = initproducer("127.0.0.1:4150") p2.producer,_ = initproducer("127.0.0.1:4150") defer p1.producer.stop() defer p2.producer.stop() //p1.publish("test","hello!!!") p1.deferredpublish("test", 10 * time.second,"这是一条延迟消息?") fmt.println("done") } type producer struct { producer *nsq.producer } func(p *producer) publish(topic string,message string) (err error){ if message == "" { return errors.new("message is empty") } if err = p.producer.publish(topic,[]byte(message)); err != nil { fmt.println(err) return err } return nil } // 延迟消息 func(p *producer) deferredpublish(topic string,delay time.duration, message string) (err error){ if message == "" { return errors.new("message is empty") } if err = p.producer.deferredpublish(topic,delay, []byte(message)); err != nil { fmt.println(err) return err } return nil } func initproducer(addr string) (p *nsq.producer,err error){ var ( config *nsq.config ) config = nsq.newconfig() if p, err = nsq.newproducer(addr, config); err != nil { return nil, err } return p, nil }
消费消息
package main import ( "encoding/json" "fmt" "github.com/nsqio/go-nsq" ) //nsqio消费测试 type mytesthandler struct { q *nsq.consumer messagereceive int } func (h *mytesthandler) handlemessage(message *nsq.message) error { type data struct { } var ( data *data err error ) data = &data{} if err = json.unmarshal(message.body, data) ;err != nil { fmt.printf("id:%s, msg:%s \n", message.id, string(message.body)) err = nil } message.finish() return nil } func initconsuemr(topic string, channel string) { var ( config *nsq.config h *mytesthandler err error ) h = &mytesthandler{ } config = nsq.newconfig() if h.q, err = nsq.newconsumer(topic, channel, config); err != nil { fmt.println(err) return } h.q.addhandler(h) if err = h.q.connecttonsqd("127.0.0.1:4150"); err != nil { fmt.println(err) } //<-h.q.stopchan fmt.println("stop") return } func main() { initconsuemr("test","test") initconsuemr("test","lc") select{} }
总的来说,nsq的消费是有保障的,能保证消息的可靠性。可用多个 nsqd和nsqlookupd做分布式集群等,使用go的channel能够高并发消费,高吞吐量,而且,部署方面也简单。
不过,给我的感觉还是不如kafka和rocketmq这些专业的消息队列,不过在某些场景下还是够用的。这个就得根椐自已的情况去取舍了,毕竟,没有好的架构,只有合适的架构。
上一篇: 康熙时期有名的大臣,明珠也是个大贪官?
下一篇: defer 链如何被遍历