Analyzing CloudFront with AWS Elasticsearch Service

on
Nov 18, 2018
in
AWS

In my blog AWS Elasticsearch Service with Firehose Delivery Stream and Analyzing API Gateway Access Logs with AWS Elasticsearch Service we saw how easy it is to setup an Elasticsearch cluster, ingesting data data from Firehose and creating dashboards in Kibana. This time we will ingest data from AWS CloudFront CDN. CloudFront stores its data in a S3 bucket. We will simulate CloudFront by uploading a CloudFront log file to the bucket. By means of S3 Bucket Notifications the log file will be analyzed and put on a Firehose Delivery Stream to be ingested by AWS Elasticsearch Service. Lets get going!

CloudFront Log Format

CloudFront writes log lines to S3. The log starts with two lines starting with a hash and define the schema and version of the log file. An example is shown below.

CloudFront log:

#Version: 1.0
#Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields
2018-11-10  00:16:26    DEN50-C1    887 65.154.226.109  GET d313bttlbsfoy.cloudfront.net    /   403 -   Mozilla/4.0%2520(compatible;%2520MSIE%25208.0;%2520Windows%2520NT%25206.1;%2520WOW64;%2520Trident/4.0;%2520SLCC2;%2520.NET%2520CLR%25202.0.50727;%2520.NET%2520CLR%25203.5.30729;%2520.NET%2520CLR%25203.0.30729;%2520Media%2520Center%2520PC%25206.0;%2520.NET4.0C;%2520.NET4.0E;%2520InfoPath.2)  -   -   Error   OuhTpC1iLajIPb2jeR9_rUEFEyboP9LI2YECBzkUrbefHicnvFIM5A==    www.example.com http    479 0.001   -   -   -   Error   HTTP/1.1    -   -

The log lines are written in tab-separated format. The NotificationLambda parses the log line, creates a JSON message and publishes log lines in batches of 500 lines to Firehose. Firehose will deliver the log lines to Elasticsearch Service to be indexed.

NotificationLambda:

import boto3
import json
import os

s3 = boto3.resource('s3')
firehose = boto3.client('firehose')

delivery_stream_name = os.environ['DELIVERY_STREAM_NAME']

def process_line(line: str):
    record = line.strip('\n').split('\t')
    return {
        'date': record[0],
        'time': record[1],
        'x-edge-location': record[2],
        'sc-bytes': record[3],
        'c-ip': record[4],
        'cs-method': record[5],
        'cs-host': record[6],
        'cs-uri-stem': record[7],
        'sc-status': record[8],
        'cs-referer': record[9],
        'cs-user-agent': record[10],
        'cs-uri-query': record[11],
        'cs-cookie': record[12],
        'x-edge-result-type': record[13],
        'x-edge-request-id': record[14],
        'x-host-header': record[15],
        'cs-protocol': record[16],
        'cs-bytes': record[17],
        'time-taken': record[18],
        'x-forwarded-for': record[19],
        'ssl-protocol': record[20],
        'ssl-cipher': record[21],
        'x-edge-response-result-type': record[22],
        'cs-protocol-version': record[23],
        'fle-status': record[24],
        'fle-encrypted-fields': record[25]
    }

def filter_lines(lines: [str]) -> [str]:
    return list(filter(lambda line: not line.startswith('#'), lines))

def publish(batch: [dict]) -> None:
    assert len(batch) <= 500
    data = []
    for log in batch:
        encoded = bytes(json.dumps(log), 'utf-8')
        data.append({'Data': encoded})

    print(f'Publishing to {delivery_stream_name} data: {data}')
    firehose.put_record_batch(DeliveryStreamName=delivery_stream_name, Records=data)

def handler(event, ctx):
    counter = 0
    batch: [dict] = []
    bucket_event = event['Records'][0]
    if bucket_event['eventName'] == 'ObjectCreated:Put':
        bucket = bucket_event['s3']['bucket']['name']
        key = bucket_event['s3']['object']['key']
        obj = s3.Object(bucket, key)
        for line in obj.get()['Body'].read().splitlines():
            text: str = line.decode('utf-8')
            if not text.startswith('#'):
                if counter < 500:
                    batch.append(process_line(text))
                    counter += 1
                else:
                    publish(batch)
                    batch = []
                    counter = 0

        if counter is not 0:
            publish(batch)

Example

The example project shows how to configure a project to create an elasticsearch cluster and to ingest API CloudFront logs. The example can be deployed with make merge-lambda && make deploy and removed with make delete.

Upload CloudFront log.

To upload an example CloudFront log to s3, you need the name of the log bucket. Type make info and find the field LogBucketName and note the bucket name. To copy an example CloudFront log type:

 aws s3 cp logs/cf-logs.txt s3://<name of bucket>

Kibana

Log into the ‘AWS Console’, then the ‘Elasticsearch service dashboard’, and click on the Kibana URL. Once logged in, click on ‘discover’ and create a new index pattern with the name example-*. Click on ‘discover’ another time and you should see data. If not, upload the example CloudFront log to s3.
Try to create a visualization and dashboards with the access logs you have available. If you have read my previous blog post about AWS Elasticsearch Service with Firehose Delivery Stream it shouldn’t be difficult

Conclusion

In this example we have deployed a Elasticsearch service, ingested CloudFront access logs, and created a dashboard. Elasticsearch is perfect for analyzing access logs to get real time information about how an API is performing. Dashboards provide aggregated data and provide insights that can be used to make changes to the platform.

Share this article: Tweet this post / Post on LinkedIn