Source code for matrice.compute

import sys
from matrice.utils import handle_response
from datetime import datetime, timedelta


[docs] class ComputeInstance: """ Represents a single compute instance and allows performing operations on the instance such as update, delete, and refresh. Attributes: Alias : str Status : str Price_Hour : float Machine_EFF : float Service_Provider : str Launched_At : str Launch_Duration : int Shutdown_Threshold : int GPU_Type : str GPU_Memory : int CPU : str Cores : int Memory_GB : int Storage_GB : int Storage_Type : str """
[docs] def __init__(self, session, alias): """ Initialize the ComputeInstance object by fetching the compute instance details from the server based on the provided alias. Parameters ---------- session : object The session object containing account and RPC information. alias : str The alias of the compute instance to fetch details for. """ # Save session details self.session = session self.rpc = session.rpc self.account_number = session.account_number # Store alias and initialize the object by fetching details self.alias = alias self.last_refresh_time = datetime.now() # Fetch the details from the server using alias self._fetch_instance_details()
def _fetch_instance_details(self): """ Internal method to fetch and populate instance details from the server using the alias. Raises ------ ValueError If the instance details cannot be fetched. """ path = f"/v1/scaling/get_instance_by_alias/{self.account_number}/{self.alias}"#TODO: Need API here resp = self.rpc.get(path=path) # Handle the response and populate attributes data, error, message = handle_response( resp, f"Successfully fetched instance details for alias: {self.alias}", f"Could not fetch instance details for alias: {self.alias}" ) if error: raise ValueError(f"Error fetching instance details: {message}") data=data[0] # Populate instance attributes using the fetched data self.status = data.get("status") self.price_hour = data.get("price") self.machine_eff = data.get("machineEfficiency") self.service_provider = data.get("serviceProvider") self.launched_at = data.get("launchedAt") self.launch_duration = data.get("launchDuration") self.shutdown_threshold = data.get("shutdownThreshold") self.gpu_type = data.get("gpuType") self.gpu_memory = data.get("gpuMemory") self.cpu = data.get("cpuType") self.cpu_cores = data.get("cpuCores") self.memory_mb = data.get("memory") self.storage_gb = data.get("storageSize") self.storage_type = data.get("storage") self.lease_type = data.get("leaseType") @staticmethod def _from_json(data,session): """ Convert JSON data to an instance of ComputeInstance. Parameters ---------- data : dict JSON data representing a compute instance. session : object The session object containing account and RPC information. Returns ------- ComputeInstance A ComputeInstance object initialized with the data. """ return ComputeInstance( alias=data.get("alias"), session=session ) def refresh(self): """ Refresh the instance by reinstantiating it with the previous values. """ # Check if two minutes have passed since the last refresh if datetime.now() - self.last_refresh_time < timedelta(minutes=2): raise Exception("Refresh can only be called after two minutes since the last refresh.") # Capture the current state state = self.__dict__.copy() # Reinitialize the instance init_params = { 'session': self.session, 'alias': self.alias, } self.__init__(**init_params) # Update the last refresh time self.last_refresh_time = datetime.now()
[docs] def stop(self): """ Stop an on-demand compute instance. Returns ------- dict or None Server response indicating the result of the stop request, or None if an error occurred. """ path = f"/v1/scaling/stop_account_compute/{self.account_number}/{self.alias}" resp = self.rpc.put(path=path) # Handle the response data, error, message = handle_response( resp, f"Successfully stopped on-demand instance: {self.alias}", f"An error occurred while stopping the instance: {self.alias}" ) return data if not error else None
[docs] def delete(self): """ Update the compute instance attributes if it is not a dedicated instance. Returns ------- dict or None Server response indicating the result of the update request, or None if update is not allowed or fails. """ if self.lease_type == "dedicated": print("delete for dedicated instance is not an allowed operation") return path = f"/v1/scaling/delete_account_compute/{self.account_number}/{self.alias}" resp = self.rpc.delete(path=path) return handle_response( resp, f"Successfully deleted the given compute", "Error deleting the given compute", )
[docs] def update(self): """ Static method to update the compute instance attributes. Returns ------- dict or None Server response indicating the result of the update request, or None if update is not allowed or fails. """ if self.lease_type == "dedicated": print("Update for dedicated instance is not an allowed operation") return path = f"/v1/scaling/update_account_compute" headers = {"Content-Type": "application/json"} body = { "accountNumber": self.account_number, "computeAlias": self.alias, "launchDuration": self.launch_duration, "shutDownTime": self.shutdown_threshold, } resp = self.rpc.put(path=path, headers=headers, payload=body) return handle_response( resp, f"Successfully updated the given compute", "Error updating the given compute", )
from datetime import datetime, timedelta
[docs] @staticmethod def refresh(obj): """ Static method to refresh the compute instance. """ # Check if two minutes have passed since the last refresh if datetime.now() - obj.last_refresh_time < timedelta(minutes=2): raise Exception("Refresh can only be called after two minutes since the last refresh.") # Capture the current state state = obj.__dict__.copy() # Reinitialize the instance obj.__init__(**state) # Update the last refresh time obj.last_refresh_time = datetime.now()
[docs] class ComputeType: """ Initialize a ComputeType instance with the provided attributes. Parameters ---------- session : object The session object containing account and RPC information. instance_type : str The type of compute instance. price_hour : float Hourly price of the instance. service_provider : str Service provider offering the instance. machine_eff : float Efficiency rating of the machine. compute_eff : float Efficiency rating of the compute. gpu_type : str Type of GPU in the instance. gpu_memory : int GPU memory in GB. cpu : str CPU type in the instance. cores : int Number of CPU cores. memory_mb : int Memory size in MB. storage_gb : int Storage size in GB. storage_type : str Type of storage in the instance. """
[docs] def __init__(self, session, instance_type, price_hour, service_provider, machine_eff, compute_eff, gpu_type, gpu_memory, cpu, cores, memory_mb, storage_gb, storage_type): self.instance_type = instance_type self.price_hour = price_hour self.service_provider = service_provider self.machine_eff = machine_eff self.compute_eff = compute_eff self.gpu_type = gpu_type self.gpu_memory = gpu_memory self.cpu = cpu self.cores = cores self.memory_mb = memory_mb self.storage_gb = storage_gb self.storage_type = storage_type self.rpc=session.rpc self.account_number=session.account_number
@staticmethod def _from_json(data, session): """ Convert JSON data to an instance of ComputeType. Parameters ---------- data : dict JSON data representing a compute type. session : object The session object containing account and RPC information. Returns ------- ComputeType A ComputeType object initialized with the data. """ return ComputeType( session=session, instance_type=data.get("instanceType"), price_hour=data.get("pricePerHour"), service_provider=data.get("serviceProvider"), machine_eff=data.get("machineEfficiency"), compute_eff=data.get("computeEfficiency"), gpu_type=data.get("gpu"), gpu_memory=data.get("gpuMemory"), cpu=data.get("cpu"), cores=data.get("cores"), memory_mb=data.get("memory"), storage_gb=data.get("storageSize"), storage_type=data.get("storage"), )
[docs] def list_instance_types(session, providers=None, gpu_types=None, price_range=None, page_size=10, page_num=0): """ List all available compute types on the platform with optional filters. Parameters ---------- session : object The session object containing account and RPC information. providers : list, optional List of service providers to filter instances. gpu_types : list, optional List of GPU types to filter instances. price_range : tuple, optional A tuple containing min and max price to filter instances by price range. page_size : int, optional The number of instances to return per page. page_num : int, optional The page number for pagination. Returns ------- dict or None Dictionary of `ComputeType` objects indexed by `instanceType`, or None if no data is available. """ path = "/v1/scaling/get_all_instances_type" # Prepare query parameters params = { "page_size": page_size, "page_num": page_num } if providers: params["providers"] = ",".join(providers) if gpu_types: params["gpu_types"] = ",".join(gpu_types) if price_range: min_price, max_price = price_range params["min_price"] = min_price params["max_price"] = max_price # Make the RPC call with query parameters resp = session.rpc.get(path=path, params=params) # Handle the response and convert to ComputeType objects data = handle_response( resp, "Instance list fetched successfully", "Could not fetch instance list" ) if data: return {instance['instanceType']:ComputeType._from_json(instance, session) for instance in data[0]} return None
[docs] def list_account_compute(session, status="all"): """ List all compute instances associated with an account, with an optional status filter. Parameters ---------- session : object The session object containing account and RPC information. status : str, optional Status filter for instances (e.g., 'all', 'active', 'terminated'). Returns ------- dict or None Dictionary of `ComputeInstance` objects indexed by `alias`, or None if no data is available. """ path = f"/v1/scaling/get_all_account_compute/{session.account_number}" # Make the RPC call with the status filter resp = session.rpc.get(path=path) # Handle the response and convert to ComputeInstance objects data = handle_response( resp, f"Instance list fetched successfully for account: {session.account_number}", f"Could not fetch instance list for account: {session.account_number}", ) if data: return {instance['alias']: ComputeInstance._from_json(instance, session) for instance in data[0]} return None
from collections import OrderedDict
[docs] def get_compute_status_summary(session, lease_type="on-demand"): """ Get a summary of compute statuses for the current account based on the lease type. Parameters ---------- session : object The session object containing account and RPC information. lease_type : str, optional The lease type of computes (e.g., 'dedicated', 'shared'). Default is 'on-demand'. Returns ------- OrderedDict An ordered dictionary with compute statuses and their counts. """ # API endpoint for compute status summary path = f"/v1/scaling/get_all_account_compute/{session.account_number}/{lease_type}" # Make the RPC call resp = session.rpc.get(path=path) # Handle the response and extract the relevant data data, error, message = handle_response( resp, "Successfully fetched compute status summary", "An error occurred while fetching compute status summary" ) if error: return OrderedDict(), error # Extract status summary from the response compute_status_summary = OrderedDict(data.get("data", {}).get("computeCountByStatus", {})) # Add the total number of computes to the summary compute_status_summary["total"] = data.get("data", {}).get("total", 0) return compute_status_summary, None
[docs] def add_on_demand_instance(session,alias, compute_type,service_provider, launch_duration_hours, shutdown_thres_minutes): """ Add an on-demand instance. Parameters ---------- session : object The session object containing account and RPC information. alias : str Alias for the new compute instance. compute_type : str Type of compute instance to launch. service_provider : str Service provider offering the instance. launch_duration_hours : int Duration in hours for the compute instance. shutdown_thres_minutes : int Shutdown threshold in minutes for automatic shutdown. Returns ------- dict or None Server response indicating the result of the add request, or None if an error occurred. """ path = "/v1/scaling/add_account_compute" payload = { "accountNumber": session.account_number, "alias": alias, "launchDuration": launch_duration_hours, "shutDownTime": shutdown_thres_minutes, "serviceProvider":service_provider, "instanceType":compute_type, "shutDownTime":shutdown_thres_minutes, "leaseType":"on-demand", "launchDuration":launch_duration_hours } resp = session.rpc.post(path=path, json=payload) # Handle the response data, error, message = handle_response( resp, "Successfully added on-demand instance", "An error occurred while adding on-demand instance" ) return data if not error else None