← Blog

"AppSec Series #8: AWS Security Deep Dive — KMS, CloudTrail, GuardDuty and Security Hub"

AWS has native security services that form a complete defence stack. Learn KMS encryption, CloudTrail audit logging, GuardDuty threat detection, Security Hub findings, and WAF configuration.

reading now
views
comments

Series Navigation

Part 7: Cloud Security Fundamentals

Part 9: Container Security — Docker and Kubernetes Hardening


AWS Security Services Overview

AWS Security Stack (in order of implementation priority):

1. IAM              → Who can do what
2. CloudTrail       → Who did what and when (audit log)
3. Config           → Are resources configured correctly?
4. Security Hub     → Centralised findings from all security services
5. GuardDuty        → Threat detection (ML-based)
6. Macie            → Find PII/sensitive data in S3
7. KMS              → Encryption key management
8. Secrets Manager  → Store application secrets
9. WAF              → Block malicious web requests
10. Shield          → DDoS protection

AWS KMS — Key Management Service

KMS manages cryptographic keys so your applications never handle raw key material.

import boto3
import base64

kms = boto3.client('kms', region_name='us-east-1')

# ── Create a KMS key ─────────────────────────────────────────────────────────
# (Usually done via Terraform, not code)
response = kms.create_key(
    Description='App database encryption key',
    KeyUsage='ENCRYPT_DECRYPT',
    KeySpec='SYMMETRIC_DEFAULT',
    Tags=[{'TagKey': 'Environment', 'TagValue': 'production'}]
)
key_id = response['KeyMetadata']['KeyId']

# ── Encrypt data directly (up to 4KB) ───────────────────────────────────────
plaintext = b"Sensitive user PII"
encrypted = kms.encrypt(
    KeyId=key_id,
    Plaintext=plaintext,
    EncryptionContext={         # Additional authenticated data
        'Purpose': 'user-pii', # Context must match on decrypt
        'UserId': '123'
    }
)
ciphertext = encrypted['CiphertextBlob']

# ── Decrypt ──────────────────────────────────────────────────────────────────
decrypted = kms.decrypt(
    CiphertextBlob=ciphertext,
    EncryptionContext={'Purpose': 'user-pii', 'UserId': '123'}  # must match
)
plaintext = decrypted['Plaintext']

# ── Envelope encryption (for large data) ────────────────────────────────────
# Generate a data key — use it to encrypt your data, KMS encrypts the key
data_key_response = kms.generate_data_key(
    KeyId=key_id,
    KeySpec='AES_256',
    EncryptionContext={'Purpose': 'bulk-data'}
)
plaintext_data_key = data_key_response['Plaintext']    # use to encrypt, then delete from memory
encrypted_data_key = data_key_response['CiphertextBlob']  # store with encrypted data

# Encrypt your data with the plaintext key (using AES-256-GCM)
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import os
nonce = os.urandom(12)
aesgcm = AESGCM(plaintext_data_key)
encrypted_data = aesgcm.encrypt(nonce, large_data, None)

# Store: encrypted_data_key + nonce + encrypted_data
# Delete plaintext_data_key from memory

# To decrypt: call KMS to decrypt the data key, then decrypt your data
# Terraform KMS key with key rotation
resource "aws_kms_key" "app_key" {
  description             = "Application encryption key"
  enable_key_rotation     = true          # rotate annually
  deletion_window_in_days = 30

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "Enable IAM User Permissions"
        Effect = "Allow"
        Principal = { AWS = "arn:aws:iam::${var.account_id}:root" }
        Action   = "kms:*"
        Resource = "*"
      },
      {
        Sid    = "Allow App Role to Use Key"
        Effect = "Allow"
        Principal = { AWS = aws_iam_role.app_role.arn }
        Action   = ["kms:Decrypt", "kms:GenerateDataKey"]
        Resource = "*"
      }
    ]
  })
}

CloudTrail — Every API Call Logged

CloudTrail records every AWS API call — who made it, from where, what they changed.

# Enable CloudTrail (should be always on)
aws cloudtrail create-trail \
  --name organization-trail \
  --s3-bucket-name cloudtrail-logs-bucket \
  --include-global-service-events \
  --is-multi-region-trail \
  --enable-log-file-validation   # detect tampering with log files

aws cloudtrail start-logging --name organization-trail

# Query CloudTrail with Athena (for historical analysis)
# First, create Athena table pointing to S3 bucket
-- Athena query: Find all failed console logins
SELECT
  eventTime,
  sourceIPAddress,
  userIdentity.userName,
  responseElements.ConsoleLogin
FROM cloudtrail_logs
WHERE eventName = 'ConsoleLogin'
  AND responseElements.ConsoleLogin = 'Failure'
  AND eventTime > '2024-01-01'
ORDER BY eventTime DESC;

-- Find all S3 bucket policy changes
SELECT eventTime, userIdentity.arn, requestParameters
FROM cloudtrail_logs
WHERE eventName IN ('PutBucketPolicy', 'DeleteBucketPolicy', 'PutBucketAcl')
ORDER BY eventTime DESC;

-- Find root account usage (should be near zero)
SELECT eventTime, eventName, sourceIPAddress
FROM cloudtrail_logs
WHERE userIdentity.type = 'Root'
ORDER BY eventTime DESC;

-- Find IAM changes
SELECT eventTime, eventName, userIdentity.arn, requestParameters
FROM cloudtrail_logs
WHERE eventSource = 'iam.amazonaws.com'
  AND eventName IN ('CreateUser', 'AttachUserPolicy', 'CreateAccessKey',
                    'PutUserPolicy', 'AddUserToGroup')
ORDER BY eventTime DESC;

GuardDuty — ML-Based Threat Detection

GuardDuty analyses VPC Flow Logs, DNS logs, and CloudTrail to detect threats:

# Enable GuardDuty
aws guardduty create-detector \
  --enable \
  --finding-publishing-frequency FIFTEEN_MINUTES

DETECTOR_ID=$(aws guardduty list-detectors --query 'DetectorIds[0]' --output text)

# List high-severity findings
aws guardduty list-findings \
  --detector-id $DETECTOR_ID \
  --finding-criteria '{"Criterion":{"severity":{"Gte":7}}}'

# Get finding details
aws guardduty get-findings \
  --detector-id $DETECTOR_ID \
  --finding-ids FINDING_ID

Common GuardDuty finding types:

Threat findings:
├── UnauthorizedAccess:EC2/TorIPCaller     ← TOR exit node accessing EC2
├── UnauthorizedAccess:EC2/SSHBruteForce   ← SSH brute force attempt
├── Backdoor:EC2/C&CActivity              ← EC2 communicating with C2 server
├── CryptoCurrency:EC2/BitcoinTool         ← crypto mining detected
├── CredentialAccess:IAMUser/AnomalousBehavior ← unusual API call patterns
├── Exfiltration:S3/ObjectRead             ← unusual S3 data access
└── Policy:S3/BucketBlockPublicAccessDisabled ← public access re-enabled
# Lambda to auto-remediate GuardDuty findings
import boto3
import json

ec2 = boto3.client('ec2')
sns = boto3.client('sns')

def lambda_handler(event, context):
    finding = event['detail']
    finding_type = finding['type']
    severity = finding['severity']

    # Auto-isolate compromised EC2 instance for critical findings
    if severity >= 8 and 'EC2' in finding_type:
        instance_id = finding['resource']['instanceDetails']['instanceId']

        # Remove from all security groups, attach quarantine SG
        quarantine_sg = 'sg-quarantine-no-traffic'
        ec2.modify_instance_attribute(
            InstanceId=instance_id,
            Groups=[quarantine_sg]
        )

        # Create EBS snapshot for forensics
        instance = ec2.describe_instances(InstanceIds=[instance_id])
        for volume in instance['Reservations'][0]['Instances'][0]['BlockDeviceMappings']:
            ec2.create_snapshot(
                VolumeId=volume['Ebs']['VolumeId'],
                Description=f"Forensic snapshot - GuardDuty finding {finding['id']}"
            )

        # Alert security team
        sns.publish(
            TopicArn='arn:aws:sns:us-east-1:123456789:security-alerts',
            Subject=f"CRITICAL: EC2 Instance Isolated - {finding_type}",
            Message=json.dumps({
                'finding': finding,
                'action_taken': f'Instance {instance_id} isolated to quarantine SG',
                'forensic_snapshot': 'created'
            })
        )

AWS WAF — Web Application Firewall

# Terraform WAF configuration
resource "aws_wafv2_web_acl" "app_waf" {
  name  = "app-waf"
  scope = "REGIONAL"

  default_action { allow {} }

  # AWS Managed Rules — Common vulnerabilities
  rule {
    name     = "AWSManagedRulesCommonRuleSet"
    priority = 1
    override_action { none {} }
    statement {
      managed_rule_group_statement {
        name        = "AWSManagedRulesCommonRuleSet"
        vendor_name = "AWS"
      }
    }
    visibility_config {
      cloudwatch_metrics_enabled = true
      metric_name                = "CommonRuleSet"
      sampled_requests_enabled   = true
    }
  }

  # Rate limiting — block IPs making too many requests
  rule {
    name     = "RateLimitRule"
    priority = 2
    action { block {} }
    statement {
      rate_based_statement {
        limit              = 2000  # requests per 5 minutes per IP
        aggregate_key_type = "IP"
      }
    }
    visibility_config {
      cloudwatch_metrics_enabled = true
      metric_name                = "RateLimit"
      sampled_requests_enabled   = true
    }
  }

  # Block specific countries (if needed for compliance)
  rule {
    name     = "GeoBlockRule"
    priority = 3
    action { block {} }
    statement {
      geo_match_statement {
        country_codes = ["KP", "CU", "IR", "SY"]  # OFAC sanctioned countries
      }
    }
    visibility_config {
      cloudwatch_metrics_enabled = true
      metric_name                = "GeoBlock"
      sampled_requests_enabled   = true
    }
  }

  visibility_config {
    cloudwatch_metrics_enabled = true
    metric_name                = "app-waf"
    sampled_requests_enabled   = true
  }
}

Security Hub — Centralised Findings

import boto3

securityhub = boto3.client('securityhub')

# Enable Security Hub
securityhub.enable_security_hub(
    EnableDefaultStandards=True  # enables CIS AWS Foundations, AWS Security Best Practices
)

# List all FAILED findings
paginator = securityhub.get_paginator('get_findings')
for page in paginator.paginate(
    Filters={
        'ComplianceStatus': [{'Value': 'FAILED', 'Comparison': 'EQUALS'}],
        'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}],
        'SeverityLabel': [{'Value': 'CRITICAL', 'Comparison': 'EQUALS'}]
    }
):
    for finding in page['Findings']:
        print(f"[{finding['Severity']['Label']}] {finding['Title']}")
        print(f"  Resource: {finding['Resources'][0]['Id']}")
        print(f"  Remediation: {finding.get('Remediation', {}).get('Recommendation', {}).get('Text', 'N/A')}")

What's Next

In Part 9 we secure Docker containers and Kubernetes clusters — image hardening, non-root containers, network policies, RBAC, secret management, and runtime security with Falco.

Discussion

Loading...

Leave a Comment

All comments are reviewed before appearing. No links please.

0 / 1000