欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

05 ,lambda - sqs 结合 : 监控的实现

程序员文章站 2024-01-28 09:39:58
...

1 ,java 代码 :

  1. pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lefrcycle.lambda</groupId>
    <artifactId>lambda01_test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <!-- s3 -->
        <!--<dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-s3</artifactId>
            <version>1.11.636</version>
        </dependency>-->
        <!-- sqs -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-sqs</artifactId>
            <version>1.11.636</version>
        </dependency>
        <!-- lambda -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-lambda-java-core</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-lambda-java-events</artifactId>
            <version>2.2.2</version>
        </dependency>
        <!-- jackson -->
        <!--<dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.7</version>
        </dependency>-->

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. sqsUtil:
package com.lifeCycle.Commons;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.*;
import com.lifeCycle.entity.SqsResult;

import java.io.IOException;
import java.io.InputStream;
import java.util.Date;
import java.util.List;
import java.util.Properties;

public class SqsUtil {

    //  1 ,获得 sqs 对象 :
    public static AmazonSQS getSqs() throws IOException {
        //  1 ,得到配置数据
        Properties properties = new Properties();
        final InputStream is = SqsUtil.class.getClassLoader().getResourceAsStream("s3.properties");
        properties.load(is);
        //  2 ,设置凭证
        AWSCredentials credentials = new BasicAWSCredentials(properties.getProperty("fs.s3a.access.key"), properties.getProperty("fs.s3a.secret.key"));
        //  3 ,创建 SQS : 设置凭证,设置区域
        AmazonSQS sqs = AmazonSQSClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(credentials))
                .withRegion(Regions.CN_NORTHWEST_1)
                .build();
        return sqs;
    }

    //  2 ,发消息 :
    public static String sendMessage(String msg) throws IOException {
        //  0 ,获得 sqs 对象 :
        final AmazonSQS sqs = getSqs();
        //  1 ,消息 :
        final SendMessageRequest sendMessageRequest = new SendMessageRequest("https://sqs.cn-northwest-1.amazonaws.com.cn/281007668287/da.fifo", msg);
        //      设置重复数据删除 ID :
        final long time = new Date().getTime();
        sendMessageRequest.setMessageDeduplicationId(String.valueOf(time));
        //  2 ,组名 :fifo 队列,必须指定组名
        sendMessageRequest.setMessageGroupId("dataAnalysis");
        //  3 ,发消息,获得返回值
        final SendMessageResult res = sqs.sendMessage(sendMessageRequest);
        //  4 ,结果 :
        final String messageId = res.getMessageId();
        return messageId;
    }

    //  3 ,处理消息 :
    public static SqsResult doMsg() throws IOException {
        //  1 ,获得 sqs 对象
        final AmazonSQS sqs = getSqs();
        //  2 ,消息对象
        final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest("https://sqs.cn-northwest-1.amazonaws.com.cn/281007668287/da.fifo");
        //  3 ,获得消息
        final List<Message> list = sqs.receiveMessage(receiveMessageRequest).getMessages();
        final Message msg = list.get(0);
        //  4 ,获取消息内容
        //      消息内容 :
        final String body = msg.getBody();
        //      消息句柄 :
        final String handle = msg.getReceiptHandle();
        //      返回结果 :
        SqsResult res = new SqsResult(body, handle);
        return res;
    }


    //  3 ,删除消息 : 传入 ( 句柄 )
    public static void deleteMsg(String handle) throws IOException {
        //  1 ,获得 sqs 对象
        final AmazonSQS sqs = getSqs();
        //  2 ,消息对象
        sqs.deleteMessage(new DeleteMessageRequest("https://sqs.cn-northwest-1.amazonaws.com.cn/281007668287/da.fifo", handle));
    }

    //  将接收到的消息,转化成我们的对象
    public static SqsResult transMsg(SQSEvent event){
        final List<SQSEvent.SQSMessage> list = event.getRecords();
        //  没有消息,有消息 : 返回不一样的结果
        if(list==null || list.size()==0){
            return null;
        }else{
            final SQSEvent.SQSMessage msg = list.get(0);
            SqsResult res = new SqsResult(msg.getBody(),msg.getReceiptHandle());
            return res;
        }
    }

    //  测试 : ( 如果没有消息,会抛出异常 : IndexOutOfBoundsException )
    public static void main(String[] args) throws IOException {
        final SqsResult res = doMsg();
        System.out.println(res);
        deleteMsg(res.getHandle());
    }
}
  1. lambda :
package com.lifeCycle.service.lambda;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.lifeCycle.entity.SqsResult;
import com.lifeCycle.Commons.SqsUtil;

//  lambda 接收 sqs 发送的消息
public class LambdaSqs implements RequestHandler<SQSEvent, String> {
    public String handleRequest(SQSEvent event, Context context) {
        final SqsResult res = SqsUtil.transMsg(event);
        System.out.println(res);
        return "456";
    }
}

2 ,上传到 lambda :

05 ,lambda - sqs 结合 : 监控的实现

3 ,保存 :

4 ,发消息 :

5 ,看结果 : 看看我们的打印结果

05 ,lambda - sqs 结合 : 监控的实现
05 ,lambda - sqs 结合 : 监控的实现

6 ,正确的标志 : 看到了我们打印的日志

05 ,lambda - sqs 结合 : 监控的实现

7 ,自动删除,默认覆盖 :

  1. 两次上传 jar 包,是否会默认删除前面的 jar ,使用最新的 jar : 是的
  2. 用 sqs 触发 lambda ,消息处理后,是否会由 lambda 自动删除消息,不需要我们手动删除 : 是的
  3. 如果 lambda 处理消息的时候出现异常,消息还在不在 :在的
  4. lambda 出现异常后,消息没有被处理掉,他还在,那么,他是以怎么样的状态存在的 :
相关标签: aws_lambda_sqs_sns