Source code for matrice.projects

"""Module for interacting with backend API to manage projects."""

import sys

from matrice.action import Action
from matrice.annotation import Annotation
from matrice.dataset import Dataset
from matrice.deployment import Deployment
from matrice.experiment import Experiment
from matrice.exported_model import ExportedModel
from matrice.models import Model
from matrice.utils import handle_response
from collections import OrderedDict
from datetime import datetime, timedelta
from matrice.dataset import get_dataset_size , upload_file
from matrice.model_store import fetch_supported_runtimes_metrics




[docs] class Projects: """ A class for handling project-related operations using the backend API. Attributes ---------- session : Session The session object used for API interactions. account_number : str The account number associated with the session. project_name : str The name of the project. project_id : str The ID of the project (initialized in the constructor). project_input : str The input type for the project (initialized in the constructor). output_type : str The output type for the project (initialized in the constructor). Parameters ---------- session : Session The session object used for API interactions. project_name : str The name of the project. """
[docs] def __init__(self, session, project_name=None, project_id=None): """ Initialize a Projects object with project details. Parameters ---------- session : Session The session object used for API interactions. project_name : str The name of the project. """ assert project_name is not None or project_id is not None self.session = session self.account_number = session.account_number self.last_refresh_time = datetime.now() self.project_name = project_name self.project_id = project_id self.rpc = session.rpc if project_name: project_info, error, message = self._get_project_by_name() else: project_info, error, message = self._get_a_project_by_id() if error: print(f"Error fetching project info: {message}") else: if(project_info["isDisabled"]==False): self.status = "enabled" else: self.status = "disabled" self.project_id = project_info["_id"] self.project_name = project_info["name"] self.project_input = project_info["inputType"] self.output_type = project_info["outputType"] self.created_at = project_info["createdAt"] self.updated_at = project_info["updatedAt"]
[docs] def refresh(self): """ Refresh the instance by reinstantiating it with the previous values. """ # Check if two minutes have passed since the last refresh if datetime.now() - self.last_refresh_time < timedelta(minutes=2): raise Exception("Refresh can only be called after two minutes since the last refresh.") # Prepare the parameters needed for reinitialization init_params = { 'session': self.session, 'project_name': self.project_name, 'project_id': self.project_id, } # Reinitialize the instance self.__init__(**init_params) # Update the last refresh time self.last_refresh_time = datetime.now()
def _get_service_and_action_ids(self, resp, error, message): """ Extract service and action IDs from the response. Parameters ---------- resp : dict The response dictionary from the API. error : str An error message if extraction fails. Returns ------- tuple A tuple containing: - The service ID if extraction is successful, or None if it fails. - The action ID if extraction is successful, or None if it fails. - An error message if extraction fails, or None if successful. Example ------- >>> resp = {"data": {"service_id": "123", "action_id": "456"}} >>> service_id, action_id, error = project._get_service_and_action_ids(resp, None) >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(f"Service ID: {service_id}, Action ID: {action_id}") """ if error: print(message, error) return None, None data = resp return data["_id"], data["_idAction"] def __job_cost_estimate(self, data): pass # TODO def _get_project_by_name(self): """ Fetch project details by project name. Returns ------- tuple A tuple containing: - The response dictionary from the API. - An error message if the response indicates an error, or None if successful. - A status message describing the result of the operation. Example ------- >>> result, error, message = project._get_project_by_name() >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(f"Status: {message}") """ path = f"/v1/project/get_project_by_name?name={self.project_name}" resp = self.rpc.get(path=path) return handle_response( resp, "Project details Fetched Successfully", "Could not fetch project details", ) def _get_a_project_by_id(self): """ Fetch project information by project ID. Returns ------- tuple A tuple containing: - The response dictionary from the API. - An error message if the response indicates an error, or None if successful. - A status message describing the result of the operation. Example ------- >>> result, error, message = project._get_a_project_by_id() >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(f"Status: {message}") """ path = f"/v1/project/{self.project_id}" resp = self.rpc.get(path=path) return handle_response( resp, f"Project info fetched for project with id {self.project_id}", f"Could not fetch project info for project with id {self.project_id}", )
[docs] def get_service_action_logs(self, service_id ,service_name): """ Fetch action logs for a specific service. Parameters ---------- service_id : str The ID of the service for which to fetch action logs. service_name : str The name of the service for which to fetch action logs. Returns ------- tuple A tuple containing: - The response dictionary from the API. - An error message if the response indicates an error, or None if successful. - A status message describing the result of the operation. Example ------- >>> result, error, message = project.get_service_action_logs("service123") >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(f"Status: {message}") """ # User can fetch service id using the get method of respective # services, eg - to get logs of dataset use get_dataset method assert service_id or service_name , "Service ID or name is required to fetch action logs" path = f"/v1/project/service/{service_id}/logs?projectId={self.project_id}" resp = self.rpc.get(path=path) return handle_response( resp, "Action logs fected succesfully", "Could not fetch action logs" )
[docs] def get_latest_action_record(self, service_id): """ Fetch the latest action logs for a specific service ID. Parameters ---------- service_id : str The ID of the service for which to fetch the latest action logs. Returns ------- tuple A tuple containing: - The response dictionary from the API. - An error message if the response indicates an error, or None if successful. - A status message describing the result of the operation. Example ------- >>> result, error, message = project.get_latest_action_record("service123") >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(f"Status: {message}") """ path = f"/v1/project/get_latest_action_record/{service_id}" resp = self.rpc.get(path=path) return handle_response( resp, "Action logs fected succesfully", "Could not fetch action logs" )
# POST REQUESTS def _create_dataset( self, dataset_name, source, cloud_provider, dataset_type, input_type, source_url="", file_path="", credential_alias="", bucket_alias="", compute_alias="", dataset_description="", version_description="", source_credential_alias="", bucket_alias_service_provider="auto", ): """ Create a new dataset. Parameters ---------- dataset_name : str The name of the dataset. source : str The source of the dataset. cloud_provider : str The cloud provider for the dataset. dataset_type : str The type of the dataset. input_type : str The input type for the dataset. source_url : str, optional The URL of the source (default is an empty string). file_path : str, optional The path to the file if the source is local (default is an empty string). credential_alias : str, optional The credential alias for accessing the dataset (default is an empty string). bucket_alias : str, optional The bucket alias for the dataset (default is an empty string). compute_alias : str, optional The compute alias (default is an empty string). dataset_description : str, optional A description of the dataset (default is an empty string). version_description : str, optional A description of the dataset version (default is an empty string). source_credential_alias : str, optional The source credential alias (default is an empty string). bucket_alias_service_provider : str, optional The bucket alias service provider (default is "auto"). Returns ------- tuple A tuple containing: - A `Dataset` object for the created dataset. - An `Action` object related to the dataset creation process. Example ------- >>> dataset, action = project.create_dataset("MyDataset", "local", "", "image", "image", file_path="data.csv") >>> if action: >>> print(f"Dataset created: {dataset}") >>> else: >>> print(f"Error: {dataset}") """ if source == "lu": response = upload_file(self.session , file_path) if response["success"]: source_url = response["data"] else: return response["data"], "error in uploading file", response["message"] dataset_size, err, msg = get_dataset_size(self.session , source_url , self.project_id) if err: dataset_size = 0 path = f"/v1/dataset?projectId={self.project_id}" headers = {"Content-Type": "application/json"} body = { "name": dataset_name, "isUnlabeled": False, "source": source, "sourceUrl": source_url, "cloudProvider": cloud_provider, "isCreateNew": True, "oldDatasetVersion": None, "newDatasetVersion": "v1.0", "datasetDescription": dataset_description, "description": dataset_description, "newVersionDescription": version_description, "isPublic": False, "computeAlias": compute_alias, "datasetSize": dataset_size, "bucketAliasServiceProvider": bucket_alias_service_provider, "_idProject": self.project_id, "type": dataset_type, "sourceCredentialAlias": source_credential_alias, "credentialAlias": credential_alias, "bucketAlias": bucket_alias, "inputType": input_type, } resp = self.rpc.post(path=path, headers=headers, payload=body) resp, error, message = handle_response( resp, "Dataset creation in progress", "An error occurred while trying to create new dataset", ) service_id, action_id = self._get_service_and_action_ids(resp, error, message) return Dataset(self.session, service_id), Action(self.session, action_id)
[docs] def import_local_dataset( self, dataset_name, file_path, dataset_type, source="lu", dataset_description="", version_description="", input_type="image", credential_alias="", bucket_alias="", compute_alias="", source_credential_alias="", bucket_alias_service_provider="auto", ): """ Upload a local dataset. Parameters ---------- dataset_name : str The name of the dataset. file_path : str The path to the local file. dataset_type : str The type of the dataset. source : str, optional The source of the dataset (default is "lu"). dataset_description : str, optional A description of the dataset (default is an empty string). version_description : str, optional A description of the dataset version (default is an empty string). input_type : str, optional The input type for the dataset (default is "image"). credential_alias : str, optional The credential alias for accessing the dataset (default is an empty string). bucket_alias : str, optional The bucket alias for the dataset (default is an empty string). compute_alias : str, optional The compute alias (default is an empty string). source_credential_alias : str, optional The source credential alias (default is an empty string). bucket_alias_service_provider : str, optional The bucket alias service provider (default is "auto"). Returns ------- tuple A tuple containing: - A `Dataset` object for the created dataset. - An `Action` object related to the dataset upload process. Example ------- >>> dataset, action = project.upload_local_dataset("MyLocalDataset", "path/to/data.csv", "image") >>> if action: >>> print(f"Dataset uploaded: {dataset}") >>> else: >>> print(f"Error: {dataset}") """ return self._create_dataset( dataset_name=dataset_name, source=source, cloud_provider="", dataset_type=dataset_type, input_type=input_type, file_path=file_path, dataset_description=dataset_description, version_description=version_description, credential_alias=credential_alias, bucket_alias=bucket_alias, compute_alias=compute_alias, source_credential_alias=source_credential_alias, bucket_alias_service_provider=bucket_alias_service_provider, )
[docs] def import_cloud_dataset( self, dataset_name, source, source_url, cloud_provider, dataset_type, input_type="image", dataset_description="", version_description="", credential_alias="", bucket_alias="", compute_alias="", source_credential_alias="", bucket_alias_service_provider="auto", ): """ Upload a cloud dataset. Parameters ---------- dataset_name : str The name of the dataset. source : str The source of the dataset. source_url : str The URL of the source. cloud_provider : str The cloud provider for the dataset. dataset_type : str The type of the dataset. input_type : str, optional The input type for the dataset (default is "image"). dataset_description : str, optional A description of the dataset (default is an empty string). version_description : str, optional A description of the dataset version (default is an empty string). credential_alias : str, optional The credential alias for accessing the dataset (default is an empty string). bucket_alias : str, optional The bucket alias for the dataset (default is an empty string). compute_alias : str, optional The compute alias (default is an empty string). source_credential_alias : str, optional The source credential alias (default is an empty string). bucket_alias_service_provider : str, optional The bucket alias service provider (default is "auto"). Returns ------- tuple A tuple containing: - A `Dataset` object for the created dataset. - An `Action` object related to the dataset upload process. Example ------- >>> dataset, action = project.upload_cloud_dataset("MyCloudDataset", "cloud_source", "http://source.url", "AWS", "image") >>> if action: >>> print(f"Dataset uploaded: {dataset}") >>> else: >>> print(f"Error: {dataset}") """ return self._create_dataset( dataset_name=dataset_name, source=source, cloud_provider=cloud_provider, dataset_type=dataset_type, input_type=input_type, source_url=source_url, dataset_description=dataset_description, version_description=version_description, credential_alias=credential_alias, bucket_alias=bucket_alias, compute_alias=compute_alias, source_credential_alias=source_credential_alias, bucket_alias_service_provider=bucket_alias_service_provider, )
[docs] def create_annotation( self, project_type, ann_title, dataset_id, dataset_version, labels, only_unlabeled, is_ML_assisted, labellers, reviewers, guidelines, ): """ Create a new annotation for a dataset. Parameters ---------- project_type : str The type of the project for which the annotation is being created. ann_title : str The title of the annotation. dataset_id : str The ID of the dataset to annotate. dataset_version : str The version of the dataset. labels : list The list of labels for the annotation. only_unlabeled : bool Whether to annotate only unlabeled data. is_ML_assisted : bool Whether the annotation is ML-assisted. labellers : list The list of labellers for the annotation. reviewers : list The list of reviewers for the annotation. guidelines : str The guidelines for the annotation. Returns ------- tuple A tuple containing: - An `Annotation` object for the created annotation. - An `Action` object related to the annotation creation process. Example ------- >>> annotation, action = project.create_annotation("object_detection", "MyAnnotation", "dataset123", "v1.0", ["label1", "label2"], True, False, [{"email": "user-email", "name": "username", "percentageWork": '100'}],[{"email": "user-email", "name": "username", "percentageWork": '100'}], "Follow these guidelines") >>> if action: >>> print(f"Annotation created: {annotation}") >>> else: >>> print(f"Error: {annotation}") """ # Validate labellers and reviewers validated_labellers = self._validate_labellers_and_reviewers(labellers) validated_reviewers = self._validate_labellers_and_reviewers(reviewers) # Prepare the payload for the annotation creation path = f"/v1/annotations?projectId={self.project_id}&projectType={project_type}" payload = { "title": ann_title, "_idDataset": dataset_id, "datasetVersion": dataset_version, "labels": labels, "onlyUnlabeled": only_unlabeled, "isMLAssisted": is_ML_assisted, "labellers": validated_labellers, "reviewers": validated_reviewers, "guidelines": guidelines, "type": project_type, "modelType": "", "modelId": "", } headers = {"Content-Type": "application/json"} resp = self.rpc.post(path=path, headers=headers, payload=payload) resp, error, message = handle_response( resp, "Annotation creation in progress", "An error occurred while trying to create new annotation", ) annotation_id = resp['_id'] service_id, action_id = self._get_service_and_action_ids(resp, error, message) return Annotation(self.session, annotation_id, ann_title), Action( self.session, action_id )
[docs] def create_experiment( self, name, dataset_id, target_run_time, dataset_version, primary_metric, matrice_compute=True, models_trained=[], performance_trade_off=-1, ): """ Create a new experiment for model training. Parameters ---------- name : str The name of the experiment. dataset_id : str The ID of the dataset to be used in the experiment. target_run_time : str The target runtime for the experiment. dataset_version : str The version of the dataset. primary_metric : str The primary metric to evaluate the experiment. matrice_compute : bool, optional Flag to indicate whether to use matrix compute (default is True). models_trained : list, optional List of models that have been trained in the experiment (default is an empty list). performance_trade_off : float, optional The performance trade-off for the experiment (default is -1). Returns ------- tuple A tuple containing: - An `Experiment` object for the created experiment. - An error message if the response indicates an error, or None if successful. - A status message describing the result of the operation. Example ------- >>> experiment, error, message = project.create_experiment("Experiment1", "dataset123", "runtimeA", "v1.0", "accuracy") >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(f"Experiment created: {experiment}") """ dataset = Dataset(self.session, dataset_id=dataset_id) project_info, _, _ = self._get_a_project_by_id() if project_info is None: print("No project found.") return None, "No project found.", None dataset_info, _, _ = dataset.get_processed_versions() print(dataset_info) if dataset_info is None: print("No datasets found") return None, "No datasets found", None dataset_version_ready = False for data_info in dataset_info: if dataset_id == data_info["_id"]: if dataset_version in data_info["processedVersions"]: dataset_version_ready = True break if not dataset_version_ready: print( "Dataset or Dataset version does not exist. Cannot use this dataset version to create a model." ) return None, "Dataset or Dataset version does not exist.", None model_inputs = [project_info["inputType"]] model_outputs = [project_info["outputType"]] # ## TODO: update it to fetch all the supported runtimes across the platform for the task type # runtime_metrics, _ , _ = fetch_supported_runtimes_metrics( # self.session,self.project_id, model_inputs, model_outputs # ) # print(runtime_metrics) # if runtime_metrics is None: # print("No primary metric and target runtime found.") # return None, "No primary metric and target runtime found.", None # if target_run_time not in runtime_metrics[0]["supportedRuntimes"]: # print("Target runtime provided does not exist.") # return None, "Target runtime provided does not exist.", None # if primary_metric not in runtime_metrics[0]["supportedMetrics"]: # print("Primary metric not available in the existing runtime Metrics.") # return ( # None, # "Primary metric not available in the existing runtime Metrics.", # None, # ) path = f"/v1/model/create_experiment?projectId={self.project_id}" headers = {"Content-Type": "application/json"} if matrice_compute == False: model_payload = { "experimentName": name, "_idProject": self.project_id, "matriceCompute": matrice_compute, } else: model_payload = { "experimentName": name, "_idDataset": dataset_id, "_idProject": self.project_id, "modelInputs": [project_info["inputType"]], "modelOutputs": [project_info["outputType"]], "targetRuntime": [target_run_time], "datasetVersion": dataset_version, "performanceTradeoff": performance_trade_off, "primaryMetric": primary_metric, "modelsTrained": models_trained, "matriceCompute": matrice_compute, "baseModelStoragePath": "", "storageCloudCredentials": [], } resp = self.rpc.post(path=path, headers=headers, payload=model_payload) resp, error, message = handle_response( resp, "Experiment creation success", "An error occurred while trying to create new experiment", ) if error: print(message, error) return None, None data = resp exp_id = data["_id"] exp_name = data["experimentName"] return Experiment(self.session, exp_id, exp_name)
[docs] def create_model_export( self, model_train_id, export_formats, model_config, is_gpu_required=False ): """ Add export configurations to a trained model. Parameters ---------- model_train_id : str The ID of the trained model. export_formats : list The list of formats to export the model. model_config : dict The configuration settings for the model export. is_gpu_required : bool, optional Flag to indicate if GPU is required for the export (default is False). Returns ------- tuple A tuple containing: - An `InferenceOptimization` object related to the model export. - An `Action` object related to the export process. Example ------- >>> inference_opt, action = project.add_model_export("model123", ["format1", "format2"], {"configKey": "configValue"}, is_gpu_required=True) >>> if action: >>> print(f"Model export added: {inference_opt}") >>> else: >>> print(f"Error: {inference_opt}") """ # Ensure export_formats is a list if not isinstance(export_formats, list): export_formats = [export_formats] M = Model(self.session, model_train_id) if M.created_at == "0001-01-01T00:00:00Z": print("No model exists with the given model train id") sys.exit(0) path = ( f"/v1/model/{model_train_id}/add_model_export?projectId={self.project_id}" ) headers = {"Content-Type": "application/json"} model_payload = { "_idProject": self.project_id, "_idModelTrain": model_train_id, "modelName": M.model_name, "modelInputs": M.model_inputs, "_idModelInfo": M.model_info_id, "modelOutputs": M.model_outputs, "exportFormats": export_formats, "_idDataset": M.dataset_id, "datasetVersion": M.dataset_version, "gpuRequired": is_gpu_required, "actionConfig": M.action_config, "modelConfig": model_config, } resp = self.rpc.post(path=path, headers=headers, payload=model_payload) resp, error, message = handle_response( resp, "Model Export added successfully", "An error occurred while adding model export", ) service_id, action_id = self._get_service_and_action_ids(resp, error, message) return ExportedModel(self.session, service_id), Action(self.session, action_id)
[docs] def create_deployment( self, deployment_name, model_id, gpu_required=True, auto_scale=False, auto_shutdown=True, shutdown_threshold=5, compute_alias="", model_type="trained", runtime_framework="Pytorch", ): """ Create a deployment for a model. Parameters ---------- deployment_name : str The name of the deployment. model_id : str The ID of the model to be deployed. gpu_required : bool, optional Flag to indicate if GPU is required for the deployment (default is True). auto_scale : bool, optional Flag to indicate if auto-scaling is enabled (default is False). auto_shutdown : bool, optional Flag to indicate if auto-shutdown is enabled (default is True). shutdown_threshold : int, optional The threshold for auto-shutdown (default is 5). compute_alias : str, optional The alias for the compute (default is an empty string). Returns ------- tuple A tuple containing: - A `Deployment` object for the created deployment. - An `Action` object related to the deployment process. Example ------- >>> deployment, action = project.create_deployment("Deployment1", "model123", auto_scale=True) >>> if action: >>> print(f"Deployment created: {deployment}") >>> else: >>> print(f"Error: {deployment}") """ if model_type == "trained": model = Model(self.session, model_id=model_id) elif model_type == "exported": model = ExportedModel(self.session, model_export_id=model_id) runtime_framework = model.export_format body = { "deploymentName": deployment_name, "_idModel": model_id, "runtimeFramework": runtime_framework, "deploymentType": "regular", "modelType": model_type, "modelInput": model.model_inputs[0], "modelOutput": model.model_outputs[0], "autoShutdown": auto_shutdown, "autoScale": auto_scale, "gpuRequired": gpu_required, "shutdownThreshold": shutdown_threshold, "computeAlias": compute_alias, } headers = {"Content-Type": "application/json"} path = f"/v1/deployment?projectId={self.project_id}" resp = self.rpc.post(path=path, headers=headers, payload=body) resp, error, message = handle_response( resp, "Deployment created successfully.", "An error occurred while trying to create deployment.", ) service_id, action_id = self._get_service_and_action_ids(resp, error, message) return Deployment(self.session, service_id), Action(self.session, action_id)
[docs] def delete(self): """ Delete a project by project ID. Returns ------- tuple A tuple containing: - A success message if the project is deleted successfully. - An error message if the deletion fails. Example ------- >>> success_message, error = project.delete() >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(success_message) """ _, error, _ = self._get_a_project_by_id() if error: print("Project is not found") sys.exit(1) path = f"/v1/project/delete_project/{self.project_id}" resp = self.rpc.delete(path=path) return handle_response( resp, "Project deleted successfully", "An error occurred while trying to delete project", )
## TODO: update to use enabled field
[docs] def change_status(self, enable=True): """ Enables or disable a project. It is set to enable by default. Parameters ---------- type : str The type of action to perform: "enable" or "disable". Returns ------- tuple A tuple containing: - A success message if the project is enabled or disabled successfully. - An error message if the action fails. Example ------- >>> success_message, error = project.change_status(enable=True) >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(success_message) """ if enable==True: type = "enable" else: type = "disable" _, error, _ = self._get_a_project_by_id() if error: print("Project is not found") sys.exit(1) path = f"/v1/project/enable-disable-project/{type}/{self.project_id}" resp = self.rpc.put(path=path) return handle_response( resp, f"Project {self.project_id} {type}d successfully", f"Could not {type} project {self.project_id}", )
[docs] def get_actions_logs(self, action_id): """ Fetch action logs for a specific action. Parameters ---------- action_id : str The ID of the action for which logs are to be fetched. Returns ------- tuple A tuple containing: - The action logs if the request is successful. - An error message if the request fails. Example ------- >>> logs, error = project.get_actions_logs_for_action("action123") >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(f"Action logs: {logs}") """ path = f"/v1/project/action_logs_from_record_id/{action_id}" resp = self.rpc.get(path=path) return handle_response( resp, "Action logs fected succesfully", "Could not fetch action logs", )
[docs] def list_collaborators(self): """ List all collaborators associated with the current project along with the permissions. This function retrieves a list of all collaborators for the specified project ID. Returns ------- tuple A tuple containing three elements: - API response (dict): The raw response from the API. - error_message (str or None): Error message if an error occurred, None otherwise. - status_message (str): A status message indicating success or failure. Example ------- >>> resp, err, msg =project.list_collaborators() >>> if err: >>> print(f"Error: {err}") >>> else: >>> print(f"Collaborators : {resp}") """ path = f"/v1/user/project/{self.project_id}/collaborators" resp = self.rpc.get(path=path) return handle_response(resp, "Collaborators fetched successfully", "Could not fetch collaborators")
[docs] def invite_user_to_project( self, email, permissions ): """ Invite a user to the current project with specific permissions. This function sends an invitation to a user, identified by their email address, to join the specified project. The user will be assigned the provided permissions for different project services. Args: email (str): The email address of the user to invite. permissions (dict): A dictionary specifying the permissions for various project services. Returns ------- tuple A tuple containing three elements: - API response (dict): The raw response from the API. - error_message (str or None): Error message if an error occurred, None otherwise. - status_message (str): A status message indicating success or failure. Example ------- >>> email = "ashray.gupta@matrice.ai" >>> permissions = { ... 'datasetsService': { ... 'read': True, ... 'write': True, ... 'admin': True ... }, ... 'annotationService': { ... 'read': True, ... 'write': False, ... 'admin': False ... }, ... 'modelsService': { ... 'read': True, ... 'write': False, ... 'admin': False ... }, ... 'inferenceService': { ... 'read': True, ... 'write': False, ... 'admin': False ... }, ... 'deploymentService': { ... 'read': True, ... 'write': True, ... 'admin': False ... }, ... 'byomService': { ... 'read': True, ... 'write': False, ... 'admin': False ... } ... } >>> resp, err, msg = project.invite_user_to_project(email, permissions) >>> if err: ... print(f"Error: {err}") ... else: ... print("User invited successfully") """ path = f"/v1/user/project/invite" headers = {"Content-Type": "application/json"} body = { "_idProject": self.project_id, "email": email, "projectName": self.project_name, "permissions": permissions, } resp = self.rpc.post(path=path, headers=headers, payload=body) return handle_response( resp, "User invited to the project successfully", "Could not invite user to the project", )
[docs] def update_permissions(self, collaborator_id, permissions): """ Update the permissions for a collaborator in the current project. This function updates the permissions for a specified collaborator in the current project. Args: collaborator_id (str): The ID of the collaborator whose permissions are to be updated. permissions (list): A list containing the updated permissions for various project services. Returns ------- tuple A tuple containing three elements: - API response (dict): The raw response from the API. - error_message (str or None): Error message if an error occurred, None otherwise. - status_message (str): A status message indicating success or failure. Example ------- >>> collaborator_id = "12345" >>> permissions = [ ... "v1.0", ... True, # isProjectAdmin ... {"read": True, "write": True, "admin": False}, # datasetsService ... {"read": True, "write": False, "admin": False}, # modelsService ... {"read": True, "write": False, "admin": False}, # annotationService ... {"read": True, "write": False, "admin": False}, # byomService ... {"read": True, "write": True, "admin": False}, # deploymentService ... {"read": True, "write": False, "admin": False}, # inferenceService ... ] >>> resp, err, msg = project.update_permissions(collaborator_id, permissions) >>> if err: ... print(f"Error: {err}") ... else: ... print("Permissions updated successfully") """ path = f"/v1/user/project/{self.project_id}/collaborators/{collaborator_id}?projectId={self.project_id}" headers = {"Content-Type": "application/json"} body = { "version": permissions[0], "isProjectAdmin": permissions[1], "datasetsService": permissions[2], "modelsService": permissions[3], "annotationService": permissions[4], "byomService": permissions[5], "deploymentService": permissions[6], "inferenceService": permissions[7], } resp = self.rpc.put(path=path, headers=headers, payload=body) return handle_response(resp, "Collaborator permissions updated successfully", "Could not update collaborator permissions")
[docs] def list_deployments(self , page_size = 10, page_number = 0): """ List all deployments inside the project. Returns ------- tuple A tuple containing: - A list of deployments if the request is successful. - An error message if the request fails. Example ------- >>> deployments, error = project.list_deployments() >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(f"Deployments: {deployments}") """ print('project_id',self.project_id) path = f"/v1/deployment/list_deployments/v2?projectId={self.project_id}&pageSize={page_size}&pageNumber={page_number}" resp = self.rpc.get(path=path) print('path',path) data, error, message = handle_response( resp, "Deployment list fetched successfully", "An error occurred while trying to fetch deployment list.", ) if error: return {}, error, message if data is None: return {} , "" , "No Deployments , create one." items = data.get("items", []) deployments = {item["deploymentName"]: Deployment(self.session, deployment_id=item["_id"]) for item in items} return deployments, None, message
[docs] def list_datasets(self, status="total", page_size = 10, page_number = 0): """ List all datasets in the project. Returns ------- tuple A tuple containing: - A list of datasets if the request is successful. - An error message if the request fails. Example ------- >>> datasets, error = project.list_datasets() >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(f"Datasets: {datasets}") """ path = f"/v1/dataset/v2?projectId={self.project_id}&pageSize={page_size}&pageNumber={page_number}" resp = self.rpc.get(path=path) data, error, message = handle_response( resp, "Dataset list fetched successfully", "Could not fetch dataset list" ) if error: return {}, error if data is None: return {} , "" , "No Datasets , create one." items = data.get("items", []) datasets = {item["name"]: Dataset(self.session, dataset_id=item["_id"]) for item in items} return datasets, None
[docs] def list_annotations(self, page_size = 10, page_number = 0): """ List all annotations in the project. Returns ------- tuple A tuple containing: - A list of annotations if the request is successful. - An error message if the request fails. Example ------- >>> annotations, error = project.list_annotations() >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(f"Annotations: {annotations}") """ path = f"/v1/annotations/v2?projectId={self.project_id}&pageSize={page_size}&pageNumber={page_number}" resp = self.rpc.get(path=path) data, error, message = handle_response( resp, "Annotations fetched successfully", "Could not fetch annotations" ) if error: return {}, error if data is None: return {} , "" , "No annotations , create one." items = data.get("items", []) annotations = {item["title"]: Annotation(self.session, annotation_id=item["_id"]) for item in items} return annotations, None
[docs] def list_experiments(self, page_size=10, page_number=0): """ List all experiments in the project. Returns ------- tuple A tuple containing: - A list of experiments if the request is successful. - An error message if the request fails. """ path = f"/v1/model/get_experiments?projectId={self.project_id}&pageSize={page_size}&pageNumber={page_number}" resp = self.rpc.get(path=path) data, error, message = handle_response( resp, "Experiments summary fetched successfully", "An error occurred while trying to fetch experiments summary.", ) if error: return {}, error if data is None: return {} , "No Experiments , create one." items = data.get("items", []) experiments_list = [] # Process each item individually for item in items: experiment_id = item["_id"] experiment_name = item["experimentName"] # Create an Experiment instance E = Experiment(session=self.session, experiment_id=experiment_id) # Append the tuple (experiment_name, E) to the list experiments_list.append((experiment_name, E)) return experiments_list, None
[docs] def list_model_train(self, page_size = 10, page_number = 0): """ List model training sessions in the project with pagination. Returns ------- tuple A tuple containing: - A paginated list of model training sessions if the request is successful. - An error message if the request fails. Example ------- >>> model_train_sessions, error = project.list_model_train() >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(f"Model training sessions: {model_train_sessions}") """ path = f"/v1/model/model_train?projectId={self.project_id}&pageSize={page_size}&pageNumber={page_number}" resp = self.rpc.get(path=path) data, error, message = handle_response( resp, "Model train list fetched successfully", "Could not fetch models train list", ) if error: return {}, error if data is None: return {} , "No models trained , create one." # Extract items directly from the nested data field items = data.get("items", []) models = {item["modelTrainName"]: Model(self.session, model_id=item["_id"]) for item in items} return models, None
[docs] def list_exported_models(self , page_size = 10, page_number = 0): """ List all exported models in the project. Returns ------- tuple A tuple containing: - A list of exported models if the request is successful. - An error message if the request fails. Example ------- >>> exported_models, error = project.list_exported_models() >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(f"Exported models: {exported_models}") """ path = f"/v1/model/get_model_exports/v2?projectId={self.project_id}&pageSize={page_size}&pageNumber={page_number}" resp = self.rpc.get(path=path) data, error, message = handle_response( resp, "Model train list fetched successfully", "Could not fetch models train list", ) if error: return {}, error if data is None: return {} , "No models exported , create one." items = data.get("items", []) exported_models = {item["modelExportName"]: ExportedModel(self.session, model_export_id=item["_id"]) for item in items} return exported_models, None
[docs] def list_drift_monitorings(self, page_size = 10, page_number = 0): """ Fetch a list of all drift monitorings. Returns ------- tuple A tuple containing three elements: - API response (dict): The raw response from the API. - error_message (str or None): Error message if an error occurred, None otherwise. - status_message (str): A status message indicating success or failure. Example ------- >>> resp, err, msg = projects.list_drift_monitorings() >>> if err: >>> print(f"Error: {err}") >>> else: >>> print(f"Drift Monitoring detail : {resp}") """ print(self.project_id) path = f"/v1/deployment/list_drift_monitorings?pageSize={page_size}&pageNumber={page_number}&projectId={self.project_id}" resp = self.rpc.get(path=path) data, error, message = handle_response( resp, "Model train list fetched successfully", "Could not fetch models train list", ) if error: return {}, error if data is None: return {} , "No drift monitorings done , create one." # Extract items directly from the nested data field items = data.get("items", []) deployments = {} for item in items: if "deploymentName" in item and "_id" in item: deployments[item["deploymentName"]] = Deployment(self.session, deployment_id=item["_idDeployment"]) else: raise ValueError(f"Missing required parameters for Deployment initialization in item: {item}") return deployments, None
[docs] def get_exported_models(self): """ Fetch all model exports for the project. Returns ------- tuple A tuple containing: - The model export data if the request is successful. - An error message if the request fails. Example ------- >>> model_exports, error = project.get_model_exports() >>> if error: >>> print(f"Error: {error}") >>> else: >>> print(f"Model exports: {model_exports}") """ path = f"/v1/model/get_model_exports?projectId={self.project_id}" resp = self.rpc.get(path=path) return handle_response( resp, "Model exports fetched successfully", "Could not fetch model exports" )
# Instances Getters
[docs] def get_dataset(self, dataset_id=None, dataset_name=""): """ Get a Dataset instance. Parameters ---------- dataset_id : str, optional The ID of the dataset. dataset_name : str, optional The name of the dataset. Returns ------- Dataset A Dataset instance with the specified ID and/or name. Example ------- >>> dataset = project.get_dataset(dataset_id="dataset123") >>> print(dataset) """ return Dataset(self.session, dataset_id, dataset_name)
[docs] def get_annotation(self, dataset_id=None, annotation_id=None, annotation_name=""): """ Get an Annotation instance. Parameters ---------- dataset_id : str, optional The ID of the dataset associated with the annotation. annotation_id : str, optional The ID of the annotation. annotation_name : str, optional The name of the annotation. Returns ------- Annotation An Annotation instance with the specified dataset ID, annotation ID, and/or name. Example ------- >>> annotation = project.get_annotation(annotation_id="annotation123") >>> print(annotation) """ return Annotation(self.session, dataset_id, annotation_id, annotation_name)
[docs] def get_experiment(self, experiment_id=None, experiment_name=""): """ Get an Experiment instance. Parameters ---------- experiment_id : str, optional The ID of the experiment. experiment_name : str, optional The name of the experiment. Returns ------- Experiment An Experiment instance with the specified ID and/or name. Example ------- >>> experiment = project.get_experiment(experiment_id="experiment123") >>> print(experiment) """ return Experiment(self.session, experiment_id, experiment_name)
[docs] def get_model(self, model_id=None, model_name=""): """ Get a Model instance. Parameters ---------- model_id : str, optional The ID of the model. model_name : str, optional The name of the model. Returns ------- Model A Model instance with the specified ID and/or name. Example ------- >>> model = project.get_model(model_id="model123") >>> print(model) """ return Model(self.session, model_id, model_name)
[docs] def get_exported_model(self, model_export_id=None, model_export_name=""): """ Get an InferenceOptimization instance. Parameters ---------- model_export_id : str, optional The ID of the model export. model_export_name : str, optional The name of the model export. Returns ------- InferenceOptimization An InferenceOptimization instance with the specified ID and/or name. Example ------- >>> inference_optimization = project.get_inference_optimization(model_export_id="export123") >>> print(inference_optimization) """ return ExportedModel(self.session, model_export_id, model_export_name)
[docs] def get_deployment(self, deployment_id=None, deployment_name=""): """ Get a Deployment instance. Parameters ---------- deployment_id : str, optional The ID of the deployment. deployment_name : str, optional The name of the deployment. Returns ------- Deployment A Deployment instance with the specified ID and/or name. Example ------- >>> deployment = project.get_deployment(deployment_id="deployment123") >>> print(deployment) """ return Deployment(self.session, deployment_id, deployment_name)
[docs] def get_dataset_status_summary(self): """ Get the dataset status summary for the project. Returns ------- OrderedDict An ordered dictionary with dataset status and their counts. Example ------- >>> dataset_status = project.get_dataset_status_summary() >>> print(dataset_status) """ path = f"/v1/dataset/get_dataset_status?projectId={self.project_id}" resp = self.rpc.get(path=path) data, error, message = handle_response( resp, "Successfully fetched dataset status summary", "An error occurred while fetching dataset status summary", ) if error: return OrderedDict(), error dataset_status_summary = OrderedDict(data.get("data", {})) return dataset_status_summary, None
[docs] def get_annotations_status_summary(self): """ Get the annotations status summary for the project. Returns ------- OrderedDict An ordered dictionary with annotations status and their counts. Example ------- >>> annotations_status = project.get_annotations_status_summary() >>> print(annotations_status) """ path = f"/v1/annotations/summary?projectId={self.project_id}" resp = self.rpc.get(path=path) data, error, message = handle_response( resp, "Successfully fetched annotations status summary", "An error occurred while fetching annotations status summary", ) if error: return OrderedDict(), error annotations_status_summary = OrderedDict(data.get("data", {})) return annotations_status_summary, None
[docs] def get_model_status_summary(self): """ Get the model status summary for the project. Returns ------- OrderedDict An ordered dictionary with model status and their counts. Example ------- >>> model_status = project.get_model_status_summary() >>> print(model_status) """ path = f"/v1/model/summary?projectId={self.project_id}" resp = self.rpc.get(path=path) data, error, message = handle_response( resp, "Successfully fetched model status summary", "An error occurred while fetching model status summary", ) if error: return OrderedDict(), error model_status_summary = OrderedDict(data.get("data", {}).get("modelCountByStatus", {})) model_status_summary["total"] = data.get("data", {}).get("total", 0) return model_status_summary, None
[docs] def get_model_export_status_summary(self): """ Get the model export status summary for the project. Returns ------- OrderedDict An ordered dictionary with model export status and their counts. Example ------- >>> model_export_status = project.get_model_export_status_summary() >>> print(model_export_status) """ path = f"/v1/model/summaryExported?projectId={self.project_id}" resp = self.rpc.get(path=path) data, error, message = handle_response( resp, "Successfully fetched model export status summary", "An error occurred while fetching model export status summary", ) if error: return OrderedDict(), error model_export_status_summary = OrderedDict(data.get("data", {}).get("modelCountByStatus", {})) model_export_status_summary["total"] = data.get("data", {}).get("total", 0) return model_export_status_summary, None
[docs] def get_deployment_status_summary(self): """ Get the deployment status summary for the project. Returns ------- OrderedDict An ordered dictionary with deployment status and their counts. Example ------- >>> deployment_status = project.get_deployment_status_summary() >>> print(deployment_status) """ path = f"/v1/deployment/summary?projectId={self.project_id}" resp = self.rpc.get(path=path) data, error, message = handle_response( resp, "Successfully fetched deployment status summary", "An error occurred while fetching deployment status summary", ) if error: return OrderedDict(), error deployment_status_summary = OrderedDict(data.get("data", {})) return deployment_status_summary, None
def _validate_labellers_and_reviewers(self, users_list): """ Validate labellers and reviewers by ensuring they are collaborators and extracting their user IDs. Parameters ---------- users_list : list A list of dictionaries, each containing 'email', 'name', and 'percentageWork'. Returns ------- list A list of dictionaries containing '_idUser', 'name', and 'percentageWork'. Raises ------ ValueError If a user is not added as a collaborator to the project. """ # List the collaborators for the project collaborators, _, _ = self.list_collaborators() # Create a mapping of collaborator emails and names to their user IDs collaborator_map = { (collaborator.get("email"), collaborator.get("userName")): collaborator.get("_idUser") for collaborator in collaborators } # Validate each user in the input list validated_users = [] for user in users_list: email = user.get("email") name = user.get("name") percentage_work = user.get("percentageWork") # Check if the user is a collaborator user_id = collaborator_map.get((email, name)) if not user_id: raise ValueError(f"The user with name '{name}' is not added as a collaborator to the project.") # Append the validated user information validated_users.append({ "_idUser": user_id, "name": name, "percentageWork": int(percentage_work) }) return validated_users