Series Navigation
← Part 7: Cloud Security Fundamentals
→ Part 9: Container Security — Docker and Kubernetes Hardening
AWS Security Services Overview
AWS Security Stack (in order of implementation priority):
1. IAM → Who can do what
2. CloudTrail → Who did what and when (audit log)
3. Config → Are resources configured correctly?
4. Security Hub → Centralised findings from all security services
5. GuardDuty → Threat detection (ML-based)
6. Macie → Find PII/sensitive data in S3
7. KMS → Encryption key management
8. Secrets Manager → Store application secrets
9. WAF → Block malicious web requests
10. Shield → DDoS protection
AWS KMS — Key Management Service
KMS manages cryptographic keys so your applications never handle raw key material.
import boto3
import base64
kms = boto3.client('kms', region_name='us-east-1')
# ── Create a KMS key ─────────────────────────────────────────────────────────
# (Usually done via Terraform, not code)
response = kms.create_key(
Description='App database encryption key',
KeyUsage='ENCRYPT_DECRYPT',
KeySpec='SYMMETRIC_DEFAULT',
Tags=[{'TagKey': 'Environment', 'TagValue': 'production'}]
)
key_id = response['KeyMetadata']['KeyId']
# ── Encrypt data directly (up to 4KB) ───────────────────────────────────────
plaintext = b"Sensitive user PII"
encrypted = kms.encrypt(
KeyId=key_id,
Plaintext=plaintext,
EncryptionContext={ # Additional authenticated data
'Purpose': 'user-pii', # Context must match on decrypt
'UserId': '123'
}
)
ciphertext = encrypted['CiphertextBlob']
# ── Decrypt ──────────────────────────────────────────────────────────────────
decrypted = kms.decrypt(
CiphertextBlob=ciphertext,
EncryptionContext={'Purpose': 'user-pii', 'UserId': '123'} # must match
)
plaintext = decrypted['Plaintext']
# ── Envelope encryption (for large data) ────────────────────────────────────
# Generate a data key — use it to encrypt your data, KMS encrypts the key
data_key_response = kms.generate_data_key(
KeyId=key_id,
KeySpec='AES_256',
EncryptionContext={'Purpose': 'bulk-data'}
)
plaintext_data_key = data_key_response['Plaintext'] # use to encrypt, then delete from memory
encrypted_data_key = data_key_response['CiphertextBlob'] # store with encrypted data
# Encrypt your data with the plaintext key (using AES-256-GCM)
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import os
nonce = os.urandom(12)
aesgcm = AESGCM(plaintext_data_key)
encrypted_data = aesgcm.encrypt(nonce, large_data, None)
# Store: encrypted_data_key + nonce + encrypted_data
# Delete plaintext_data_key from memory
# To decrypt: call KMS to decrypt the data key, then decrypt your data
# Terraform KMS key with key rotation
resource "aws_kms_key" "app_key" {
description = "Application encryption key"
enable_key_rotation = true # rotate annually
deletion_window_in_days = 30
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "Enable IAM User Permissions"
Effect = "Allow"
Principal = { AWS = "arn:aws:iam::${var.account_id}:root" }
Action = "kms:*"
Resource = "*"
},
{
Sid = "Allow App Role to Use Key"
Effect = "Allow"
Principal = { AWS = aws_iam_role.app_role.arn }
Action = ["kms:Decrypt", "kms:GenerateDataKey"]
Resource = "*"
}
]
})
}
CloudTrail — Every API Call Logged
CloudTrail records every AWS API call — who made it, from where, what they changed.
# Enable CloudTrail (should be always on)
aws cloudtrail create-trail \
--name organization-trail \
--s3-bucket-name cloudtrail-logs-bucket \
--include-global-service-events \
--is-multi-region-trail \
--enable-log-file-validation # detect tampering with log files
aws cloudtrail start-logging --name organization-trail
# Query CloudTrail with Athena (for historical analysis)
# First, create Athena table pointing to S3 bucket
-- Athena query: Find all failed console logins
SELECT
eventTime,
sourceIPAddress,
userIdentity.userName,
responseElements.ConsoleLogin
FROM cloudtrail_logs
WHERE eventName = 'ConsoleLogin'
AND responseElements.ConsoleLogin = 'Failure'
AND eventTime > '2024-01-01'
ORDER BY eventTime DESC;
-- Find all S3 bucket policy changes
SELECT eventTime, userIdentity.arn, requestParameters
FROM cloudtrail_logs
WHERE eventName IN ('PutBucketPolicy', 'DeleteBucketPolicy', 'PutBucketAcl')
ORDER BY eventTime DESC;
-- Find root account usage (should be near zero)
SELECT eventTime, eventName, sourceIPAddress
FROM cloudtrail_logs
WHERE userIdentity.type = 'Root'
ORDER BY eventTime DESC;
-- Find IAM changes
SELECT eventTime, eventName, userIdentity.arn, requestParameters
FROM cloudtrail_logs
WHERE eventSource = 'iam.amazonaws.com'
AND eventName IN ('CreateUser', 'AttachUserPolicy', 'CreateAccessKey',
'PutUserPolicy', 'AddUserToGroup')
ORDER BY eventTime DESC;
GuardDuty — ML-Based Threat Detection
GuardDuty analyses VPC Flow Logs, DNS logs, and CloudTrail to detect threats:
# Enable GuardDuty
aws guardduty create-detector \
--enable \
--finding-publishing-frequency FIFTEEN_MINUTES
DETECTOR_ID=$(aws guardduty list-detectors --query 'DetectorIds[0]' --output text)
# List high-severity findings
aws guardduty list-findings \
--detector-id $DETECTOR_ID \
--finding-criteria '{"Criterion":{"severity":{"Gte":7}}}'
# Get finding details
aws guardduty get-findings \
--detector-id $DETECTOR_ID \
--finding-ids FINDING_ID
Common GuardDuty finding types:
Threat findings:
├── UnauthorizedAccess:EC2/TorIPCaller ← TOR exit node accessing EC2
├── UnauthorizedAccess:EC2/SSHBruteForce ← SSH brute force attempt
├── Backdoor:EC2/C&CActivity ← EC2 communicating with C2 server
├── CryptoCurrency:EC2/BitcoinTool ← crypto mining detected
├── CredentialAccess:IAMUser/AnomalousBehavior ← unusual API call patterns
├── Exfiltration:S3/ObjectRead ← unusual S3 data access
└── Policy:S3/BucketBlockPublicAccessDisabled ← public access re-enabled
# Lambda to auto-remediate GuardDuty findings
import boto3
import json
ec2 = boto3.client('ec2')
sns = boto3.client('sns')
def lambda_handler(event, context):
finding = event['detail']
finding_type = finding['type']
severity = finding['severity']
# Auto-isolate compromised EC2 instance for critical findings
if severity >= 8 and 'EC2' in finding_type:
instance_id = finding['resource']['instanceDetails']['instanceId']
# Remove from all security groups, attach quarantine SG
quarantine_sg = 'sg-quarantine-no-traffic'
ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[quarantine_sg]
)
# Create EBS snapshot for forensics
instance = ec2.describe_instances(InstanceIds=[instance_id])
for volume in instance['Reservations'][0]['Instances'][0]['BlockDeviceMappings']:
ec2.create_snapshot(
VolumeId=volume['Ebs']['VolumeId'],
Description=f"Forensic snapshot - GuardDuty finding {finding['id']}"
)
# Alert security team
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789:security-alerts',
Subject=f"CRITICAL: EC2 Instance Isolated - {finding_type}",
Message=json.dumps({
'finding': finding,
'action_taken': f'Instance {instance_id} isolated to quarantine SG',
'forensic_snapshot': 'created'
})
)
AWS WAF — Web Application Firewall
# Terraform WAF configuration
resource "aws_wafv2_web_acl" "app_waf" {
name = "app-waf"
scope = "REGIONAL"
default_action { allow {} }
# AWS Managed Rules — Common vulnerabilities
rule {
name = "AWSManagedRulesCommonRuleSet"
priority = 1
override_action { none {} }
statement {
managed_rule_group_statement {
name = "AWSManagedRulesCommonRuleSet"
vendor_name = "AWS"
}
}
visibility_config {
cloudwatch_metrics_enabled = true
metric_name = "CommonRuleSet"
sampled_requests_enabled = true
}
}
# Rate limiting — block IPs making too many requests
rule {
name = "RateLimitRule"
priority = 2
action { block {} }
statement {
rate_based_statement {
limit = 2000 # requests per 5 minutes per IP
aggregate_key_type = "IP"
}
}
visibility_config {
cloudwatch_metrics_enabled = true
metric_name = "RateLimit"
sampled_requests_enabled = true
}
}
# Block specific countries (if needed for compliance)
rule {
name = "GeoBlockRule"
priority = 3
action { block {} }
statement {
geo_match_statement {
country_codes = ["KP", "CU", "IR", "SY"] # OFAC sanctioned countries
}
}
visibility_config {
cloudwatch_metrics_enabled = true
metric_name = "GeoBlock"
sampled_requests_enabled = true
}
}
visibility_config {
cloudwatch_metrics_enabled = true
metric_name = "app-waf"
sampled_requests_enabled = true
}
}
Security Hub — Centralised Findings
import boto3
securityhub = boto3.client('securityhub')
# Enable Security Hub
securityhub.enable_security_hub(
EnableDefaultStandards=True # enables CIS AWS Foundations, AWS Security Best Practices
)
# List all FAILED findings
paginator = securityhub.get_paginator('get_findings')
for page in paginator.paginate(
Filters={
'ComplianceStatus': [{'Value': 'FAILED', 'Comparison': 'EQUALS'}],
'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}],
'SeverityLabel': [{'Value': 'CRITICAL', 'Comparison': 'EQUALS'}]
}
):
for finding in page['Findings']:
print(f"[{finding['Severity']['Label']}] {finding['Title']}")
print(f" Resource: {finding['Resources'][0]['Id']}")
print(f" Remediation: {finding.get('Remediation', {}).get('Recommendation', {}).get('Text', 'N/A')}")
What's Next
In Part 9 we secure Docker containers and Kubernetes clusters — image hardening, non-root containers, network policies, RBAC, secret management, and runtime security with Falco.
Discussion
Loading...Leave a Comment
All comments are reviewed before appearing. No links please.