@@ -130,3 +130,161 @@ def get_raw_user_info(self, token: str):
130
130
131
131
def _transform_user_info (self , raw_info : dict ) -> OAuthUserInfo :
132
132
return OAuthUserInfo (id = str (raw_info ["sub" ]), name = "" , email = raw_info ["email" ])
133
+
134
+
135
+ class DingTalkOAuth (OAuth ):
136
+ """DingTalk OAuth implementation"""
137
+
138
+ _AUTH_URL = "https://login.dingtalk.com/oauth2/auth"
139
+ _TOKEN_URL = "https://api.dingtalk.com/v1.0/oauth2/userAccessToken"
140
+ _USER_INFO_URL = "https://api.dingtalk.com/v1.0/contact/users/me"
141
+
142
+ def get_authorization_url (self , invite_token : str | None = None ):
143
+ params = {
144
+ "client_id" : self .client_id ,
145
+ "redirect_uri" : self .redirect_uri ,
146
+ "scope" : "openid" ,
147
+ "response_type" : "code" ,
148
+ "prompt" : "consent" ,
149
+ }
150
+ if invite_token :
151
+ params ["state" ] = invite_token
152
+ return f"{ self ._AUTH_URL } ?{ urllib .parse .urlencode (params )} "
153
+
154
+ def get_access_token (self , code : str ):
155
+ headers = {"Content-Type" : "application/json" , "Accept" : "application/json" }
156
+ data = {
157
+ "clientId" : self .client_id ,
158
+ "clientSecret" : self .client_secret ,
159
+ "code" : code ,
160
+ "grantType" : "authorization_code" ,
161
+ }
162
+ response = httpx .post (self ._TOKEN_URL , json = data , headers = headers )
163
+ response_json = response .json ()
164
+ access_token = response_json .get ("accessToken" )
165
+
166
+ if not access_token :
167
+ raise ValueError (f"Error in DingTalk OAuth: { response_json } " )
168
+
169
+ return access_token
170
+
171
+ def get_raw_user_info (self , token : str ):
172
+ headers = {"x-acs-dingtalk-access-token" : token }
173
+ response = httpx .get (self ._USER_INFO_URL , headers = headers )
174
+ response .raise_for_status ()
175
+ return response .json ()
176
+
177
+ def _transform_user_info (self , raw_info : dict ) -> OAuthUserInfo :
178
+ user_id = raw_info .get ("unionId" , "" )
179
+ name = raw_info .get ("nick" , "" )
180
+ email = raw_info .get ("email" , f"{ user_id } @dingtalk.com" )
181
+ return OAuthUserInfo (id = user_id , name = name , email = email )
182
+
183
+
184
+ class MicrosoftOAuth (OAuth ):
185
+ """Microsoft OAuth implementation"""
186
+
187
+ _AUTH_URL = "https://login.microsoftonline.com/common/oauth2/v2.0/authorize"
188
+ _TOKEN_URL = "https://login.microsoftonline.com/common/oauth2/v2.0/token"
189
+ _USER_INFO_URL = "https://graph.microsoft.com/v1.0/me"
190
+
191
+ def get_authorization_url (self , invite_token : str | None = None ):
192
+ params = {
193
+ "client_id" : self .client_id ,
194
+ "redirect_uri" : self .redirect_uri ,
195
+ "response_type" : "code" ,
196
+ "scope" : "user.read" ,
197
+ "response_mode" : "query" ,
198
+ }
199
+ if invite_token :
200
+ params ["state" ] = invite_token
201
+ return f"{ self ._AUTH_URL } ?{ urllib .parse .urlencode (params )} "
202
+
203
+ def get_access_token (self , code : str ):
204
+ data = {
205
+ "client_id" : self .client_id ,
206
+ "client_secret" : self .client_secret ,
207
+ "code" : code ,
208
+ "grant_type" : "authorization_code" ,
209
+ "redirect_uri" : self .redirect_uri ,
210
+ }
211
+ headers = {"Accept" : "application/json" }
212
+ response = httpx .post (self ._TOKEN_URL , data = data , headers = headers )
213
+ response_json = response .json ()
214
+ access_token = response_json .get ("access_token" )
215
+
216
+ if not access_token :
217
+ raise ValueError (f"Error in Microsoft OAuth: { response_json } " )
218
+
219
+ return access_token
220
+
221
+ def get_raw_user_info (self , token : str ):
222
+ headers = {"Authorization" : f"Bearer { token } " }
223
+ response = httpx .get (self ._USER_INFO_URL , headers = headers )
224
+ response .raise_for_status ()
225
+ return response .json ()
226
+
227
+ def _transform_user_info (self , raw_info : dict ) -> OAuthUserInfo :
228
+ user_id = str (raw_info .get ("id" , "" ))
229
+ name = raw_info .get ("displayName" , "" )
230
+ email = raw_info .get ("mail" ) or raw_info .get ("userPrincipalName" , f"{ user_id } @microsoft.com" )
231
+ return OAuthUserInfo (id = user_id , name = name , email = email )
232
+
233
+
234
+ class CanvasOAuth (OAuth ):
235
+ """Canvas LMS OAuth implementation"""
236
+
237
+ def __init__ (self , client_id : str , client_secret : str , redirect_uri : str , install_url : str ):
238
+ super ().__init__ (client_id , client_secret , redirect_uri )
239
+ self .install_url = install_url .rstrip ("/" )
240
+ self ._AUTH_URL = f"{ self .install_url } /login/oauth2/auth"
241
+ self ._TOKEN_URL = f"{ self .install_url } /login/oauth2/token"
242
+ self ._user_cache = None # Cache user info from token response
243
+
244
+ def get_authorization_url (self , invite_token : str | None = None ):
245
+ params = {
246
+ "client_id" : self .client_id ,
247
+ "redirect_uri" : self .redirect_uri ,
248
+ "response_type" : "code" ,
249
+ }
250
+ if invite_token :
251
+ params ["state" ] = invite_token
252
+ return f"{ self ._AUTH_URL } ?{ urllib .parse .urlencode (params )} "
253
+
254
+ def get_access_token (self , code : str ):
255
+ data = {
256
+ "client_id" : self .client_id ,
257
+ "client_secret" : self .client_secret ,
258
+ "code" : code ,
259
+ "grant_type" : "authorization_code" ,
260
+ "redirect_uri" : self .redirect_uri ,
261
+ }
262
+ headers = {"Accept" : "application/json" }
263
+ response = httpx .post (self ._TOKEN_URL , data = data , headers = headers )
264
+ response_json = response .json ()
265
+
266
+ # Canvas returns user info in the token response
267
+ # Example: {"access_token": "...", "user": {"id": 42, "name": "Jimi Hendrix"}, ...}
268
+ user_info = response_json .get ("user" , {})
269
+ if user_info :
270
+ self ._user_cache = user_info
271
+
272
+ access_token = response_json .get ("access_token" )
273
+ if not access_token :
274
+ raise ValueError (f"Error in Canvas OAuth: { response_json } " )
275
+
276
+ return access_token
277
+
278
+ def get_raw_user_info (self , token : str ):
279
+ # Canvas returns user info in the token response, which we cached
280
+ if self ._user_cache :
281
+ return self ._user_cache
282
+
283
+ raise ValueError ("No user info available." )
284
+
285
+ def _transform_user_info (self , raw_info : dict ) -> OAuthUserInfo :
286
+ # Canvas uses 'id' as the primary identifier
287
+ user_id = str (raw_info .get ("id" , "" ))
288
+ name = raw_info .get ("name" , "" )
289
+ email = raw_info .get ("email" , f"{ user_id } @canvas.local" )
290
+ return OAuthUserInfo (id = user_id , name = name , email = email )
0 commit comments