iT邦幫忙

2024 iThome 鐵人賽

DAY 19
0
Software Development

Django 2024: 從入門到SaaS實戰系列 第 22

Django REST framework: 掌握 JWT、CORS 和 Cookie 處理技巧

  • 分享至 

  • xImage
  •  

上一篇我們提到了到JSON Web Token(JWT)的流程以及如果要進行實踐的話可能需要考量的因素

我們就在這邊進行實作吧

程式碼:https://github.com/class83108/drf_demo/tree/jwt

今日重點:

  • 流程說明:說明整體實作的流程以及對應的技術細節
  • 實際流程:
    • 安裝與配置
    • 建立客戶端
    • 建立服務端
      • 自定義序列化器
      • 自定義認證JWT認證
      • 自定義相關視圖
      • 配置路由
  • 功能展示

流程說明

  • 建立客戶端:在5501端口上建立前端模擬跨域的場景

  • 有以下功能

    • 客戶端輸入用戶名發送POST請求到Django,Django根據用戶名發送one-time password(OTP)到用戶的信箱中
    • 客戶端輸入用戶名與OTP發送POST請求,Django根據用戶名與OTP確認沒問題後,返回Access Token與Refresh Token
    • Access Token儲存在localStorage,而Refresh Token存在cookie中
    • 客戶端向需要認證的視圖發出請求,並且在請求頭中附帶Access Token
    • 客戶端拿Refresh Token去刷新Access Token
  • 技術細節

    • 透過OTP的方式,避免在發送請求時傳遞敏感資訊
    • OTP儲存在Redis並且設置過期時間,除了速度更快之外也避免儲存壓力
    • 將Access Token設置較短的過期時間規避風險
    • 將Refresh Token儲存在相對安全的cookie中

實作流程

安裝所需套件與基礎配置

  • 安裝所需套件
poetry add djangorestframework-simplejwt
poetry add django-cors-headers
poetry add django-redis
  • django-cors-headers幫我們添加對應的CORS (Cross-Origin Resource Sharing, 跨來源資源共用)標頭,讓後端響應時包含允許前端網域的標頭而不被擋下

  • djangorestframework-simplejwt則讓我們能快速建立JWT相關認證邏輯

  • 如果要在Django中使用Redis需要額外安裝django-redis

  • 基礎配置

INSTALLED_APPS = [
    ...
    # DRF
    "rest_framework",
    "rest_framework.authtoken",
    "rest_framework_simplejwt",
    "corsheaders",
    # 自定義 app
    "note",
]

MIDDLEWARE = [
    "corsheaders.middleware.CorsMiddleware",  # 放在最前面
    ...
]

REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES": [
        # "rest_framework_simplejwt.authentication.JWTAuthentication", # 這是預設的 JWT 驗證器
        "drf_demo.custom_authentication.CustomJWTAuthentication",  # 使用自定義的 JWT 驗證器
    ],
}

# 進行JWT的基礎設定
SIMPLE_JWT = {
    "AUTH_HEADER_TYPES": ("Bearer",),
    "ACCESS_TOKEN_LIFETIME": timedelta(minutes=60), # 這邊可以設置更短的時間
    "REFRESH_TOKEN_LIFETIME": timedelta(days=1),
    "ROTATE_REFRESH_TOKENS": False,
    "BLACKLIST_AFTER_ROTATION": True,
    # 用於認證令牌的類別
    "AUTH_TOKEN_CLASSES": ("rest_framework_simplejwt.tokens.AccessToken",),
    "AUTH_COOKIE": "access_token",  # 設定 cookie 名稱
    "TOKEN_OBTAIN_SERIALIZER": "drf_demo.serializers.CustomTokenObtainPairSerializer",
}

# CORS 設定 允許跨域請求
CORS_ALLOWED_ORIGINS = [
    "http://localhost:5501",
    "http://127.0.0.1:5501",
]

CORS_ALLOW_CREDENTIALS = True

# 配置Redis當作快取
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/1",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
        },
    }
}

# SMTP Configuration
EMAIL_BACKEND = "django.core.mail.backends.smtp.EmailBackend"
EMAIL_HOST = "smtp.gmail.com"  # SMTP伺服器
EMAIL_PORT = 587  # TLS通訊埠號
EMAIL_USE_TLS = True  # 開啟TLS(傳輸層安全性)
EMAIL_HOST_USER = EMAIL_USER
EMAIL_HOST_PASSWORD = EMAIL_PASSWORD
  • DEFAULT_AUTHENTICATION_CLASSES中我們自定義的認證器可以等完成再補上
  • SIMPLE_JWT中設定了使用djangorestframework-simplejwt 的基本設定
    • TOKEN_OBTAIN_SERIALIZER也可以在序列化器中指定,可以等完成後再補上
  • 有關於Email的部分,需要先去gmail申請應用程式密碼,流程可以參考連結:Python寄送Gmail電子郵件實作教學

建立服務端

自定義序列化器

首先在跟目錄下建立serializers.py

from rest_framework_simplejwt.serializers import TokenObtainPairSerializer
from django.contrib.auth import get_user_model
from rest_framework import serializers
from django.core.cache import cache

User = get_user_model()

class OTPRequestSerializer(serializers.Serializer):
    username = serializers.CharField()

class CustomTokenObtainPairSerializer(TokenObtainPairSerializer):
    username_field = "username"
    otp = serializers.CharField(max_length=6, write_only=True)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.fields["password"] = serializers.CharField(required=False, write_only=True)

    @classmethod
    def get_token(cls, user):
        token = super().get_token(user)
        token["username"] = user.username
        return token

    def validate(self, attrs):
        username = attrs.get("username")
        otp = attrs.get("otp")

        try:
            user = User.objects.get(username=username)
        except User.DoesNotExist:
            raise serializers.ValidationError("Invalid username")

        # 驗證OTP
        stored_otp = cache.get(f"otp_{username}")
        if not stored_otp or stored_otp != otp:
            raise serializers.ValidationError("Invalid or expired OTP")

        # OTP驗證成功,刪除存儲的 OTP
        cache.delete(f"otp_{username}")

        refresh = self.get_token(user)
        data = {}
        data["refresh"] = str(refresh)
        data["access"] = str(refresh.access_token)

        return data

  • OTPRequestSerializer很單純就是驗證用戶名是不是字符串形式
  • CustomTokenObtainPairSerializer
    • 需要繼承TokenObtainPairSerializer 方便我們調用現成的方法
    • __ini__中先把password取消掉required屬性,因為我們沒有要透過密碼驗證,因為不想傳輸敏感資訊
    • 驗證就是拿到用戶名與傳過來的OTP後,去redis找有沒有對應的資料
    • 如果用戶有對應的資料,刪除存在redis的OTP,並且調用get_token方法拿到Access Token與Refresh Token,最後返回Token

自定義認證JWT認證

在根目錄下建立custom_authentication.py

import logging
from rest_framework_simplejwt.authentication import JWTAuthentication
from django.conf import settings

logger = logging.getLogger(__name__)

class CustomJWTAuthentication(JWTAuthentication):
    def authenticate(self, request):
        logger.debug(f"Headers: {request.headers}")
        
        header = self.get_header(request)
        if header is None:
            logger.debug("No Auth header found, checking cookies")
            raw_token = request.COOKIES.get(settings.SIMPLE_JWT['AUTH_COOKIE'])
        else:
            logger.debug("Auth header found")
            raw_token = self.get_raw_token(header)
        
        if raw_token is None:
            logger.debug("No token found")
            return None

        logger.debug("Token found, validating")
        try:
            validated_token = self.get_validated_token(raw_token)
            user = self.get_user(validated_token)
            logger.debug(f"Token validated, user: {user}")
            return user, validated_token
        except Exception as e:
            logger.error(f"Token validation error: {str(e)}")
            return None

雖然我們繼承了JWTAuthentication,但是其實我們寫的認證方法跟原本方法基本一致

都是先拿到請求頭後,確認裡面有沒有對應的Access Token,以及用戶有沒有註冊

但是我為了確認跨域有沒有成功等因素,我添加比較多debug的過程方便調適

自定義相關視圖

因為需要引入的套件較多,可以直接看github

我們需要配置幾個視圖:

  • OTPRequestView
    • 拿到用戶名後,由前面自定義OTPRequestSerializer 序列化器驗資料
    • 建立OTP並且將其存入Redis
    • 發送郵件到用戶的信箱,使客戶端拿到OTP
def generate_otp():
    return "".join(random.choices(string.digits, k=6))

class OTPRequestView(APIView):
    def post(self, request):
        serializer = OTPRequestSerializer(data=request.data)
        if serializer.is_valid():
            username = serializer.validated_data["username"]
            try:
                user = User.objects.get(username=username)
            except User.DoesNotExist:
                return Response(
                    {"error": "User not found."}, status=status.HTTP_404_NOT_FOUND
                )

            otp = generate_otp()

            # 將OTP存儲在redis中,設置5分鐘過期
            cache.set(f"otp_{username}", otp, timeout=300)

            # 發送OTP郵件
            send_mail(
                "Your OTP for Authentication",
                f"Your OTP is: {otp}. It will expire in 5 minutes.",
                settings.EMAIL_HOST_USER,
                [user.email],
                fail_silently=False,
            )

            return Response(
                {"message": "OTP sent successfully."}, status=status.HTTP_200_OK
            )
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
  • CustomTokenObtainPairView
    • 繼承TokenObtainPairView然後改寫POST方法
    • 定義好我們自定義的序列化器
    • 序列化器驗證資料後,我們將Access Token直接放入響應中
    • 而Refresh Token則是放入set_cookie,並且設定httponly=True
class CustomTokenObtainPairView(TokenObtainPairView):
    serializer_class = CustomTokenObtainPairSerializer

    def post(self, request, *args, **kwargs):
        serializer = self.get_serializer(data=request.data)

        try:
            serializer.is_valid(raise_exception=True)
        except Exception as e:
            return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST)

        access_token = serializer.validated_data["access"]
        refresh_token = serializer.validated_data["refresh"]

        response = Response(
            {"access": access_token, "message": "Authentication successful"},
            status=status.HTTP_200_OK,
        )

        try:
            response.set_cookie(
                "refresh_token",
                refresh_token,
                httponly=True,
                samesite="None",  # 對於跨域請求,使用 'None'
                secure=False,  # 在生產環境應該設為 True
                domain=None,  # 如果需要,指定域名
                max_age=3600 * 24 * 14,  # 14 days
            )
        except Exception as e:
            print(f"Error: {str(e)}")
        return response

  • CustomTokenRefreshView
    • 這裡我們就直接自定義一個刷新Token的視圖
    • 拿到cookie中的Refresh Token後使用RefreshToken 方法做驗證
    • 驗證成功的話,返回對應的Tokens
from rest_framework_simplejwt.tokens import RefreshToken

class CustomTokenRefreshView(APIView):
    def post(self, request):
        refresh_token = request.COOKIES.get("refresh_token")
        
        if not refresh_token:
            return Response(
                {"error": "No refresh token provided"},
                status=status.HTTP_400_BAD_REQUEST,
            )

        try:
            token = RefreshToken(refresh_token)
            access_token = str(token.access_token)

            response = Response(
                {"access": access_token, "message": "Token refreshed successfully"}
            )

            
            new_refresh_token = str(token)
            response.set_cookie(
                "refresh_token",
                new_refresh_token,
                httponly=True,
                samesite="Lax",
                path="/",
                max_age=3600 * 24,  # 1天
            )

            return response

        except TokenError:
            return Response(
                {"error": "Invalid or expired token"},
                status=status.HTTP_401_UNAUTHORIZED,
            )
  • ProtectedView
    • 設置好對應的權限與我們自定義的JWT認證
    • 如果成功便返回對應的JSON
class ProtectedView(APIView):
    authentication_classes = [CustomJWTAuthentication]
    permission_classes = [IsAuthenticated]

    def get(self, request):
        print(f"User authenticated: {request.user.is_authenticated}")
        print(f"User: {request.user}")
        return Response(
            {
                "message": "You have accessed a protected view!",
                "user": (
                    request.user.username
                    if request.user.is_authenticated
                    else "Anonymous"
                ),
            }
        )

配置路由

from .views import (
    OTPRequestView,
    CustomTokenObtainPairView,
    ProtectedView,
    CustomTokenRefreshView,
    DebugView,
)

urlpatterns = [
    ...
    path("otp/", OTPRequestView.as_view(), name="otp_request"),
    path("token/", CustomTokenObtainPairView.as_view(), name="token_obtain_pair"),
    path("token/refresh/", CustomTokenRefreshView.as_view(), name="token_refresh"),
    path("protected/", ProtectedView.as_view(), name="protected_view"),
]

建立前端

  • 在專案資料夾下新增資料夾並且建立inde.html檔,以及對應的main.js

index.html

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>JWT Authentication Demo</title>
    <style>
      body {
        font-family: Arial, sans-serif;
        max-width: 600px;
        margin: 0 auto;
        padding: 20px;
      }
      input,
      button {
        margin: 10px 0;
        padding: 5px;
      }
      #output {
        margin-top: 20px;
        padding: 10px;
        border: 1px solid #ddd;
      }
    </style>
  </head>
  <body>
    <h1>JWT Authentication Demo</h1>
    <input type="text" id="username" placeholder="Enter username" />
    <input type="password" id="otp" placeholder="Enter OTP" />
    <button onclick="getOTP()">Get OTP</button>
    <button onclick="getToken()">Get Token</button>
    <button onclick="accessProtectedResource()">
      Access Protected Resource
    </button>
    <button onclick="refreshAccessToken()">Refresh Token</button>
    <button onclick="debugRequest()">Debug</button>
    <div id="output"></div>

    <script src="/drf_demo/demo_html/main.js"></script>
  </body>
</html>

https://ithelp.ithome.com.tw/upload/images/20241003/201618662r6AwSTEZ1.png

  • 在JS中設置變數
const API_BASE_URL = "http://localhost:8000";
const output = document.getElementById("output");
  • 我們分別有幾個按鈕處發不同方法,但是記得發送請求一定要包含credentials: "include" ,告訴瀏覽器在跨域請求中包含 cookies。默認情況下,跨域請求不會發送 cookies

  • getOTP方法對應OTPRequestView視圖

async function getOTP() {
  const username = document.getElementById("username").value;
  if (!username) {
    output.innerHTML = "Please enter username";
    return;
  }
  try {
    const response = await fetch(`${API_BASE_URL}/otp/`, {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ username: username }),
      credentials: "include",
    });

    const otpRespData = await response.json();

    output.innerHTML = "OTP send successfully";
  } catch (error) {
    output.innerHTML = `Error: ${error.message}`;
  }
}
  • getToken方法對應CustomTokenObtainPairView視圖
    • 拿到後端的響應後,將Access Token存到localStoreage
async function getToken() {
  const username = document.getElementById("username").value;
  const otp = document.getElementById("otp").value;

  if (!username || !otp) {
    output.innerHTML = "Please enter username and OTP";
    return;
  }

  try {
    const response = await fetch(`${API_BASE_URL}/token/`, {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ username: username, otp: otp }),
      credentials: "include",
    });

    const tokenRespData = await response.json();

    if (tokenRespData.access) {
      // Store the access token in localStorage
      localStorage.setItem("accessToken", tokenRespData.access);
      output.innerHTML =
        "Successfully got tokens. Access token stored in localStorage.";
    } else {
      output.innerHTML = "Access token not received from the server.";
    }

    // Note: The refresh token should be automatically stored as an HttpOnly cookie by the server
  } catch (error) {
    output.innerHTML = `Error: ${error.message}`;
  }
}

  • refreshAccessToken方法對應CustomTokenRefreshView視圖
    • 後端會檢查cookie中的Refresh Token是否合法
    • 拿到後端的響應後,將Access Token存到localStoreage
async function refreshAccessToken() {
  try {
    const response = await fetch(`${API_BASE_URL}/token/refresh/`, {
      method: "POST",
      credentials: "include", // 重要:允許發送和接收 cookies
      headers: {
        "Content-Type": "application/json",
        "X-Requested-With": "XMLHttpRequest",
      },
    });
    if (!response.ok) {
      throw new Error("Failed to refresh token");
    }
    const data = await response.json();
    if (data.access) {
      localStorage.setItem("accessToken", data.access);
       output.innerHTML = "Token refreshed successfully";
    } else {
      throw new Error("No access token received");
    }
  } catch (error) {
    console.error("Error refreshing token:", error);
    throw error;
  }
}
  • accessProtectedResource方法對應ProtectedView視圖
    • 先試著用localStorage的Access Token發起請求到我們有設置權限與認證限制的視圖
    • 如果失敗,調用refreshAccessToken更新Access Token然後再重新請求一次
async function accessProtectedResource() {
  async function fetchWithToken(token) {
    const response = await fetch(`${API_BASE_URL}/protected/`, {
      method: "GET",
      credentials: "include",
      headers: {
        Authorization: `Bearer ${token}`,
      },
    });
    if (!response.ok) {
      throw response;
    }
    return response.json();
  }

  try {
    // 首先嘗試使用存儲的 access token
    let accessToken = localStorage.getItem("accessToken");

    try {
      const data = await fetchWithToken(accessToken);
      output.innerHTML = `Protected Resource: ${JSON.stringify(data)}`;
    } catch (error) {
      if (error.status === 401) {
        // Token 可能已過期,嘗試刷新
        accessToken = await refreshAccessToken();
        // 使用新的 token 重試
        const data = await fetchWithToken(accessToken);
        output.innerHTML = `Protected Resource (after refresh): ${JSON.stringify(
          data
        )}`;
      } else {
        throw error;
      }
    }
  } catch (error) {
    console.error("Error accessing protected resource:", error);
    output.innerHTML = `Error: ${
      error.message || "Failed to access protected resource"
    }`;
  }
}

功能展示

既然程式碼都寫好了,就來試試看功能是否都沒有問題

  • 獲取OTP,點擊按鈕後過一段時間去看信箱就可以看到我們拿到OTP(這邊沒有做非同步所以會等比較久)

https://ithelp.ithome.com.tw/upload/images/20241003/20161866U4CcEZeJMu.png

Your OTP is: 936765. It will expire in 5 minutes.
  • 看看能不能拿到Tokens,可以看到localStoreage中確實有Access Token
  • 並且我們可以在控制台輸入document.cookie,確認不會被惡意腳本取得Refresh Token

https://ithelp.ithome.com.tw/upload/images/20241003/201618669xfNj9ZSX0.png

https://ithelp.ithome.com.tw/upload/images/20241003/20161866k1t5VH5fVI.png

  • 並且測試能調用受保護的視圖,也沒有問題,成功認證身份與權限

https://ithelp.ithome.com.tw/upload/images/20241003/20161866omJaLkRF6y.png


User authenticated: True
User: admin
  • 最後測試刷新Token也沒有問題

https://ithelp.ithome.com.tw/upload/images/20241003/20161866Xnc7eVCa7d.png

今日總結

我們根據上一篇對於JWT的探討,了解可能用哪些方式使用JWT是更適合的實踐方式

並且在這個章節中進行實現:

  1. 想取得Token同時避免輸入敏感訊息:改用OTP的方式實現
  2. Tokens都存在localStoreage容易受到腳本攻擊:將Refresh Token使用httponly cookie保護
  3. 因為只是在本地Demo因此沒有搭配HTTPS,生產環境務下務必要搭配使用

因為是每天日更沒有任何庫存,加上近期加班,每一天包含週末都是被寫文章還有寫demo佔滿

今天(應該說昨天)意外的遇到蠻多bug,的確我對於整個流程整握度還不夠高,導致最後還是中斷

了無法成功完賽,不過我還是會在10/14前把整個系列完成

有看到這裡的讀者,對我來說真的心懷感激XD 如果覺得這樣的內容跟其他網路上千篇一律介紹DRF JWT的文章有所不同,對你有啟發或是任何想法的話,給我一個LIKE就是最大的鼓勵!


上一篇
Django REST framework: JWT與TokenAuthentication的全面比較
下一篇
Django REST framework: 打造高效 API-流量限制、分頁與過濾
系列文
Django 2024: 從入門到SaaS實戰31
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言