API Reference
FastAPIKeycloak
Instance to wrap the Keycloak API with FastAPI
Attributes:
Name | Type | Description |
---|---|---|
_admin_token |
KeycloakToken |
A KeycloakToken instance, containing the access token that is used for any admin related request |
Examples:
app = FastAPI()
idp = KeycloakFastAPI(
server_url="https://auth.some-domain.com/auth",
client_id="some-test-client",
client_secret="some-secret",
admin_client_secret="some-admin-cli-secret",
realm="Test",
callback_uri=f"http://localhost:8081/callback"
)
idp.add_swagger_config(app)
admin_token
property
writable
Holds an AccessToken for the admin-cli
client
Returns:
Type | Description |
---|---|
KeycloakToken |
A token, valid to perform admin actions |
Notes
- This might result in an infinite recursion if something unforeseen goes wrong
authorization_uri
cached
property
writable
The authorization endpoint URL
login_uri
cached
property
writable
The URL for users to login on the realm. Also adds the client id and the callback.
logout_uri
cached
property
writable
The logout endpoint URL
open_id_configuration: dict
cached
property
writable
Returns Keycloaks Open ID Connect configuration
Returns:
Type | Description |
---|---|
dict |
Open ID Configuration |
providers_uri
cached
property
writable
The endpoint that returns all configured identity providers
public_key: str
cached
property
writable
Returns the Keycloak public key
Returns:
Type | Description |
---|---|
str |
Public key for JWT decoding |
realm_uri
cached
property
writable
The realms endpoint URL
roles_uri
cached
property
writable
The roles endpoint URL
token_uri
cached
property
writable
The token endpoint URL
user_auth_scheme: OAuth2PasswordBearer
cached
property
writable
Returns the auth scheme to register the endpoints with swagger
Returns:
Type | Description |
---|---|
OAuth2PasswordBearer |
Auth scheme for swagger |
users_uri
cached
property
writable
The users endpoint URL
__init__(self, server_url, client_id, client_secret, realm, admin_client_secret, callback_uri)
special
FastAPIKeycloak constructor
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_url |
str |
The URL of the Keycloak server, with |
required |
client_id |
str |
The id of the client used for users |
required |
client_secret |
str |
The client secret |
required |
realm |
str |
The realm (name) |
required |
admin_client_secret |
str |
Secret for the |
required |
callback_uri |
str |
Callback URL of the instance, used for auth flows. Must match at least one |
required |
Source code in fastapi_keycloak/api.py
def __init__(self, server_url: str, client_id: str, client_secret: str, realm: str, admin_client_secret: str, callback_uri: str):
""" FastAPIKeycloak constructor
Args:
server_url (str): The URL of the Keycloak server, with `/auth` suffix
client_id (str): The id of the client used for users
client_secret (str): The client secret
realm (str): The realm (name)
admin_client_secret (str): Secret for the `admin-cli` client
callback_uri (str): Callback URL of the instance, used for auth flows. Must match at least one `Valid Redirect URIs` of Keycloak and should point to an endpoint
that utilizes the authorization_code flow.
"""
self.server_url = server_url
self.realm = realm
self.client_id = client_id
self.client_secret = client_secret
self.admin_client_secret = admin_client_secret
self.callback_uri = callback_uri
self._get_admin_token() # Requests an admin access token on startup
__repr__(self)
special
Debug representation
Source code in fastapi_keycloak/api.py
def __repr__(self):
""" Debug representation """
return f'{self.__str__()} <class {self.__class__} >'
__str__(self)
special
String representation
Source code in fastapi_keycloak/api.py
def __str__(self):
""" String representation """
return f'FastAPI Keycloak Integration'
add_swagger_config(self, app)
Adds the client id and secret securely to the swagger ui. Enabling Swagger ui users to perform actions they usually need the client credentials, without exposing them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
FastAPI |
Optional FastAPI app to add the config to swagger |
required |
Returns:
Type | Description |
---|---|
None |
Inplace method |
Source code in fastapi_keycloak/api.py
def add_swagger_config(self, app: FastAPI):
""" Adds the client id and secret securely to the swagger ui.
Enabling Swagger ui users to perform actions they usually need the client credentials, without exposing them.
Args:
app (FastAPI): Optional FastAPI app to add the config to swagger
Returns:
None: Inplace method
"""
app.swagger_ui_init_oauth = {
"usePkceWithAuthorizationCodeGrant": True,
"clientId": self.client_id,
"clientSecret": self.client_secret
}
add_user_roles(self, roles, user_id)
Adds roles to a specific user
Parameters:
Name | Type | Description | Default |
---|---|---|---|
roles |
List[str] |
Roles to add (name) |
required |
user_id |
str |
ID of the user the roles should be added to |
required |
Returns:
Type | Description |
---|---|
dict |
Proxied response payload |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error()
def add_user_roles(self, roles: List[str], user_id: str) -> dict:
""" Adds roles to a specific user
Args:
roles List[str]: Roles to add (name)
user_id str: ID of the user the roles should be added to
Returns:
dict: Proxied response payload
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
keycloak_roles = self.get_roles(roles)
return self._admin_request(
url=f'{self.users_uri}/{user_id}/role-mappings/realm',
data=[role.__dict__ for role in keycloak_roles],
method=HTTPMethod.POST
)
admin_uri(self, resource)
Returns a admin resource URL
Source code in fastapi_keycloak/api.py
def admin_uri(self, resource: str):
""" Returns a admin resource URL """
return f"{self._admin_uri}/{resource}"
change_password(self, user_id, new_password, temporary=False)
Exchanges a users password.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
temporary |
bool |
If True, the password must be changed on the first login |
False |
user_id |
str |
The user ID of interest |
required |
new_password |
str |
The new password |
required |
Returns:
Type | Description |
---|---|
dict |
Proxied response payload |
Notes
- Possibly should be extended by an old password check
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error()
def change_password(self, user_id: str, new_password: str, temporary: bool = False) -> dict:
""" Exchanges a users password.
Args:
temporary (bool): If True, the password must be changed on the first login
user_id (str): The user ID of interest
new_password (str): The new password
Returns:
dict: Proxied response payload
Notes:
- Possibly should be extended by an old password check
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
credentials = {"temporary": temporary, "type": "password", "value": new_password}
return self._admin_request(url=f'{self.users_uri}/{user_id}/reset-password', data=credentials, method=HTTPMethod.PUT)
create_role(self, role_name)
Create a role on the realm
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name |
str |
Name of the new role |
required |
Returns:
Type | Description |
---|---|
KeycloakRole |
If creation succeeded, else it will return the error |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error(response_model=KeycloakRole)
def create_role(self, role_name: str) -> KeycloakRole:
""" Create a role on the realm
Args:
role_name (str): Name of the new role
Returns:
KeycloakRole: If creation succeeded, else it will return the error
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
response = self._admin_request(url=self.roles_uri, data={'name': role_name}, method=HTTPMethod.POST)
if response.status_code == 201:
return self.get_roles(role_names=[role_name])[0]
else:
return response
create_user(self, first_name, last_name, username, email, password, enabled=True, initial_roles=None, send_email_verification=True)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
first_name |
str |
The first name of the new user |
required |
last_name |
str |
The last name of the new user |
required |
username |
str |
The username of the new user |
required |
email |
str |
The email of the new user |
required |
password |
str |
The password of the new user |
required |
initial_roles |
List[str] |
The roles the user should posses. Defaults to |
None |
enabled |
bool |
True if the user should be able to be used. Defaults to |
True |
send_email_verification |
bool |
If true, the email verification will be added as an required
action and the email triggered - if the user was created successfully. Defaults to |
True |
Returns:
Type | Description |
---|---|
KeycloakUser |
If the creation succeeded |
Notes
- Also triggers the email verification email
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error(response_model=KeycloakUser)
def create_user(
self,
first_name: str,
last_name: str,
username: str,
email: str,
password: str,
enabled: bool = True,
initial_roles: List[str] = None,
send_email_verification: bool = True
) -> KeycloakUser:
"""
Args:
first_name (str): The first name of the new user
last_name (str): The last name of the new user
username (str): The username of the new user
email (str): The email of the new user
password (str): The password of the new user
initial_roles List[str]: The roles the user should posses. Defaults to `None`
enabled (bool): True if the user should be able to be used. Defaults to `True`
send_email_verification (bool): If true, the email verification will be added as an required
action and the email triggered - if the user was created successfully. Defaults to `True`
Returns:
KeycloakUser: If the creation succeeded
Notes:
- Also triggers the email verification email
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
data = {
"email": email,
"username": username,
"firstName": first_name,
"lastName": last_name,
"enabled": enabled,
"clientRoles": self.get_roles(initial_roles),
"credentials": [
{
"temporary": False,
"type": "password",
"value": password
}
],
"requiredActions": ["VERIFY_EMAIL" if send_email_verification else None]
}
response = self._admin_request(url=self.users_uri, data=data, method=HTTPMethod.POST)
if response.status_code == 201:
user = self.get_user(query=f'username={username}')
if send_email_verification:
self.send_email_verification(user.id)
return user
else:
return response
delete_role(self, role_name)
Deletes a role on the realm
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name |
str |
The role (name) to delte |
required |
Returns:
Type | Description |
---|---|
dict |
Proxied response payload |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error()
def delete_role(self, role_name: str) -> dict:
""" Deletes a role on the realm
Args:
role_name (str): The role (name) to delte
Returns:
dict: Proxied response payload
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
return self._admin_request(url=f'{self.roles_uri}/{role_name}', method=HTTPMethod.DELETE)
delete_user(self, user_id)
Deletes an user
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
str |
The user ID of interest |
required |
Returns:
Type | Description |
---|---|
dict |
Proxied response payload |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error()
def delete_user(self, user_id: str) -> dict:
""" Deletes an user
Args:
user_id (str): The user ID of interest
Returns:
dict: Proxied response payload
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
return self._admin_request(url=f'{self.users_uri}/{user_id}', method=HTTPMethod.DELETE)
exchange_authorization_code(self, session_state, code)
Models the authorization code OAuth2 flow. Opening the URL provided by login_uri
will result in a callback to the configured callback URL.
The callback will also create a session_state and code query parameter that can be exchanged for an access token.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session_state |
str |
Salt to reduce the risk of successful attacks |
required |
code |
str |
The authorization code |
required |
Returns:
Type | Description |
---|---|
KeycloakToken |
If the exchange succeeds |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error(response_model=KeycloakToken)
def exchange_authorization_code(self, session_state: str, code: str) -> KeycloakToken:
""" Models the authorization code OAuth2 flow. Opening the URL provided by `login_uri` will result in a callback to the configured callback URL.
The callback will also create a session_state and code query parameter that can be exchanged for an access token.
Args:
session_state (str): Salt to reduce the risk of successful attacks
code (str): The authorization code
Returns:
KeycloakToken: If the exchange succeeds
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"code": code,
"session_state": session_state,
"grant_type": "authorization_code",
"redirect_uri": self.callback_uri
}
response = requests.post(url=self.token_uri, headers=headers, data=data)
return response
get_all_roles(self)
Get all roles of the Keycloak realm
Returns:
Type | Description |
---|---|
List[KeycloakRole] |
All roles of the realm |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error(response_model=KeycloakRole, is_list=True)
def get_all_roles(self) -> List[KeycloakRole]:
""" Get all roles of the Keycloak realm
Returns:
List[KeycloakRole]: All roles of the realm
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
return self._admin_request(url=self.roles_uri, method=HTTPMethod.GET)
get_all_users(self)
Returns all users of the realm
Returns:
Type | Description |
---|---|
List[KeycloakUser] |
All Keycloak users of the realm |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error(response_model=KeycloakUser, is_list=True)
def get_all_users(self) -> List[KeycloakUser]:
""" Returns all users of the realm
Returns:
List[KeycloakUser]: All Keycloak users of the realm
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
response = self._admin_request(url=self.users_uri, method=HTTPMethod.GET)
return response
get_current_user(self, required_roles=None)
Returns the current user based on an access token in the HTTP-header. Optionally verifies roles are possessed by the user
Parameters:
Name | Type | Description | Default |
---|---|---|---|
required_roles |
List[str] |
List of role names required for this endpoint |
None |
Returns:
Type | Description |
---|---|
OIDCUser |
Decoded JWT content |
Exceptions:
Type | Description |
---|---|
ExpiredSignatureError |
If the token is expired (exp > datetime.now()) |
JWTError |
If decoding fails or the signature is invalid |
JWTClaimsError |
If any claim is invalid |
HTTPException |
If any role required is not contained within the roles of the users |
Source code in fastapi_keycloak/api.py
def get_current_user(self, required_roles: List[str] = None) -> OIDCUser:
""" Returns the current user based on an access token in the HTTP-header. Optionally verifies roles are possessed by the user
Args:
required_roles List[str]: List of role names required for this endpoint
Returns:
OIDCUser: Decoded JWT content
Raises:
ExpiredSignatureError: If the token is expired (exp > datetime.now())
JWTError: If decoding fails or the signature is invalid
JWTClaimsError: If any claim is invalid
HTTPException: If any role required is not contained within the roles of the users
"""
def current_user(token: OAuth2PasswordBearer = Depends(self.user_auth_scheme)) -> OIDCUser:
""" Decodes and verifies a JWT to get the current user
Args:
token OAuth2PasswordBearer: Access token in `Authorization` HTTP-header
Returns:
OIDCUser: Decoded JWT content
Raises:
ExpiredSignatureError: If the token is expired (exp > datetime.now())
JWTError: If decoding fails or the signature is invalid
JWTClaimsError: If any claim is invalid
HTTPException: If any role required is not contained within the roles of the users
"""
decoded_token = self._decode_token(token=token, audience="account")
user = OIDCUser.parse_obj(decoded_token)
if required_roles:
for role in required_roles:
if role not in user.roles:
raise HTTPException(status_code=403, detail=f'Role "{role}" is required to perform this action')
return user
return current_user
get_identity_providers(self)
Returns all configured identity Providers
Returns:
Type | Description |
---|---|
List[KeycloakIdentityProvider] |
All configured identity providers |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error(response_model=KeycloakIdentityProvider, is_list=True)
def get_identity_providers(self) -> List[KeycloakIdentityProvider]:
""" Returns all configured identity Providers
Returns:
List[KeycloakIdentityProvider]: All configured identity providers
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
return self._admin_request(url=self.providers_uri, method=HTTPMethod.GET).json()
get_roles(self, role_names)
Returns full entries of Roles based on role names
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_names |
List[str] |
Roles that should be looked up (names) |
required |
Returns:
Type | Description |
---|---|
List[KeycloakRole] |
Full entries stored at Keycloak. Or None if the list of requested roles is None |
Notes
- The Keycloak RestAPI will only identify RoleRepresentations that use name AND id which is the only reason for existence of this function
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error(response_model=KeycloakRole, is_list=True)
def get_roles(self, role_names: List[str]) -> List[KeycloakRole] or None:
""" Returns full entries of Roles based on role names
Args:
role_names List[str]: Roles that should be looked up (names)
Returns:
List[KeycloakRole]: Full entries stored at Keycloak. Or None if the list of requested roles is None
Notes:
- The Keycloak RestAPI will only identify RoleRepresentations that
use name AND id which is the only reason for existence of this function
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
if role_names is None:
return None
roles = self.get_all_roles()
return list(filter(lambda role: role.name in role_names, roles))
get_user(self, user_id=None, query='')
Queries the keycloak API for a specific user either based on its ID or any native attribute
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
str |
The user ID of interest |
None |
query |
str |
Query string. e.g. |
'' |
Returns:
Type | Description |
---|---|
KeycloakUser |
If the user was found |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error(response_model=KeycloakUser)
def get_user(self, user_id: str = None, query: str = "") -> KeycloakUser:
""" Queries the keycloak API for a specific user either based on its ID or any **native** attribute
Args:
user_id (str): The user ID of interest
query: Query string. e.g. `email=testuser@codespecialist.com` or `username=codespecialist`
Returns:
KeycloakUser: If the user was found
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
if user_id is None:
response = self._admin_request(url=f'{self.users_uri}?{query}', method=HTTPMethod.GET)
return KeycloakUser(**response.json()[0])
else:
response = self._admin_request(url=f'{self.users_uri}/{user_id}', method=HTTPMethod.GET)
return KeycloakUser(**response.json())
get_user_roles(self, user_id)
Gets all roles of an user
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
str |
ID of the user of interest |
required |
Returns:
Type | Description |
---|---|
List[KeycloakRole] |
All roles possessed by the user |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error(response_model=KeycloakRole, is_list=True)
def get_user_roles(self, user_id: str) -> List[KeycloakRole]:
""" Gets all roles of an user
Args:
user_id (str): ID of the user of interest
Returns:
List[KeycloakRole]: All roles possessed by the user
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
return self._admin_request(url=f'{self.users_uri}/{user_id}/role-mappings/realm', method=HTTPMethod.GET)
open_id(self, resource)
Returns a openip connect resource URL
Source code in fastapi_keycloak/api.py
def open_id(self, resource: str):
""" Returns a openip connect resource URL """
return f"{self._open_id}/{resource}"
proxy(self, relative_path, method, additional_headers=None, payload=None)
Proxies a request to Keycloak and automatically adds the required Authorization header. Should not be exposed under any circumstances. Grants full API admin access.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
relative_path |
str |
The relative path of the request. Requests will be sent to: |
required |
method |
HTTPMethod |
The HTTP-verb to be used |
required |
additional_headers |
dict |
Optional headers besides the Authorization to add to the request |
None |
payload |
dict |
Optional payload to send |
None |
Returns:
Type | Description |
---|---|
Response |
Proxied response |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
def proxy(self, relative_path: str, method: HTTPMethod, additional_headers: dict = None, payload: dict = None) -> Response:
""" Proxies a request to Keycloak and automatically adds the required Authorization header. Should not be exposed under any circumstances. Grants full API admin access.
Args:
relative_path (str): The relative path of the request. Requests will be sent to: `[server_url]/[relative_path]`
method (HTTPMethod): The HTTP-verb to be used
additional_headers (dict): Optional headers besides the Authorization to add to the request
payload (dict): Optional payload to send
Returns:
Response: Proxied response
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
headers = {"Authorization": f"Bearer {self.admin_token}"}
if additional_headers is not None:
headers = {**headers, **additional_headers}
return requests.request(
method=method.name,
url=f'{self.server_url}{relative_path}',
data=json.dumps(payload),
headers=headers
)
remove_user_roles(self, roles, user_id)
Removes roles from a specific user
Parameters:
Name | Type | Description | Default |
---|---|---|---|
roles |
List[str] |
Roles to remove (name) |
required |
user_id |
str |
ID of the user the roles should be removed from |
required |
Returns:
Type | Description |
---|---|
dict |
Proxied response payload |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error()
def remove_user_roles(self, roles: List[str], user_id: str) -> dict:
""" Removes roles from a specific user
Args:
roles List[str]: Roles to remove (name)
user_id str: ID of the user the roles should be removed from
Returns:
dict: Proxied response payload
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
keycloak_roles = self.get_roles(roles)
return self._admin_request(
url=f'{self.users_uri}/{user_id}/role-mappings/realm',
data=[role.__dict__ for role in keycloak_roles],
method=HTTPMethod.DELETE
)
send_email_verification(self, user_id)
Sends the email to verify the email address
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
str |
The user ID of interest |
required |
Returns:
Type | Description |
---|---|
dict |
Proxied response payload |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Source code in fastapi_keycloak/api.py
@result_or_error()
def send_email_verification(self, user_id: str) -> dict:
""" Sends the email to verify the email address
Args:
user_id (str): The user ID of interest
Returns:
dict: Proxied response payload
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
return self._admin_request(url=f'{self.users_uri}/{user_id}/send-verify-email', method=HTTPMethod.PUT)
token_is_valid(self, token, audience=None)
Validates an access token, optionally also its audience
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token |
str |
The token to be verified |
required |
audience |
str |
Optional audience. Will be checked if provided |
None |
Returns:
Type | Description |
---|---|
bool |
True if the token is valid |
Source code in fastapi_keycloak/api.py
def token_is_valid(self, token: str, audience: str = None) -> bool:
""" Validates an access token, optionally also its audience
Args:
token (str): The token to be verified
audience (str): Optional audience. Will be checked if provided
Returns:
bool: True if the token is valid
"""
try:
self._decode_token(token=token, audience=audience)
return True
except (ExpiredSignatureError, JWTError, JWTClaimsError):
return False
update_user(self, user)
Updates a user. Requires the whole object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user |
KeycloakUser |
The (new) user object |
required |
Returns:
Type | Description |
---|---|
KeycloakUser |
The updated user |
Exceptions:
Type | Description |
---|---|
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299) |
Notes
- You may alter any aspect of the user object, also the requiredActions for instance. There is not explicit function for updating those as it is a user update in essence
Source code in fastapi_keycloak/api.py
@result_or_error(response_model=KeycloakUser)
def update_user(self, user: KeycloakUser):
""" Updates a user. Requires the whole object.
Args:
user (KeycloakUser): The (new) user object
Returns:
KeycloakUser: The updated user
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
Notes:
- You may alter any aspect of the user object, also the requiredActions for instance. There is not explicit function for updating those as it is a user update in
essence
"""
response = self._admin_request(url=f'{self.users_uri}/{user.id}', data=user.__dict__, method=HTTPMethod.PUT)
if response.status_code == 204: # Update successful
return self.get_user(user_id=user.id)
return response
user_login(self, username, password)
Models the password OAuth2 flow. Exchanges username and password for an access token. Will raise detailed errors if login fails due to requiredActions
Parameters:
Name | Type | Description | Default |
---|---|---|---|
username |
str |
Username used for login |
required |
password |
str |
Password of the user |
required |
Returns:
Type | Description |
---|---|
KeycloakToken |
If the exchange succeeds |
Exceptions:
Type | Description |
---|---|
HTTPException |
If the credentials did not match any user |
MandatoryActionException |
If the login is not possible due to mandatory actions |
KeycloakError |
If the resulting response is not a successful HTTP-Code (>299, != 400, != 401) |
UpdateUserLocaleException |
If the credentials we're correct but the has requiredActions of which the first one is to update his locale |
ConfigureTOTPException |
If the credentials we're correct but the has requiredActions of which the first one is to configure TOTP |
VerifyEmailException |
If the credentials we're correct but the has requiredActions of which the first one is to verify his email |
UpdatePasswordException |
If the credentials we're correct but the has requiredActions of which the first one is to update his password |
UpdateProfileException |
If the credentials we're correct but the has requiredActions of which the first one is to update his profile |
Notes
- To avoid calling this multiple times, you may want to check all requiredActions of the user if it fails due to a (sub)instance of an MandatoryActionException
Source code in fastapi_keycloak/api.py
@result_or_error(response_model=KeycloakToken)
def user_login(self, username: str, password: str) -> KeycloakToken:
""" Models the password OAuth2 flow. Exchanges username and password for an access token. Will raise detailed errors if login fails due to requiredActions
Args:
username (str): Username used for login
password (str): Password of the user
Returns:
KeycloakToken: If the exchange succeeds
Raises:
HTTPException: If the credentials did not match any user
MandatoryActionException: If the login is not possible due to mandatory actions
KeycloakError: If the resulting response is not a successful HTTP-Code (>299, != 400, != 401)
UpdateUserLocaleException: If the credentials we're correct but the has requiredActions of which the first one is to update his locale
ConfigureTOTPException: If the credentials we're correct but the has requiredActions of which the first one is to configure TOTP
VerifyEmailException: If the credentials we're correct but the has requiredActions of which the first one is to verify his email
UpdatePasswordException: If the credentials we're correct but the has requiredActions of which the first one is to update his password
UpdateProfileException: If the credentials we're correct but the has requiredActions of which the first one is to update his profile
Notes:
- To avoid calling this multiple times, you may want to check all requiredActions of the user if it fails due to a (sub)instance of an MandatoryActionException
"""
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"username": username,
"password": password,
"grant_type": "password"
}
response = requests.post(url=self.token_uri, headers=headers, data=data)
if response.status_code == 401:
raise HTTPException(status_code=401, detail="Invalid user credentials")
if response.status_code == 400:
user: KeycloakUser = self.get_user(query=f'username={username}')
if len(user.requiredActions) > 0:
reason = user.requiredActions[0]
exception = {
"update_user_locale": UpdateUserLocaleException(),
"CONFIGURE_TOTP": ConfigureTOTPException(),
"VERIFY_EMAIL": VerifyEmailException(),
"UPDATE_PASSWORD": UpdatePasswordException(),
"UPDATE_PROFILE": UpdateProfileException(),
}.get(
reason, # Try to return the matching exception
# On custom or unknown actions return a MandatoryActionException by default
MandatoryActionException(detail=f"This user can't login until the following action has been resolved: {reason}")
)
raise exception
return response
KeycloakError
ConfigureTOTPException (MandatoryActionException)
Throw if the exchange of username and password for an access token fails due to the CONFIGURE_TOTP requiredAction
KeycloakError (Exception)
Thrown if any response of keycloak does not match our expectation
Attributes:
Name | Type | Description |
---|---|---|
status_code |
int |
The status code of the response received |
reason |
str |
The reason why the requests did fail |
MandatoryActionException (HTTPException)
Throw if the exchange of username and password for an access token fails
UpdatePasswordException (MandatoryActionException)
Throw if the exchange of username and password for an access token fails due to the UPDATE_PASSWORD requiredAction
UpdateProfileException (MandatoryActionException)
Throw if the exchange of username and password for an access token fails due to the UPDATE_PROFILE requiredAction
UpdateUserLocaleException (MandatoryActionException)
Throw if the exchange of username and password for an access token fails due to the update_user_locale requiredAction
VerifyEmailException (MandatoryActionException)
Throw if the exchange of username and password for an access token fails due to the VERIFY_EMAIL requiredAction
Models
HTTPMethod (Enum)
Represents the basic HTTP verbs
Values
- GET: get
- POST: post
- DELETE: delete
- PUT: put
KeycloakIdentityProvider (BaseModel)
pydantic-model
Keycloak representation of an identity provider
Attributes:
Name | Type | Description |
---|---|---|
alias |
str |
|
internalId |
str |
|
providerId |
str |
|
enabled |
bool |
|
updateProfileFirstLoginMode |
str |
|
trustEmail |
bool |
|
storeToken |
bool |
|
addReadTokenRoleOnCreate |
bool |
|
authenticateByDefault |
bool |
|
linkOnly |
bool |
|
firstBrokerLoginFlowAlias |
str |
|
config |
dict |
Notes
Check the Keycloak documentation at https://www.keycloak.org/docs-api/15.0/rest-api/index.html for details. This is a mere proxy object.
KeycloakRole (BaseModel)
pydantic-model
Keycloak representation of a role
Attributes:
Name | Type | Description |
---|---|---|
id |
str |
|
name |
str |
|
composite |
bool |
|
clientRole |
bool |
|
containerId |
str |
Notes
Check the Keycloak documentation at https://www.keycloak.org/docs-api/15.0/rest-api/index.html for details. This is a mere proxy object.
KeycloakToken (BaseModel)
pydantic-model
Keycloak representation of a token object
Attributes:
Name | Type | Description |
---|---|---|
access_token |
str |
An access token |
__str__(self)
special
String representation of KeycloakToken
Source code in fastapi_keycloak/model.py
def __str__(self):
""" String representation of KeycloakToken """
return f'Bearer {self.access_token}'
KeycloakUser (BaseModel)
pydantic-model
Represents a user object of Keycloak.
Attributes:
Name | Type | Description |
---|---|---|
id |
str |
|
createdTimestamp |
int |
|
username |
str |
|
enabled |
bool |
|
totp |
bool |
|
emailVerified |
bool |
|
firstName |
Optional[str] |
|
lastName |
Optional[str] |
|
email |
Optional[str] |
|
disableableCredentialTypes |
List[str] |
|
requiredActions |
List[str] |
|
notBefore |
int |
|
access |
dict |
Notes
Check the Keycloak documentation at https://www.keycloak.org/docs-api/15.0/rest-api/index.html for details. This is a mere proxy object.
OIDCUser (BaseModel)
pydantic-model
Represents a user object of Keycloak, parsed from an access token
Attributes:
Name | Type | Description |
---|---|---|
sub |
str |
|
iat |
int |
|
exp |
int |
|
scope |
str |
|
email_verified |
bool |
|
name |
Optional[str] |
|
given_name |
Optional[str] |
|
family_name |
Optional[str] |
|
email |
Optional[str] |
|
realm_access |
dict |
Notes
Check the Keycloak documentation at https://www.keycloak.org/docs-api/15.0/rest-api/index.html for details. This is a mere proxy object.
roles: List[str]
property
readonly
Returns the roles of the user
Returns:
Type | Description |
---|---|
List[str] |
If the realm access dict contains roles |
__str__(self)
special
String representation of an OIDCUser
Source code in fastapi_keycloak/model.py
def __str__(self) -> str:
""" String representation of an OIDCUser """
return self.email
UsernamePassword (BaseModel)
pydantic-model
Represents a request body that contains username and password
Attributes:
Name | Type | Description |
---|---|---|
username |
str |
Username |
password |
str |
Password, masked by swagger |