Serverless Logs Analyzer - Overview

This project demonstrates the use of AWS to generate and analyze logs. It was designed to provide automated insights into AWS account activities through a serverless architecture that monitors, processes, and analyzes CloudTrail logs in real-time. The system automatically captures all AWS API calls and management events via CloudTrail, aggregates daily log files for efficient processing, and generates comprehensive statistical reports about resource usage patterns, security events, and operational activities. The solution uses Lambda functions, S3 storage, and EventBridge to create a cost-effective, scalable log analysis system that requires no infrastructure management while providing comprehensive visibility into AWS account activities.

Github repository: https://github.com/samuelincoln1/serverless-logs-analyzer

To see the dashboard created with the processed logs, go to the dashboard page here

Architecture Diagram

The architecture diagram below shows the infrastructure, which includes:

  • AWS Cloudtrail: used to track all actions performed in the AWS account.
  • AWS EventBridge: used to trigger the AWS Lambda function when a new log is created.
  • AWS Lambda: used to analyze the logs and store the results in Amazon S3.
  • Amazon S3: used to store the logs and the results.
Architecture Diagram

Diagram created with Lucidchart: https://www.lucidchart.com/pages/landing

Infrastructure Code

The infrastructure is implemented through a modular architecture using Terraform, which provisions the following AWS components organized into 5 main modules.

Modules

  • S3 Module: This module is responsible for creating the storage infrastructure with two S3 buckets. It configures the input bucket to receive CloudTrail logs and the output bucket for processed insights. Both buckets are secured with versioning enabled, AES256 encryption, and complete public access blocking to ensure data protection and compliance.
  • IAM Module: The Identity and Access Management (IAM) module manages security permissions for the Lambda functions. It creates a dedicated execution role with precise permissions for S3 operations (read, write, delete) on both buckets and CloudWatch logging capabilities, following the principle of least privilege for enhanced security.
  • Lambda Module: This module handles the serverless compute functions that process the logs. It deploys two Python 3.11 Lambda functions - the Analyzer for processing aggregated logs and generating insights, and the Aggregator for consolidating daily log files. The module also configures S3 event triggers and proper permissions for seamless integration.
  • CloudTrail Module: The CloudTrail module sets up comprehensive AWS account auditing and logging. It configures a multi-region trail that captures management events, global service events, and S3 data events, automatically delivering all logs to the input S3 bucket with appropriate bucket policies for secure log storage.
  • EventBridge Module: This module manages the automated scheduling system for log processing. It creates a CloudWatch Event Rule with a cron expression to trigger the Lambda Aggregator function every 12 hours, ensuring regular log consolidation and processing without manual intervention.

Inside each module, there is a main.tf file that defines the resources, a variables.tf file that defines the variables, and an outputs.tf file that defines the outputs, when needed.

S3 Module

This module is designed to create the storage infrastructure with two S3 buckets, one for input logs and one for output insights. The input bucket receives CloudTrail logs automatically, while the output bucket stores the processed JSON and CSV reports. Both buckets are configured with versioning enabled for change tracking, AES256 server-side encryption for data protection, and complete public access blocking to ensure security compliance.

Features

  • S3 Buckets: Creates two S3 buckets - an input bucket for receiving CloudTrail logs and an output bucket for storing processed insights and reports.
  • Versioning: Enables versioning on both buckets to maintain object history and provide rollback capabilities for data protection.
  • Encryption: Configures AES256 encryption by default on all objects stored in both buckets to ensure data security at rest.
  • Public Access Blocking: Implements comprehensive public access restrictions on both buckets, blocking all forms of public access including ACLs and bucket policies to prevent unauthorized access.

Resources

1. Buckets

  • Resource: aws_s3_bucket
    • bucket: The unique name identifier for the S3 bucket within AWS.
    • force_destroy: Whether to allow deletion of the bucket even when it contains objects.
    • tags: Assigns a name tag to the bucket for identification and resource management purposes.

/main.tf

resource "aws_s3_bucket" "serverless_log_analyzer_bucket" {
  bucket        = var.s3_input_bucket_name
  force_destroy = true

  tags = {
    Name = var.s3_input_bucket_tag
  }
}

resource "aws_s3_bucket" "serverless_log_analyzer_output" {
  bucket        = var.s3_output_bucket_name
  force_destroy = true

  tags = {
    Name = var.s3_output_bucket_tag
  }
}

2. Versioning

  • Resource: aws_s3_bucket_versioning
    • bucket: The S3 bucket to enable versioning for.
    • versioning_configuration: Configuration block for versioning settings.

s3/main.tf

resource "aws_s3_bucket_versioning" "serverless_log_analyzer_bucket_versioning" {
  bucket = aws_s3_bucket.serverless_log_analyzer_bucket.id
  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_versioning" "serverless_log_analyzer_output_versioning" {
  bucket = aws_s3_bucket.serverless_log_analyzer_output.id
  versioning_configuration {
    status = "Enabled"
  }
}

3. Encryption

  • Resource: aws_s3_bucket_server_side_encryption_configuration
    • bucket: The S3 bucket to enable encryption for.
    • rule: Configuration block for encryption settings.
    • apply_server_side_encryption_by_default: Configuration block for encryption settings.
    • sse_algorithm: The server-side encryption algorithm to use.

s3/main.tf

resource "aws_s3_bucket_server_side_encryption_configuration" "serverless_log_analyzer_bucket_encryption" {
  bucket = aws_s3_bucket.serverless_log_analyzer_bucket.id
  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "serverless_log_analyzer_output_encryption" {
  bucket = aws_s3_bucket.serverless_log_analyzer_output.id
  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }
  }
}

4. Public Access Blocking

  • Resource: aws_s3_bucket_public_access_block
    • bucket: The S3 bucket to enable public access block for.
    • block_public_acls: Whether to block public ACLs for this bucket.
    • block_public_policy: Whether to block public bucket policies for this bucket.
    • ignore_public_acls: Whether to ignore public ACLs for this bucket.
    • restrict_public_buckets: Whether to restrict public bucket policies for this bucket.

s3/main.tf

resource "aws_s3_bucket_public_access_block" "serverless_log_analyzer_bucket_block" {
  bucket                  = aws_s3_bucket.serverless_log_analyzer_bucket.id
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}


resource "aws_s3_bucket_public_access_block" "serverless_log_analyzer_output_block" {
  bucket                  = aws_s3_bucket.serverless_log_analyzer_output.id
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

IAM Module

This module is designed to create the security and access management infrastructure for the Lambda functions. It establishes a dedicated IAM role with precisely scoped permissions following the principle of least privilege. The role allows Lambda functions to interact with S3 buckets for log processing and CloudWatch for logging, while restricting access to only the necessary resources and actions required for the log analysis workflow.

Features

  • IAM Role: Creates a dedicated execution role specifically for Lambda functions with a trust policy that allows only the Lambda service to assume the role
  • IAM Policy: Attaches a policy to the role that grants the necessary permissions for the Lambda function to access the S3 bucket and CloudWatch logs.

Resources

1. IAM Role

  • Resource: aws_iam_role
    • name: The name of the IAM role.
    • assume_role_policy: The policy that allows the Lambda service to assume the role. It defines which services are allowed to assume the role, in this case, only the Lambda service.

iam/main.tf

resource "aws_iam_role" "lambda_exec_role" {
    name = var.lambda_exec_role_name

    assume_role_policy = jsonencode({
        Version = "2012-10-17"
        Statement = [
            {
                Action = "sts:AssumeRole"
                Effect = "Allow"
                Principal = {
                    Service = "lambda.amazonaws.com"
                }
               
            }
        ]
    })
}

2. IAM Policy

  • Resource: aws_iam_role_policy
    • name: The name of the IAM policy.
    • role: The IAM role to attach the policy to.
    • policy: The policy to attach to the IAM role. It grants the necessary permissions for the Lambda function to access the S3 bucket and CloudWatch logs.

iam/main.tf

resource "aws_iam_role_policy" "lambda_exec_policy" {
    name = var.lambda_exec_policy_name
    role = aws_iam_role.lambda_exec_role.id

    policy = jsonencode({
        Version = "2012-10-17"
        Statement = [
            { 
                Action = [
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:PutObject",
                    "s3:PutObjectAcl",
                    "s3:DeleteObject"
                ]
                Effect = "Allow"
                Resource = [
                    var.s3_input_bucket_arn,
                    var.s3_output_bucket_arn,
                    "${var.s3_input_bucket_arn}/*",
                    "${var.s3_output_bucket_arn}/*"
                ]
                
            }, 
            {
                Action = [
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents"
                ]
                Effect = "Allow"
                Resource = "*"
            }
        ]
    })
}

Lambda Module

This module is designed to create the serverless compute infrastructure for log processing and analysis. It establishes two specialized Lambda functions that handle different aspects of the log analysis workflow - one for aggregating daily logs and another for processing aggregated logs to generate insights. The module also configures the necessary triggers, permissions, and integrations to enable automatic execution based on S3 events and scheduled intervals.

Features

  • Lambda Aggregator Function: Deploys a Python 3.11 Lambda function that consolidates multiple daily log files into single aggregated files, designed to be triggered by EventBridge scheduling.
  • Lambda Analyzer Function: Creates a Python 3.11 Lambda function that processes aggregated CloudTrail logs and generates statistical insights, triggered automatically by S3 ObjectCreated events.
  • Lambda Permissions: Establishes proper permissions allowing S3 service to invoke the Lambda functions while maintaining security boundaries.
  • S3 Event Triggers: analyzer.lambdaModule.s3EventTriggerDescription

Resources

1. Lambda Functions

  • Resource: aws_lambda_function
    • function_name: The name of the Lambda function.
    • role: The role that the Lambda function assumes when it executes, providing necessary permissions.
    • handler: The entry point for the Lambda function, specifying the file and function name to execute.
    • runtime: The runtime environment for the Lambda function.
    • filename: The path to the deployment package (ZIP file) containing the Lambda function code.
    • source_code_hash: The hash of the source code for the Lambda function.
    • timeout: The maximum execution time in seconds for the Lambda function.
    • environment: The environment variables for the Lambda function.
    • tags: The tags of the Lambda function.

lambda/main.tf

resource "aws_lambda_function" "log_aggregator" {
  function_name = var.lambda_function_name_aggregator
  role          = var.lambda_exec_role_arn
  handler       = var.lambda_function_handler_aggregator
  runtime       = "python3.11"
  filename         = "${path.module}/../../../lambda-aggregator.zip"
  source_code_hash = filebase64sha256("${path.module}/../../../lambda-aggregator.zip")
  timeout = 30

  environment {
    variables = {
      BUCKET_NAME = var.s3_input_bucket_name
    }
  }

  tags = {
    Project = "log-aggregator"
  }
}

resource "aws_lambda_function" "log_processor" {
  function_name = var.lambda_function_name
  role          = var.lambda_exec_role_arn
  handler       = var.lambda_function_handler
  runtime       = "python3.11"
  filename         = "${path.module}/../../../lambda-analyzer.zip"
  source_code_hash = filebase64sha256("${path.module}/../../../lambda-analyzer.zip")
  timeout = 30

  environment {
    variables = {
      BUCKET_NAME = var.s3_input_bucket_name
    }
  }

  tags = {
    Project = "log-processor"
  }
}

2. Lambda Permissions

  • Resource: aws_lambda_permission
    • statement_id: The unique identifier for the permission.
    • action: he action to allow the Lambda function to perform.
    • principal: The service or entity that is allowed to invoke the Lambda function.
    • function_name: The name of the Lambda function that is allowed to be invoked by the principal.
    • source_arn: The ARN of the resource that is allowed to invoke the Lambda function.

lambda/main.tf

resource "aws_lambda_permission" "allow_s3_trigger" {
  statement_id = "AllowExecutionFromS3"
  action = "lambda:InvokeFunction"
  principal = "s3.amazonaws.com"
  function_name = aws_lambda_function.log_processor.function_name
  source_arn = var.s3_input_bucket_arn
}

3. S3 Event Triggers

  • Resource: aws_s3_bucket_notification
    • bucket: The S3 bucket to configure the notification for.
    • action: The action to allow the Lambda function to perform.
    • depends_on: Ensures that the Lambda function exists before the S3 bucket notification is configured.
    • lambda_function: The Lambda function to be triggered when the event occurs and the event type that triggers the Lambda function.

lambda/main.tf

resource "aws_s3_bucket_notification" "log_upload_notification" {
  bucket = var.s3_input_bucket_bucket

  depends_on = [aws_lambda_permission.allow_s3_trigger]

  lambda_function {
    lambda_function_arn = aws_lambda_function.log_processor.arn
    events              = ["s3:ObjectCreated:*"]
  }
}

CloudTrail Module

This module is designed to create comprehensive AWS account auditing and logging infrastructure. It establishes a multi-region CloudTrail that captures all management events, global service events, and S3 data events across the entire AWS account. The module automatically delivers all captured logs to the designated S3 input bucket and configures the necessary bucket policies to enable secure log storage while maintaining proper access controls for the CloudTrail service.

Features

  • CloudTrail Configuration: Creates a multi-region trail that monitors AWS account activities across all regions, including global services like IAM, Route53, and CloudFront to provide complete visibility. The CloudTrail also captures all management plane operations such as resource creation, deletion, and configuration changes for comprehensive account auditing.
  • S3 Integration: Automatically delivers all captured logs to the input S3 bucket, and establishes secure S3 bucket policies that grant CloudTrail service the minimum necessary permissions to write logs and read bucket ACLs while maintaining security boundaries.

Resources

1. CloudTrail Configuration

  • Resource: aws_cloudtrail
    • name: The name of the CloudTrail.
    • s3_bucket_name: The name of the S3 bucket to store the CloudTrail logs.
    • is_multi_region_trail: Whether the trail captures events from all AWS regions or just where it's created.
    • include_global_service_events: Whether to include events from global AWS services like IAM and STS.
    • enable_logging: Whether to enable logging.
    • event_selector: Configuration block that defines which types of events the trail should capture.
    • depends_on: Ensures the CloudTrail is created only after the specified dependencies are established.

cloudtrail/main.tf

resource "aws_cloudtrail" "cloudtrail" {
    name = var.cloudtrail_name
    s3_bucket_name = var.s3_input_bucket_bucket
    is_multi_region_trail = var.is_multi_region_trail
    include_global_service_events = var.include_global_service_events
    enable_logging = var.enable_logging

    event_selector {
        include_management_events = var.include_management_events
        read_write_type = var.read_write_type
        data_resource {
            type = var.data_resource_type
            values = var.data_resource_values
        }
    }
    depends_on = [aws_s3_bucket_policy.logs_input]
}

2. S3 Bucket Policy

  • Resource: aws_s3_bucket_policy
    • bucket: The S3 bucket to configure the policy for.
    • policy: The JSON-encoded policy document that defines the permissions and access controls for the bucket, specifying who can perform what actions on the bucket and its objects.

cloudtrail/main.tf

resource "aws_s3_bucket_policy" "logs_input" {
  bucket = var.s3_input_bucket_bucket

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Sid       = "AWSCloudTrailWrite",
        Effect    = "Allow",
        Principal = {
          Service = "cloudtrail.amazonaws.com"
        },
        Action    = "s3:PutObject",
        Resource  = "${var.s3_input_bucket_arn}/AWSLogs/*", 
        Condition = {
          StringEquals = {
            "s3:x-amz-acl" = "bucket-owner-full-control"
          }
        }
      },
      {
        Sid       = "AWSCloudTrailRead",
        Effect    = "Allow",
        Principal = {
          Service = "cloudtrail.amazonaws.com"
        },
        Action    = "s3:GetBucketAcl",
        Resource  = "${var.s3_input_bucket_arn}"
      }
    ]
  })
}

EventBridge Module

This module is designed to create automated scheduling infrastructure for periodic log processing tasks. It establishes a CloudWatch Event Rule with cron-based scheduling that automatically triggers the Lambda Aggregator function every 12 hours to consolidate daily CloudTrail logs. The module configures the necessary event targets, permissions, and payload delivery to enable seamless integration between the scheduling service and the serverless compute functions..

Features

  • CloudWatch Event Rule: Creates an EventBridge rule that automatically triggers the Lambda analyzer function whenever new log files are uploaded to the input S3 bucket.
  • Lambda Target Configuration: Establishes the Lambda Aggregator function as the target for the scheduled events, with proper payload configuration including account ID for contextual processing.
  • EventBridge Permissions: Configures Lambda permissions to allow the EventBridge service to invoke the aggregator function while maintaining security boundaries and preventing unauthorized access.

Resources

1. CloudWatch Event Rule

  • Resource: aws_cloudwatch_event_rule
    • name: The unique name identifier for the EventBridge rule within the AWS account.
    • description: The description of the EventBridge rule.
    • schedule_expression: The cron or rate expression that defines when the rule should trigger, supporting both cron syntax and rate-based scheduling formats.

eventbridge/main.tf

resource "aws_cloudwatch_event_rule" "every_12_hours" {
  name        = var.eventbridge_name
  description = var.eventbridge_description
  schedule_expression = var.eventbridge_schedule_expression
}

2. Lambda Target Configuration

  • Resource: aws_cloudwatch_event_target
    • rule: The name of the EventBridge rule to which the target is attached.
    • target_id: The unique identifier for the target within the EventBridge rule.
    • arn: The Amazon Resource Name (ARN) of the Lambda function to be invoked by the EventBridge rule.
    • input: The JSON-encoded payload to be passed to the Lambda function.

eventbridge/main.tf

resource "aws_cloudwatch_event_target" "lambda_target" {
  rule      = aws_cloudwatch_event_rule.every_12_hours.name
  target_id = "logAggregator"
  arn       = var.lambda_function_arn

  input = jsonencode({
    account_id = var.account_id
  })
}

3. EventBridge Permissions

  • Resource: aws_lambda_permission
    • statement_id: The unique identifier for the permission.
    • action: The AWS Lambda action that is being allowed, in this case 'lambda:InvokeFunction' to permit function execution.
    • function_name: The name of the Lambda function that is allowed to be invoked by the EventBridge service.
    • principal: The service or entity that is allowed to invoke the Lambda function, in this case 'events.amazonaws.com' for EventBridge.
    • source_arn: The Amazon Resource Name (ARN) of the EventBridge rule that is allowed to invoke the Lambda function.

eventbridge/main.tf

resource "aws_lambda_permission" "allow_eventbridge" {
  statement_id  = "AllowExecutionFromEventBridge"
  action        = "lambda:InvokeFunction"
  function_name = var.lambda_function_name
  principal     = "events.amazonaws.com"
  source_arn    = aws_cloudwatch_event_rule.every_12_hours.arn
}

Terraform Main File

The main.tf file serves as the central configuration for deploying and managing the infrastructure on AWS using Terraform. It orchestrates the setup of the modules and resources previously defined, ensuring that all components are correctly provisioned and interconnected to form a cohesive infrastructure environment.

main.tf

data "aws_caller_identity" "current" {}

terraform {
    backend "s3" {
        bucket = "samuellincoln-serverless-log-analyzer-state"
        key = "terraform.tfstate"
        region = "us-east-1"
        dynamodb_table = "serverless-log-analyzer-locks"
        encrypt = true
    }

    required_providers {
        aws = {
            source = "hashicorp/aws"
            version = "~> 5.0"
        }
    }
}

provider "aws" {
    region = "us-east-1"
}

module "s3" {
    source = "./modules/s3"
    s3_input_bucket_name = "samuellincoln-log-analyzer-input"
    s3_output_bucket_name = "samuellincoln-log-analyzer-output"
    s3_input_bucket_tag = "log-analyzer-input"
    s3_output_bucket_tag = "log-analyzer-output"
}

module "iam" {
    source = "./modules/iam"
    lambda_exec_role_name = "log-analyzer-role"
    lambda_exec_policy_name = "log-analyzer-policy"
    s3_input_bucket_arn = module.s3.s3_input_bucket_arn
    s3_output_bucket_arn = module.s3.s3_output_bucket_arn
}

module "lambda" {
    source = "./modules/lambda"
    lambda_function_name = "log-analyzer-function"
    lambda_function_handler = "handler.lambda_handler"
    lambda_exec_role_arn = module.iam.lambda_exec_role_arn
    s3_input_bucket_name = module.s3.s3_input_bucket_name
    s3_input_bucket_arn = module.s3.s3_input_bucket_arn
    s3_input_bucket_bucket = module.s3.s3_input_bucket_bucket
    lambda_function_name_aggregator = "log-aggregator-function"
    lambda_function_handler_aggregator = "handler.lambda_handler"
}   

module "cloudtrail" {
    source = "./modules/cloudtrail"
    cloudtrail_name = "log-analyzer-cloudtrail"
    s3_input_bucket_name = module.s3.s3_input_bucket_name
    s3_input_bucket_arn = module.s3.s3_input_bucket_arn
    s3_input_bucket_bucket = module.s3.s3_input_bucket_bucket
    is_multi_region_trail = true
    include_global_service_events = true
    enable_logging = true
    include_management_events = true
    read_write_type = "All"
    data_resource_type = "AWS::S3::Object"
    data_resource_values = ["arn:aws:s3:::${module.s3.s3_input_bucket_arn}/*"]
    account_id = data.aws_caller_identity.current.account_id    
   
}

module "eventbridge" {
    source = "./modules/eventbridge"
    eventbridge_name = "log-aggregator-eventbridge"
    eventbridge_description = "EventBridge rule to trigger the log aggregator lambda function every 12 hours"
    eventbridge_schedule_expression = "cron(0 0/12 * * ? *)"
    lambda_function_arn = module.lambda.lambda_function_aggregator_arn
    lambda_function_name = module.lambda.lambda_function_aggregator_name
    account_id = data.aws_caller_identity.current.account_id
}

Lambda Aggregator Function

The Lambda Aggregator is a Python 3.11 serverless function designed to consolidate multiple CloudTrail log files from a single day into one aggregated file. This function is triggered by EventBridge every 12 hours and performs log consolidation, compression, and cleanup operations to optimize storage and prepare data for analysis.

Purpose

  • Consolidate multiple daily CloudTrail log files into a single aggregated file.
  • Reduce the number of individual files for efficient processing.
  • Cleanup original log files to optimize storage costs.
  • Prepare data for analysis by the analyzer function.

Full Code:

lambda-aggregator/handler.py

import boto3
import gzip
import json
from datetime import datetime
from io import BytesIO

s3 = boto3.client('s3')

def lambda_handler(event, context):
    print("aggregator called")
    bucket_name = 'samuellincoln-log-analyzer-input'
    
    # Calculate current date
    current_date = datetime.utcnow()
    year = current_date.strftime("%Y")
    month = current_date.strftime("%m")
    day = current_date.strftime("%d")
    
    
    # Create dynamic prefix
    prefix = f'AWSLogs/{event["account_id"]}/CloudTrail/us-east-1/{year}/{month}/{day}/'
    print(f"dynamic prefix: {prefix}")
    
    aggregated_data = []

    # List objects in bucket with specified prefix
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    print(f"list_objects_v2 response: {response.get('Contents', [])}")
    for obj in response.get('Contents', []):
        print(f"analyzing object: {obj}")
        key = obj['Key']
        print(f"analyzing key: {key}")
        if key.endswith('.json.gz') and 'aggregated' not in key:
            # Read and decompress file
            obj_data = s3.get_object(Bucket=bucket_name, Key=key)
            with gzip.GzipFile(fileobj=BytesIO(obj_data['Body'].read())) as gz:
                log_content = json.load(gz)
                aggregated_data.extend(log_content['Records'])

    # Get current date and time to name the file
    now = datetime.utcnow()
    output_key = f"{now.strftime('%Y-%m-%d-%H:%M')}-aggregated.json.gz"
    print(f"will save to bucket with path: {prefix}{output_key}")

    # Compress and write aggregated file
    out_buffer = BytesIO()
    with gzip.GzipFile(fileobj=out_buffer, mode='w') as gz:
        gz.write(json.dumps({'Records': aggregated_data}).encode('utf-8'))

    # Save aggregated file to S3
    s3.put_object(Bucket=bucket_name, Key=f"{prefix}{output_key}", Body=out_buffer.getvalue())
    
    # Delete old log files
    for obj in response.get('Contents', []):
        key = obj['Key']
        print(f"will delete file: {key}")
        if key.endswith('.json.gz') and 'aggregated' not in key:
            s3.delete_object(Bucket=bucket_name, Key=key)

    print("Logs aggregated and old logs deleted successfully")
    return {
        'statusCode': 200,
        'body': json.dumps('Logs aggregated and old logs deleted successfully')
    }

Function Workflow

1. Trigger and Initialization

lambda-aggregator/handler.py

def lambda_handler(event, context):
    print("aggregator called")
    bucket_name = 'samuellincoln-log-analyzer-input'

The function is triggered by EventBridge every 12 hours and the target bucket is defined.

2. Dynamic Path Construction

lambda-aggregator/handler.py

 # Calculate current date
    current_date = datetime.utcnow()
    year = current_date.strftime("%Y")
    month = current_date.strftime("%m")
    day = current_date.strftime("%d")
    
    
    # Create dynamic prefix
    prefix = f'AWSLogs/{event["account_id"]}/CloudTrail/us-east-1/{year}/{month}/{day}/'
    print(f"dynamic prefix: {prefix}")

Calculates the current date and creates a dynamic prefix for the log files, in the format of AWSLogs/[AccountID]/CloudTrail/us-east-1/YYYY/MM/DD/

3. File Discovery and Filtering

lambda-aggregator/handler.py

 # List objects in bucket with specified prefix
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
print(f"list_objects_v2 response: {response.get('Contents', [])}")
for obj in response.get('Contents', []):
    print(f"analyzing object: {obj}")
    key = obj['Key']
    print(f"analyzing key: {key}")
    if key.endswith('.json.gz') and 'aggregated' not in key:

Lists all objects in the current day's folder, filters for .json.gz files only, and excludes already aggregated files to prevent reprocessing.

4. Data Aggregation

lambda-aggregator/handler.py

 # Read and decompress file
    obj_data = s3.get_object(Bucket=bucket_name, Key=key)
    with gzip.GzipFile(fileobj=BytesIO(obj_data['Body'].read())) as gz:
        log_content = json.load(gz)
        aggregated_data.extend(log_content['Records'])

Downloads each individual log file, decompresses gzip content in memory, extracts Records array from each file, and consolidates all records into a single array.

5. Output File Creation

lambda-aggregator/handler.py

# Get current date and time to name the file
now = datetime.utcnow()
output_key = f"{now.strftime('%Y-%m-%d-%H:%M')}-aggregated.json.gz"
print(f"will save to bucket with path: {prefix}{output_key}")

# Compress and write aggregated file
out_buffer = BytesIO()
with gzip.GzipFile(fileobj=out_buffer, mode='w') as gz:
    gz.write(json.dumps({'Records': aggregated_data}).encode('utf-8'))

# Save aggregated file to S3
s3.put_object(Bucket=bucket_name, Key=f"{prefix}{output_key}", Body=out_buffer.getvalue())

Generates a timestamp-based filename, compresses aggregated data using gzip, and uploads the consolidated file to S3 in the format YYYY-MM-DD-HH:MM-aggregated.json.gz.

6. Cleanup Operations

lambda-aggregator/handler.py

# Delete old log files
for obj in response.get('Contents', []):
    key = obj['Key']
    print(f"will delete file: {key}")
    if key.endswith('.json.gz') and 'aggregated' not in key:
        s3.delete_object(Bucket=bucket_name, Key=key)

Deletes original individual log files while preserving the new aggregated file, optimizing storage costs and organization.

7. Results

The function completes successfully, consolidating logs and optimizing storage.

Before the aggregation, the files are as follows:

AWS S3

AWSLogs/123456789/CloudTrail/us-east-1/2025/07/15/
├── file1_20240115T1000Z.json.gz
├── file2_20240115T1100Z.json.gz
└── file3_20240115T1200Z.json.gz 

After the aggregation, the files are as follows:

AWS S3

AWSLogs/123456789/CloudTrail/us-east-1/2025/07/15/
└── 2025-07-15-14:30-aggregated.json.gz

Lambda Analyzer Function

The Lambda Analyzer is a Python 3.11 serverless function designed to process aggregated CloudTrail log files and generate comprehensive statistical insights about AWS account activities. This function is triggered automatically by S3 ObjectCreated events when aggregated log files are uploaded, and it produces detailed reports in both JSON and CSV formats for further analysis.

Purpose

  • Process aggregated CloudTrail log files to extract meaningful insights.
  • Analyze AWS account activities and usage patterns.
  • Generate statistical reports on events, resources, regions, and user activities.
  • Export results in multiple formats (JSON and CSV) for dashboard integration.

Handler Code

lambda-analyzer/handler.py

import boto3
import json
import gzip
import os
from urllib.parse import unquote

from exporter import export_to_json, export_to_csv
from analyzer import process_logs
s3 = boto3.client("s3")

def lambda_handler(event, context):
    bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
    print(f"[+] Bucket name: {bucket_name}")
    key = event["Records"][0]["s3"]["object"]["key"]
    print(f"[+] Key: {key}")

    key = unquote(key)
    
    base_filename = os.path.splitext(key.split('/')[-1])[0]
    
    local_gz_path = f"/tmp/{key.split('/')[-1]}"
    print(f"[+] Local gz path: {local_gz_path}")
    
    print(f"[+] Downloading file from s3")
    s3.download_file(bucket_name, key, local_gz_path)
    print(f"[+] File downloaded from s3")
    
    local_json_path = local_gz_path.replace('.gz', '')
    print(f"[+] Local json path: {local_json_path}")
    
    print(f"[+] Unzipping file")
    with gzip.open(local_gz_path, 'rt') as gz_file:
        with open(local_json_path, 'w') as json_file:
            json_file.write(gz_file.read())
    print(f"[+] File unzipped")
    if os.path.exists(local_json_path):
        print(f"[+] JSON file created at {local_json_path}")
    else:
        print(f"[-] JSON file not found at {local_json_path}")
    
    if "aggregated" in base_filename:
        insights = process_logs(local_json_path)
        
        base_filename = base_filename.replace('-aggregated.json', '')
        json_path = f"{local_json_path.rsplit('/', 1)[0]}/{base_filename}-insights.json"
        csv_path = f"{local_json_path.rsplit('/', 1)[0]}/{base_filename}-insights.csv"
      
        export_to_json(insights, json_path)
        export_to_csv(insights, csv_path)
        
        directory = '/'.join(key.split('/')[:-1])
        
        new_key_json = f"{directory}/{base_filename}-insights.json"
        new_key_csv = f"{directory}/{base_filename}-insights.csv"

        print(f"[+] Uploading insights to output s3 in path {new_key_json} and {new_key_csv}")
        s3.upload_file(json_path, "samuellincoln-log-analyzer-output", new_key_json)
        s3.upload_file(csv_path, "samuellincoln-log-analyzer-output", new_key_csv)
        print(f"[+] Insights uploaded to s3")
    else:
        print(f"[-] File {base_filename} does not contain 'aggregated', skipping processing.")

Analyzer Code

lambda-analyzer/analyzer.py

 import json
from collections import Counter, defaultdict

def process_logs(filepath):
    with open(filepath, "r") as file:
        logs = json.load(file)
        
    event_counter = Counter()
    resource_counter = Counter()
    account_counter = Counter()
    region_counter = Counter()
    source_ip_counter = Counter()
    event_type_counter = Counter()
    event_category_counter = Counter()
    
    for record in logs["Records"]:
        event_name = record["eventName"]
        event_counter[event_name] += 1
        
        for resource in record.get("resources", []): 
            resource_name = resource.get("type") 
            resource_counter[resource_name] += 1
            
        region = record.get("awsRegion", None)
        if region:
            region_counter[region] += 1
            
        source_ip = record.get("sourceIPAddress", None)
        if source_ip:
            source_ip_counter[source_ip] += 1
            
        event_type = record.get("eventType", None)
        if event_type:
            event_type_counter[event_type] += 1
            
        event_category = record.get("eventCategory", None)
        if event_category:
            event_category_counter[event_category] += 1
    
        account_type = record.get("userIdentity", {}).get("type", "Unknown")
        account_counter[account_type] += 1
                   
    return {
        "event_counts": dict(event_counter),      
        "resource_counts": dict(resource_counter),
        "account_counts": dict(account_counter),       
        "region_counts": dict(region_counter),       
        "source_ip_counts": dict(source_ip_counter),        
        "event_type_counts": dict(event_type_counter),
        "event_category_counts": dict(event_category_counter),
        "total_events": len(logs["Records"]),
    }

Export Code

lambda-analyzer/exporter.py

import json
import csv
import os

def export_to_json(logs, path):
    with open(path, "w") as f:
        json.dump(logs, f, indent=2)
    print(f"[+] Exported JSON logs to {path}")

def export_to_csv(logs, path):
    with open(path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["event_name", "count"])
        for event_name, count in logs["event_counts"].items():
            writer.writerow([event_name, count])
    print(f"[+] Exported CSV logs to {path}")

Function Workflow

1. Trigger and S3 Event Processing

lambda-analyzer/handler.py

def lambda_handler(event, context):
    bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
    print(f"[+] Bucket name: {bucket_name}")
    key = event["Records"][0]["s3"]["object"]["key"]
    print(f"[+] Key: {key}")
    
    key = unquote(key)

The function is triggered automatically by S3 ObjectCreated events when new files are uploaded to the input bucket. It extracts bucket name and object key from the S3 event payload.

2. File Download and Path Setup

lambda-analyzer/handler.py

base_filename = os.path.splitext(key.split('/')[-1])[0]
local_gz_path = f"/tmp/{key.split('/')[-1]}"
print(f"[+] Local gz path: {local_gz_path}")

print(f"[+] Downloading file from s3")
s3.download_file(bucket_name, key, local_gz_path)
print(f"[+] File downloaded from s3")

Downloads the triggered file from S3 to the Lambda's /tmp directory for local processing, and extracts the base filename for future use.

3. File Decompression

lambda-analyzer/handler.py

local_json_path = local_gz_path.replace('.gz', '')
print(f"[+] Local json path: {local_json_path}")

print(f"[+] Unzipping file")
with gzip.open(local_gz_path, 'rt') as gz_file:
    with open(local_json_path, 'w') as json_file:
        json_file.write(gz_file.read())
print(f"[+] File unzipped")

Decompresses the gzipped log file to extract the JSON content, creating a readable JSON file in the Lambda's temporary storage.

4. Aggregated File Validation and Processing

lambda-analyzer/handler.py

if "aggregated" in base_filename:
    insights = process_logs(local_json_path)
    
    base_filename = base_filename.replace('-aggregated.json', '')
    json_path = f"{local_json_path.rsplit('/', 1)[0]}/{base_filename}-insights.json"
    csv_path = f"{local_json_path.rsplit('/', 1)[0]}/{base_filename}-insights.csv"

Validates that the file is an aggregated log file before processing. If valid, calls the analyzer module to process the logs and defines output paths for the insights files.

5. Data Analysis and Insights Generation

lambda-analyzer/analyzer.py

# From analyzer.py
def process_logs(filepath):
    with open(filepath, "r") as file:
        logs = json.load(file)
        
    event_counter = Counter()
    resource_counter = Counter()
    account_counter = Counter()
    region_counter = Counter()
    source_ip_counter = Counter()
    event_type_counter = Counter()
    event_category_counter = Counter()

Processes the JSON log file to extract and count various metrics including event names, resource types, regions, source IPs, event types, event categories, and user identity types.

6. Data Export and Format Conversion

lambda-analyzer/exporter.py

export_to_json(insights, json_path)
export_to_csv(insights, csv_path)

# From exporter.py
def export_to_json(logs, path):
    with open(path, "w") as f:
        json.dump(logs, f, indent=2)

def export_to_csv(logs, path):
    with open(path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["event_name", "count"])
        for event_name, count in logs["event_counts"].items():
            writer.writerow([event_name, count])

Exports the generated insights to both JSON (complete data structure) and CSV (event counts only) formats for different use cases and integrations.

7. Output Upload to S3

lambda-analyzer/handler.py

directory = '/'.join(key.split('/')[:-1])
new_key_json = f"{directory}/{base_filename}-insights.json"
new_key_csv = f"{directory}/{base_filename}-insights.csv"

print(f"[+] Uploading insights to output s3 in path {new_key_json} and {new_key_csv}")
s3.upload_file(json_path, "samuellincoln-log-analyzer-output", new_key_json)
s3.upload_file(csv_path, "samuellincoln-log-analyzer-output", new_key_csv)
print(f"[+] Insights uploaded to s3") 

Uploads the generated insights to the output S3 bucket in the specified paths, completing the analysis process.

8. Results

The output files are structured as follows:

2025-07-15-14:30-insights.json

{
  "event_counts": {"AssumeRole": 150, "GetObject": 89, ...},
  "resource_counts": {"AWS::S3::Object": 45, "AWS::IAM::Role": 12, ...},
  "account_counts": {"AssumedRole": 98, "IAMUser": 52, ...},
  "region_counts": {"us-east-1": 120, "us-west-2": 30, ...},
  "source_ip_counts": {"192.168.1.1": 25, "10.0.0.5": 18, ...},
  "event_type_counts": {"AwsApiCall": 180, "AwsServiceEvent": 20, ...},
  "event_category_counts": {"Management": 150, "Data": 50, ...},
  "total_events": 200
}

AWS S3


samuellincoln-log-analyzer-output/
└── AWSLogs/123456789/CloudTrail/us-east-1/2025/07/15/
    ├── 2025-07-15-14:30-insights.json
    └── 2025-07-15-14:30-insights.csv

Dashboard

The Dashboard is an interactive visualization interface that transforms the processed AWS account activity data from the Lambda Analyzer into intuitive charts and metrics. This dashboard provides stakeholders with real-time insights into AWS usage patterns, security events, and operational trends through visual representations of event frequencies, regional distribution, user activities, resource utilization, and source IP analysis.

To see the dashboard created with the processed logs, go to the dashboard page.