05 ,lambda - sqs 结合 : 监控的实现
程序员文章站
2024-01-28 09:39:58
...
1 ,java 代码 :
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lefrcycle.lambda</groupId>
<artifactId>lambda01_test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- s3 -->
<!--<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.636</version>
</dependency>-->
<!-- sqs -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>1.11.636</version>
</dependency>
<!-- lambda -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>2.2.2</version>
</dependency>
<!-- jackson -->
<!--<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.7</version>
</dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- sqsUtil:
package com.lifeCycle.Commons;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.*;
import com.lifeCycle.entity.SqsResult;
import java.io.IOException;
import java.io.InputStream;
import java.util.Date;
import java.util.List;
import java.util.Properties;
public class SqsUtil {
// 1 ,获得 sqs 对象 :
public static AmazonSQS getSqs() throws IOException {
// 1 ,得到配置数据
Properties properties = new Properties();
final InputStream is = SqsUtil.class.getClassLoader().getResourceAsStream("s3.properties");
properties.load(is);
// 2 ,设置凭证
AWSCredentials credentials = new BasicAWSCredentials(properties.getProperty("fs.s3a.access.key"), properties.getProperty("fs.s3a.secret.key"));
// 3 ,创建 SQS : 设置凭证,设置区域
AmazonSQS sqs = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.withRegion(Regions.CN_NORTHWEST_1)
.build();
return sqs;
}
// 2 ,发消息 :
public static String sendMessage(String msg) throws IOException {
// 0 ,获得 sqs 对象 :
final AmazonSQS sqs = getSqs();
// 1 ,消息 :
final SendMessageRequest sendMessageRequest = new SendMessageRequest("https://sqs.cn-northwest-1.amazonaws.com.cn/281007668287/da.fifo", msg);
// 设置重复数据删除 ID :
final long time = new Date().getTime();
sendMessageRequest.setMessageDeduplicationId(String.valueOf(time));
// 2 ,组名 :fifo 队列,必须指定组名
sendMessageRequest.setMessageGroupId("dataAnalysis");
// 3 ,发消息,获得返回值
final SendMessageResult res = sqs.sendMessage(sendMessageRequest);
// 4 ,结果 :
final String messageId = res.getMessageId();
return messageId;
}
// 3 ,处理消息 :
public static SqsResult doMsg() throws IOException {
// 1 ,获得 sqs 对象
final AmazonSQS sqs = getSqs();
// 2 ,消息对象
final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest("https://sqs.cn-northwest-1.amazonaws.com.cn/281007668287/da.fifo");
// 3 ,获得消息
final List<Message> list = sqs.receiveMessage(receiveMessageRequest).getMessages();
final Message msg = list.get(0);
// 4 ,获取消息内容
// 消息内容 :
final String body = msg.getBody();
// 消息句柄 :
final String handle = msg.getReceiptHandle();
// 返回结果 :
SqsResult res = new SqsResult(body, handle);
return res;
}
// 3 ,删除消息 : 传入 ( 句柄 )
public static void deleteMsg(String handle) throws IOException {
// 1 ,获得 sqs 对象
final AmazonSQS sqs = getSqs();
// 2 ,消息对象
sqs.deleteMessage(new DeleteMessageRequest("https://sqs.cn-northwest-1.amazonaws.com.cn/281007668287/da.fifo", handle));
}
// 将接收到的消息,转化成我们的对象
public static SqsResult transMsg(SQSEvent event){
final List<SQSEvent.SQSMessage> list = event.getRecords();
// 没有消息,有消息 : 返回不一样的结果
if(list==null || list.size()==0){
return null;
}else{
final SQSEvent.SQSMessage msg = list.get(0);
SqsResult res = new SqsResult(msg.getBody(),msg.getReceiptHandle());
return res;
}
}
// 测试 : ( 如果没有消息,会抛出异常 : IndexOutOfBoundsException )
public static void main(String[] args) throws IOException {
final SqsResult res = doMsg();
System.out.println(res);
deleteMsg(res.getHandle());
}
}
- lambda :
package com.lifeCycle.service.lambda;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.lifeCycle.entity.SqsResult;
import com.lifeCycle.Commons.SqsUtil;
// lambda 接收 sqs 发送的消息
public class LambdaSqs implements RequestHandler<SQSEvent, String> {
public String handleRequest(SQSEvent event, Context context) {
final SqsResult res = SqsUtil.transMsg(event);
System.out.println(res);
return "456";
}
}
2 ,上传到 lambda :
3 ,保存 :
4 ,发消息 :
5 ,看结果 : 看看我们的打印结果
6 ,正确的标志 : 看到了我们打印的日志
7 ,自动删除,默认覆盖 :
- 两次上传 jar 包,是否会默认删除前面的 jar ,使用最新的 jar : 是的
- 用 sqs 触发 lambda ,消息处理后,是否会由 lambda 自动删除消息,不需要我们手动删除 : 是的
- 如果 lambda 处理消息的时候出现异常,消息还在不在 :在的
- lambda 出现异常后,消息没有被处理掉,他还在,那么,他是以怎么样的状态存在的 :
上一篇: 「图像处理」Canny边缘检测原理解释
下一篇: js获取本周,本月