← Blog

"AppSec Series #4: Authentication and Authorization — OAuth 2.0, JWT Security and Session Management"

Authentication and authorization failures cause the most account takeovers. Learn OAuth 2.0 flows, JWT attack patterns, session security, MFA implementation, and authorization models.

reading now
views
comments

Series Navigation

Part 3: OWASP Top 10 Deep Dive

Part 5: Cryptography for AppSec Engineers


Authentication vs Authorization

Authentication = WHO are you?
  → Verify identity via password, certificate, biometric, token

Authorization = WHAT are you allowed to do?
  → Check permissions after identity is verified

Common bug: Checking one but not the other

Example:
GET /api/admin/users
  → Authentication: ✅ user is logged in (valid JWT)
  → Authorization: ❌ NOT CHECKED — any user can list all users

Password Authentication Security

# Complete secure password flow

import bcrypt
import secrets
from datetime import datetime, timedelta

class AuthService:

    def register(self, email: str, password: str) -> User:
        # 1. Check password strength
        self._validate_password_strength(password)

        # 2. Check if email already exists (but don't reveal it in error)
        # Return same response whether email exists or not (prevent enumeration)

        # 3. Hash password — NEVER store plaintext
        password_hash = bcrypt.hashpw(
            password.encode('utf-8'),
            bcrypt.gensalt(rounds=12)  # 12 rounds = ~300ms on modern hardware
        )

        user = User(email=email, password_hash=password_hash)
        db.session.add(user)
        db.session.commit()
        return user

    def authenticate(self, email: str, password: str) -> Optional[User]:
        user = User.query.filter_by(email=email).first()

        # ✅ ALWAYS run bcrypt even if user not found
        # Prevents timing attacks that reveal whether email exists
        dummy_hash = b'$2b$12$invalid_hash_for_timing'
        check_hash = user.password_hash if user else dummy_hash

        password_correct = bcrypt.checkpw(password.encode('utf-8'), check_hash)

        if not user or not password_correct:
            return None  # Same error for wrong email AND wrong password

        return user

    def _validate_password_strength(self, password: str):
        if len(password) < 12:
            raise ValueError("Password must be at least 12 characters")
        # Check against HaveIBeenPwned API (k-anonymity — privacy-safe)
        self._check_pwned_passwords(password)

    def _check_pwned_passwords(self, password: str):
        import hashlib, requests
        sha1 = hashlib.sha1(password.encode()).hexdigest().upper()
        prefix, suffix = sha1[:5], sha1[5:]
        response = requests.get(f"https://api.pwnedpasswords.com/range/{prefix}")
        if suffix in response.text:
            raise ValueError("This password has appeared in data breaches. Choose another.")

JWT — JSON Web Tokens

JWT is three Base64url-encoded parts separated by dots:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9   ← Header
.eyJ1c2VyX2lkIjoxMjMsInJvbGUiOiJ1c2VyIn0  ← Payload
.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c  ← Signature

Decoded payload:

{
  "user_id": 123,
  "role": "user",
  "iat": 1704067200,
  "exp": 1704153600
}

JWT Attacks

Attack 1 — Algorithm Confusion (alg:none)

import jwt

# ❌ Vulnerable — accepts 'none' algorithm (no signature)
token_data = jwt.decode(token, options={"verify_signature": False})
# Attacker creates: {"alg":"none"}.{"user_id":1,"role":"admin"}.
# No signature needed — they become admin

# ✅ Always specify allowed algorithms
token_data = jwt.decode(
    token,
    secret_key,
    algorithms=["HS256"],  # ONLY allow this algorithm
    options={"require": ["exp", "iat", "sub"]}  # require these claims
)

Attack 2 — RS256 to HS256 Confusion

# When server uses RS256 (asymmetric):
# Public key is... public. Attacker takes the public key
# and uses it as the HMAC secret for HS256.
# If server accepts both HS256 and RS256, it verifies with public key → valid!

# ✅ Fix: only allow the algorithm you use
jwt.decode(token, public_key, algorithms=["RS256"])  # NOT ["RS256", "HS256"]

Attack 3 — Weak Secret

# Crack HS256 JWT with weak secret
# Tool: hashcat
hashcat -a 0 -m 16500 eyJ...token .../wordlists/rockyou.txt

# If cracked, attacker can forge ANY token

Secure JWT Implementation

import jwt
import secrets
import os
from datetime import datetime, timedelta, timezone

class JWTService:
    def __init__(self):
        # Secret must be cryptographically random and at least 256 bits
        self.secret = os.environ['JWT_SECRET']  # 32+ random bytes
        self.algorithm = 'HS256'
        self.access_token_ttl = timedelta(minutes=15)   # SHORT expiry
        self.refresh_token_ttl = timedelta(days=7)

    def create_access_token(self, user_id: int, role: str) -> str:
        now = datetime.now(timezone.utc)
        payload = {
            'sub': str(user_id),          # subject (user identifier)
            'role': role,
            'iat': now,                    # issued at
            'exp': now + self.access_token_ttl,  # expiry
            'jti': secrets.token_hex(16),  # unique ID (enables revocation)
            'type': 'access'               # prevent refresh token used as access
        }
        return jwt.encode(payload, self.secret, algorithm=self.algorithm)

    def verify_access_token(self, token: str) -> dict:
        try:
            payload = jwt.decode(
                token,
                self.secret,
                algorithms=[self.algorithm],
                options={
                    "require": ["sub", "exp", "iat", "jti", "type"],
                    "verify_exp": True,
                    "verify_iat": True,
                }
            )
            if payload.get('type') != 'access':
                raise ValueError("Not an access token")

            # Check token revocation (if using a blocklist)
            if self._is_revoked(payload['jti']):
                raise ValueError("Token has been revoked")

            return payload
        except jwt.ExpiredSignatureError:
            raise AuthError("Token expired")
        except jwt.InvalidTokenError as e:
            raise AuthError(f"Invalid token: {e}")

    def _is_revoked(self, jti: str) -> bool:
        # Check Redis blocklist
        return redis_client.exists(f"revoked_jti:{jti}")

    def revoke_token(self, jti: str, ttl_seconds: int):
        # Add to blocklist until token would have expired naturally
        redis_client.setex(f"revoked_jti:{jti}", ttl_seconds, "1")

JWT Storage — Where to Store Tokens

Option 1: localStorage (JavaScript accessible)
  ✅ Simple
  ❌ Vulnerable to XSS — any script on page can steal it
  ❌ Do NOT use for sensitive applications

Option 2: sessionStorage (clears on tab close)
  ✅ Slightly better than localStorage
  ❌ Still XSS vulnerable

Option 3: HttpOnly cookie (recommended for web apps)
  ✅ Not accessible via JavaScript — XSS safe
  ✅ Automatically sent with requests
  ⚠️ Requires CSRF protection (SameSite=Strict or CSRF token)

Option 4: Memory (JavaScript variable)
  ✅ XSS resistant
  ✅ No CSRF risk
  ❌ Lost on page refresh
  → Used for SPAs with refresh token in HttpOnly cookie
# HttpOnly cookie approach
@app.route('/login', methods=['POST'])
def login():
    user = authenticate(request.json['email'], request.json['password'])
    if not user:
        return jsonify({'error': 'Invalid credentials'}), 401

    access_token = jwt_service.create_access_token(user.id, user.role)
    refresh_token = jwt_service.create_refresh_token(user.id)

    response = jsonify({'message': 'Logged in'})
    response.set_cookie(
        'access_token',
        access_token,
        httponly=True,      # not accessible via JavaScript
        secure=True,        # HTTPS only
        samesite='Strict',  # CSRF protection
        max_age=900         # 15 minutes
    )
    return response

OAuth 2.0 — Understanding the Flows

OAuth 2.0 is an authorization framework — it lets users grant third-party apps access to their data without sharing passwords.

OAuth 2.0 Authorization Code Flow (most secure for web apps):

1. User clicks "Login with Google"
2. App redirects to Google with:
   GET https://accounts.google.com/oauth/authorize
     ?client_id=YOUR_CLIENT_ID
     &redirect_uri=https://your-app.com/callback
     &response_type=code
     &scope=email profile
     &state=RANDOM_CSRF_TOKEN         ← CRITICAL: prevents CSRF
     &code_challenge=BASE64URL(SHA256(verifier))  ← PKCE
     &code_challenge_method=S256

3. User logs in to Google, grants permissions
4. Google redirects back: /callback?code=AUTH_CODE&state=RANDOM_CSRF_TOKEN
5. App verifies state matches original (prevents CSRF)
6. App exchanges code for tokens:
   POST https://oauth2.googleapis.com/token
     code=AUTH_CODE
     client_id=YOUR_CLIENT_ID
     client_secret=YOUR_CLIENT_SECRET   ← server-side only, never in browser
     redirect_uri=https://your-app.com/callback
     grant_type=authorization_code
     code_verifier=ORIGINAL_VERIFIER    ← PKCE

7. Google returns: access_token, id_token, refresh_token
8. App validates id_token, creates session

Common OAuth Vulnerabilities

# ❌ Missing state parameter — OAuth CSRF
# Attacker tricks user into authorizing attacker's account
# User's session gets linked to attacker's identity

# ✅ Always generate and validate state
import secrets

def start_oauth():
    state = secrets.token_urlsafe(32)
    session['oauth_state'] = state  # store server-side
    return redirect(f"https://provider.com/oauth?...&state={state}")

def oauth_callback():
    if request.args.get('state') != session.get('oauth_state'):
        abort(400, "State mismatch — possible CSRF attack")
    # continue...

# ❌ Open redirect in redirect_uri
# Attacker registers redirect_uri=https://evil.com
# OAuth code sent to attacker's server

# ✅ Validate redirect_uri against whitelist
ALLOWED_REDIRECT_URIS = {'https://your-app.com/auth/callback'}
if redirect_uri not in ALLOWED_REDIRECT_URIS:
    abort(400, "Invalid redirect_uri")

Authorization Models

Role-Based Access Control (RBAC)

from enum import Enum
from functools import wraps

class Role(str, Enum):
    VIEWER = 'viewer'
    EDITOR = 'editor'
    ADMIN  = 'admin'

ROLE_HIERARCHY = {
    Role.ADMIN:  {Role.ADMIN, Role.EDITOR, Role.VIEWER},
    Role.EDITOR: {Role.EDITOR, Role.VIEWER},
    Role.VIEWER: {Role.VIEWER},
}

def require_role(required_role: Role):
    def decorator(f):
        @wraps(f)
        def decorated(*args, **kwargs):
            user_role = Role(current_user.role)
            if required_role not in ROLE_HIERARCHY.get(user_role, set()):
                abort(403, "Insufficient permissions")
            return f(*args, **kwargs)
        return decorated
    return decorator

@app.route('/admin/users')
@login_required
@require_role(Role.ADMIN)
def list_users():
    return jsonify(User.query.all())

Attribute-Based Access Control (ABAC) — More Flexible

class PolicyEngine:
    """Evaluate policies based on subject, action, resource, and environment"""

    def can(self, user, action: str, resource) -> bool:
        context = {
            'user': {'id': user.id, 'role': user.role, 'department': user.department},
            'action': action,
            'resource': {'owner_id': resource.owner_id, 'sensitivity': resource.sensitivity},
            'environment': {'time': datetime.utcnow().hour, 'ip': request.remote_addr}
        }
        return self._evaluate_policies(context)

    def _evaluate_policies(self, ctx: dict) -> bool:
        # Policy: users can read their own documents
        if ctx['action'] == 'read' and ctx['resource']['owner_id'] == ctx['user']['id']:
            return True

        # Policy: admins can read any document
        if ctx['user']['role'] == 'admin' and ctx['action'] == 'read':
            return True

        # Policy: highly sensitive documents require admin role regardless
        if ctx['resource']['sensitivity'] == 'top_secret' and ctx['user']['role'] != 'admin':
            return False

        # Policy: no access outside business hours for non-admins
        hour = ctx['environment']['time']
        if not (8 <= hour <= 18) and ctx['user']['role'] != 'admin':
            return False

        return False  # Default deny

policy = PolicyEngine()

@app.route('/documents/<int:doc_id>')
@login_required
def get_document(doc_id):
    doc = Document.query.get_or_404(doc_id)
    if not policy.can(current_user, 'read', doc):
        abort(403)
    return jsonify(doc.to_dict())

Multi-Factor Authentication (MFA)

import pyotp
import qrcode
from io import BytesIO
import base64

class MFAService:

    def setup_totp(self, user: User) -> dict:
        """Generate TOTP secret and QR code for authenticator apps"""
        secret = pyotp.random_base32()  # 16-char base32 secret

        # Generate provisioning URI (for QR code)
        totp = pyotp.TOTP(secret)
        uri = totp.provisioning_uri(
            name=user.email,
            issuer_name='AIQEAcademy'
        )

        # Generate QR code
        qr = qrcode.make(uri)
        buffer = BytesIO()
        qr.save(buffer, format='PNG')
        qr_base64 = base64.b64encode(buffer.getvalue()).decode()

        # Store secret (encrypted) — don't activate until user verifies
        user.totp_secret_pending = encrypt(secret)
        db.session.commit()

        return {'qr_code': f"data:image/png;base64,{qr_base64}", 'secret': secret}

    def verify_and_activate_totp(self, user: User, code: str) -> bool:
        """User enters code from authenticator — verify and activate MFA"""
        secret = decrypt(user.totp_secret_pending)
        totp = pyotp.TOTP(secret)

        # valid_window=1 allows 30 seconds of clock drift
        if totp.verify(code, valid_window=1):
            user.totp_secret = user.totp_secret_pending
            user.mfa_enabled = True
            user.totp_secret_pending = None
            db.session.commit()
            return True
        return False

    def verify_totp(self, user: User, code: str) -> bool:
        """Verify TOTP code during login"""
        if not user.mfa_enabled:
            return True  # MFA not set up

        secret = decrypt(user.totp_secret)
        totp = pyotp.TOTP(secret)
        return totp.verify(code, valid_window=1)

Interview Questions

Q: What is the difference between HS256 and RS256 for JWT signing?

HS256 uses a shared secret (symmetric) — both signing and verification use the same key. RS256 uses a key pair — the private key signs, the public key verifies. RS256 is preferred for distributed systems where multiple services need to verify tokens but only one should issue them. The public key can be distributed safely; if the verification key leaked, attackers still can't forge tokens.

Q: What is PKCE and why is it used?

Proof Key for Code Exchange. The OAuth client generates a random code_verifier, hashes it to code_challenge, and sends the challenge with the auth request. When exchanging the code for tokens, it sends the original code_verifier. The auth server verifies the hash matches. This prevents authorization code interception attacks — even if an attacker intercepts the code, they can't exchange it without the verifier.

Q: Why should JWTs have short expiry times?

JWTs are stateless — you can't invalidate them once issued (without a blocklist). If an access token is stolen, the attacker can use it until it expires. A 15-minute expiry limits the damage window. Refresh tokens (longer-lived, stored securely) handle session continuity.


What's Next

In Part 5 we cover cryptography — hashing, symmetric and asymmetric encryption, TLS internals, key management, and common crypto mistakes that developers make.

Discussion

Loading...

Leave a Comment

All comments are reviewed before appearing. No links please.

0 / 1000