Source code for matrice.token_auth

"""Module for custom authentication"""
import json
import os
import sys
from datetime import datetime

import requests
from dateutil.parser import parse
from requests.auth import AuthBase

ENV = os.environ["ENV"]
ACCESS_KEY_URL = f"https://{ENV}.backend.app.matrice.ai/v1/user/validate_access_key"
AUTH_TOKEN_URL = f"https://{ENV}.backend.app.matrice.ai/v1/user/refresh"


[docs] class RefreshToken(AuthBase): """Implements a custom authentication scheme."""
[docs] def __init__(self, access_key, secret_key): self.bearer_token = None self.access_key = access_key self.secret_key = secret_key
def __call__(self, r): """Attach an API token to a custom auth header.""" self.set_bearer_token() r.headers["Authorization"] = self.bearer_token return r
[docs] def set_bearer_token(self): """Obtain a bearer token using the provided access key and secret key.""" # print("Setting bearer token...") payload_dict = { "accessKey": self.access_key, "secretKey": self.secret_key, } payload = json.dumps(payload_dict) headers = {"Content-Type": "text/plain"} try: response = requests.request( "GET", ACCESS_KEY_URL, headers=headers, data=payload, timeout=20 ) except Exception as e: # pylint: disable=W0718 print("Error while making request to the auth server") print(e) sys.exit(0) if response.status_code != 200: print("Error response from the auth server") print(response.text) sys.exit(0) res_dict = response.json() if res_dict["success"]: self.bearer_token = "Bearer " + res_dict["data"]["refreshToken"] else: print("The provided credentials are incorrect!!") sys.exit(0)
[docs] class AuthToken(AuthBase): """Implements a custom authentication scheme."""
[docs] def __init__(self, access_key, secret_key, refresh_token): self.bearer_token = None self.access_key = access_key self.secret_key = secret_key self.refresh_token = refresh_token self.expiry_time = datetime.utcnow()
def __call__(self, r): """Attach an API token to a custom auth header.""" self.set_bearer_token() r.headers["Authorization"] = self.bearer_token return r
[docs] def set_bearer_token(self): """Obtain an authentication bearer token using the provided refresh token.""" # print("Getting Auth bearer token...") headers = {"Content-Type": "application/json"} try: response = requests.request( "POST", AUTH_TOKEN_URL, headers=headers, auth=self.refresh_token, timeout=20, ) except Exception as e: # pylint: disable=W0718 print("Error while making request to the auth server") print(e) sys.exit(0) if response.status_code != 200: print("Error response from the auth server") print(response.text) sys.exit(0) res_dict = response.json() if res_dict["success"]: self.bearer_token = "Bearer " + res_dict["data"]["token"] self.expiry_time = parse(res_dict["data"]["expiresAt"]) else: print("The provided credentials are incorrect!!") sys.exit(0)