欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

GCP: Pub/Sub的使用

程序员文章站 2022-07-01 15:02:50
...

1、简介

Pub/Sub 是一种全托管式实时消息传递服务,可让您在独立的应用之间发送和接收消息,它是一个PAAS服务。

2、概览

  • 主题(Topic):相当于一个消息的中转站,发布者发布消息后,消息存储在主题中。
  • 发布者(Publisher):发布消息的应用
  • 订阅者(Subscriber):接收消息者

如下图,首先对主题创建了两个订阅者(subscriber1, subscriber2),发布者(publisher)向主题中发布一条消息(Hello,World!), 接着,这两个订阅者都收到了此消息(Hello,World!)。

GCP: Pub/Sub的使用

3、订阅消息

(1)订阅是如何工作的

  • 多个订阅者订阅一个主题的情况,如果在一个主题中发布一条消息,那所有的订阅者都会收到此消息。如果想要多个订阅者分工处理不同的内容,可以在消息中加自定义特性 (Attribute),在订阅者逻辑中可以根据此特性只处理当前订阅者感兴趣的消息。
  • 只有经过确认过的消息`message.ack()`,才不会再被传送,如果您在确认时限之前未确认消息,Pub/Sub 会重新发送该消息。因此,Pub/Sub 可能会发送重复的消息,如果订阅者处理消息发生异常,且消息未被确认,那Pub/Sub 会重新发送该消息。
  • 确认过的消息,会在Pub/Sub被删除。
  • 在给定订阅者创建之前发布的消息通常不会针对此订阅进行传送。因此,如果某主题没有订阅,则发布到该主题的消息将不会传送给任何订阅者。

(2)至少传送一次

通常,Pub/Sub 会按照消息发布的顺序将每条消息传送一次,但有时可能并不按顺序传送消息,或者会将消息传送多次。 一般来说,如果要实施多次传送,订阅者需要在处理消息时遵循幂等原则。您可以使用 Cloud Dataflow PubsubIO 将 Pub/Sub 消息流只处理一次。PubsubIO 会根据自定义消息标识符或由 Pub/Sub 分配的消息标识符来删除重复的消息。所以,处理消息的逻辑必须是幂等的(所谓幂等,通俗点说,就是函数执行一次,和执行数次,产生的结/效果是一样的)。

(3)对消息排序

通常情况下,pub/sub不完全像队列一样严格地保证消息先进先出,因为保证消息顺序会对吞吐量产生严重限制,pub/sub仅保证第一次传送消息时是按顺序进行的,后序的消息不一定是按顺序排列的,所有消息都允许随时尝试重新传送,这样允许一次向订阅者发送多条消息。

如果想使消息有顺序的话,可以在自定义特性 (Attribute)中加时间戳或***。如果主题有10条这样的消息0,1,2,3,4,5 那收到消息顺序,下面几种情况都可能会发生。

  • 0,1,5,4,2,3  # 第一条消息始终是0,顺序不能保证
  • 0,1,2,4,2,3,0  # 0 也可能被发送了2次
  • 0,1,1,2,4,3,5 ,6  # 后序的消息可能被发送多次

4、订阅者接收消息的两种方式

订阅者可以有两种方式拿到消息,一是设置public endpoint,让pub/sub被动地推送消息给你,二是主要向pub/sub发送拉取(pull)请求。

4.1 推送传送

(1)关于推送

Pub/Sub 将根据收到成功响应的速率来动态调整推送请求的速率, 推送订阅受一组配额和资源限制的约束,系统会自动调整推送传送的速率,以最大限度地提高传送速率,同时不会使推送端点过载,它是通过一套算法来控制的。

(2)Cloud Function和App Engine通常都是使用推送传送的,也是遵循至少传送一次的原则。

(3)对于推送订阅,Pub/Sub 不会发送否定确认(有时称为 NACK)。如果 Webhook 未返回成功代码,则 Pub/Sub 会重试传送,直到消息在订阅的消息保留期限过后失效为止。

(4)推送不能一次处理多条消息,没法使用批处理,但拉取和发布消息可以。

4.2 拉取传送

(1)异步拉取(用得最多,实时处理消息那种)

可以在应用中使用长时间运行的消息侦听器接收消息,并且一次确认一条消息,不建议使用cron job,效率不高。

使用异步拉取不需要应用阻止新消息,从而在应用中实现更高的吞吐量。

如果订阅者客户端处理和确认消息的速度可能比 Pub/Sub 将消息发送到客户端的速度要慢,可考虑使用订阅者的流控制功能来控制订阅者接收消息的速率。https://cloud.google.com/pubsub/docs/pull

(2)同步拉取

在某些情况下,异步拉取并不非常适合您的应用。例如,应用逻辑可能依赖轮询模式来检索消息,或者需要对客户端在任何给定时间检索的消息数量进行精确限制。为了支持此类应用,该服务支持同步拉取方法,用于拉取和确认固定数量的消息,但这会带来一些消息传送的延迟。

如下是例子的代码

from google.cloud import pubsub_v1


def sub_data():
    project_id = 'cong-proj'
    # topic_name = 'hello_topic'
    subscription_name = 'sub_one'
    subscription1 = pubsub_v1.SubscriberClient()
    # topic_path = subscription1.topic_path(project_id, topic_name)
    subscription_path = subscription1.subscription_path(project_id, subscription_name)

    def callback(message):
        print("Received message: {}".format(message))
        message.ack()

    streaming_pull_future = subscription1.subscribe(subscription_path, callback=callback)
    print("Listening for messages on {}..\n".format(subscription_path))
    try:
        streaming_pull_future.result()
    except:  # noqa
        streaming_pull_future.cancel()

5、发布消息

5.1 批处理以平衡延迟和吞吐量

消息可以根据请求大小(以字节为单位)、消息数量和时间分批。

batch_settings = pubsub_v1.types.BatchSettings(

    max_bytes=1024, max_latency=1  # One kilobyte  # One second

)

publisher = pubsub_v1.PublisherClient(batch_settings)

5.2 重试请求

如果发布失败,系统会自动重试,但无法保证能够重试的错误除外。

publisher = pubsub_v1.PublisherClient(client_config=retry_settings)

如下是例子的代码

def publish_data():
    project_id = 'cong-proj'
    topic_name = 'hello_topic'

    # batch_settings = pubsub_v1.types.BatchSettings(max_messages=10, max_latency=30)

    publisher = pubsub_v1.PublisherClient() # batch_settings 
    # publisher.from_service_account_file("""C:\CongStudy\pubsub-demo\cong-pubsub.json""")
    topic_path = publisher.topic_path(project_id, topic_name)

    data = "Message number 1".encode('utf-8')
    future = publisher.publish(topic_path, data)
    print(future.result())



 def get_message(message):
    # 消息实际上是以Base64的字符串的形式存于主题中,如要在订阅器中使用消息,可以使用如下代码
    pubsub_message = base64.b64decode(message).decode('utf-8')


    # hell,world base64加密后为 aGVsbCx3b3JsZA==, 它就会存于主题中
    # print(base64.b64encode('hell,world'.encode('utf-8')).decode('utf-8')) # aGVsbCx3b3JsZA==

    # aGVsbCx3b3JsZA == 解密后为 hell,world
    # print(base64.b64decode('aGVsbCx3b3JsZA==').decode('utf-8')) # hell,world
    


 

6、重放和完全清除消息

  • 还原至某一时间戳
  • 还原至快照

7、参考链接

 
 
 
 
相关标签: GCP