时间:2022-08-06 19:24:01 | 来源:网站运营
时间:2022-08-06 19:24:01 来源:网站运营
Author: 廖长江/etc/ssh/sshd_config
加入 GatewayPorts yes
ssh -R 0.0.0.0:80:localhost:8080 user@server_host
code
def get_wx_authorize_url(appid : str, state: str = None): if state is None: state = "".join([random.choice(string.ascii_letters + string.digits) for _ in range(20)]) redirect_url = 'your callback url' # 回调链接,在这里面进行用户信息入库的操作 response_type = 'code' scope = 'snsapi_userinfo' wx_url = f"https://open.weixin.qq.com/connect/oauth2/authorize?appid={appid}&redirect_uri={redirect_url}&response_type={response_type}&scope={scope}&state={state}#wechat_redirect" return wx_url
code
换取 access_token
和 openid
def request_access_token(appid : str, secret : str, code: str): secret = settings.WX_SECRET api = f"https://api.weixin.qq.com/sns/oauth2/access_token?appid={appid}&secret={secret}&code={code}&grant_type=authorization_code" r = requests.get(api) return r.json()
access_token
换取 用户信息def request_userinfo(access_token: str, openid: str): api = f"https://api.weixin.qq.com/sns/userinfo?access_token={access_token}&openid={openid}&lang=zh_CN" r = requests.get(api) return r.json()
ISO-8859-1
,需要转换成 utf-8
。def convert_string_encoding(s: str, from_encoding: str, to_encoding: str) -> str: """先根据 from_encoding 转换成bytes,然后在 decode 为 to_encoding 的字符串 """ return bytes(s, encoding=from_encoding).decode(to_encoding)nickname = convert_string_encoding(resp['nickname'], 'ISO-8859-1', 'utf-8')
state
为 key
。from app.models import WXUser, RedirectUrlfrom utils import get_wx_authorize_url, get_random_stringfrom django.shortcuts import redirectdef login_required(func): def wrapper(request, *args, **kwargs): openid = request.openid try: user = WXUser.objects.get(openid=openid) request.wxuser = user except WXUser.DoesNotExist: state = get_random_string() redirect_url = get_wx_authorize_url(state=state) # 存储跳转链接 try: r = RedirectUrl.objects.get(state=state) except RedirectUrl.DoesNotExist: r = RedirectUrl() r.state = state origin_url = request.get_raw_uri() r.url = origin_url r.save() return redirect(redirect_url) return func(request, *args, **kwargs) return wrapper
然后在我们设置的回调接口(会带上 code
和 state
)里面,就可以通过 state
从数据库里获取原链接。class RedirectUrl(BaseModel): state = models.TextField(unique=True) url = models.TextField()
session
,那可以讲另一个故事了,这里不赘述了。HTTP_AUTHORIZATION
(请求头中的 Authorization
字段)或者 key 为jwttoken
的 cookie
中抽取出 jwt token
,从中解析出 openid
,添加到 request
变量中,之后就可以在后续的 views里面通过 request.openid
直接获取 openid
了。def jwt_decode(token: Union[str, bytes]) -> tuple: """ :param token : 可以是 bytes 也可以是 str,如果是 str,会先 encode 转成 bytes :return: 第一个参数为 payload,第二个参数为异常类型 """ if isinstance(token, str): token = token.encode() secret = settings.JWT_SECRET try: return jwt.decode(token, secret, algorithms=["HS256"]), None except Exception as e: # 统一捕捉异常: # jwt.exceptions.DecodeError # jwt.exceptions.InvalidSignatureError # jwt.exceptions.ExpiredSignatureError return None, eclass JWTAuthMiddleware(object): """ 小程序认证中间件 """ def __init__(self, get_response=None): self.get_response = get_response def __call__(self, request, *args, **kws): token = self.get_authorization_header(request) payload, error = jwt_decode(token) if not error: openid = payload['openid'] request.openid = openid else: request.openid = None response = self.get_response(request, *args, **kws) return response def get_authorization_header(self, request): """ 从 AUTHORIZATION 请求头或者cookie 中获取 jwt code cookie 的 jwt code 的 key 为 jwtcode :param request: :return: rawtoken """ auth_header = request.META.get('HTTP_AUTHORIZATION', '') cookie = request.COOKIES rawtoken = None if auth_header != "": try: rawtoken = auth_header.split(" ")[1] except IndexError as e: pass if 'jwttoken' in cookie: rawtoken = cookie['jwttoken'] return rawtoken
欢迎关注个人微信公众号:全栈不存在的。个人博客:lcj.im
关键词:授权,登陆,实战