← Blog

"AppSec Series #6: API Security — REST, GraphQL and gRPC Attack Surfaces"

APIs are the new perimeter. Learn REST API security misconfigs, GraphQL introspection and injection attacks, gRPC security, mass assignment, and how to build a secure API from scratch.

reading now
views
comments

Series Navigation

Part 5: Cryptography for AppSec Engineers

Part 7: Cloud Security Fundamentals


OWASP API Security Top 10

APIs have their own threat list — the OWASP API Security Top 10:

API1  Broken Object Level Authorization    (BOLA / IDOR)
API2  Broken Authentication
API3  Broken Object Property Level Auth   (Mass Assignment)
API4  Unrestricted Resource Consumption   (No rate limits)
API5  Broken Function Level Authorization (User calls admin endpoints)
API6  Unrestricted Access to Sensitive Business Flows
API7  Server Side Request Forgery
API8  Security Misconfiguration
API9  Improper Inventory Management       (Shadow/zombie APIs)
API10 Unsafe Consumption of APIs          (Trusting 3rd party APIs blindly)

REST API Security

Mass Assignment (API3)

Automatically binding request body to model without filtering:

# ❌ Mass assignment vulnerability
@app.route('/api/users/me', methods=['PUT'])
@login_required
def update_profile():
    user = current_user
    # Directly update all fields from request — attacker sends:
    # {"name": "Alice", "role": "admin", "is_verified": true}
    for key, value in request.json.items():
        setattr(user, key, value)
    db.session.commit()
    return jsonify(user.to_dict())

# ✅ Explicit allowlist of updatable fields
UPDATABLE_FIELDS = {'name', 'bio', 'avatar_url', 'phone'}

@app.route('/api/users/me', methods=['PUT'])
@login_required
def update_profile():
    user = current_user
    update_data = {k: v for k, v in request.json.items()
                   if k in UPDATABLE_FIELDS}
    for key, value in update_data.items():
        setattr(user, key, value)
    db.session.commit()
    return jsonify(user.to_dict())

Rate Limiting and Throttling (API4)

from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import redis

# Different limits for different endpoint sensitivity
limiter = Limiter(
    app,
    key_func=get_remote_address,
    storage_uri="redis://localhost:6379",
    default_limits=["200 per day", "50 per hour"]
)

# Strict limit on sensitive endpoints
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute; 20 per hour")
def login(): ...

@app.route('/api/auth/forgot-password', methods=['POST'])
@limiter.limit("3 per hour")
def forgot_password(): ...

# Per-user rate limiting (authenticated)
def get_user_id():
    return str(current_user.id) if current_user.is_authenticated else get_remote_address()

user_limiter = Limiter(app, key_func=get_user_id)

@app.route('/api/export')
@user_limiter.limit("10 per day")
def export_data(): ...

Excessive Data Exposure

# ❌ Returns all fields including sensitive ones
class UserSchema(Schema):
    class Meta:
        model = User
        fields = '__all__'  # returns password_hash, internal_notes, etc.

# ✅ Explicit field projection
class UserPublicSchema(Schema):
    id = fields.Int()
    username = fields.Str()
    email = fields.Email()
    created_at = fields.DateTime()
    # Deliberately excludes: password_hash, admin_notes, internal_id

class UserPrivateSchema(UserPublicSchema):
    # Additional fields only the owner sees
    phone = fields.Str()
    address = fields.Str()

@app.route('/api/users/<int:user_id>')
def get_user(user_id):
    user = User.query.get_or_404(user_id)
    if current_user.id == user_id:
        return UserPrivateSchema().dump(user)  # owner sees more
    return UserPublicSchema().dump(user)        # others see less

API Key Security

import secrets
import hashlib

class APIKeyService:
    def generate_key(self, user_id: int) -> tuple[str, str]:
        """Returns (raw_key_for_user, key_id_for_display)"""
        raw_key = f"aiqe_{secrets.token_urlsafe(32)}"  # prefix helps identify in logs
        key_hash = hashlib.sha256(raw_key.encode()).hexdigest()

        api_key = APIKey(
            key_hash=key_hash,
            key_prefix=raw_key[:12],  # store prefix for display (aiqe_abc123)
            user_id=user_id,
            created_at=datetime.utcnow()
        )
        db.session.add(api_key)
        db.session.commit()

        # Return raw key once — user must store it. We only store hash.
        return raw_key, api_key.id

    def verify_key(self, raw_key: str) -> Optional[APIKey]:
        key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
        return APIKey.query.filter_by(key_hash=key_hash, revoked=False).first()

# Middleware
@app.before_request
def authenticate_api_key():
    api_key_header = request.headers.get('X-API-Key') or \
                     request.headers.get('Authorization', '').removeprefix('Bearer ')

    if api_key_header:
        key = api_key_service.verify_key(api_key_header)
        if key:
            g.current_user = key.user
            key.last_used_at = datetime.utcnow()
            db.session.commit()

GraphQL Security

GraphQL presents unique attack surfaces:

Introspection Attacks

# Attacker queries schema to map the entire API
{
  __schema {
    types {
      name
      fields {
        name
        type { name kind }
      }
    }
  }
}
# Response: complete API map including admin mutations
# Disable introspection in production
from graphene import Schema
from graphql import build_ast_schema

class SecureSchema(Schema):
    def execute(self, *args, **kwargs):
        # Block introspection queries in production
        if not app.config.get('GRAPHQL_ALLOW_INTROSPECTION', False):
            source = kwargs.get('source', '') or (args[0] if args else '')
            if '__schema' in str(source) or '__type' in str(source):
                raise GraphQLError("Introspection is disabled")
        return super().execute(*args, **kwargs)

Depth and Complexity Attacks

# Resource exhaustion via deeply nested queries
{
  user(id: 1) {
    friends {
      friends {
        friends {
          friends {
            friends { id name email }  # exponential DB queries
          }
        }
      }
    }
  }
}
from graphql_server import GraphQLView

# Limit query depth and complexity
MAX_QUERY_DEPTH = 5
MAX_QUERY_COMPLEXITY = 100

def depth_limit_validator(max_depth):
    def validator(validation_context):
        # Count nesting depth and reject if exceeds limit
        ...
    return validator

app.add_url_rule('/graphql', view_func=GraphQLView.as_view(
    'graphql',
    schema=schema,
    validation_rules=[depth_limit_validator(MAX_QUERY_DEPTH)]
))

GraphQL Injection

# ❌ String interpolation in GraphQL — injection via variables
query = f"""
  {{ user(email: "{email}") {{ id name }} }}
"""
# email = '") { id } users { email password' → data leakage

# ✅ Always use GraphQL variables
query = """
  query GetUser($email: String!) {
    user(email: $email) { id name }
  }
"""
result = schema.execute(query, variable_values={"email": email})

Authorization in Resolvers

import graphene
from graphene_django import DjangoObjectType

class UserType(DjangoObjectType):
    class Meta:
        model = User
        # Exclude sensitive fields
        exclude = ('password', 'admin_notes', 'internal_id')

class Query(graphene.ObjectType):
    user = graphene.Field(UserType, id=graphene.ID(required=True))
    all_users = graphene.List(UserType)

    def resolve_user(self, info, id):
        # Authorization check in resolver
        if not info.context.user.is_authenticated:
            raise GraphQLError("Authentication required")

        user = User.objects.get(pk=id)

        # Users can only see themselves (unless admin)
        if user.id != info.context.user.id and not info.context.user.is_admin:
            raise GraphQLError("Not authorized")

        return user

    def resolve_all_users(self, info):
        # Admin only
        if not info.context.user.is_admin:
            raise GraphQLError("Admin access required")
        return User.objects.all()

gRPC Security

// user.proto
syntax = "proto3";

service UserService {
  rpc GetUser (GetUserRequest) returns (UserResponse);
  rpc UpdateUser (UpdateUserRequest) returns (UserResponse);
}

message GetUserRequest {
  string user_id = 1;
}

message UserResponse {
  string id = 1;
  string username = 2;
  string email = 3;
  // Note: password_hash NOT included in response type
}
import grpc
from grpc import ServicerContext

class UserServiceServicer:

    def GetUser(self, request: GetUserRequest, context: ServicerContext) -> UserResponse:
        # Authentication via metadata
        metadata = dict(context.invocation_metadata())
        token = metadata.get('authorization', '').removeprefix('bearer ')

        if not token:
            context.abort(grpc.StatusCode.UNAUTHENTICATED, "Token required")
            return UserResponse()

        try:
            claims = jwt_service.verify_access_token(token)
        except AuthError as e:
            context.abort(grpc.StatusCode.UNAUTHENTICATED, str(e))
            return UserResponse()

        # Authorization
        if request.user_id != claims['sub'] and claims.get('role') != 'admin':
            context.abort(grpc.StatusCode.PERMISSION_DENIED, "Access denied")
            return UserResponse()

        user = User.query.get(request.user_id)
        if not user:
            context.abort(grpc.StatusCode.NOT_FOUND, "User not found")
            return UserResponse()

        return UserResponse(id=str(user.id), username=user.username, email=user.email)

# gRPC server with TLS
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
with open('server.key', 'rb') as f: private_key = f.read()
with open('server.crt', 'rb') as f: certificate_chain = f.read()

server_credentials = grpc.ssl_server_credentials([(private_key, certificate_chain)])
server.add_secure_port('[::]:50051', server_credentials)

API Security Testing Checklist

# 1. Endpoint discovery
# Find undocumented endpoints (shadow APIs)
ffuf -u https://api.example.com/FUZZ -w /usr/share/wordlists/api-wordlist.txt

# 2. Authentication bypass tests
curl -H "Authorization: Bearer invalid_token" https://api.example.com/users/me
curl https://api.example.com/users/me  # no auth header at all

# 3. IDOR testing (change IDs)
# Log in as user A, get token
# Try accessing user B's resources with user A's token
curl -H "Authorization: Bearer USER_A_TOKEN" https://api.example.com/users/USER_B_ID/profile

# 4. Mass assignment test
curl -X PUT https://api.example.com/users/me \
  -H "Content-Type: application/json" \
  -d '{"name":"Alice","role":"admin","is_verified":true}'

# 5. Rate limiting test
for i in {1..100}; do
  curl -X POST https://api.example.com/auth/login \
    -d '{"email":"test@test.com","password":"wrong"}' &
done
# Should get 429 after 5-10 attempts

# 6. SSRF test
curl https://api.example.com/fetch?url=http://169.254.169.254/latest/meta-data/

# 7. Automated API scanning
docker run -v $(pwd):/zap/wrk owasp/zap2docker-stable \
  zap-api-scan.py -t https://api.example.com/swagger.json \
  -f openapi -r api-report.html

What's Next

In Part 7 we move to the cloud — AWS shared responsibility model, IAM fundamentals, the most common cloud misconfigurations, and how to think about cloud-native attack surfaces.

Discussion

Loading...

Leave a Comment

All comments are reviewed before appearing. No links please.

0 / 1000