Series Navigation
← Part 3: OWASP Top 10 Deep Dive
→ Part 5: Cryptography for AppSec Engineers
Authentication vs Authorization
Authentication = WHO are you?
→ Verify identity via password, certificate, biometric, token
Authorization = WHAT are you allowed to do?
→ Check permissions after identity is verified
Common bug: Checking one but not the other
Example:
GET /api/admin/users
→ Authentication: ✅ user is logged in (valid JWT)
→ Authorization: ❌ NOT CHECKED — any user can list all users
Password Authentication Security
# Complete secure password flow
import bcrypt
import secrets
from datetime import datetime, timedelta
class AuthService:
def register(self, email: str, password: str) -> User:
# 1. Check password strength
self._validate_password_strength(password)
# 2. Check if email already exists (but don't reveal it in error)
# Return same response whether email exists or not (prevent enumeration)
# 3. Hash password — NEVER store plaintext
password_hash = bcrypt.hashpw(
password.encode('utf-8'),
bcrypt.gensalt(rounds=12) # 12 rounds = ~300ms on modern hardware
)
user = User(email=email, password_hash=password_hash)
db.session.add(user)
db.session.commit()
return user
def authenticate(self, email: str, password: str) -> Optional[User]:
user = User.query.filter_by(email=email).first()
# ✅ ALWAYS run bcrypt even if user not found
# Prevents timing attacks that reveal whether email exists
dummy_hash = b'$2b$12$invalid_hash_for_timing'
check_hash = user.password_hash if user else dummy_hash
password_correct = bcrypt.checkpw(password.encode('utf-8'), check_hash)
if not user or not password_correct:
return None # Same error for wrong email AND wrong password
return user
def _validate_password_strength(self, password: str):
if len(password) < 12:
raise ValueError("Password must be at least 12 characters")
# Check against HaveIBeenPwned API (k-anonymity — privacy-safe)
self._check_pwned_passwords(password)
def _check_pwned_passwords(self, password: str):
import hashlib, requests
sha1 = hashlib.sha1(password.encode()).hexdigest().upper()
prefix, suffix = sha1[:5], sha1[5:]
response = requests.get(f"https://api.pwnedpasswords.com/range/{prefix}")
if suffix in response.text:
raise ValueError("This password has appeared in data breaches. Choose another.")
JWT — JSON Web Tokens
JWT is three Base64url-encoded parts separated by dots:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9 ← Header
.eyJ1c2VyX2lkIjoxMjMsInJvbGUiOiJ1c2VyIn0 ← Payload
.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c ← Signature
Decoded payload:
{
"user_id": 123,
"role": "user",
"iat": 1704067200,
"exp": 1704153600
}
JWT Attacks
Attack 1 — Algorithm Confusion (alg:none)
import jwt
# ❌ Vulnerable — accepts 'none' algorithm (no signature)
token_data = jwt.decode(token, options={"verify_signature": False})
# Attacker creates: {"alg":"none"}.{"user_id":1,"role":"admin"}.
# No signature needed — they become admin
# ✅ Always specify allowed algorithms
token_data = jwt.decode(
token,
secret_key,
algorithms=["HS256"], # ONLY allow this algorithm
options={"require": ["exp", "iat", "sub"]} # require these claims
)
Attack 2 — RS256 to HS256 Confusion
# When server uses RS256 (asymmetric):
# Public key is... public. Attacker takes the public key
# and uses it as the HMAC secret for HS256.
# If server accepts both HS256 and RS256, it verifies with public key → valid!
# ✅ Fix: only allow the algorithm you use
jwt.decode(token, public_key, algorithms=["RS256"]) # NOT ["RS256", "HS256"]
Attack 3 — Weak Secret
# Crack HS256 JWT with weak secret
# Tool: hashcat
hashcat -a 0 -m 16500 eyJ...token .../wordlists/rockyou.txt
# If cracked, attacker can forge ANY token
Secure JWT Implementation
import jwt
import secrets
import os
from datetime import datetime, timedelta, timezone
class JWTService:
def __init__(self):
# Secret must be cryptographically random and at least 256 bits
self.secret = os.environ['JWT_SECRET'] # 32+ random bytes
self.algorithm = 'HS256'
self.access_token_ttl = timedelta(minutes=15) # SHORT expiry
self.refresh_token_ttl = timedelta(days=7)
def create_access_token(self, user_id: int, role: str) -> str:
now = datetime.now(timezone.utc)
payload = {
'sub': str(user_id), # subject (user identifier)
'role': role,
'iat': now, # issued at
'exp': now + self.access_token_ttl, # expiry
'jti': secrets.token_hex(16), # unique ID (enables revocation)
'type': 'access' # prevent refresh token used as access
}
return jwt.encode(payload, self.secret, algorithm=self.algorithm)
def verify_access_token(self, token: str) -> dict:
try:
payload = jwt.decode(
token,
self.secret,
algorithms=[self.algorithm],
options={
"require": ["sub", "exp", "iat", "jti", "type"],
"verify_exp": True,
"verify_iat": True,
}
)
if payload.get('type') != 'access':
raise ValueError("Not an access token")
# Check token revocation (if using a blocklist)
if self._is_revoked(payload['jti']):
raise ValueError("Token has been revoked")
return payload
except jwt.ExpiredSignatureError:
raise AuthError("Token expired")
except jwt.InvalidTokenError as e:
raise AuthError(f"Invalid token: {e}")
def _is_revoked(self, jti: str) -> bool:
# Check Redis blocklist
return redis_client.exists(f"revoked_jti:{jti}")
def revoke_token(self, jti: str, ttl_seconds: int):
# Add to blocklist until token would have expired naturally
redis_client.setex(f"revoked_jti:{jti}", ttl_seconds, "1")
JWT Storage — Where to Store Tokens
Option 1: localStorage (JavaScript accessible)
✅ Simple
❌ Vulnerable to XSS — any script on page can steal it
❌ Do NOT use for sensitive applications
Option 2: sessionStorage (clears on tab close)
✅ Slightly better than localStorage
❌ Still XSS vulnerable
Option 3: HttpOnly cookie (recommended for web apps)
✅ Not accessible via JavaScript — XSS safe
✅ Automatically sent with requests
⚠️ Requires CSRF protection (SameSite=Strict or CSRF token)
Option 4: Memory (JavaScript variable)
✅ XSS resistant
✅ No CSRF risk
❌ Lost on page refresh
→ Used for SPAs with refresh token in HttpOnly cookie
# HttpOnly cookie approach
@app.route('/login', methods=['POST'])
def login():
user = authenticate(request.json['email'], request.json['password'])
if not user:
return jsonify({'error': 'Invalid credentials'}), 401
access_token = jwt_service.create_access_token(user.id, user.role)
refresh_token = jwt_service.create_refresh_token(user.id)
response = jsonify({'message': 'Logged in'})
response.set_cookie(
'access_token',
access_token,
httponly=True, # not accessible via JavaScript
secure=True, # HTTPS only
samesite='Strict', # CSRF protection
max_age=900 # 15 minutes
)
return response
OAuth 2.0 — Understanding the Flows
OAuth 2.0 is an authorization framework — it lets users grant third-party apps access to their data without sharing passwords.
OAuth 2.0 Authorization Code Flow (most secure for web apps):
1. User clicks "Login with Google"
2. App redirects to Google with:
GET https://accounts.google.com/oauth/authorize
?client_id=YOUR_CLIENT_ID
&redirect_uri=https://your-app.com/callback
&response_type=code
&scope=email profile
&state=RANDOM_CSRF_TOKEN ← CRITICAL: prevents CSRF
&code_challenge=BASE64URL(SHA256(verifier)) ← PKCE
&code_challenge_method=S256
3. User logs in to Google, grants permissions
4. Google redirects back: /callback?code=AUTH_CODE&state=RANDOM_CSRF_TOKEN
5. App verifies state matches original (prevents CSRF)
6. App exchanges code for tokens:
POST https://oauth2.googleapis.com/token
code=AUTH_CODE
client_id=YOUR_CLIENT_ID
client_secret=YOUR_CLIENT_SECRET ← server-side only, never in browser
redirect_uri=https://your-app.com/callback
grant_type=authorization_code
code_verifier=ORIGINAL_VERIFIER ← PKCE
7. Google returns: access_token, id_token, refresh_token
8. App validates id_token, creates session
Common OAuth Vulnerabilities
# ❌ Missing state parameter — OAuth CSRF
# Attacker tricks user into authorizing attacker's account
# User's session gets linked to attacker's identity
# ✅ Always generate and validate state
import secrets
def start_oauth():
state = secrets.token_urlsafe(32)
session['oauth_state'] = state # store server-side
return redirect(f"https://provider.com/oauth?...&state={state}")
def oauth_callback():
if request.args.get('state') != session.get('oauth_state'):
abort(400, "State mismatch — possible CSRF attack")
# continue...
# ❌ Open redirect in redirect_uri
# Attacker registers redirect_uri=https://evil.com
# OAuth code sent to attacker's server
# ✅ Validate redirect_uri against whitelist
ALLOWED_REDIRECT_URIS = {'https://your-app.com/auth/callback'}
if redirect_uri not in ALLOWED_REDIRECT_URIS:
abort(400, "Invalid redirect_uri")
Authorization Models
Role-Based Access Control (RBAC)
from enum import Enum
from functools import wraps
class Role(str, Enum):
VIEWER = 'viewer'
EDITOR = 'editor'
ADMIN = 'admin'
ROLE_HIERARCHY = {
Role.ADMIN: {Role.ADMIN, Role.EDITOR, Role.VIEWER},
Role.EDITOR: {Role.EDITOR, Role.VIEWER},
Role.VIEWER: {Role.VIEWER},
}
def require_role(required_role: Role):
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
user_role = Role(current_user.role)
if required_role not in ROLE_HIERARCHY.get(user_role, set()):
abort(403, "Insufficient permissions")
return f(*args, **kwargs)
return decorated
return decorator
@app.route('/admin/users')
@login_required
@require_role(Role.ADMIN)
def list_users():
return jsonify(User.query.all())
Attribute-Based Access Control (ABAC) — More Flexible
class PolicyEngine:
"""Evaluate policies based on subject, action, resource, and environment"""
def can(self, user, action: str, resource) -> bool:
context = {
'user': {'id': user.id, 'role': user.role, 'department': user.department},
'action': action,
'resource': {'owner_id': resource.owner_id, 'sensitivity': resource.sensitivity},
'environment': {'time': datetime.utcnow().hour, 'ip': request.remote_addr}
}
return self._evaluate_policies(context)
def _evaluate_policies(self, ctx: dict) -> bool:
# Policy: users can read their own documents
if ctx['action'] == 'read' and ctx['resource']['owner_id'] == ctx['user']['id']:
return True
# Policy: admins can read any document
if ctx['user']['role'] == 'admin' and ctx['action'] == 'read':
return True
# Policy: highly sensitive documents require admin role regardless
if ctx['resource']['sensitivity'] == 'top_secret' and ctx['user']['role'] != 'admin':
return False
# Policy: no access outside business hours for non-admins
hour = ctx['environment']['time']
if not (8 <= hour <= 18) and ctx['user']['role'] != 'admin':
return False
return False # Default deny
policy = PolicyEngine()
@app.route('/documents/<int:doc_id>')
@login_required
def get_document(doc_id):
doc = Document.query.get_or_404(doc_id)
if not policy.can(current_user, 'read', doc):
abort(403)
return jsonify(doc.to_dict())
Multi-Factor Authentication (MFA)
import pyotp
import qrcode
from io import BytesIO
import base64
class MFAService:
def setup_totp(self, user: User) -> dict:
"""Generate TOTP secret and QR code for authenticator apps"""
secret = pyotp.random_base32() # 16-char base32 secret
# Generate provisioning URI (for QR code)
totp = pyotp.TOTP(secret)
uri = totp.provisioning_uri(
name=user.email,
issuer_name='AIQEAcademy'
)
# Generate QR code
qr = qrcode.make(uri)
buffer = BytesIO()
qr.save(buffer, format='PNG')
qr_base64 = base64.b64encode(buffer.getvalue()).decode()
# Store secret (encrypted) — don't activate until user verifies
user.totp_secret_pending = encrypt(secret)
db.session.commit()
return {'qr_code': f"data:image/png;base64,{qr_base64}", 'secret': secret}
def verify_and_activate_totp(self, user: User, code: str) -> bool:
"""User enters code from authenticator — verify and activate MFA"""
secret = decrypt(user.totp_secret_pending)
totp = pyotp.TOTP(secret)
# valid_window=1 allows 30 seconds of clock drift
if totp.verify(code, valid_window=1):
user.totp_secret = user.totp_secret_pending
user.mfa_enabled = True
user.totp_secret_pending = None
db.session.commit()
return True
return False
def verify_totp(self, user: User, code: str) -> bool:
"""Verify TOTP code during login"""
if not user.mfa_enabled:
return True # MFA not set up
secret = decrypt(user.totp_secret)
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=1)
Interview Questions
Q: What is the difference between HS256 and RS256 for JWT signing?
HS256 uses a shared secret (symmetric) — both signing and verification use the same key. RS256 uses a key pair — the private key signs, the public key verifies. RS256 is preferred for distributed systems where multiple services need to verify tokens but only one should issue them. The public key can be distributed safely; if the verification key leaked, attackers still can't forge tokens.
Q: What is PKCE and why is it used?
Proof Key for Code Exchange. The OAuth client generates a random code_verifier, hashes it to code_challenge, and sends the challenge with the auth request. When exchanging the code for tokens, it sends the original code_verifier. The auth server verifies the hash matches. This prevents authorization code interception attacks — even if an attacker intercepts the code, they can't exchange it without the verifier.
Q: Why should JWTs have short expiry times?
JWTs are stateless — you can't invalidate them once issued (without a blocklist). If an access token is stolen, the attacker can use it until it expires. A 15-minute expiry limits the damage window. Refresh tokens (longer-lived, stored securely) handle session continuity.
What's Next
In Part 5 we cover cryptography — hashing, symmetric and asymmetric encryption, TLS internals, key management, and common crypto mistakes that developers make.
Discussion
Loading...Leave a Comment
All comments are reviewed before appearing. No links please.