Blog

Lambda CloudWatch Logs Subscription

19 Nov, 2018
Xebia Background Header Wave

AWS CloudWatch Logs (CW) is a service that among others, stores log files from AWS services in a central location. CW supports subscriptions that sends log events from CloudWatch logs and have it delivered to other services such as an Amazon Kinesis stream, Amazon Kinesis Data Firehose stream, or AWS Lambda for custom processing, analysis, or loading to other systems. To begin subscribing to log events, create the receiving source, such as AWS Lambda, where the events will be delivered. A subscription filter defines the filter pattern to use for filtering which log events get delivered to your AWS resource, as well as information about where to send matching log events to. In this blog we will setup a lambda subscription on a CloudWatch log.

Architecture

We will create the following application. A CloudWatch Event Rule will schedule an event to TriggerFunction. TriggerFunction writes a log message to CloudWatch logs. CloudWatchSubscriptionLambda is subscribed to the log of TriggerFunction. When there is a log entry, CloudWatchSubscriptionLambda gets invoked with the log line. CloudWatchSubscriptionLambda writes the received event to its log.
Lambda CloudWatch Logs Subscription
Scheduled Events


To schedule events to a Lambda we need the following configuration:

  LambdaBasicExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Statement:
        - Effect: Allow
          Principal:
            Service: lambda.amazonaws.com
          Action: sts:AssumeRole
          Condition: {}
      Path: /
      ManagedPolicyArns:
      - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

  TriggerFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.handler
      Runtime: python3.6
      Role: !GetAtt 'LambdaBasicExecutionRole.Arn'
      MemorySize: 128
      Timeout: 30
      Code:
        ZipFile: |-
          def handler(event, ctx):
              print(event)

  TriggerFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/lambda/${TriggerFunction}'
      RetentionInDays: 30

  CloudWatchEventsRule:
    Type: AWS::Events::Rule
    Properties:
      ScheduleExpression: rate(1 minute)
      State: ENABLED
      Targets:
      - Arn: !GetAtt TriggerFunction.Arn
        Id: scheduled-event

  InvokeTriggerFunctionPermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt TriggerFunction.Arn
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com
      SourceArn: !GetAtt CloudWatchEventsRule.Arn

CloudWatch Subscription

For the CloudWatch Lambda subscription we need the following configuration:

  CloudWatchSubscriptionLambda:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.handler
      Runtime: python3.6
      Role: !GetAtt 'LambdaBasicExecutionRole.Arn'
      MemorySize: 128
      Timeout: 30
      Code:
        ZipFile: |-
          def handler(event, ctx):
              print(event)

  CloudWatchSubscriptionLambdaLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/lambda/${CloudWatchSubscriptionLambda}'
      RetentionInDays: 30

  CloudWatchLogSubscription:
    Type: AWS::Logs::SubscriptionFilter
    DependsOn: CloudWatchSubscriptionFunctionPermission
    Properties:
      DestinationArn: !GetAtt CloudWatchSubscriptionLambda.Arn
      FilterPattern: ''
      LogGroupName: !Ref TriggerFunctionLogGroup

  CloudWatchSubscriptionFunctionPermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt CloudWatchSubscriptionLambda.Arn
      Action: 'lambda:InvokeFunction'
      Principal: 'logs.eu-west-1.amazonaws.com'
      SourceArn: !GetAtt TriggerFunctionLogGroup.Arn

Raw Log Event

The log event that is received by the CloudWatchSubscriptionLambda has the following format. The log record is base64 encoded and compressed with gzip.

{"awslogs": {"data": "H4sIAAAAAAAAAIVRy27bMBD8FYPoMYSWFJ+6qa1tBIlbwFKbQ2oUlEQpBGRJpaS6RZB/z7oPoLdiL8vhYnZ25pmc/Ty7zpc/J08y8j4v86+HbVHk+y25IeNl8BFhxbgwqeVCaUC4H7t9HNcJfxJ3mZPenavGJRXiFN/U925eQj17F+sn2obon8bZUzfRMoau83G3DvUSxoGyWzgc3vLy0/HD3fE3c7FE785IzYGZhLGE2eTxzX1ebovyJHiTShCqraARjXOWs1ZVDUArGu10ihTzWs11DNOVfxf6xceZZI/k/+IC7S7U/3Dnqfe07se1ubgFR95d24dre4/q/mGnD9zsgB0+3u13lpx+qd9+98NyXfhMQoNHpEIA56A5CCxpjbESpJESLbUcuGFKM6YYgAapU2VNiqU5HrIEzGZBOSRjUnCFCfDUGnbzNzOkL8r8WG6O/tuKo7dNtmGmNRWTmvqaA2XMG1q1WlJoK6vrSijg9eYzeoL6s80fW78M5OX08grvTTW4DAIAAA=="}}

Processing Log Lines

The raw log event should be processed with a lambda:

import gzip
import json
from base64 import b64decode

def decompress(data) -> bytes:
    return gzip.decompress(data)

def decode_record(data: dict) -> dict:
    x = decompress(b64decode(data['data']))
    return json.loads(x.decode('utf8'))

def decode_event(event: dict) -> dict:
    return decode_record(event['awslogs'])

def handler(event, ctx) -> None:
    print(json.dumps(decode_event(event)))

The log line after processing is the following:

{
    "messageType": "DATA_MESSAGE",
    "owner": "612483924670",
    "logGroup": "/aws/lambda/blog-aws-elasticsearch-firehose-ap-TriggerFunction-1I0MMB2TURNKR",
    "logStream": "2018/11/19/[$LATEST]42d35046fb0d4daa921f6bd00f4d7a73",
    "subscriptionFilters": [
        "blog-aws-elasticsearch-firehose-api-gw-example-cloudwatch-CloudWatchLogSubscription-W28F01MOKGF9"
    ],
    "logEvents": [
        {
            "id": "34402224585553617062605662326569493632889831387406598144",
            "timestamp": 1542649103395,
            "message": "{'version': '0', 'id': '8d73f860-d1d1-d867-bb38-0093a2e3790d', 'detail-type': 'Scheduled Event', 'source': 'aws.events', 'account': '612483924670', 'time': '2018-11-19T17:37:38Z', 'region': 'eu-west-1', 'resources': ['arn:aws:events:eu-west-1:612483924670:rule/blog-aws-elasticsearch-fireho-CloudWatchEventsRule-127RVH0C949OJ'], 'detail': {}}\n"
        },
        {
            "id": "34402224585575917807804192949711029351162479748912578561",
            "timestamp": 1542649103396,
            "message": "END RequestId: e9a9b88a-ec21-11e8-bc46-871711881058\n"
        },
        {
            "id": "34402224585575917807804192949711029351162479748912578562",
            "timestamp": 1542649103396,
            "message": "REPORT RequestId: e9a9b88a-ec21-11e8-bc46-871711881058\tDuration: 1.19 ms\tBilled Duration: 100 ms \tMemory Size: 128 MB\tMax Memory Used: 21 MB\t\n"
        }
    ]
}

Example

The example project shows how to setup the project. The example can be deployed with make create and removed with make delete.

Example with Private Network

To try out whether CloudWatch has access to the Lambdas when they have network connectivity with a private-subnet, I have created the following example project that deploys a VPC and places the Lambdas in the private subnet. I’m pleased to say that CloudWatch can still receive logs from a Lambda. Moreover, CloudWatch can still trigger the lambda that has been subscribed to a log, even if the Lambda has private network connectivity.

Conclusion

AWS CloudWatch Logs supports real time log processing by offering subscriptions. Supported destinations are Kinesis, Firehose, SNS and Lambda. In this blog we setup a lambda as a log subscription that decodes a lambda and writes the output to CloudWatch logs. We also configured a lambda that is triggered based on a schedule using CloudWatch events.

Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts