60 秒系统安全认证明战

点击python编程从入门到实践置顶 公众号重磅 python入门资料,第一时间送达python

 

 

读完须要 18分钟算法

速读仅需 6 分钟数据库

/ python 生产实战 60 秒系统安全认证明战 /编程

上节主要讲解了目前主流的认证规范/协议以及对 JWT 进行了深刻的研究和分析并在最后给出了在生产环境中如何去生成一个有效的 Token,基于 Python 语言那在生产环境中是如何进行有效的安全认证的呢?上节咱们也基于 JWT 的 Token 的认证过程进行了登录认证、请求认证的理论分析以及用图示的方式给出了数据的流向,本节咱们再带你们从代码层面走一次流程,一方面加深你们对上节理论部分的理解,另外一方面也是给你们在作工程的过程当中提供一套"模版"快速应用在项目中后端

1api

准备工具安全

1.1微信

密码安全闭包

为了数据安全,咱们利用 PassLib 对入库的用户密码进行加密处理,推荐的加密算法是"Bcrypt"。咱们须要安装依赖包:并发

pip install passlibpip install bcrypt

简单的介绍一下这两个库:
1.passlib 是 python2&3 的密码散列库,它提供超过 30 种密码散列算法的跨平台实现,以及做为管理现有密码哈希的框架。它被设计成有用的 对于范围普遍的任务,从验证/etc/shadow 中找到的散列到 为多用户应用程序提供全强度密码哈希。2.bcrypt 模块是一个用于在 Python 中生成强哈希值的库。
注意:若对于以上两个库的详细内容比较感兴趣的小伙伴能够自行到 python 官网查看相关资料,咱们本节的核心是作整个流程的实现这个具体的库内容就先不作详细介绍了。

2

用户登录流程

用户经过终端发送 username 和 password 到后端。后端收到数据后,进行一下操做:1.用户信息校验:查询当前系统是否存在该用户,以及密码是否正确
2.若是用户存在,则生成 JWT token 并返回,JWT payload 中能够携带自定义数据





























# -*- encoding: utf-8 -*-from datetime import datetime, timedeltafrom typing import Optionalfrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormimport jwtfrom pydantic import BaseModelfrom passlib.context import CryptContext# to get a string like this run:# openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 25# 模拟数据库数据fake_users_db = { "hiashiniu": { "username": "hiashiniu", "full_name": "HaiShiNiu", "email": "haishiniu@bbs.com", "hashed_password": "$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC", "disabled": False, }}class Token(BaseModel): access_token: str token_type: strclass User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = Noneclass UserInDB(User): hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")app = FastAPI()# 校验密码def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)# 密码哈希def get_password_hash(password): # pwd_context.hash('123456789') # '$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC' return pwd_context.hash(password)# 模拟从数据库读取用户信息def get_user(db, username: str): user_dict = db.get(username,None) if user_dict: return UserInDB(**user_dict)# 用户信息校验:username和password分别校验def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) print(user) if not user: return False if not verify_password(password, user.hashed_password): return False return user# 生成token,带有过时时间def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() # 有一个实效性 默认的时间25分钟 if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt@app.post("/token", response_model=Token)async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): # 首先校验用户信息 print(form_data.password) print(form_data.username) user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # 生成并返回token信息 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}

 

3

请求数据流

终端获取到 token 信息后,必须在后续请求的 Authorization 头信息中带有 Bearer token,才能被容许访问。咱们添加一个校验函数,对请求的合法性进行校验,读取 token 内容解析并进行验证。





# -*- encoding: utf-8 -*-async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except jwt.PyJWTError as ex: print(ex) raise credentials_exception user = get_user(fake_users_db, username=username) if user is None: raise credentials_exception return user@app.get("/users/me/", response_model=User)async def read_users_me(current_user: User = Depends(get_current_user)): return current_user

 

4

效果展现

本小节咱们整合上述代码进行一下效果展现


































# -*- encoding: utf-8 -*-from datetime import datetime, timedeltafrom typing import Optionalfrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormimport jwtfrom pydantic import BaseModelfrom passlib.context import CryptContext# to get a string like this run:# openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 25# 模拟数据库数据fake_users_db = { "hiashiniu": { "username": "hiashiniu", "full_name": "HaiShiNiu", "email": "haishiniu@bbs.com", "hashed_password": "$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC", "disabled": False, }}class Token(BaseModel): access_token: str token_type: strclass User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = Noneclass UserInDB(User): hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")app = FastAPI()# 校验密码def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)# 密码哈希def get_password_hash(password): # pwd_context.hash('123456789') # '$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC' return pwd_context.hash(password)# 模拟从数据库读取用户信息def get_user(db, username: str): user_dict = db.get(username,None) if user_dict: return UserInDB(**user_dict)# 用户信息校验:username和password分别校验def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user# 生成token,带有过时时间def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() # 有一个实效性 默认的时间25分钟 if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt@app.post("/token", response_model=Token)async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): # 首先校验用户信息 print(form_data.password) print(form_data.username) user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # 生成并返回token信息 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except jwt.PyJWTError as ex: print(ex) raise credentials_exception user = get_user(fake_users_db, username=username) if user is None: raise credentials_exception return user@app.get("/users/me/", response_model=User)async def read_users_me(current_user: User = Depends(get_current_user)): return current_user

1.使用命令启动项目:

uvicorn main:app --reload 

2.使用 Postman 模拟登录获取验证 Token

3.使用 Postman 模拟携带 Token 获取正常逻辑信息

5

 

   

总结

本节核心:从代码层面实战了 登录认证、请求认证的数据流转

 

原创不易,只愿能帮助那些须要这些内容的同行或刚入行的小伙伴,你的每次 点赞分享 都是我继续创做下去的动力,我但愿能在推广 python 技术的道路上尽我一份力量,欢迎在评论区向我提问,我都会一一解答,记得一键三连支持一下哦!

 

加入python学习交流微信群,请后台回复「入群

 

 

往期推荐

python生产实战 python 闭包之庖丁解牛篇

大型fastapi项目实战 靠 python 中间件解决方案涨薪了

大型fastapi项目实战 高并发请求神器之aiohttp(下)

大型fastapi项目实战 高并发请求神器之aiohttp(上) [建议收藏]

 

 

本文分享自微信公众号 - python编程军火库(PythonCoder1024)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索