|
1 | 1 | """Basic JSON Web Token implementation.""" |
2 | 2 | import json |
3 | | -from json import JSONDecodeError |
4 | 3 | import logging |
5 | 4 | import time |
| 5 | +import uuid |
| 6 | +from json import JSONDecodeError |
6 | 7 | from typing import Dict |
7 | 8 | from typing import List |
8 | 9 | from typing import MutableMapping |
9 | 10 | from typing import Optional |
10 | | -import uuid |
11 | 11 |
|
12 | 12 | from .exception import HeaderError |
13 | 13 | from .exception import VerificationError |
@@ -81,24 +81,24 @@ class JWT: |
81 | 81 | """The basic JSON Web Token class.""" |
82 | 82 |
|
83 | 83 | def __init__( |
84 | | - self, |
85 | | - key_jar=None, |
86 | | - iss: str="", |
87 | | - lifetime: int = 0, |
88 | | - sign: bool = True, |
89 | | - sign_alg: str = "RS256", |
90 | | - encrypt: bool = False, |
91 | | - enc_enc: str = "A128GCM", |
92 | | - enc_alg: str = "RSA-OAEP-256", |
93 | | - msg_cls: Optional[MutableMapping] = None, |
94 | | - iss2msg_cls: Dict[str, str] = None, |
95 | | - skew: int = 15, |
96 | | - allowed_sign_algs: List[str] = None, |
97 | | - allowed_enc_algs: List[str] = None, |
98 | | - allowed_enc_encs: List[str] = None, |
99 | | - allowed_max_lifetime: int = None, |
100 | | - zip: str = "", |
101 | | - typ2msg_cls: Dict = None |
| 84 | + self, |
| 85 | + key_jar=None, |
| 86 | + iss: str = "", |
| 87 | + lifetime: int = 0, |
| 88 | + sign: bool = True, |
| 89 | + sign_alg: str = "RS256", |
| 90 | + encrypt: bool = False, |
| 91 | + enc_enc: str = "A128GCM", |
| 92 | + enc_alg: str = "RSA-OAEP-256", |
| 93 | + msg_cls: Optional[MutableMapping] = None, |
| 94 | + iss2msg_cls: Dict[str, str] = None, |
| 95 | + skew: int = 15, |
| 96 | + allowed_sign_algs: List[str] = None, |
| 97 | + allowed_enc_algs: List[str] = None, |
| 98 | + allowed_enc_encs: List[str] = None, |
| 99 | + allowed_max_lifetime: int = None, |
| 100 | + zip: str = "", |
| 101 | + typ2msg_cls: Dict = None, |
102 | 102 | ): |
103 | 103 | self.key_jar = key_jar # KeyJar instance |
104 | 104 | self.iss = iss # My identifier |
@@ -216,15 +216,15 @@ def message(self, signing_key, **kwargs): |
216 | 216 | return json.dumps(kwargs) |
217 | 217 |
|
218 | 218 | def pack( |
219 | | - self, |
220 | | - payload: Optional[dict] = None, |
221 | | - kid: Optional[str] = "", |
222 | | - issuer_id: Optional[str] = "", |
223 | | - recv: Optional[str] = "", |
224 | | - aud: Optional[str] = None, |
225 | | - iat: Optional[int] = None, |
226 | | - jws_headers: Dict[str, str] = None, |
227 | | - **kwargs |
| 219 | + self, |
| 220 | + payload: Optional[dict] = None, |
| 221 | + kid: Optional[str] = "", |
| 222 | + issuer_id: Optional[str] = "", |
| 223 | + recv: Optional[str] = "", |
| 224 | + aud: Optional[str] = None, |
| 225 | + iat: Optional[int] = None, |
| 226 | + jws_headers: Dict[str, str] = None, |
| 227 | + **kwargs |
228 | 228 | ) -> str: |
229 | 229 | """ |
230 | 230 |
|
@@ -399,8 +399,8 @@ def unpack(self, token, timestamp=None): |
399 | 399 | # try to find an issuer specific message class |
400 | 400 | if "iss" in _info: |
401 | 401 | _msg_cls = self.iss2msg_cls.get(_info["iss"]) |
402 | | - if not _msg_cls and _jws_header and 'typ' in _jws_header: |
403 | | - _msg_cls = self.typ2msg_cls.get(_jws_header['typ']) |
| 402 | + if not _msg_cls and _jws_header and "typ" in _jws_header: |
| 403 | + _msg_cls = self.typ2msg_cls.get(_jws_header["typ"]) |
404 | 404 |
|
405 | 405 | timestamp = timestamp or utc_time_sans_frac() |
406 | 406 |
|
|
0 commit comments