Blog

The Data Segment – Composing your data lake

06 Dec, 2018
Xebia Background Header Wave

When designing your data lake, there a lot of aspects that guide the design. In this blog we’ll look at how we can compose a data lake from a very small component called a ‘data segment’. Lets take a look!

The Data Segment

The data segment is a small component that is a building block for data lakes.
The data segment provides streaming ingestion, streaming transformation, streaming aggregation and data persistence capabilities. It also provides buffering, data encryption, access control, backup & recovery and message replay capabilities.
The most important aspect of the data segment is that it is highly composable. Lets meet the data segment.
data segment
A Data Segment consists of the following three elements:

  • A pub-sub channel,
  • A wire-tap channel,
  • Data persistence

Data Layer

Data segments are grouped into a Data Layer. Each data segment is responsible for a data domain.
Data Layer
Data Lake


A Data Lake is a grouping of data layers and can get arbitrary wide or high. Each Data Layer is fully isolated from other Data Layers.
Data Lake
Connecting Data Segments


The Data Lake acts as a platform for Data Segments, and provides infrastructure, message routing, discovery, and connectivity capabilities to Data Segments. Data segments are wired together depending on the message routing requirement.

Component Integration

Components like publishers and subscribers integrate with the Data Lake. The Data Lake is responsible for routing the message to the appropriate Data Segment.
Data Lake Integration
Cloud Service Provider


Cloud service providers provide services that are a match for implementing Data Lakes. For example, AWS provides messaging and persistence services that can be used to implement Data Segments and Data Lakes.

AWS Technical Mapping

The Data Lake can be implemented in AWS with the following technical mapping:
| Data Segment Component | Amazon Service (preferred) | Alternative |
| — | — | — |
| pub-sub channel | Amazon Kinesis Stream | Amazon SNS, Amazon SQS |
| write-tap channel | Amazon Kinesis Data Firehose | Amazon Lambda, EMR, Amazon Fargate |
| data persistence | Amazon S3 | Amazon EFS |

CloudFormation Implementation

An implementation of the Data Segment is a CloudFormation desired state configuration shown below. The data segment is consists of Amazon Kinesis Stream for the pub-sub channel, Amazon Kinesis Data Firehose for the wire-tap channel and Amazon S3 for data persistence.

AWSTemplateFormatVersion: '2010-09-09'
content: blog-datasegment

Parameters:
  DataSegmentName:
    Type: String
    Default: example-datasegment

Resources:
  DataSegmentRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
            Condition: {}
      Path: /
      Policies:
        - PolicyName: Allow
          PolicyDocument:
            Statement:
              - Effect: Allow
                Action:
                  - s3:*
                  - kms:*
                  - kinesis:*
                  - logs:*
                  - lambda:*
                Resource:
                  - '*'

  DataSegmentBucket:
    Type: AWS::S3::Bucket

  Deliverystream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Sub ${DataSegmentName}-firehose
      DeliveryStreamType: KinesisStreamAsSource
      KinesisStreamSourceConfiguration:
        KinesisStreamARN: !GetAtt KinesisStream.Arn
        RoleARN: !GetAtt DataSegmentRole.Arn
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt DataSegmentBucket.Arn
        RoleARN: !GetAtt DataSegmentRole.Arn
        Prefix: ''
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 1
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Sub ${DataSegmentName}-s3-firehose
          LogStreamName: !Sub ${DataSegmentName}-s3-firehose
        CompressionFormat: UNCOMPRESSED

  KinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: !Sub ${DataSegmentName}-stream
      ShardCount: 1

  CloudWatchLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub ${DataSegmentName}-s3-firehose
      RetentionInDays: 30
  CloudWatchLogStream:
    Type: AWS::Logs::LogStream
    DependsOn:
      - CloudWatchLogGroup
    Properties:
      LogGroupName: !Sub ${DataSegmentName}-s3-firehose
      LogStreamName: !Sub ${DataSegmentName}-s3-firehose

Outputs:
  KinesisStreamName:
    content: The name of the KinesisStream
    Value: !Ref KinesisStream
  KinesisStreamArn:
    content: The ARN of the KinesisStream
    Value: !GetAtt KinesisStream.Arn
  DeliveryStreamName:
    content: The name of the Deliverystream
    Value: !Ref Deliverystream
  DeliveryStreamArn:
    content: The arn of the Deliverystream
    Value: !GetAtt Deliverystream.Arn
  BucketName:
    content: THe name of the DataSegmentBucket
    Value: !Ref DataSegmentBucket

A Publisher

The publisher is easy to implement with Go:

package main

import (
    "github.com/binxio/datasegment/common"
    "os"
)

func main() {
    streamName := os.Getenv("KINESIS_STREAM_NAME")
    if streamName == "" {
        panic("KINESIS_STREAM_NAME not set")
    }
    partitionKey := "1"
    sess := common.GetSession()
    common.PutRecords(streamName, partitionKey, common.GetKinesis(sess))
}

A Subscriber

A subscriber is easy to implement with Go:

package main

import (
    "github.com/binxio/datasegment/common"
    "os"
)

func main() {
    streamName := os.Getenv("KINESIS_STREAM_NAME")
    if streamName == "" {
        panic("KINESIS_STREAM_NAME not set")
    }
    shardId := "0" // shard ids start from 0
   sess := common.GetSession()
    common.GetRecords(streamName, shardId, common.GetKinesis(sess))
}

Common

The publisher and subscriber needs some common logic.

package common

import (
    "encoding/hex"
    "encoding/json"
    "fmt"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/kinesis"
    "log"
)

type Person struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}

func GetSession() *session.Session {
    sess, err := session.NewSession()
    if err != nil {
        panic(err)
    }
    return sess
}

func GetKinesis(sess *session.Session) *kinesis.Kinesis {
    return kinesis.New(sess, aws.NewConfig().WithRegion("eu-west-1"))
}

func CreatePerson(name string, age int) Person {
    return Person{Name: name, Age: age}
}

func SerializePerson(person Person) []byte {
    data, err := json.Marshal(person)
    if err != nil {
        panic(err)
    }
    return []byte(string(data) + "\n")
}

func CreateRecord(streamName string, partitionKey string, data []byte) *kinesis.PutRecordInput {
    return &kinesis.PutRecordInput{
        PartitionKey: aws.String(partitionKey),
        StreamName:   aws.String(streamName),
        Data:         data,
    }
}

func PutRecords(streamName string, partitionKey string, svc *kinesis.Kinesis) {
    for i := 0; i < 100000; i++ {
        person := CreatePerson(fmt.Sprintf("dennis %d", i), i)
        data := SerializePerson(person)
        record := CreateRecord(streamName, partitionKey, data)
        res, err := svc.PutRecord(record)
        if err != nil {
            panic(err)
        }
        log.Println("Publishing:", string(data), hex.EncodeToString(data), res)
    }
}

func ProcessRecords(records *[]*kinesis.Record) {
    for _, record := range *records {
        log.Println(string(record.Data))
    }
}

func GetNextRecords(it *string, svc *kinesis.Kinesis) {
    records, err := svc.GetRecords(&kinesis.GetRecordsInput{
        ShardIterator: it,
    })
    if err != nil {
        panic(err)
    }
    ProcessRecords(&records.Records)
    GetNextRecords(records.NextShardIterator, svc)
}


func GetRecords(streamName string, shardId string, svc *kinesis.Kinesis) {
    it, err := svc.GetShardIterator(&kinesis.GetShardIteratorInput{
        StreamName:        aws.String(streamName),
        ShardId:           aws.String(shardId),
        ShardIteratorType: aws.String("TRIM_HORIZON"),
    })
    if err != nil {
        panic(err)
    }
    GetNextRecords(it.ShardIterator, svc)
}

Example

The example project shows an implementation of a Data Segment in AWS. The example can be created with ‘make create’ and deleted with ‘make delete’. To publish messages type ‘make publish’ and to launch one or more subscribers type: ‘make subscribe’.

Conclusion

The Data Segment is a primitive for creating Data Lakes. The abstraction helps in many ways, for determining the boundaries for data domains, for grouping data domains in layers and for isolating layers. Data segments provide real-time capabilities to the data lake by means of pub-sub messaging channels.
Data Segments make real-time stream processing possible by means of a back plane that the Data Lake provides. Data Segments plug into the back plane and receive data from and provide data to the Data Lake.
Data Segments can be added and removed arbitrarily which allows for a dynamically sized Data Lake. The real-time aspect by means of pub-sub makes the architecture ideal for designing real-time stateful applications like data platforms.

Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts