欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3

程序员文章站 2022-03-30 19:22:52
...
AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3

Some final references to create the project
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Firehose.html
https://github.com/danielsan/firehose-nodejs-example
https://forum.serverless.com/t/creating-a-kinesis-firehose-stream-in-serverless-yaml-with-iamrolelambdaexecution-role/2366
https://github.com/otofu-square/serverless-kinesis-firehose/blob/master/serverless.yml
https://github.com/mikepatrick/kinesis-log-aggregator-demo
https://github.com/phodal/serverless/blob/02a067088cb37d294074b334777a4ca4175f737c/firehose/handler.js

Examples
http://serverless.ink/#serverless-%E6%95%B0%E6%8D%AE%E5%88%86%E6%9E%90kinesis-firehose-%E6%8C%81%E4%B9%85%E5%8C%96%E6%95%B0%E6%8D%AE%E5%88%B0-s3
http://serverless.ink/#serverless-kinesis-firehose-%E4%BB%A3%E7%A0%81
http://serverless.ink/#%E5%AE%89%E8%A3%85%E5%8F%8A%E6%B5%8B%E8%AF%95

Give region in the new construction
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property

More example here
https://github.com/phodal/serverless

Nothing special in the package.json for dependency
  "devDependencies": {
    "@types/aws-lambda": "^8.10.31",
    "@types/node": "^8.0.57",
    "@typescript-eslint/eslint-plugin": "^2.0.0",
    "@typescript-eslint/parser": "^2.0.0",
    "aws-sdk": "^2.518.0",
    "eslint": "^6.2.2",
    "eslint-config-prettier": "^6.1.0",
    "eslint-plugin-prettier": "^3.1.0",
    "prettier": "^1.18.2",
    "serverless": "^1.50.1",
    "serverless-webpack": "^4.4.0",
    "ts-loader": "^2.3.7",
    "tslint": "^5.19.0",
    "typescript": "^3.5.3",
    "webpack": "^3.12.0"
  }

Use the serverless.yml to create the resource
plugins:
  - serverless-webpack
custom:
  stage: ${opt:stage, 'stage'}
  regionByStage:
    int: us-west-1
    stage: us-west-1
    prod: us-west-2
  resource_region: ${self:custom.regionByStage.${self:custom.stage}}
  deploy_region: ${opt:region, self:custom.regionByStage.${self:custom.stage}}
  eventBusArn: ${cf:eventBus-${opt:stage, self:provider.stage}.SNSTopic}
  datawarehouses3: 'datawarehouse-${self:custom.stage}-events'
  persists3bucketname: 'datawarehouse-${self:custom.stage}-events'

functions:
  postEventHandler:
    handler: src/eventHandler.postEvents
   
resources:
  Resources:
    FirehoseToS3Role:
      Type: AWS::IAM::Role
      Properties:
        RoleName: FirehoseToS3Role
        AssumeRolePolicyDocument:
          Statement:
          - Effect: Allow
            Principal:
              Service:
              - firehose.amazonaws.com
            Action:
            - sts:AssumeRole
        Policies:
        - PolicyName: FirehoseToS3Policy
          PolicyDocument:
            Statement:
              - Effect: Allow
                Action:
                - s3:AbortMultipartUpload
                - s3:GetBucketLocation
                - s3:GetObject
                - s3:ListBucket
                - s3:ListBucketMultipartUploads
                - s3:PutObject
                Resource: '*'
    ServerlessKinesisFirehoseBucket:
      Type: AWS::S3::Bucket
      DeletionPolicy: Retain
      Properties:
        BucketName: ${self:provider.environment.PERSIST_S3_BUCKET_NAME}
    ServerlessKinesisFirehose:
      Type: AWS::KinesisFirehose::DeliveryStream
      Properties:
        DeliveryStreamName: ${self:provider.environment.DELIVERY_STREAM_NAME}
        S3DestinationConfiguration:
          BucketARN:
            Fn::Join:
            - ''
            - - 'arn:aws:s3:::'
              - Ref: ServerlessKinesisFirehoseBucket
          BufferingHints:
            IntervalInSeconds: "60"
            SizeInMBs: "1"
          CompressionFormat: "UNCOMPRESSED"
          Prefix: "raw/"
          RoleARN: { Fn::GetAtt: [ FirehoseToS3Role, Arn ] }

The util.ts to send out the string to firehose is as easy as follow:
import * as AWS from 'aws-sdk';

const region = process.env.REGION;
const deliveryStreamName = process.env.DELIVERY_STREAM_NAME;

const firehose = new AWS.Firehose( { region: region } );

export const sendFirehose = async (bodyMsg: string) => {
    const params = {
        DeliveryStreamName: deliveryStreamName,
        Record: {
          Data: new Buffer(bodyMsg)
        }
    };
    return firehose.putRecord(params).promise()
};

Handler to receive the POST events
import { SNSEvent, Handler, APIGatewayEvent, Context, Callback } from 'aws-lambda';
import { get } from 'lodash';
import { EventAction } from ‘@sillycat/eventbus';
import { sendFirehose } from './util';

export const postEvents: Handler = async (event: APIGatewayEvent, context: Context, callBack: Callback) => {
  console.log("event received:" + JSON.stringify(event));
  try {
      await sendFirehose(JSON.stringify(event.body));
      callBack(null, { 'statusCode': 200, 'body': 'Successful POST' });
  } catch (err) {
      callBack(err);
  }
}


References:
https://*.com/questions/55714834/push-from-lambda-to-s3-or-push-from-lambda-to-kinesis-firehose-to-s3
https://fivetran.com/docs/files/aws-kinesis
https://github.com/SumoLogic/sumologic-aws-lambda/tree/master/kinesisfirehose-processor
https://towardsdatascience.com/delivering-real-time-streaming-data-to-amazon-s3-using-amazon-kinesis-data-firehose-2cda5c4d1efe
https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/
https://aws.amazon.com/kinesis/data-firehose/faqs/
https://github.com/alexcasalboni/serverless-data-pipeline-sam

NodeJS Document
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Firehose.html
Old Example
https://github.com/otofu-square/serverless-kinesis-firehose