本文详细介绍了JWT(JSON Web Token)的工作原理、组成部分、生成和验证过程,以及在不同后端框架中的应用和安全性注意事项。
JWT简介什么是JWT
JWT(JSON Web Token)是一种开放标准(RFC 7519),它定义了一种紧凑且自包含的方式,用于在各方之间安全地传输信息。JWT主要应用于身份验证和信息交换场景,其设计初衷是提供一种能够在客户端和服务端之间安全地传输信息的方法,且这种方式不需要依赖于服务器端的会话机制。JWT由三个部分组成:头部(Header)、负载(Payload)和签名(Signature)。
JWT的优势在于它是一种无状态的认证机制,这意味着服务器不需要存储任何会话信息。JWT的使用场景包括但不限于:
- 身份验证:用户登录后,服务器生成一个JWT并返回给客户端,客户端在后续的请求中携带这个JWT。
- 信息交换:JWT可以携带用户信息,适用于需要跨域请求的前端应用。
- 单点登录:不同应用之间可以共享JWT,实现单点登录。
- 授权:JWT可以携带权限信息,用于控制用户访问特定资源的权限。
例如,在一个电子商务网站中,用户登录后,服务器会生成一个JWT并返回给客户端。客户端在后续的请求中,如查看购物车或下订单时,会携带这个JWT来验证用户的身份。这样不仅简化了会话管理,还提高了系统的可伸缩性。
JWT的工作原理
JWT的工作原理可以分为三个步骤:生成、传输和验证。
- 生成:客户端发送用户名和密码到服务器,服务器验证用户信息后生成一个JWT。
- 传输:服务器将生成的JWT返回给客户端,客户端在后续的请求中将JWT附加在HTTP请求的头部中,格式如下:
Authorization: Bearer <token>
- 验证:服务器收到JWT后,通过验证密钥和签名来确认JWT的有效性。如果JWT有效,服务器将处理客户端的请求。
Header: 头部结构
JWT的头部包含两部分信息:令牌的类型(typ
)和所使用的签名算法(alg
)。头部通常采用JSON格式,然后通过Base64编码进行转换。
示例的头部结构如下:
{
"typ": "JWT",
"alg": "HS256"
}
Payload: 负载结构
负载部分包含令牌的声明(claims)。这些声明可以分为三类:注册声明(registered claims)、公共声明(public claims)和私有声明(private claims)。注册声明是JWT标准中定义的声明,如iss
(颁发者)、sub
(主题)、aud
(受众)、exp
(过期时间)。
示例的负载结构如下:
{
"sub": "1234567890",
"name": "John Doe",
"iat": 1516239022
}
Signature: 签名生成
签名部分是通过将头部和负载部分使用指定的算法进行编码,并加上一个密钥(通常是服务器端的密钥)进行哈希运算得到的。常用的哈希算法有HS256(HMAC SHA-256)、RS256(RSA SHA-256)等。
生成签名的过程如下:
- 将头部和负载进行Base64编码。
- 将编码后的头部和负载连接起来,中间用点号(
.
)分隔。 - 使用密钥和指定的哈希算法计算签名。
示例的签名生成代码如下:
import base64
import json
import hmac
import hashlib
def generate_signature(header, payload, secret):
encoded_header = base64.urlsafe_b64encode(json.dumps(header).encode('utf-8')).rstrip(b'=')
encoded_payload = base64.urlsafe_b64encode(json.dumps(payload).encode('utf-8')).rstrip(b'=')
segment = encoded_header + b"." + encoded_payload
signature = hmac.new(secret, segment, hashlib.sha256).digest()
return base64.urlsafe_b64encode(signature).rstrip(b'=')
# 示例
header = {"typ": "JWT", "alg": "HS256"}
payload = {"sub": "1234567890", "name": "John Doe", "iat": 1516239022}
secret = b"secret-key"
print(generate_signature(header, payload, secret))
实现JWT认证的步骤
生成JWT
生成JWT的过程可以分为以下几个步骤:
- 创建头部。
- 创建负载。
- 生成签名。
- 将头部、负载和签名用点号(
.
)连接起来。
示例的生成JWT代码如下:
import base64
import json
import hmac
import hashlib
def generate_jwt(header, payload, secret):
encoded_header = base64.urlsafe_b64encode(json.dumps(header).encode('utf-8')).rstrip(b'=')
encoded_payload = base64.urlsafe_b64encode(json.dumps(payload).encode('utf-8')).rstrip(b'=')
segment = encoded_header + b"." + encoded_payload
signature = hmac.new(secret, segment, hashlib.sha256).digest()
encoded_signature = base64.urlsafe_b64encode(signature).rstrip(b'=')
jwt = encoded_header + b"." + encoded_payload + b"." + encoded_signature
return jwt.decode('utf-8')
# 示例
header = {"typ": "JWT", "alg": "HS256"}
payload = {"sub": "1234567890", "name": "John Doe", "iat": 1516239022}
secret = b"secret-key"
print(generate_jwt(header, payload, secret))
验证JWT
验证JWT的过程也分为几个步骤:
- 解析JWT,分别提取头部、负载和签名。
- 计算签名。
- 比较计算后的签名和JWT中的签名是否一致。
示例的验证JWT代码如下:
def verify_jwt(jwt, secret):
parts = jwt.split('.')
encoded_header = parts[0]
encoded_payload = parts[1]
encoded_signature = parts[2]
header = json.loads(base64.urlsafe_b64decode(encoded_header + b'=='))
payload = json.loads(base64.urlsafe_b64decode(encoded_payload + b'=='))
segment = base64.urlsafe_b64decode(encoded_header + b"." + encoded_payload)
signature = base64.urlsafe_b64decode(encoded_signature + b'==')
calculated_signature = hmac.new(secret, segment, hashlib.sha256).digest()
return hmac.compare_digest(signature, calculated_signature)
# 示例
jwt = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.iJr9oPQfV8yW1W6r9jw2tJcE09wE2Uz6BnJ1MaBc80s'
secret = b"secret-key"
print(verify_jwt(jwt, secret))
解析JWT
解析JWT主要是将JWT字符串分割为头部、负载和签名部分,并对它们进行Base64解码。
示例的解析JWT代码如下:
def parse_jwt(jwt):
parts = jwt.split('.')
encoded_header = parts[0]
encoded_payload = parts[1]
encoded_signature = parts[2]
header = json.loads(base64.urlsafe_b64decode(encoded_header + b'=='))
payload = json.loads(base64.urlsafe_b64decode(encoded_payload + b'=='))
signature = base64.urlsafe_b64decode(encoded_signature + b'==')
return header, payload, signature
# 示例
jwt = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.iJr9oPQfV8yW1W6r9jw2tJcE09wE2Uz6BnJ1MaBc80s'
header, payload, signature = parse_jwt(jwt)
print("Header:", header)
print("Payload:", payload)
print("Signature:", signature)
JWT在不同后端框架中的应用
在Node.js中的使用
Node.js中的JWT实现可以通过jsonwebtoken
库来完成。
安装jsonwebtoken
首先,需要通过npm安装jsonwebtoken
库,执行以下命令:
npm install jsonwebtoken
生成JWT
示例代码如下:
const jwt = require('jsonwebtoken');
const secret = 'mysecretkey';
const payload = {
sub: '1234567890',
name: 'John Doe',
iat: Math.floor(Date.now() / 1000)
};
const token = jwt.sign(payload, secret, { algorithm: 'HS256' });
console.log(token);
验证JWT
示例代码如下:
const jwt = require('jsonwebtoken');
const secret = 'mysecretkey';
const token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.iJr9oPQfV8yW1W6r9jw2tJcE09wE2Uz6BnJ1MaBc80s';
try {
const decoded = jwt.verify(token, secret);
console.log(decoded);
} catch (err) {
console.log(err.message);
}
在Spring Boot中的使用
Spring Boot可以通过spring-boot-starter-security
和spring-boot-starter-oauth2-jwt
依赖来实现JWT认证。
添加依赖
在pom.xml
中添加如下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-oauth2-jwt</artifactId>
</dependency>
配置JWT认证
创建一个WebSecurityConfig
类,并配置JWT认证。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter;
@Configuration
@EnableWebSecurity
public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http.cors().and().csrf().disable()
.authorizeRequests()
.antMatchers("/api/auth/**").permitAll()
.anyRequest().authenticated()
.and()
.sessionManagement()
.sessionCreationPolicy(SessionCreationPolicy.STATELESS)
.and()
.addFilterBefore(new JwtAuthenticationFilter(), UsernamePasswordAuthenticationFilter.class);
}
}
创建JWT认证过滤器
创建一个JwtAuthenticationFilter
类,用于从请求中提取JWT并验证。
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.web.authentication.WebAuthenticationDetailsSource;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
@Component
public class JwtAuthenticationFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
final String authorizationHeader = request.getHeader("Authorization");
String username = null;
String jwt = null;
if (authorizationHeader != null && authorizationHeader.startsWith("Bearer ")) {
jwt = authorizationHeader.substring(7);
username = jwtService.extractUsername(jwt);
}
if (username != null && SecurityContextHolder.getContext().getAuthentication() == null) {
UserDetails userDetails = userDetailsService.loadUserByUsername(username);
if (jwtService.validateToken(jwt, userDetails)) {
UsernamePasswordAuthenticationToken authentication = new UsernamePasswordAuthenticationToken(
userDetails, null, userDetails.getAuthorities()
);
authentication.setDetails(new WebAuthenticationDetailsSource().buildDetails(request));
SecurityContextHolder.getContext().setAuthentication(authentication);
}
}
filterChain.doFilter(request, response);
}
}
创建JWT服务
创建一个JwtService
类,用于生成和验证JWT。
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@Service
public class JwtService {
private static final String SECRET = "mysecretkey";
private static final long EXPIRATION = 3600000;
public String generateToken(UserDetails userDetails) {
Map<String, Object> claims = new HashMap<>();
return doGenerateToken(claims, userDetails.getUsername());
}
private String doGenerateToken(Map<String, Object> claims, String subject) {
return Jwts.builder()
.setClaims(claims)
.setSubject(subject)
.setIssuedAt(new Date(System.currentTimeMillis()))
.setExpiration(new Date(System.currentTimeMillis() + EXPIRATION))
.signWith(SignatureAlgorithm.HS512, SECRET)
.compact();
}
public Boolean validateToken(String token, UserDetails userDetails) {
final String username = extractUsername(token);
return (username.equals(userDetails.getUsername()) && !isTokenExpired(token));
}
private Claims extractClaims(String token) {
return Jwts.parser().setSigningKey(SECRET).parseClaimsJws(token).getBody();
}
public String extractUsername(String token) {
return extractClaims(token).getSubject();
}
private Boolean isTokenExpired(String token) {
final Date expiration = extractClaims(token).getExpiration();
return expiration.before(new Date());
}
}
在Django中的使用
在Django中可以使用Django-rest-framework-jwt
库来实现JWT认证。
安装
首先,通过pip安装Django-rest-framework-jwt
库,执行以下命令:
pip install djangorestframework-jwt
配置JWT认证
在settings.py
中添加JWT认证配置。
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework_jwt.authentication.JSONWebTokenAuthentication',
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication',
),
}
JWT_AUTH = {
'JWT_SECRET_KEY': 'mysecretkey',
'JWT_ALGORITHM': 'HS256',
'JWT_ALLOW_REFRESH': True,
'JWT_EXPIRATION_DELTA': datetime.timedelta(seconds=3600),
}
创建JWT视图
创建一个JWT视图来处理JWT的生成和刷新。
from rest_framework_jwt.views import obtain_jwt_token, refresh_jwt_token, verify_jwt_token
from django.urls import path
urlpatterns = [
path('api-token-auth/', obtain_jwt_token),
path('api-token-refresh/', refresh_jwt_token),
path('api-token-verify/', verify_jwt_token),
]
JWT的安全性注意事项
密钥的安全管理
JWT的安全性很大程度上依赖于密钥的安全管理。密钥应该存储在安全的地方,如环境变量或加密存储中,避免硬编码在代码中。服务器启动时从安全的存储中加载密钥。
示例:加载密钥
import os
secret_key = os.getenv('JWT_SECRET_KEY')
签名算法的选择
不同的签名算法有不同的安全性。常用的算法有HS256(HMAC with SHA-256)和RS256(RSA with SHA-256)。HS256适合对称密钥的情况,而RS256适合公钥加密的情况。
示例:选择签名算法
import jwt
header = {"typ": "JWT", "alg": "HS256"}
payload = {"sub": "1234567890", "name": "John Doe", "iat": 1516239022}
secret = 'mysecretkey'
token = jwt.encode(payload, secret, algorithm='HS256')
过期时间和刷新机制
JWT的有效期通常是有限的,过期后需要重新获取新的JWT。为了简化过期的处理,可以提供一个刷新机制,即在JWT过期后,客户端可以使用过期的JWT来获取一个新的JWT。
示例:刷新JWT
import jwt
old_token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.iJr9oPQfV8yW1W6r9jw2tJcE09wE2Uz6BnJ1MaBc80s'
refresh_token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJqdGkiOiJkZGZkMmYzYiJ9.iJr9oPQfV8yW1W6r9jw2tJcE09wE2Uz6BnJ1MaBc80s'
secret = 'mysecretkey'
try:
decoded = jwt.decode(old_token, secret, algorithms=['HS256'])
print(decoded)
new_token = jwt.encode(decoded, secret, algorithm='HS256')
print(new_token)
except jwt.ExpiredSignatureError:
print('Token expired, use refresh token')
new_token = jwt.encode(jwt.decode(refresh_token, secret, algorithms=['HS256']), secret, algorithm='HS256')
print(new_token)
实战演练
创建一个简单的JWT认证系统
创建一个简单的JWT认证系统需要完成以下几个步骤:
- 用户注册:用户注册时生成用户信息,并将用户信息保存在数据库中。
- 用户登录:用户登录时验证用户信息,生成JWT并返回给客户端。
- 访问受保护的资源:客户端在访问受保护资源时携带JWT进行请求,服务器验证JWT后返回资源。
示例:用户注册
import jwt
import datetime
users = {}
def register(username, password):
if username in users:
return "User already exists"
users[username] = password
return "Registration successful"
# 示例注册
register('john', 'password')
示例:用户登录
def login(username, password):
if username in users and users[username] == password:
payload = {
'sub': username,
'iat': datetime.datetime.utcnow(),
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
token = jwt.encode(payload, 'mysecretkey', algorithm='HS256')
return token
return "Invalid credentials"
# 示例登录
token = login('john', 'password')
print(token)
示例:访问受保护资源
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
class ProtectedResourceView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
return Response({"message": "Welcome, authenticated user!"})
# 示例访问受保护资源
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
class AuthenticatedResourceView(APIView):
authentication_classes = [JSONWebTokenAuthentication]
permission_classes = [IsAuthenticated]
def get(self, request):
return Response({"message": "Welcome, authenticated user!"})
测试JWT的生成和验证
测试JWT的生成和验证可以通过编写单元测试来完成。确保生成的JWT格式正确,并且能够正确验证。
示例:测试JWT生成和验证
import unittest
import jwt
import datetime
class TestJwt(unittest.TestCase):
def test_jwt_generation(self):
payload = {
'sub': '1234567890',
'name': 'John Doe',
'iat': datetime.datetime.utcnow()
}
token = jwt.encode(payload, 'mysecretkey', algorithm='HS256')
self.assertTrue(type(token) == bytes)
def test_jwt_verification(self):
token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.iJr9oPQfV8yW1W6r9jw2tJcE09wE2Uz6BnJ1MaBc80s'
try:
decoded = jwt.decode(token, 'mysecretkey', algorithms=['HS256'])
self.assertTrue(decoded)
except jwt.ExpiredSignatureError:
self.fail("Token expired")
except jwt.InvalidTokenError:
self.fail("Invalid token")
if __name__ == '__main__':
unittest.main()
处理JWT的常见问题
在使用JWT的过程中可能会遇到一些常见问题,如Token过期、Token被篡改等。需要确保JWT的生成和验证过程安全可靠。
示例:处理过期Token
在验证JWT时,如果Token过期,需要重新生成一个新的Token。
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
from rest_framework.exceptions import AuthenticationFailed
class CustomJWTAuthentication(JSONWebTokenAuthentication):
def authenticate_credentials(self, payload):
try:
if payload['exp'] < datetime.datetime.utcnow():
raise AuthenticationFailed('Token expired')
return payload
except jwt.ExpiredSignatureError:
raise AuthenticationFailed('Token expired')
except jwt.InvalidTokenError:
raise AuthenticationFailed('Invalid token')
# 示例使用自定义认证
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
class ProtectedResourceView(APIView):
authentication_classes = [CustomJWTAuthentication]
permission_classes = [IsAuthenticated]
def get(self, request):
return Response({"message": "Welcome, authenticated user!"})
示例:处理被篡改的Token
确保JWT的签名算法安全。如果Token被篡改,签名验证会失败。
import jwt
def validate_jwt(token):
try:
decoded = jwt.decode(token, 'mysecretkey', algorithms=['HS256'])
return decoded
except jwt.InvalidTokenError:
return None
# 示例验证Token
token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.iJr9oPQfV8yW1W6r9jw2tJcE09wE2Uz6BnJ1MaBc80s'
decoded = validate_jwt(token)
print(decoded)
共同學(xué)習(xí),寫下你的評論
評論加載中...
作者其他優(yōu)質(zhì)文章