import json
import math
import os
import shutil
import tarfile
import zipfile
from io import BytesIO
from typing import List
import requests
import yaml
from PIL import Image, ImageDraw
from pycocotools.coco import COCO
from pydantic import BaseModel
from matrice.session import Session
[docs]
class SplitMetricStruct(BaseModel):
"""This is a private class used internally to store split metrics.
Attributes
----------
splitType : str
Type of the dataset split (e.g., 'train', 'val', 'test').
metricName : str
Name of the evaluation metric (e.g., 'accuracy', 'precision').
metricValue : float
Value of the metric for the given split.
"""
"""This is a private class used internally."""
splitType: str
metricName: str
metricValue: float
[docs]
class dotdict(dict):
"""A dictionary subclass that provides dot notation access to attributes.
Attributes
----------
__getattr__ : function
Allows accessing dictionary keys as object attributes.
__setattr__ : function
Allows setting dictionary keys as object attributes.
__delattr__ : function
Allows deleting dictionary keys as object attributes.
"""
"""This is a private class used internally."""
"""dot.notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
[docs]
class TestingActionTracker:
"""Handles logging, dataset preparation, and configuration management for model testing actions.
Parameters
----------
model_family_info_path : str
Path to the model family information file.
model_info_path : str
Path to the model information file.
config_path : str
Path to the action configuration file.
"""
"""This is a private class used internally."""
[docs]
def __init__(self, model_family_info_path, model_info_path, config_path):
"""Initializes the TestingActionTracker class, loading model family info, model info, and configurations.
Parameters
----------
model_family_info_path : str
Path to the model family information JSON file.
model_info_path : str
Path to the model information JSON file.
config_path : str
Path to the action configuration file.
"""
self.logs = []
self.testing_logs_folder_path = "./testing_logs"
os.makedirs(self.testing_logs_folder_path, exist_ok=True)
self.model_family_info_path = model_family_info_path
self.model_info_path = model_info_path
self.config_path = config_path
session = Session()
self.rpc = session.rpc
self.load_model_family_info()
self.load_model_info()
self.load_action_config()
self.action_doc = self.mock_action_doc()
self.action_details = self.action_doc["actionDetails"]
self.checkpoint_path, self.pretrained = self.get_checkpoint_path()
self.prepare_dataset() # Download the dataset and prepare it for the action type in the specific format
[docs]
def get_main_action_logs_path(self):
"""Determines the appropriate log file path based on the action type (train, export, eval).
Returns
-------
str
Path to the main log file for the current action.
"""
if "train" in self.config_path:
return os.path.join(self.testing_logs_folder_path, "train.json")
elif "export" in self.config_path:
return os.path.join(
self.testing_logs_folder_path,
os.path.basename(self.config_path).replace("-config", ""),
)
elif "eval" in self.config_path:
return os.path.join(self.testing_logs_folder_path, "eval.json")
[docs]
def log_to_json(self, file_path, payload):
"""Logs data to a JSON file, appending the payload if the file exists.
Parameters
----------
file_path : str
Path to the JSON log file.
payload : dict
The data to log in the JSON file.
"""
try:
with open(file_path, "r") as file:
data = json.load(file)
except FileNotFoundError:
data = []
except json.JSONDecodeError:
data = []
data.append(payload)
with open(file_path, "w") as file:
json.dump(data, file, indent=4)
[docs]
def add_logs(self, step, status, description):
"""Adds a log entry for a specific step, including status and description.
Parameters
----------
step : str
The step or action being logged (e.g., 'load_model').
status : str
The status of the step (e.g., 'SUCCESS', 'ERROR').
description : str
A description or error message related to the step.
"""
self.logs.append({"step": step, "status": status, "description": description})
self.log_to_json(
self.get_main_action_logs_path(),
{"step": step, "status": status, "description": description},
)
[docs]
def log_decorator(func):
def wrapper(self, *args, **kwargs):
try:
result = func(self, *args, **kwargs)
self.add_logs(func.__name__, "SUCCESS", "SUCCESS")
return result
except Exception as e:
print(f"ERROR occurred in: {func.__name__} : {str(e)}")
self.add_logs(func.__name__, "ERROR", str(e))
raise e
return wrapper
[docs]
@log_decorator
def load_model_family_info(self):
"""Loads model family information from the specified file.
Returns
-------
dict
The loaded model family information.
"""
with open(self.model_family_info_path) as f:
self.model_family_info = json.load(f)
self.input_type = self.model_family_info["modelInputs"].lower()
self.output_type = self.model_family_info["modelOutputs"].lower()
self.models_family_name = self.model_family_info["modelFamily"]
[docs]
@log_decorator
def load_model_info(self):
"""Loads model information from the specified file.
Returns
-------
dict
The loaded model information.
"""
with open(self.model_info_path) as f:
self.model_info = json.load(f)
self.model_key = self.model_info["modelKey"]
self.model_name = self.model_info["modelName"]
[docs]
@log_decorator
def mock_action_doc(self):
"""Creates a mock action document with dataset and model details.
Returns
-------
dict
A mock document containing action and model information.
"""
api_url = f"/v1/system/get_dataset_url?inputType={self.input_type}&outputType={self.output_type}"
response = self.rpc.get(
path=api_url,
params={"inputType": self.input_type, "outputType": self.output_type},
)
if response and "data" in response:
mock_dataset = response["data"]
else:
raise ValueError("Invalid response from the API call")
action_details = {
"_idModel": "mocked_model_id",
"runtimeFramework": "Pytorch",
"datasetVersion": "v1.0",
"dataset_url": mock_dataset,
"project_type": self.output_type,
"input_type": self.input_type,
"output_type": self.output_type,
}
# Store _idModel as an instance variable
self._idModel = action_details["_idModel"]
return {
"actionDetails": action_details,
"action": self.action_type,
"serviceName": "mocked_service_name",
"_idProject": "mocked_project_id",
}
[docs]
@log_decorator
def get_checkpoint_path(self):
"""Finds and returns the path to the latest model checkpoint.
Returns
-------
tuple
Path to the checkpoint file and a boolean indicating whether it exists.
"""
checkpoint_dir = "./checkpoints"
# Ensure the checkpoints directory exists
if not os.path.exists(checkpoint_dir):
os.makedirs(checkpoint_dir)
print(f"Created checkpoint directory: {checkpoint_dir}")
return None, False # No checkpoints available
# List all files in the checkpoints directory
checkpoint_files = [f for f in os.listdir(checkpoint_dir) if f.endswith(".pt")]
if not checkpoint_files:
print("No checkpoint files found in the checkpoints directory.")
return None, False
# If there are multiple checkpoints, you might want to choose the most recent one
# For simplicity, we're just choosing the first one here
checkpoint_path = os.path.join(checkpoint_dir, checkpoint_files[0])
print(f"Found checkpoint: {checkpoint_path}")
return checkpoint_path, True
[docs]
@log_decorator
def load_action_config(self):
"""Loads action configuration based on the config path (train, export, eval).
Raises
------
Exception
If the config path is not valid or cannot be loaded.
"""
self.model_config = {}
if "train" in self.config_path and self.config_path.endswith("-config.json"):
self.action_type = "model_train"
with open(self.config_path, "r") as config_file:
self.config_file = json.load(config_file)
print(
f"Loaded train config for model {self.model_name}: {self.config_file}"
)
for config in self.config_file.get("actionConfig", []):
key_name = config.get("keyName")
default_value = config.get("defaultValue")
if key_name and default_value is not None:
self.model_config[key_name] = self.cast_value(
config.get("valueType"), default_value
)
print(f"Model config: {self.model_config}")
elif "export" in self.config_path and self.config_path.endswith("-config.json"):
self.action_type = "model_export"
with open(self.config_path, "r") as config_file:
self.config_file = json.load(config_file)
self.action_details["exportFormats"] = [self.config_file["exportFormat"]]
for config in self.config_file.get("actionConfig", []):
key_name = config.get("keyName")
default_value = config.get("defaultValue")
if key_name and default_value is not None:
self.model_config[key_name] = self.cast_value(
config.get("valueType"), default_value
)
print(f"Model config: {self.model_config}")
print(
f"Loaded export config for format {self.action_details['exportFormats']}"
)
elif "eval" in self.config_path:
self.action_type = "model_eval"
self.model_config["split_types"] = ["vel", "test"]
print(f"Model config: {self.model_config}")
else:
raise Exception(
"Couldn't load action config, Make sure config path is one of [train-config.json, export-export_format-config, eval]"
)
[docs]
def cast_value(self, value_type, value):
"""Casts a value to its specified type (int, float, string, bool).
Parameters
----------
value_type : str
The type to cast the value to (e.g., 'int32', 'float32').
value : any
The value to be cast.
Returns
-------
any
The casted value.
"""
if value_type == "int32":
return int(value)
elif value_type == "float32":
return float(value)
elif value_type == "string":
return str(value)
elif value_type == "bool":
return bool(value)
else:
return value
[docs]
def update_status(self, stepCode, status, status_description):
"""Mocks the status update for a given step, adding it to logs.
Parameters
----------
stepCode : str
The code for the current step.
status : str
The current status (e.g., 'SUCCESS', 'ERROR').
status_description : str
Description or details about the step status.
"""
print(f"Mock update status: {stepCode}, {status}, {status_description}")
self.add_logs(stepCode, status, status_description)
[docs]
@log_decorator
def upload_checkpoint(self, checkpoint_path, model_type="trained"):
"""Uploads a checkpoint to a remote location (mocked behavior).
Parameters
----------
checkpoint_path : str
Path to the checkpoint file to be uploaded.
model_type : str, optional
Type of model (default is 'trained').
"""
print(f"Mock upload checkpoint: {checkpoint_path}, {model_type}")
file_path, ext = os.path.splitext(checkpoint_path)
if model_type == "trained":
new_name = os.path.join(
self.testing_logs_folder_path, "model_" + model_type + ext
)
elif model_type == "exported":
new_name = os.path.join(
self.testing_logs_folder_path,
"model_" + self.action_details["exportFormats"][0] + model_type + ext,
)
shutil.move(checkpoint_path, new_name)
return True
[docs]
@log_decorator
def download_model(self, model_path, model_type="trained", runtime_framework=""):
"""Downloads a model from a remote location (mocked behavior).
Parameters
----------
model_path : str
Path to download the model to.
model_type : str, optional
Type of model (default is 'trained').
runtime_framework : str, optional
Framework used for the model (default is '').
"""
print(f"Mock download model to: {model_path}, {model_type}")
file_path, ext = os.path.splitext(model_path)
if model_type == "trained":
local_model_file = [
path
for path in os.listdir(self.testing_logs_folder_path)
if path.endswith(f"{model_type}{ext}")
][0]
elif model_type == "exported":
local_model_file = [
path
for path in os.listdir(self.testing_logs_folder_path)
if path.endswith(f"{model_type}{ext}")
][0]
local_model_file = self.testing_logs_folder_path + "/" + local_model_file
print(f"Local model file: {local_model_file}")
# TODO: adding the exportFormat into considration for loading the saved model checkpoint path, by including runtime_framework
with open(local_model_file, "rb") as src, open(model_path, "wb") as dest:
dest.write(src.read())
return True
[docs]
@log_decorator
def get_job_params(self):
"""Generates and returns job parameters for model testing.
Returns
-------
dict
A dictionary containing dataset and model configuration parameters.
"""
dataset_path = "dataset"
model_config = dotdict(
{
"dataset_path": dataset_path,
"data": f"workspace/{dataset_path}/images",
"arch": self.model_key,
"pretrained": self.pretrained,
"dataset_path": dataset_path,
"model_key": self.model_key,
"model_name": self.model_name,
"checkpoint_path": self.checkpoint_path,
}
)
# Create a new dictionary with combined content, adding only non-existing keys from dict2
self.model_config = dotdict(
{
**model_config,
**{k: v for k, v in self.model_config.items() if k not in model_config},
}
)
return self.model_config
[docs]
@log_decorator
def add_index_to_category(self, indexToCat):
"""Adds an index-to-category mapping to the log files.
Parameters
----------
indexToCat : dict
Dictionary mapping category indexes to class names.
Returns
-------
dict
The index-to-category mapping.
"""
print(f"Mock add index to category: {indexToCat}")
file_path = os.path.join(
self.testing_logs_folder_path, "index_to_category.json"
)
with open(file_path, "w") as file:
json.dump(indexToCat, file, indent=4)
return indexToCat
[docs]
@log_decorator
def get_index_to_category(self, is_exported=False):
"""Retrieves the index-to-category mapping from the log files.
Parameters
----------
is_exported : bool, optional
Indicates whether the model is exported (default is False).
Returns
-------
dict
The index-to-category mapping.
"""
file_path = os.path.join(
self.testing_logs_folder_path, "index_to_category.json"
)
with open(file_path, "r") as file:
return json.load(file)
[docs]
@log_decorator
def log_epoch_results(self, epoch, epoch_result_list: List[SplitMetricStruct]):
"""Logs the results of an epoch during model training.
Parameters
----------
epoch : int
The current epoch number.
epoch_result_list : List[SplitMetricStruct]
List of metrics for the current epoch.
"""
epoch_result_list = self.validate_metrics_structure(epoch_result_list)
epoch_result_list = self.round_metrics(epoch_result_list)
model_log_payload = {
"epoch": epoch,
"epochDetails": epoch_result_list,
}
file_path = os.path.join(self.testing_logs_folder_path, "epochs_results.json")
self.log_to_json(file_path, model_log_payload)
[docs]
@log_decorator
def save_evaluation_results(self, list_of_result_dicts: List[SplitMetricStruct]):
"""Saves evaluation results to the log files.
Parameters
----------
list_of_result_dicts : List[SplitMetricStruct]
List of evaluation metrics and results.
"""
list_of_result_dicts = self.validate_metrics_structure(list_of_result_dicts)
print(f"Mock save evaluation results: {list_of_result_dicts}")
file_path = os.path.join(
self.testing_logs_folder_path, "evaluation_results.json"
)
with open(file_path, "w") as file:
json.dump(list_of_result_dicts, file, indent=4)
[docs]
def validate_metrics_structure(self, metrics_list: List[SplitMetricStruct]):
"""Validates the structure of a list of metrics.
Parameters
----------
metrics_list : List[SplitMetricStruct]
List of metrics to be validated.
Returns
-------
List[SplitMetricStruct]
The validated metrics.
"""
return [SplitMetricStruct.model_validate(x).model_dump() for x in metrics_list]
[docs]
def round_metrics(self, epoch_result_list):
"""Rounds the metric values to four decimal places, replacing NaN or inf with 0.
Parameters
----------
epoch_result_list : List[dict]
List of metrics with values to be rounded.
Returns
-------
List[dict]
List of metrics with rounded values.
"""
for metric in epoch_result_list:
if (
metric["metricValue"] == None
or math.isinf(metric["metricValue"])
or math.isnan(metric["metricValue"])
):
metric["metricValue"] = 0
metric["metricValue"] = round(metric["metricValue"], 4)
if metric["metricValue"] == 0:
metric["metricValue"] = 0.0001
return epoch_result_list
[docs]
@log_decorator
def prepare_dataset(self):
"""Prepares the dataset for training or evaluation by downloading and formatting it.
"""
dataset_images_dir = "workspace/dataset"
if os.path.exists(dataset_images_dir):
print(
f"Dataset directory {dataset_images_dir} already exists. Skipping download and preparation."
)
else:
dataset_url = self.action_details.get("dataset_url")
project_type = self.action_details.get("project_type")
input_type = self.action_details.get("input_type")
output_type = self.action_details.get("output_type")
print(
f"Preparing dataset from {dataset_url} for project type {project_type} with input type {input_type} and output type {output_type}"
)
dataset_dir = "workspace/dataset"
os.makedirs(dataset_dir, exist_ok=True)
self.download_and_extract_dataset(dataset_url, dataset_dir)
# Prepare the dataset according to the project type
if project_type == "classification":
self.prepare_classification_dataset(dataset_dir)
elif project_type == "detection":
if "yolo" in self.model_name.lower():
self.prepare_yolo_dataset(dataset_dir)
else:
self.prepare_detection_dataset(dataset_dir)
else:
print(f"Unsupported project type: {project_type}")
[docs]
def download_and_extract_dataset(self, dataset_url, dataset_dir):
"""Downloads and extracts a dataset from a given URL.
Parameters
----------
dataset_url : str
The URL from which to download the dataset.
dataset_dir : str
The directory where the dataset should be extracted.
"""
# Extract the file name from the URL
file_name = os.path.basename(dataset_url)
local_file_path = os.path.join(dataset_dir, file_name)
try:
# Download the file
with requests.get(dataset_url, stream=True) as r:
r.raise_for_status()
print(f"Response status code: {r.status_code}")
print(f"Response headers: {r.headers}")
content_type = r.headers.get("Content-Type", "Unknown")
print(f"Content-Type: {content_type}")
# Save the file
with open(local_file_path, "wb") as f:
shutil.copyfileobj(r.raw, f)
print(f"File downloaded successfully from {dataset_url}")
print(f"Saved as: {local_file_path}")
# Extract the file based on its extension
if file_name.endswith(".zip"):
with zipfile.ZipFile(local_file_path, "r") as zip_ref:
zip_ref.extractall(dataset_dir)
print("Zip file extracted successfully")
elif file_name.endswith(".tar.gz") or file_name.endswith(".tgz"):
with tarfile.open(local_file_path, "r:gz") as tar:
tar.extractall(path=dataset_dir)
print("Tar.gz file extracted successfully")
else:
print(f"Unsupported file format: {file_name}")
return
# Remove the compressed file after extraction
os.remove(local_file_path)
print(f"Removed the compressed file: {local_file_path}")
except requests.exceptions.RequestException as e:
print(f"Error downloading dataset from {dataset_url}: {e}")
except (zipfile.BadZipFile, tarfile.TarError) as e:
print(f"Error extracting dataset from {local_file_path}: {e}")
[docs]
def get_file_extension(self, content_type):
"""Returns the appropriate file extension based on content type.
Parameters
----------
content_type : str
The content type of the file.
Returns
-------
str
The file extension (e.g., '.zip', '.tar').
"""
content_type = content_type.lower()
if "zip" in content_type:
return ".zip"
elif "gzip" in content_type or "x-gzip" in content_type:
return ".gz"
elif "tar" in content_type:
return ".tar"
elif "octet-stream" in content_type:
return "" # Binary file, no specific extension
else:
return "" # Unknown type, no extension
[docs]
def prepare_classification_dataset(self, dataset_dir):
"""Prepares a dataset for classification tasks.
Parameters
----------
dataset_dir : str
The directory where the dataset is located.
"""
print("Preparing classification dataset...")
# Locate the vehicle-c10-20 directory
sub_dirs = [
os.path.join(dataset_dir, d)
for d in os.listdir(dataset_dir)
if os.path.isdir(os.path.join(dataset_dir, d))
]
if len(sub_dirs) != 1:
raise ValueError("Expected a single subdirectory in the dataset directory")
vehicle_dir = sub_dirs[0]
print(f"Main Sub directory: {vehicle_dir}")
images_dir = os.path.join(dataset_dir, "images")
os.makedirs(images_dir, exist_ok=True)
print(f"Images directory: {images_dir}")
class_names = set()
split_info = {} # To keep track of which images belong to which split
# Iterate through train, val, and test splits
for split in ["train", "val", "test"]:
split_dir = os.path.join(vehicle_dir, split)
dst_split_dir = os.path.join(images_dir, split)
os.makedirs(dst_split_dir, exist_ok=True)
split_info[split] = {}
for class_name in os.listdir(split_dir):
class_dir = os.path.join(split_dir, class_name)
if os.path.isdir(class_dir):
class_names.add(class_name)
dst_class_dir = os.path.join(dst_split_dir, class_name)
os.makedirs(dst_class_dir, exist_ok=True)
# Copy images and keep track of which split they belong to
for img in os.listdir(class_dir):
src_path = os.path.join(class_dir, img)
dst_path = os.path.join(dst_class_dir, img)
shutil.copy2(src_path, dst_path)
if class_name not in split_info[split]:
split_info[split][class_name] = []
split_info[split][class_name].append(dst_path)
# Retrieve class names and count
self.num_classes = len(class_names)
self.class_names = list(class_names)
print(f"Number of classes: {self.num_classes}")
print(f"Class names: {self.class_names}")
# Optionally, you can save the split information for later use
# For example, you could save it as a JSON file
with open(os.path.join(dataset_dir, "split_info.json"), "w") as f:
json.dump(split_info, f, indent=4)
[docs]
def prepare_detection_dataset(self, dataset_dir):
"""Prepares a dataset for object detection tasks.
Parameters
----------
dataset_dir : str
The directory where the dataset is located.
"""
print("Preparing detection dataset...")
# Find the downloaded folder
contents = os.listdir(dataset_dir)
downloaded_dirs = [
d
for d in contents
if os.path.isdir(os.path.join(dataset_dir, d))
and d not in ("images", "annotations")
]
if not downloaded_dirs:
print("No suitable subdirectory found in the dataset directory.")
return
if len(downloaded_dirs) > 1:
print(
f"Multiple subdirectories found: {downloaded_dirs}. Using the first one."
)
downloaded_dir = os.path.join(dataset_dir, downloaded_dirs[0])
print(f"Found downloaded directory: {downloaded_dir}")
# Source paths
src_images_dir = os.path.join(downloaded_dir, "images")
src_annotations_dir = os.path.join(downloaded_dir, "annotations")
# Destination paths
dst_images_dir = os.path.join(dataset_dir, "images")
dst_annotations_dir = os.path.join(dataset_dir, "annotations")
# Move images folder
if os.path.exists(src_images_dir):
if os.path.exists(dst_images_dir):
shutil.rmtree(dst_images_dir)
shutil.move(src_images_dir, dst_images_dir)
print(f"Moved images folder to {dst_images_dir}")
else:
print("Images folder not found in the downloaded directory")
# Move annotations folder
if os.path.exists(src_annotations_dir):
if os.path.exists(dst_annotations_dir):
shutil.rmtree(dst_annotations_dir)
shutil.move(src_annotations_dir, dst_annotations_dir)
print(f"Moved annotations folder to {dst_annotations_dir}")
else:
print("Annotations folder not found in the downloaded directory")
# Remove the downloaded folder if it's empty
if os.path.exists(downloaded_dir) and not os.listdir(downloaded_dir):
os.rmdir(downloaded_dir)
print(f"Removed empty downloaded folder: {downloaded_dir}")
print("Dataset preparation completed.")
[docs]
def convert_bbox_to_yolo(self, size, box):
"""Converts bounding box coordinates to YOLO format.
Parameters
----------
size : tuple
The width and height of the image.
box : list
Bounding box coordinates in the format [x, y, width, height].
Returns
-------
tuple
Converted bounding box in YOLO format.
"""
dw = 1.0 / size[0]
dh = 1.0 / size[1]
x = (box[0] + box[2] / 2.0) * dw
y = (box[1] + box[3] / 2.0) * dh
w = box[2] * dw
h = box[3] * dh
return (x, y, w, h)
[docs]
def create_data_yaml(self, dataset_dir, class_names):
"""Creates a data.yaml file for the YOLO model from the dataset.
Parameters
----------
dataset_dir : str
The directory where the dataset is located.
class_names : list
List of class names in the dataset.
"""
data_yaml = {
"path": dataset_dir,
"train": "images/train2017",
"val": "images/val2017",
"test": "images/test2017",
"names": class_names,
}
yaml_path = os.path.join(dataset_dir, "data.yaml")
with open(yaml_path, "w") as file:
yaml.dump(data_yaml, file, default_flow_style=False)
print(f"Created data.yaml file at {yaml_path}")
[docs]
def prepare_yolo_dataset(self, dataset_dir):
"""Prepares the dataset for YOLO model training.
Parameters
----------
dataset_dir : str
The directory where the dataset is located.
"""
print("Preparing YOLO dataset...")
# Create the 'datasets' directory one level above the 'workspace' directory
root_dir = os.path.abspath(os.path.join(dataset_dir, os.pardir, os.pardir))
datasets_dir = os.path.join(root_dir, "datasets")
if not os.path.exists(datasets_dir):
os.makedirs(datasets_dir)
# New directory structure: datasets/workspace/dataset #TODO : keep the directory as /workspace/dataset by commenting these lines
workspace_dir = os.path.basename(os.path.dirname(dataset_dir))
new_workspace_dir = os.path.join(datasets_dir, workspace_dir)
if not os.path.exists(new_workspace_dir):
os.makedirs(new_workspace_dir)
new_dataset_dir = os.path.join(new_workspace_dir, os.path.basename(dataset_dir))
if os.path.exists(new_dataset_dir):
shutil.rmtree(new_dataset_dir)
shutil.move(dataset_dir, new_dataset_dir)
dataset_dir = new_dataset_dir
# Find the downloaded folder
contents = os.listdir(dataset_dir)
downloaded_dirs = [
d
for d in contents
if os.path.isdir(os.path.join(dataset_dir, d))
and d not in ("images", "annotations")
]
if not downloaded_dirs:
print("No suitable subdirectory found in the dataset directory.")
return
if len(downloaded_dirs) > 1:
print(
f"Multiple subdirectories found: {downloaded_dirs}. Using the first one."
)
downloaded_dir = os.path.join(dataset_dir, downloaded_dirs[0])
print(f"Found downloaded directory: {downloaded_dir}")
# Source paths
src_images_dir = os.path.join(downloaded_dir, "images")
src_annotations_dir = os.path.join(downloaded_dir, "annotations")
# Destination paths
dst_images_dir = os.path.join(dataset_dir, "images")
dst_annotations_dir = os.path.join(dataset_dir, "annotations")
# Move images folder
if os.path.exists(src_images_dir):
if os.path.exists(dst_images_dir):
shutil.rmtree(dst_images_dir)
shutil.move(src_images_dir, dst_images_dir)
print(f"Moved images folder to {dst_images_dir}")
else:
print("Images folder not found in the downloaded directory")
# Move annotations folder
if os.path.exists(src_annotations_dir):
if os.path.exists(dst_annotations_dir):
shutil.rmtree(dst_annotations_dir)
shutil.move(src_annotations_dir, dst_annotations_dir)
print(f"Moved annotations folder to {dst_annotations_dir}")
else:
print("Annotations folder not found in the downloaded directory")
class_names = self.create_yolo_labels_from_mscoco_ann(
dataset_dir,
dst_images_dir,
dst_annotations_dir,
os.path.join(dst_annotations_dir, "instances_train2017.json"),
)
self.create_yolo_labels_from_mscoco_ann(
dataset_dir,
dst_images_dir,
dst_annotations_dir,
os.path.join(dst_annotations_dir, "instances_val2017.json"),
)
self.create_yolo_labels_from_mscoco_ann(
dataset_dir,
dst_images_dir,
dst_annotations_dir,
os.path.join(dst_annotations_dir, "instances_test2017.json"),
)
# Create the data.yaml file
self.create_data_yaml(dataset_dir, class_names)
# Remove the downloaded folder if it's empty
if os.path.exists(downloaded_dir) and not os.listdir(downloaded_dir):
os.rmdir(downloaded_dir)
print(f"Removed empty downloaded folder: {downloaded_dir}")
print("Dataset preparation completed.")
[docs]
def create_yolo_labels_from_mscoco_ann(
self, dataset_dir, dst_images_dir, dst_annotations_dir, annotation_file
):
"""Creates YOLO labels from MSCOCO annotations.
Parameters
----------
dataset_dir : str
Directory where the dataset is stored.
dst_images_dir : str
Directory where images are stored.
dst_annotations_dir : str
Directory where annotations are stored.
annotation_file : str
Path to the MSCOCO annotation file.
Returns
-------
list
List of class names from the annotations.
"""
# Convert annotations to YOLO format
coco = COCO(annotation_file)
img_dir = dst_images_dir
ann_dir = os.path.join(dataset_dir, "labels")
if not os.path.exists(ann_dir):
os.makedirs(ann_dir)
# Subdirectories for labels
label_dirs = {
"train": os.path.join(ann_dir, "train2017"),
"val": os.path.join(ann_dir, "val2017"),
"test": os.path.join(ann_dir, "test2017"),
}
for dir_path in label_dirs.values():
if not os.path.exists(dir_path):
os.makedirs(dir_path)
# Get class names
categories = coco.loadCats(coco.getCatIds())
class_names = [category["name"] for category in categories]
for img_id in coco.getImgIds():
img_info = coco.loadImgs(img_id)[0]
img_filename = img_info["file_name"]
img_width = img_info["width"]
img_height = img_info["height"]
ann_ids = coco.getAnnIds(imgIds=img_id)
anns = coco.loadAnns(ann_ids)
if "train" in annotation_file:
label_path = os.path.join(
label_dirs["train"], img_filename.replace(".jpg", ".txt")
)
elif "val" in annotation_file:
label_path = os.path.join(
label_dirs["val"], img_filename.replace(".jpg", ".txt")
)
elif "test" in annotation_file:
label_path = os.path.join(
label_dirs["test"], img_filename.replace(".jpg", ".txt")
)
with open(label_path, "w") as f:
for ann in anns:
bbox = ann["bbox"]
yolo_bbox = self.convert_bbox_to_yolo((img_width, img_height), bbox)
category_id = ann["category_id"] - 1
f.write(f"{category_id} {' '.join(map(str, yolo_bbox))}\n")
if "train" in annotation_file:
return class_names
[docs]
@log_decorator
def get_model_train(self, is_exported=False):
"""Mock function to retrieve the model training document.
This mock version simulates the retrieval of the model training document without making actual API calls.
Parameters
----------
is_exported : bool, optional
If True, retrieves the model train document by export ID (default is False).
Returns
-------
dict
A mock model training document.
Raises
------
Exception
If there is an error in fetching the model training document.
"""
try:
# Mock URL handling based on whether the model is exported
if is_exported:
print(f"Mock fetching model train by export ID: {self._idModel_str}")
url = f"/v1/model/get_model_train_by_export_id?exportId={self._idModel_str}"
else:
print(f"Mock fetching model train by model ID: {self._idModel_str}")
url = "/v1/model/model_train/" + str(self._idModel_str)
# Mocked response (replace this with your own mock data)
model_train_doc = {
"model_id": self._idModel_str,
"training_status": "completed",
"training_accuracy": 0.95,
"model_exported": is_exported
}
print(f"Mocked model training document: {model_train_doc}")
return model_train_doc
except Exception as e:
print(f"Exception in get_model_train: {str(e)}")
self.update_status("error", "error", "Failed to get mock model train")
raise e
[docs]
class ModelDownloadMock:
"""Mock class for downloading models in the testing pipeline.
"""
[docs]
def __init__(self):
"""Initializes the ModelDownloadMock class and sets up the testing logs folder path.
"""
self.testing_logs_folder_path = "./testing_logs"
[docs]
def download_model(self, model_path, model_type="trained", runtime_framework=""):
"""Mock method to download a model file and copy it to the specified path.
Parameters
----------
model_path : str
Path where the model should be downloaded.
model_type : str, optional
Type of model to download ('trained' or 'exported'). Default is 'trained'.
runtime_framework : str, optional
Runtime framework used for the model (default is '').
Returns
-------
bool
Returns True after successfully copying the model file.
"""
print(f"Mock download model to: {model_path}, {model_type}")
file_path, ext = os.path.splitext(model_path)
if model_type == "trained":
local_model_file = [
path
for path in os.listdir(self.testing_logs_folder_path)
if path.endswith(f"{model_type}{ext}")
][0]
elif model_type == "exported":
local_model_file = [
path
for path in os.listdir(self.testing_logs_folder_path)
if path.endswith(f"{model_type}{ext}")
][0]
# TODO: adding the exportFormat into considration for loading the saved model checkpoint path, by including runtime_framework
with open(local_model_file, "rb") as src, open(model_path, "wb") as dest:
dest.write(src.read())
return True
[docs]
class TestingMatriceDeploy:
"""Class to handle deployment and inference of models for testing purposes.
This class handles model downloading, logging, and running inference with a provided model.
Parameters
----------
load_model : function
Function to load a model during testing.
predict : function
Function to make predictions using the loaded model.
"""
[docs]
def __init__(self, load_model, predict):
"""Initializes the TestingMatriceDeploy class, setting up logs and triggering inference.
Parameters
----------
load_model : function
Function that loads a model for inference.
predict : function
Function to perform prediction with the loaded model.
"""
self.logs = []
self.testing_logs_folder_path = "./testing_logs"
os.makedirs(self.testing_logs_folder_path, exist_ok=True)
self.main_action_logs_path = os.path.join(
self.testing_logs_folder_path, "deploy.json"
)
self.model_downloader = ModelDownloadMock()
self.load_model = lambda model_downloader: load_model(model_downloader)
self.predict = lambda model, image: predict(model, image)
self.model = None
self.inference(self.create_image_bytes())
[docs]
def log_to_json(self, file_path, payload):
"""Logs data to a JSON file, appending the payload if the file exists.
Parameters
----------
file_path : str
Path to the JSON log file.
payload : dict
The data to log in the JSON file.
"""
try:
with open(file_path, "r") as file:
data = json.load(file)
except FileNotFoundError:
data = []
except json.JSONDecodeError:
data = []
data.append(payload)
with open(file_path, "w") as file:
json.dump(data, file, indent=4)
[docs]
def add_logs(self, step, status, description):
"""Adds a log entry for a specific step, including status and description.
Parameters
----------
step : str
The step or action being logged (e.g., 'inference').
status : str
The status of the step (e.g., 'SUCCESS', 'ERROR').
description : str
A description or error message related to the step.
"""
self.logs.append({"step": step, "status": status, "description": description})
self.log_to_json(
self.main_action_logs_path,
{"step": step, "status": status, "description": description},
)
[docs]
def log_decorator(func):
"""A decorator to log the execution status of a function."""
def wrapper(self, *args, **kwargs):
try:
result = func(self, *args, **kwargs)
self.add_logs(func.__name__, "SUCCESS", "SUCCESS")
return result
except Exception as e:
print(f"ERROR occurred in: {func.__name__} : {str(e)}")
self.add_logs(func.__name__, "ERROR", str(e))
raise e
return wrapper
[docs]
@log_decorator
def load_predictor_model(self):
"""Loads the predictor model using the model downloader.
"""
self.model = self.load_model(self.model_downloader)
[docs]
@log_decorator
def inference(self, image):
"""Runs inference on an image using the loaded model.
Parameters
----------
image : bytes
Image data in bytes to be used for inference.
Returns
-------
tuple
Inference results and a success flag.
"""
if self.model is None:
self.load_predictor_model()
results = self.predict(self.model, image)
return results, True
[docs]
def create_image_bytes(self):
"""Creates a simple test image in memory as a byte stream.
Returns
-------
bytes
Image data in JPEG format.
"""
# Create a simple image with RGB mode and size 224x224
image = Image.new("RGB", (224, 224), color="blue")
draw = ImageDraw.Draw(image)
draw.text((50, 100), "Test", fill="white")
# Save the image to a BytesIO object
image_bytes_io = BytesIO()
image.save(image_bytes_io, format="JPEG")
image_bytes_io.seek(0)
return image_bytes_io.read()