Series Navigation
← Part 5: Cryptography for AppSec Engineers
→ Part 7: Cloud Security Fundamentals
OWASP API Security Top 10
APIs have their own threat list — the OWASP API Security Top 10:
API1 Broken Object Level Authorization (BOLA / IDOR)
API2 Broken Authentication
API3 Broken Object Property Level Auth (Mass Assignment)
API4 Unrestricted Resource Consumption (No rate limits)
API5 Broken Function Level Authorization (User calls admin endpoints)
API6 Unrestricted Access to Sensitive Business Flows
API7 Server Side Request Forgery
API8 Security Misconfiguration
API9 Improper Inventory Management (Shadow/zombie APIs)
API10 Unsafe Consumption of APIs (Trusting 3rd party APIs blindly)
REST API Security
Mass Assignment (API3)
Automatically binding request body to model without filtering:
# ❌ Mass assignment vulnerability
@app.route('/api/users/me', methods=['PUT'])
@login_required
def update_profile():
user = current_user
# Directly update all fields from request — attacker sends:
# {"name": "Alice", "role": "admin", "is_verified": true}
for key, value in request.json.items():
setattr(user, key, value)
db.session.commit()
return jsonify(user.to_dict())
# ✅ Explicit allowlist of updatable fields
UPDATABLE_FIELDS = {'name', 'bio', 'avatar_url', 'phone'}
@app.route('/api/users/me', methods=['PUT'])
@login_required
def update_profile():
user = current_user
update_data = {k: v for k, v in request.json.items()
if k in UPDATABLE_FIELDS}
for key, value in update_data.items():
setattr(user, key, value)
db.session.commit()
return jsonify(user.to_dict())
Rate Limiting and Throttling (API4)
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import redis
# Different limits for different endpoint sensitivity
limiter = Limiter(
app,
key_func=get_remote_address,
storage_uri="redis://localhost:6379",
default_limits=["200 per day", "50 per hour"]
)
# Strict limit on sensitive endpoints
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute; 20 per hour")
def login(): ...
@app.route('/api/auth/forgot-password', methods=['POST'])
@limiter.limit("3 per hour")
def forgot_password(): ...
# Per-user rate limiting (authenticated)
def get_user_id():
return str(current_user.id) if current_user.is_authenticated else get_remote_address()
user_limiter = Limiter(app, key_func=get_user_id)
@app.route('/api/export')
@user_limiter.limit("10 per day")
def export_data(): ...
Excessive Data Exposure
# ❌ Returns all fields including sensitive ones
class UserSchema(Schema):
class Meta:
model = User
fields = '__all__' # returns password_hash, internal_notes, etc.
# ✅ Explicit field projection
class UserPublicSchema(Schema):
id = fields.Int()
username = fields.Str()
email = fields.Email()
created_at = fields.DateTime()
# Deliberately excludes: password_hash, admin_notes, internal_id
class UserPrivateSchema(UserPublicSchema):
# Additional fields only the owner sees
phone = fields.Str()
address = fields.Str()
@app.route('/api/users/<int:user_id>')
def get_user(user_id):
user = User.query.get_or_404(user_id)
if current_user.id == user_id:
return UserPrivateSchema().dump(user) # owner sees more
return UserPublicSchema().dump(user) # others see less
API Key Security
import secrets
import hashlib
class APIKeyService:
def generate_key(self, user_id: int) -> tuple[str, str]:
"""Returns (raw_key_for_user, key_id_for_display)"""
raw_key = f"aiqe_{secrets.token_urlsafe(32)}" # prefix helps identify in logs
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
api_key = APIKey(
key_hash=key_hash,
key_prefix=raw_key[:12], # store prefix for display (aiqe_abc123)
user_id=user_id,
created_at=datetime.utcnow()
)
db.session.add(api_key)
db.session.commit()
# Return raw key once — user must store it. We only store hash.
return raw_key, api_key.id
def verify_key(self, raw_key: str) -> Optional[APIKey]:
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
return APIKey.query.filter_by(key_hash=key_hash, revoked=False).first()
# Middleware
@app.before_request
def authenticate_api_key():
api_key_header = request.headers.get('X-API-Key') or \
request.headers.get('Authorization', '').removeprefix('Bearer ')
if api_key_header:
key = api_key_service.verify_key(api_key_header)
if key:
g.current_user = key.user
key.last_used_at = datetime.utcnow()
db.session.commit()
GraphQL Security
GraphQL presents unique attack surfaces:
Introspection Attacks
# Attacker queries schema to map the entire API
{
__schema {
types {
name
fields {
name
type { name kind }
}
}
}
}
# Response: complete API map including admin mutations
# Disable introspection in production
from graphene import Schema
from graphql import build_ast_schema
class SecureSchema(Schema):
def execute(self, *args, **kwargs):
# Block introspection queries in production
if not app.config.get('GRAPHQL_ALLOW_INTROSPECTION', False):
source = kwargs.get('source', '') or (args[0] if args else '')
if '__schema' in str(source) or '__type' in str(source):
raise GraphQLError("Introspection is disabled")
return super().execute(*args, **kwargs)
Depth and Complexity Attacks
# Resource exhaustion via deeply nested queries
{
user(id: 1) {
friends {
friends {
friends {
friends {
friends { id name email } # exponential DB queries
}
}
}
}
}
}
from graphql_server import GraphQLView
# Limit query depth and complexity
MAX_QUERY_DEPTH = 5
MAX_QUERY_COMPLEXITY = 100
def depth_limit_validator(max_depth):
def validator(validation_context):
# Count nesting depth and reject if exceeds limit
...
return validator
app.add_url_rule('/graphql', view_func=GraphQLView.as_view(
'graphql',
schema=schema,
validation_rules=[depth_limit_validator(MAX_QUERY_DEPTH)]
))
GraphQL Injection
# ❌ String interpolation in GraphQL — injection via variables
query = f"""
{{ user(email: "{email}") {{ id name }} }}
"""
# email = '") { id } users { email password' → data leakage
# ✅ Always use GraphQL variables
query = """
query GetUser($email: String!) {
user(email: $email) { id name }
}
"""
result = schema.execute(query, variable_values={"email": email})
Authorization in Resolvers
import graphene
from graphene_django import DjangoObjectType
class UserType(DjangoObjectType):
class Meta:
model = User
# Exclude sensitive fields
exclude = ('password', 'admin_notes', 'internal_id')
class Query(graphene.ObjectType):
user = graphene.Field(UserType, id=graphene.ID(required=True))
all_users = graphene.List(UserType)
def resolve_user(self, info, id):
# Authorization check in resolver
if not info.context.user.is_authenticated:
raise GraphQLError("Authentication required")
user = User.objects.get(pk=id)
# Users can only see themselves (unless admin)
if user.id != info.context.user.id and not info.context.user.is_admin:
raise GraphQLError("Not authorized")
return user
def resolve_all_users(self, info):
# Admin only
if not info.context.user.is_admin:
raise GraphQLError("Admin access required")
return User.objects.all()
gRPC Security
// user.proto
syntax = "proto3";
service UserService {
rpc GetUser (GetUserRequest) returns (UserResponse);
rpc UpdateUser (UpdateUserRequest) returns (UserResponse);
}
message GetUserRequest {
string user_id = 1;
}
message UserResponse {
string id = 1;
string username = 2;
string email = 3;
// Note: password_hash NOT included in response type
}
import grpc
from grpc import ServicerContext
class UserServiceServicer:
def GetUser(self, request: GetUserRequest, context: ServicerContext) -> UserResponse:
# Authentication via metadata
metadata = dict(context.invocation_metadata())
token = metadata.get('authorization', '').removeprefix('bearer ')
if not token:
context.abort(grpc.StatusCode.UNAUTHENTICATED, "Token required")
return UserResponse()
try:
claims = jwt_service.verify_access_token(token)
except AuthError as e:
context.abort(grpc.StatusCode.UNAUTHENTICATED, str(e))
return UserResponse()
# Authorization
if request.user_id != claims['sub'] and claims.get('role') != 'admin':
context.abort(grpc.StatusCode.PERMISSION_DENIED, "Access denied")
return UserResponse()
user = User.query.get(request.user_id)
if not user:
context.abort(grpc.StatusCode.NOT_FOUND, "User not found")
return UserResponse()
return UserResponse(id=str(user.id), username=user.username, email=user.email)
# gRPC server with TLS
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
with open('server.key', 'rb') as f: private_key = f.read()
with open('server.crt', 'rb') as f: certificate_chain = f.read()
server_credentials = grpc.ssl_server_credentials([(private_key, certificate_chain)])
server.add_secure_port('[::]:50051', server_credentials)
API Security Testing Checklist
# 1. Endpoint discovery
# Find undocumented endpoints (shadow APIs)
ffuf -u https://api.example.com/FUZZ -w /usr/share/wordlists/api-wordlist.txt
# 2. Authentication bypass tests
curl -H "Authorization: Bearer invalid_token" https://api.example.com/users/me
curl https://api.example.com/users/me # no auth header at all
# 3. IDOR testing (change IDs)
# Log in as user A, get token
# Try accessing user B's resources with user A's token
curl -H "Authorization: Bearer USER_A_TOKEN" https://api.example.com/users/USER_B_ID/profile
# 4. Mass assignment test
curl -X PUT https://api.example.com/users/me \
-H "Content-Type: application/json" \
-d '{"name":"Alice","role":"admin","is_verified":true}'
# 5. Rate limiting test
for i in {1..100}; do
curl -X POST https://api.example.com/auth/login \
-d '{"email":"test@test.com","password":"wrong"}' &
done
# Should get 429 after 5-10 attempts
# 6. SSRF test
curl https://api.example.com/fetch?url=http://169.254.169.254/latest/meta-data/
# 7. Automated API scanning
docker run -v $(pwd):/zap/wrk owasp/zap2docker-stable \
zap-api-scan.py -t https://api.example.com/swagger.json \
-f openapi -r api-report.html
What's Next
In Part 7 we move to the cloud — AWS shared responsibility model, IAM fundamentals, the most common cloud misconfigurations, and how to think about cloud-native attack surfaces.
Discussion
Loading...Leave a Comment
All comments are reviewed before appearing. No links please.