AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3
程序员文章站
2022-03-30 19:22:52
...
AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3
Some final references to create the project
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Firehose.html
https://github.com/danielsan/firehose-nodejs-example
https://forum.serverless.com/t/creating-a-kinesis-firehose-stream-in-serverless-yaml-with-iamrolelambdaexecution-role/2366
https://github.com/otofu-square/serverless-kinesis-firehose/blob/master/serverless.yml
https://github.com/mikepatrick/kinesis-log-aggregator-demo
https://github.com/phodal/serverless/blob/02a067088cb37d294074b334777a4ca4175f737c/firehose/handler.js
Examples
http://serverless.ink/#serverless-%E6%95%B0%E6%8D%AE%E5%88%86%E6%9E%90kinesis-firehose-%E6%8C%81%E4%B9%85%E5%8C%96%E6%95%B0%E6%8D%AE%E5%88%B0-s3
http://serverless.ink/#serverless-kinesis-firehose-%E4%BB%A3%E7%A0%81
http://serverless.ink/#%E5%AE%89%E8%A3%85%E5%8F%8A%E6%B5%8B%E8%AF%95
Give region in the new construction
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property
More example here
https://github.com/phodal/serverless
Nothing special in the package.json for dependency
"devDependencies": {
"@types/aws-lambda": "^8.10.31",
"@types/node": "^8.0.57",
"@typescript-eslint/eslint-plugin": "^2.0.0",
"@typescript-eslint/parser": "^2.0.0",
"aws-sdk": "^2.518.0",
"eslint": "^6.2.2",
"eslint-config-prettier": "^6.1.0",
"eslint-plugin-prettier": "^3.1.0",
"prettier": "^1.18.2",
"serverless": "^1.50.1",
"serverless-webpack": "^4.4.0",
"ts-loader": "^2.3.7",
"tslint": "^5.19.0",
"typescript": "^3.5.3",
"webpack": "^3.12.0"
}
Use the serverless.yml to create the resource
plugins:
- serverless-webpack
custom:
stage: ${opt:stage, 'stage'}
regionByStage:
int: us-west-1
stage: us-west-1
prod: us-west-2
resource_region: ${self:custom.regionByStage.${self:custom.stage}}
deploy_region: ${opt:region, self:custom.regionByStage.${self:custom.stage}}
eventBusArn: ${cf:eventBus-${opt:stage, self:provider.stage}.SNSTopic}
datawarehouses3: 'datawarehouse-${self:custom.stage}-events'
persists3bucketname: 'datawarehouse-${self:custom.stage}-events'
functions:
postEventHandler:
handler: src/eventHandler.postEvents
resources:
Resources:
FirehoseToS3Role:
Type: AWS::IAM::Role
Properties:
RoleName: FirehoseToS3Role
AssumeRolePolicyDocument:
Statement:
- Effect: Allow
Principal:
Service:
- firehose.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: FirehoseToS3Policy
PolicyDocument:
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource: '*'
ServerlessKinesisFirehoseBucket:
Type: AWS::S3::Bucket
DeletionPolicy: Retain
Properties:
BucketName: ${self:provider.environment.PERSIST_S3_BUCKET_NAME}
ServerlessKinesisFirehose:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: ${self:provider.environment.DELIVERY_STREAM_NAME}
S3DestinationConfiguration:
BucketARN:
Fn::Join:
- ''
- - 'arn:aws:s3:::'
- Ref: ServerlessKinesisFirehoseBucket
BufferingHints:
IntervalInSeconds: "60"
SizeInMBs: "1"
CompressionFormat: "UNCOMPRESSED"
Prefix: "raw/"
RoleARN: { Fn::GetAtt: [ FirehoseToS3Role, Arn ] }
The util.ts to send out the string to firehose is as easy as follow:
import * as AWS from 'aws-sdk';
const region = process.env.REGION;
const deliveryStreamName = process.env.DELIVERY_STREAM_NAME;
const firehose = new AWS.Firehose( { region: region } );
export const sendFirehose = async (bodyMsg: string) => {
const params = {
DeliveryStreamName: deliveryStreamName,
Record: {
Data: new Buffer(bodyMsg)
}
};
return firehose.putRecord(params).promise()
};
Handler to receive the POST events
import { SNSEvent, Handler, APIGatewayEvent, Context, Callback } from 'aws-lambda';
import { get } from 'lodash';
import { EventAction } from ‘@sillycat/eventbus';
import { sendFirehose } from './util';
export const postEvents: Handler = async (event: APIGatewayEvent, context: Context, callBack: Callback) => {
console.log("event received:" + JSON.stringify(event));
try {
await sendFirehose(JSON.stringify(event.body));
callBack(null, { 'statusCode': 200, 'body': 'Successful POST' });
} catch (err) {
callBack(err);
}
}
References:
https://*.com/questions/55714834/push-from-lambda-to-s3-or-push-from-lambda-to-kinesis-firehose-to-s3
https://fivetran.com/docs/files/aws-kinesis
https://github.com/SumoLogic/sumologic-aws-lambda/tree/master/kinesisfirehose-processor
https://towardsdatascience.com/delivering-real-time-streaming-data-to-amazon-s3-using-amazon-kinesis-data-firehose-2cda5c4d1efe
https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/
https://aws.amazon.com/kinesis/data-firehose/faqs/
https://github.com/alexcasalboni/serverless-data-pipeline-sam
NodeJS Document
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Firehose.html
Old Example
https://github.com/otofu-square/serverless-kinesis-firehose
Some final references to create the project
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Firehose.html
https://github.com/danielsan/firehose-nodejs-example
https://forum.serverless.com/t/creating-a-kinesis-firehose-stream-in-serverless-yaml-with-iamrolelambdaexecution-role/2366
https://github.com/otofu-square/serverless-kinesis-firehose/blob/master/serverless.yml
https://github.com/mikepatrick/kinesis-log-aggregator-demo
https://github.com/phodal/serverless/blob/02a067088cb37d294074b334777a4ca4175f737c/firehose/handler.js
Examples
http://serverless.ink/#serverless-%E6%95%B0%E6%8D%AE%E5%88%86%E6%9E%90kinesis-firehose-%E6%8C%81%E4%B9%85%E5%8C%96%E6%95%B0%E6%8D%AE%E5%88%B0-s3
http://serverless.ink/#serverless-kinesis-firehose-%E4%BB%A3%E7%A0%81
http://serverless.ink/#%E5%AE%89%E8%A3%85%E5%8F%8A%E6%B5%8B%E8%AF%95
Give region in the new construction
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property
More example here
https://github.com/phodal/serverless
Nothing special in the package.json for dependency
"devDependencies": {
"@types/aws-lambda": "^8.10.31",
"@types/node": "^8.0.57",
"@typescript-eslint/eslint-plugin": "^2.0.0",
"@typescript-eslint/parser": "^2.0.0",
"aws-sdk": "^2.518.0",
"eslint": "^6.2.2",
"eslint-config-prettier": "^6.1.0",
"eslint-plugin-prettier": "^3.1.0",
"prettier": "^1.18.2",
"serverless": "^1.50.1",
"serverless-webpack": "^4.4.0",
"ts-loader": "^2.3.7",
"tslint": "^5.19.0",
"typescript": "^3.5.3",
"webpack": "^3.12.0"
}
Use the serverless.yml to create the resource
plugins:
- serverless-webpack
custom:
stage: ${opt:stage, 'stage'}
regionByStage:
int: us-west-1
stage: us-west-1
prod: us-west-2
resource_region: ${self:custom.regionByStage.${self:custom.stage}}
deploy_region: ${opt:region, self:custom.regionByStage.${self:custom.stage}}
eventBusArn: ${cf:eventBus-${opt:stage, self:provider.stage}.SNSTopic}
datawarehouses3: 'datawarehouse-${self:custom.stage}-events'
persists3bucketname: 'datawarehouse-${self:custom.stage}-events'
functions:
postEventHandler:
handler: src/eventHandler.postEvents
resources:
Resources:
FirehoseToS3Role:
Type: AWS::IAM::Role
Properties:
RoleName: FirehoseToS3Role
AssumeRolePolicyDocument:
Statement:
- Effect: Allow
Principal:
Service:
- firehose.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: FirehoseToS3Policy
PolicyDocument:
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource: '*'
ServerlessKinesisFirehoseBucket:
Type: AWS::S3::Bucket
DeletionPolicy: Retain
Properties:
BucketName: ${self:provider.environment.PERSIST_S3_BUCKET_NAME}
ServerlessKinesisFirehose:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: ${self:provider.environment.DELIVERY_STREAM_NAME}
S3DestinationConfiguration:
BucketARN:
Fn::Join:
- ''
- - 'arn:aws:s3:::'
- Ref: ServerlessKinesisFirehoseBucket
BufferingHints:
IntervalInSeconds: "60"
SizeInMBs: "1"
CompressionFormat: "UNCOMPRESSED"
Prefix: "raw/"
RoleARN: { Fn::GetAtt: [ FirehoseToS3Role, Arn ] }
The util.ts to send out the string to firehose is as easy as follow:
import * as AWS from 'aws-sdk';
const region = process.env.REGION;
const deliveryStreamName = process.env.DELIVERY_STREAM_NAME;
const firehose = new AWS.Firehose( { region: region } );
export const sendFirehose = async (bodyMsg: string) => {
const params = {
DeliveryStreamName: deliveryStreamName,
Record: {
Data: new Buffer(bodyMsg)
}
};
return firehose.putRecord(params).promise()
};
Handler to receive the POST events
import { SNSEvent, Handler, APIGatewayEvent, Context, Callback } from 'aws-lambda';
import { get } from 'lodash';
import { EventAction } from ‘@sillycat/eventbus';
import { sendFirehose } from './util';
export const postEvents: Handler = async (event: APIGatewayEvent, context: Context, callBack: Callback) => {
console.log("event received:" + JSON.stringify(event));
try {
await sendFirehose(JSON.stringify(event.body));
callBack(null, { 'statusCode': 200, 'body': 'Successful POST' });
} catch (err) {
callBack(err);
}
}
References:
https://*.com/questions/55714834/push-from-lambda-to-s3-or-push-from-lambda-to-kinesis-firehose-to-s3
https://fivetran.com/docs/files/aws-kinesis
https://github.com/SumoLogic/sumologic-aws-lambda/tree/master/kinesisfirehose-processor
https://towardsdatascience.com/delivering-real-time-streaming-data-to-amazon-s3-using-amazon-kinesis-data-firehose-2cda5c4d1efe
https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/
https://aws.amazon.com/kinesis/data-firehose/faqs/
https://github.com/alexcasalboni/serverless-data-pipeline-sam
NodeJS Document
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Firehose.html
Old Example
https://github.com/otofu-square/serverless-kinesis-firehose