User Client

EncordUserClient

class encord.user_client.EncordUserClient(user_config, querier)

get_dataset

get_dataset(dataset_hash, dataset_access_settings=DatasetAccessSettings(fetch_client_metadata=False))

Parameters

  • dataset_hash (str): The Dataset ID

  • dataset_access_settings (DatasetAccessSettings): Set the dataset_access_settings if you would like to change the defaults.

Return type: Dataset

def get_dataset(
        self, dataset_hash: str, dataset_access_settings: DatasetAccessSettings = DEFAULT_DATASET_ACCESS_SETTINGS
    ) -> Dataset:
        """
        Get the Project class to access project fields and manipulate a project.

        You will only have access to this project if you are one of the following

            * Dataset admin

            * Organization admin of the project

        Args:
            dataset_hash: The Dataset ID
            dataset_access_settings: Set the dataset_access_settings if you would like to change the defaults.
        """
        config = SshConfig(self.user_config, resource_type=TYPE_DATASET, resource_id=dataset_hash)
        querier = Querier(config)
        client = EncordClientDataset(querier=querier, config=config, dataset_access_settings=dataset_access_settings)
        orm_dataset = client.get_dataset()
        return Dataset(client, orm_dataset)

get_project

get_project(project_hash)

Get the Project class to access Project fields and manipulate a Project.

You will only have access to this Project if you are one of the following

  • Project admin

  • Project team manager

  • Organization admin of the Project

Parameters

project_hash (str) – The Project ID

Return type:

Project

    def get_project(self, project_hash: str) -> Project:
        """
        Get the Project class to access project fields and manipulate a project.

        You will only have access to this project if you are one of the following

            * Project admin

            * Project team manager

            * Organization admin of the project

        Args:
            project_hash: The Project ID
        """
        config = SshConfig(self.user_config, resource_type=TYPE_PROJECT, resource_id=project_hash)
        querier = Querier(config)
        client = EncordClientProject(querier=querier, config=config)

        orm_project = client.get_project(include_labels_metadata=False)

        # Querying ontology using project querier to avoid permission error,
        # as there might be only read-only ontology structure access in scope of the project,
        # not full access, that is implied by get_ontology method
        ontology_hash = orm_project["ontology_hash"]
        config = SshConfig(self.user_config, resource_type=TYPE_ONTOLOGY, resource_id=ontology_hash)

        orm_ontology = querier.basic_getter(OrmOntology, config.resource_id)
        project_ontology = Ontology(querier, config, orm_ontology)

        return Project(client, orm_project, project_ontology, client_v2=self._api_client)

get_ontology

get_ontology(ontology_hash)

Return type:

Ontology

def get_ontology(self, ontology_hash: str) -> Ontology:
        config = SshConfig(self.user_config, resource_type=TYPE_ONTOLOGY, resource_id=ontology_hash)
        querier = Querier(config)
        orm_ontology = querier.basic_getter(OrmOntology, ontology_hash)
        return Ontology(querier, config, orm_ontology)

create_private_dataset DEPRECATED

DEPRECATED - Use create_dataset instead.

create_private_dataset(dataset_title, dataset_type, dataset_description=None)

Return type:

CreateDatasetResponse

def create_private_dataset(
        self,
        dataset_title: str,
        dataset_type: StorageLocation,
        dataset_description: Optional[str] = None,
    ) -> CreateDatasetResponse:
        """
        DEPRECATED - please use `create_dataset` instead.
        """
        return self.create_dataset(dataset_title, dataset_type, dataset_description)

create_dataset

Creates a Dataset for use with Encord Annotate

create_dataset(dataset_title, dataset_type, dataset_description=None)

Parameters

  • dataset_title (str) – Title of Dataset.

  • dataset_type (StorageLocation) – StorageLocation type where data will be stored.

  • dataset_description (Optional[str]) – Optional description of the Dataset.

Return type:

CreateDatasetResponse

Returns:

CreateDatasetResponse

    def create_dataset(
        self,
        dataset_title: str,
        dataset_type: StorageLocation,
        dataset_description: Optional[str] = None,
    ) -> CreateDatasetResponse:
        """
        Args:
            dataset_title:
                Title of dataset.
            dataset_type:
                StorageLocation type where data will be stored.
            dataset_description:
                Optional description of the dataset.
        Returns:
            CreateDatasetResponse
        """
        dataset = {
            "title": dataset_title,
            "type": dataset_type,
        }

        if dataset_description:
            dataset["description"] = dataset_description

        result = self.querier.basic_setter(OrmDataset, uid=None, payload=dataset)
        return CreateDatasetResponse.from_dict(result)

create_dataset_api_key

create_dataset_api_key(dataset_hash, api_key_title, dataset_scopes)

Return type:

DatasetAPIKey

    def create_dataset_api_key(
        self, dataset_hash: str, api_key_title: str, dataset_scopes: List[DatasetScope]
    ) -> DatasetAPIKey:
        api_key_payload = {
            "dataset_hash": dataset_hash,
            "title": api_key_title,
            "scopes": list(map(lambda scope: scope.value, dataset_scopes)),
        }
        response = self.querier.basic_setter(DatasetAPIKey, uid=None, payload=api_key_payload)
        return DatasetAPIKey.from_dict(response)

get_dataset_api_keys

get_dataset_api_keys(dataset_hash)

Return type:

ListDatasetAPIKey

    def get_dataset_api_keys(self, dataset_hash: str) -> List[DatasetAPIKey]:
        api_key_payload = {
            "dataset_hash": dataset_hash,
        }
        api_keys: List[DatasetAPIKey] = self.querier.get_multiple(DatasetAPIKey, uid=None, payload=api_key_payload)
        return api_keys

get_or_create_dataset_api_key

get_or_create_dataset_api_key(dataset_hash)

Return type:

DatasetAPIKey

    def get_or_create_dataset_api_key(self, dataset_hash: str) -> DatasetAPIKey:
        api_key_payload = {
            "dataset_hash": dataset_hash,
        }
        response = self.querier.basic_put(DatasetAPIKey, uid=None, payload=api_key_payload)
        return DatasetAPIKey.from_dict(response)

get_datasets

Lists all Datasets (if called with no arguments) or matching Datasets the user has access to.

get_datasets(title_eq=None, title_like=None, desc_eq=None, desc_like=None, created_before=None, created_after=None, edited_before=None, edited_after=None)

Parameters:

  • title_eq (Optional[str]) – optional exact title filter

-title_like (Optional[str]) – optional fuzzy title filter; SQL syntax

  • desc_eq (Optional[str]) – optional exact description filter

  • desc_like (Optional[str]) – optional fuzzy description filter; SQL syntax

  • created_before (Union[str, datetime, None]) – optional creation date filter, ‘less’

  • created_after (Union[str, datetime, None]) – optional creation date filter, ‘greater’

  • edited_before (Union[str, datetime, None]) – optional last modification date filter, ‘less’

  • edited_after (Union[str, datetime, None]) – optional last modification date filter, ‘greater’

Returned by:

List[Dict[str, Any]]

Returns:

List of (role, dataset) pairs for datasets matching filter conditions.

    def get_datasets(
        self,
        title_eq: Optional[str] = None,
        title_like: Optional[str] = None,
        desc_eq: Optional[str] = None,
        desc_like: Optional[str] = None,
        created_before: Optional[Union[str, datetime]] = None,
        created_after: Optional[Union[str, datetime]] = None,
        edited_before: Optional[Union[str, datetime]] = None,
        edited_after: Optional[Union[str, datetime]] = None,
    ) -> List[Dict[str, Any]]:
        """
        List either all (if called with no arguments) or matching datasets the user has access to.

        Args:
            title_eq: optional exact title filter
            title_like: optional fuzzy title filter; SQL syntax
            desc_eq: optional exact description filter
            desc_like: optional fuzzy description filter; SQL syntax
            created_before: optional creation date filter, 'less'
            created_after: optional creation date filter, 'greater'
            edited_before: optional last modification date filter, 'less'
            edited_after: optional last modification date filter, 'greater'

        Returns:
            list of (role, dataset) pairs for datasets  matching filter conditions.
        """
        properties_filter = self.__validate_filter(locals())
        # a hack to be able to share validation code without too much c&p
        data = self.querier.get_multiple(DatasetWithUserRole, payload={"filter": properties_filter})

        def convert_dates(dataset):
            dataset["created_at"] = datetime_parser.isoparse(dataset["created_at"])
            dataset["last_edited_at"] = datetime_parser.isoparse(dataset["last_edited_at"])
            return dataset

        return [
            {"dataset": DatasetInfo(**convert_dates(d.dataset)), "user_role": DatasetUserRole(d.user_role)}
            for d in data
        ]

create_with_ssh_private_key

Creates an instance of EncordUserClient authenticated with private SSH key. Accepts the private key content, path to key file, that can be provided as method parameters or as following environment variables:

  • ENCORD_SSH_KEY: environment variable with the private key content

  • ENCORD_SSH_KEY_FILE: environment variable with the path to the key file

static create_with_ssh_private_key(ssh_private_key=None, password=None, requests_settings=RequestsSettings(max_retries=3, backoff_factor=1.5, connection_retries=3), ssh_private_key_path=None, **kwargs)

Parameters:

ssh_private_key (Optional[str]) – the private key content

ssh_private_key_path (Optional[str | Path]) – the path to the private key file

password (Optional[str]) – private key password

Returns:

EncordUserClient

    @staticmethod
    def create_with_ssh_private_key(
        ssh_private_key: Optional[str] = None,
        password: Optional[str] = None,
        requests_settings: RequestsSettings = DEFAULT_REQUESTS_SETTINGS,
        ssh_private_key_path: Optional[str | Path] = None,
        **kwargs,
    ) -> EncordUserClient:
        """
        Creates an instance of EncordUserClient authenticated with private SSH key.
        Accepts the private key content, path to key file, that can be provided as method parameters or as following environment variables:

        * **ENCORD_SSH_KEY**: environment variable with the private key content
        * **ENCORD_SSH_KEY_FILE**: environment variable with the path to the key file

        Args:
            ssh_private_key: the private key content
            ssh_private_key_path: the pah to the private key file
            password: private key password
        """

        if ssh_private_key_path is not None:
            if isinstance(ssh_private_key_path, str):
                ssh_private_key_path = Path(ssh_private_key_path)

            ssh_private_key = ssh_private_key_path.read_text(encoding="ascii")

        if not ssh_private_key:
            ssh_private_key = get_env_ssh_key()

        user_config = UserConfig.from_ssh_private_key(
            ssh_private_key, password, requests_settings=requests_settings, **kwargs
        )
        querier = Querier(user_config)

        return EncordUserClient(user_config, querier)

get_projects

List either all (if called with no arguments) or matching projects the user has access to.

get_projects(title_eq=None, title_like=None, desc_eq=None, desc_like=None, created_before=None, created_after=None, edited_before=None, edited_after=None)

Parameters:

  • title_eq (Optional[str]) – optional exact title filter

  • title_like (Optional[str]) – optional fuzzy title filter; SQL syntax

  • desc_eq (Optional[str]) – optional exact description filter

  • desc_like (Optional[str]) – optional fuzzy description filter; SQL syntax

  • created_before (Union[str, datetime, None]) – optional creation date filter, ‘less’

  • created_after (Union[str, datetime, None]) – optional creation date filter, ‘greater’

  • edited_before (Union[str, datetime, None]) – optional last modification date filter, ‘less’

  • edited_after (Union[str, datetime, None]) – optional last modification date filter, ‘greater’

Return type:

List[Dict]

Returns:

List of (role, projects) pairs for project matching filter conditions.

    def get_project(self) -> OrmProject:
        """
        This function is exposed for convenience. You are encouraged to use the property accessors instead.
        """
        return self._client.get_project()

create_project

Creates a new project and returns its uid (‘project_hash’).

create_project(project_title, dataset_hashes, project_description='', ontology_hash='', workflow_settings=<encord.orm.project.ManualReviewWorkflowSettings object>, workflow_template_hash=None)

Parameters:

  • project_title (str) – the title of the project

  • dataset_hashes (List[str]) – a list of the dataset uids that the project will use

  • project_description (str) – the optional description of the project

  • ontology_hash (str) – the uid of an ontology to be used. If omitted, a new empty ontology will be created

  • workflow_settings (Union[ManualReviewWorkflowSettings, BenchmarkQaWorkflowSettings]) – selects and configures the type of the quality control workflow to use, See encord.orm.project.ProjectWorkflowSettings for details. If omitted, ManualReviewWorkflowSettings is used.

  • workflow_template_hash (Optional[str]) – project will be created using a workflow based on the template provided.

Return type:

str

Returns:

The uid of the Project.

    def create_project(
        self,
        project_title: str,
        dataset_hashes: List[str],
        project_description: str = "",
        ontology_hash: str = "",
        workflow_settings: ProjectWorkflowSettings = ManualReviewWorkflowSettings(),
        workflow_template_hash: Optional[str] = None,
    ) -> str:
        """
        Creates a new project and returns its uid ('project_hash')

        Args:
            project_title: the title of the project
            dataset_hashes: a list of the dataset uids that the project will use
            project_description: the optional description of the project
            ontology_hash: the uid of an ontology to be used. If omitted, a new empty ontology will be created
            workflow_settings: selects and configures the type of the quality control workflow to use, See `encord.orm.project.ProjectWorkflowSettings` for details. If omitted, `~encord.orm.project.ManualReviewWorkflowSettings` is used.
            workflow_template_hash: project will be created using a workflow based on the template provided.
        Returns:
            the uid of the project.
        """
        project = {
            "title": project_title,
            "description": project_description,
            "dataset_hashes": dataset_hashes,
            "workflow_type": ProjectWorkflowType.MANUAL_QA.value,
        }
        if isinstance(workflow_settings, BenchmarkQaWorkflowSettings):
            project["workflow_type"] = ProjectWorkflowType.BENCHMARK_QA.value
            project["source_projects"] = workflow_settings.source_projects
        if ontology_hash and len(ontology_hash):
            project["ontology_hash"] = ontology_hash

        if workflow_template_hash is not None:
            project["workflow_template_id"] = workflow_template_hash

        return self.querier.basic_setter(OrmProject, uid=None, payload=project)

create_project_api_key

Creates an API key for a Project.

create_project_api_key(project_hash, api_key_title, scopes)

Return type:

str

Returns:

ProjectAPIKey - The API key for a Project.

    def create_project_api_key(self, project_hash: str, api_key_title: str, scopes: List[APIKeyScopes]) -> str:
        """
        Returns:
            The created project API key.
        """
        payload = {"title": api_key_title, "scopes": list(map(lambda scope: scope.value, scopes))}

        return self.querier.basic_setter(ProjectAPIKey, uid=project_hash, payload=payload)

get_project_api_keys

Gets a list of API keys for a specified Project.

get_project_api_keys(project_hash)

Return type:

List[ProjectAPIKey]

    def get_project_api_keys(self, project_hash: str) -> List[ProjectAPIKey]:
        return self.querier.get_multiple(ProjectAPIKey, uid=project_hash)

get_or_create_project_api_key

Creates or gets a specified project's API key.

get_or_create_project_api_key(project_hash)

Return type:

str

    def get_or_create_project_api_key(self, project_hash: str) -> str:
        return self.querier.basic_put(ProjectAPIKey, uid=project_hash, payload={})

get_dataset_client DEPRECATED

DEPRECATED - Use get_dataset() instead.

get_dataset_client(dataset_hash, dataset_access_settings=DatasetAccessSettings(fetch_client_metadata=False), **kwargs)

Return type:

[EncordClientDataset]

    def get_dataset_client(
        self,
        dataset_hash: str,
        dataset_access_settings: DatasetAccessSettings = DEFAULT_DATASET_ACCESS_SETTINGS,
        **kwargs,
    ) -> EncordClientDataset:
        """
        DEPRECATED - prefer using :meth:`get_dataset()` instead.
        """
        dataset_api_key: DatasetAPIKey = self.get_or_create_dataset_api_key(dataset_hash)
        return EncordClientDataset.initialise(
            dataset_hash,
            dataset_api_key.api_key,
            requests_settings=self.user_config.requests_settings,
            dataset_access_settings=dataset_access_settings,
        )

get_project_client DEPRECATED

DEPRECATED - Use [get_project()] instead.

get_project_client(project_hash, **kwargs)

Return type:

Union[EncordClientProject, EncordClientDataset]

    def get_project_client(self, project_hash: str, **kwargs) -> Union[EncordClientProject, EncordClientDataset]:
        """
        DEPRECATED - prefer using :meth:`get_project()` instead.
        """
        project_api_key: str = self.get_or_create_project_api_key(project_hash)
        return EncordClient.initialise(
            project_hash, project_api_key, requests_settings=self.user_config.requests_settings, **kwargs
        )

create_project_from_cvat

Export your CVAT project with the “CVAT for images 1.1” option and use this function to import your images and annotations into encord.

ℹ️

Note

Ensure that during you have the “Save images” checkbox enabled when exporting from CVAT.

create_project_from_cvat(import_method, dataset_name, review_mode=ReviewMode.LABELLED, max_workers=None, *, transform_bounding_boxes_to_polygons=False)

Parameters:

  • import_method (LocalImport) – The chosen import method. See the ImportMethod class for details.

  • dataset_name (str) – The name of the dataset that will be created.

  • review_mode (ReviewMode) – Set how much interaction is needed from the labeler and from the reviewer for the CVAT labels. See the ReviewMode documentation for more details.

  • max_workers (Optional[int]) – DEPRECATED: This argument will be ignored

  • transform_bounding_boxes_to_polygons – All instances of CVAT bounding boxes will be converted to polygons in the final Encord project.

Return type:

str

Returns:

CvatImporterSuccess if the project was successfully imported. CvatImporterError If the project could not be imported.

Raises:

ValueError – If the CVAT directory has an invalid format.

    def create_project_from_cvat(
        self,
        import_method: ImportMethod,
        dataset_name: str,
        review_mode: ReviewMode = ReviewMode.LABELLED,
        max_workers: Optional[int] = None,
        *,
        transform_bounding_boxes_to_polygons=False,
    ) -> Union[CvatImporterSuccess, CvatImporterError]:
        """
        Export your CVAT project with the "CVAT for images 1.1" option and use this function to import
            your images and annotations into encord. Ensure that during you have the "Save images"
            checkbox enabled when exporting from CVAT.

        Args:
            import_method:
                The chosen import method. See the `ImportMethod` class for details.
            dataset_name:
                The name of the dataset that will be created.
            review_mode:
                Set how much interaction is needed from the labeler and from the reviewer for the CVAT labels.
                    See the `ReviewMode` documentation for more details.
            max_workers:
                DEPRECATED: This argument will be ignored
            transform_bounding_boxes_to_polygons:
                All instances of CVAT bounding boxes will be converted to polygons in the final Encord project.

        Returns:
            CvatImporterSuccess: If the project was successfully imported.
            CvatImporterError: If the project could not be imported.

        Raises:
            ValueError:
                If the CVAT directory has an invalid format.
        """
        if not isinstance(import_method, LocalImport):
            raise ValueError("Only local imports are currently supported ")

        cvat_directory_path = import_method.file_path

        directory_path = Path(cvat_directory_path)
        images_directory_path = directory_path.joinpath("images")
        if images_directory_path not in list(directory_path.iterdir()):
            raise ValueError("The expected directory 'images' was not found.")

        annotations_file_path = directory_path.joinpath("annotations.xml")
        if not annotations_file_path.is_file():
            raise ValueError(f"The file `{annotations_file_path}` does not exist.")

        with annotations_file_path.open("rb") as f:
            annotations_base64 = base64.b64encode(f.read()).decode("utf-8")

        images_paths, used_base_path = self.__get_images_paths(annotations_base64, images_directory_path)

        log.info("Starting image upload.")
        dataset_hash, image_title_to_image_hash_map = self.__upload_cvat_images(
            images_paths, used_base_path, dataset_name
        )
        log.info("Image upload completed.")

        payload = {
            "cvat": {
                "annotations_base64": annotations_base64,
            },
            "dataset_hash": dataset_hash,
            "image_title_to_image_hash_map": image_title_to_image_hash_map,
            "review_mode": review_mode.value,
            "transform_bounding_boxes_to_polygons": transform_bounding_boxes_to_polygons,
        }

        log.info("Starting project import. This may take a few minutes.")
        server_ret = self.querier.basic_setter(ProjectImporter, uid=None, payload=payload)

        if "success" in server_ret:
            success = server_ret["success"]
            return CvatImporterSuccess(
                project_hash=success["project_hash"],
                dataset_hash=dataset_hash,
                issues=Issues.from_dict(success["issues"]),
            )
        elif "error" in server_ret:
            error = server_ret["error"]
            return CvatImporterError(dataset_hash=dataset_hash, issues=Issues.from_dict(error["issues"]))
        else:
            raise ValueError("The api server responded with an invalid payload.")

get_cloud_integrations

Gets all cloud integration information from the Encord platform.

get_cloud_integrations()

Return type:

ListCloudIntegration

    def get_cloud_integrations(self) -> List[CloudIntegration]:
        return self.querier.get_multiple(CloudIntegration)

get_ontologies

Lists all (if called with no arguments) or matching Ontologies the user has access to.

get_ontologies(title_eq=None, title_like=None, desc_eq=None, desc_like=None, created_before=None, created_after=None, edited_before=None, edited_after=None)

Parameters:

  • title_eq (Optional[str]) – optional exact title filter

  • title_like (Optional[str]) – optional fuzzy title filter; SQL syntax

  • desc_eq (Optional[str]) – optional exact description filter

  • desc_like (Optional[str]) – optional fuzzy description filter; SQL syntax

  • created_before (Union[str, datetime, None]) – optional creation date filter, ‘less’

  • created_after (Union[str, datetime, None]) – optional creation date filter, ‘greater’

  • edited_before (Union[str, datetime, None]) – optional last modification date filter, ‘less’

  • edited_after (Union[str, datetime, None]) – optional last modification date filter, ‘greater’

Return type:

List[Dict]

Returns:

A list of pairs (role, projects) for ontologies matching the filter conditions.

    def get_ontologies(
        self,
        title_eq: Optional[str] = None,
        title_like: Optional[str] = None,
        desc_eq: Optional[str] = None,
        desc_like: Optional[str] = None,
        created_before: Optional[Union[str, datetime]] = None,
        created_after: Optional[Union[str, datetime]] = None,
        edited_before: Optional[Union[str, datetime]] = None,
        edited_after: Optional[Union[str, datetime]] = None,
    ) -> List[Dict]:
        """
        List either all (if called with no arguments) or matching ontologies the user has access to.

        Args:
            title_eq: optional exact title filter
            title_like: optional fuzzy title filter; SQL syntax
            desc_eq: optional exact description filter
            desc_like: optional fuzzy description filter; SQL syntax
            created_before: optional creation date filter, 'less'
            created_after: optional creation date filter, 'greater'
            edited_before: optional last modification date filter, 'less'
            edited_after: optional last modification date filter, 'greater'

        Returns:
            list of (role, projects) pairs for ontologies matching filter conditions.
        """
        properties_filter = self.__validate_filter(locals())
        # a hack to be able to share validation code without too much c&p
        data = self.querier.get_multiple(OntologyWithUserRole, payload={"filter": properties_filter})
        retval: List[Dict] = []
        for row in data:
            ontology = OrmOntology.from_dict(row.ontology)
            config = SshConfig(self.user_config, resource_type=TYPE_ONTOLOGY, resource_id=ontology.ontology_hash)
            querier = Querier(config)
            retval.append(
                {
                    "ontology": Ontology(querier, config, ontology),
                    "user_role": OntologyUserRole(row.user_role),
                }
            )
        return retval

create_ontology

Creates an Ontology.

create_ontology(title, description='', structure=None)

Return type:

[Ontology]

    def create_ontology(
        self, title: str, description: str = "", structure: Optional[OntologyStructure] = None
    ) -> Ontology:
        structure_dict = structure.to_dict() if structure else dict()
        ontology = {
            "title": title,
            "description": description,
            "editor": structure_dict,
        }

        retval = self.querier.basic_setter(OrmOntology, uid=None, payload=ontology)
        ontology = OrmOntology.from_dict(retval)
        config = SshConfig(self.user_config, resource_type=TYPE_ONTOLOGY, resource_id=ontology.ontology_hash)
        querier = Querier(config)

        return Ontology(querier, config, ontology)

create_storage_folder

create_storage_folder(name, description, client_metadata, parent_folder)

Creates a new Storage folder.

Parameters:

  • name (str) - The name of the new folder.
  • description (Optional[str]) - An optional description for the new folder. Default is None.
  • client_metadata (Optional[Dict[str, Any]]) - Optional metadata to be included in the folder. Default is None.
  • parent_folder (Optional[Union[StorageFolder, UUID]]) - Optionally specify a parent folder the new folder is created in. Default is None.

Return type:
StorageFolder

Returns:
The created storage folder.


get_storage_folder

get_storage_folder(folder_uuid)

Retrieves the specified Storage folder.

Parameters:

  • folder_uuid (UUID) - The UUID (hash) of the storage folder to be retrieved.

Return type:
StorageFolder

Returns:
The specified storage folder.


get_storage_item

get_storage_item(item_uuid, sign_url)

Retrieves the specified item.

Parameters:

  • item_uuid (UUID) - The UUID (hash) of the storage item to be retrieved.

  • sign_url (bool) - If True, the signed URL for the storage item is retrieved. Default value is False

Return type:

StorageItem

Returns:

The Storage item


list_storage_folders

list_storage_folders(search, dataset_synced, order, desc, page_size)

Lists top-level Storage folders.

Parameters:

  • search (Optional[str]) - An optional search string to filter folders by name.

  • dataset_synced (Optional[bool]) - If True, folders that are mirrored by a Dataset are included. If omitted, False is passed.

  • order (FoldersSortBy) - Sort order for the folders. See the FoldersSortBy class for available options.

  • desc (bool) - If True, sorts Storage folders in descending order.

  • page_size (int) - The number of folders returned per page. Default value is 100.

Returns:

An iterable of StorageFolder objects.


find_storage_folders

find_storage_folders(search, dataset_synced, order, desc, page_size)

Recursively search for Storage folders, starting from the root (top) level.

Parameters:

  • search (Optional[str]) - An optional search string to filter folders by name.

  • dataset_synced (Optional[bool]) - If True, folders that are mirrored by a dataset are included. If omitted, False is passed.

  • order (FoldersSortBy) - Sort order for the folders. See the FoldersSortBy class for available options.

  • desc (bool) - If True, sorts Storage folders in descending order.

  • page_size (int) - The number of folders returned per page.

Return type:

Iterable[StorageFolder]

Returns:

An iterable of StorageFolder objects.


find_storage_items

find_storage_items(search, is_in_dataset, item_types, order, desc, get_signed_urls, page_size)

Recursively search for storage items, starting from the root (top) level.

Parameters:

  • search (Optional[str]) - An optional search string to filter folders by name.

  • is_in_dataset (Optional[bool]) - Filter items by whether they are linked to any Dataset. True selects only linked items. False selects only unlinked items. None includes all items, regardless of any links to Datasets. None is the default value.

  • item_types (Optional[List[StorageItemType]]) - Filter items by type.

  • order (FoldersSortBy) - Sort order for the folders. See :class:encord.storage.FoldersSortBy for available options.

  • desc (bool) - If True, sorts Storage folders in descending order.

  • page_size (int) - The number of folders returned per page.

  • get_signed_urls - If True, signed URLs for all items are returned.

ℹ️

Note

At least one of search or item_types must be provided.

Return type:

Iterable[StorageItem]

Returns:

Iterable of items in the folder.


list_groups

Lists all groups in the Organization the user belongs to.

list_groups()

Return type:

[OrmGroup]

def list_groups(self) -> Iterable[OrmGroup]:

    page = self._api_client.get("user/current-organisation/groups", params=None, result_type=Page[OrmGroup])
    yield from page.results

deidentify_dicom_files

Removes DICOM tags from DICOM files in external storage. Given links to DICOM files pointing to AWS, GCP, AZURE or OTC, for example: [ “https://s3.region-code.amazonaws.com/bucket-name/dicom-file-input.dcm” ]. This function executes deidentification on those files, meaning the function removes all DICOM tags (https://dicom.nema.org/medical/Dicom/2017e/output/chtml/part06/chapter_6.html) from metadata except for:

  • x00080018 SOPInstanceUID

  • x00100010 PatientName

  • x00180050 SliceThickness

  • x00180088 SpacingBetweenSlices

  • x0020000d StudyInstanceUID

  • x0020000e SeriesInstanceUID

  • x00200032 ImagePositionPatient

  • x00200037 ImageOrientationPatient

  • x00280008 NumberOfFrames

  • x00281050 WindowCenter

  • x00281051 WindowWidth

  • x00520014 ALinePixelSpacing

deidentify_dicom_files(dicom_urls, integration_hash, redact_dicom_tags=True, redact_pixels_mode=DeidentifyRedactTextMode.REDACT_NO_TEXT, save_conditions=None, upload_dir=None)

Parameters:

  • self – Encord client object.

  • dicom_urls (List[str]) – a list of urls to DICOM files, for example: [ “https://s3.region-code.amazonaws.com/bucket-name/dicom-file-input.dcm” ]

  • integration_hash (str) – integration_hash parameter of Encord platform external storage integration

  • redact_dicom_tags (bool) – Specifies if DICOM tag redaction should be enabled.

  • redact_pixels_mode (DeidentifyRedactTextMode) – Specifies which text redaction policy should be applied to pixel data.

  • save_conditions (Optional[List[Union[SaveDeidentifiedDicomConditionNotSubstr, SaveDeidentifiedDicomConditionIn]]]) – Specifies a list of conditions that all have to be true for DICOM deidentified file to be saved.

  • upload_dir (Optional[str]) – Specifies a directory that files are uploaded to. By default, set to "None". When set to "None" Deidentified files are uploaded to the same directory as source files.

Return type:

List[str]

Returns:

This function returns list of links pointing to deidentified DICOM files. The files are saved to the same bucket and in the same directory as the original files with prefix ( deid{timestamp} ). Example output: [ “https://s3.region-code.amazonaws.com/bucket-name/deid_167294769118005312_dicom-file-input.dcm” ]

    def deidentify_dicom_files(
        self,
        dicom_urls: List[str],
        integration_hash: str,
        redact_dicom_tags: bool = True,
        redact_pixels_mode: DeidentifyRedactTextMode = DeidentifyRedactTextMode.REDACT_NO_TEXT,
        save_conditions: Optional[List[SaveDeidentifiedDicomCondition]] = None,
        upload_dir: Optional[str] = None,
    ) -> List[str]:
        """
        Deidentify DICOM files in external storage.
        Given links to DICOM files pointing to AWS, GCP, AZURE or OTC, for example:
        [ "https://s3.region-code.amazonaws.com/bucket-name/dicom-file-input.dcm" ]
        Function executes deidentification on those files, it removes all
        DICOM tags (https://dicom.nema.org/medical/Dicom/2017e/output/chtml/part06/chapter_6.html)
        from metadata except for:

        * x00080018 SOPInstanceUID
        * x00100010 PatientName
        * x00180050 SliceThickness
        * x00180088 SpacingBetweenSlices
        * x0020000d StudyInstanceUID
        * x0020000e SeriesInstanceUID
        * x00200032 ImagePositionPatient
        * x00200037 ImageOrientationPatient
        * x00280008 NumberOfFrames
        * x00281050 WindowCenter
        * x00281051 WindowWidth
        * x00520014 ALinePixelSpacing

        Args:
            self: Encord client object.
            dicom_urls: a list of urls to DICOM files, e.g.
                `[ "https://s3.region-code.amazonaws.com/bucket-name/dicom-file-input.dcm" ]`
            integration_hash:
                integration_hash parameter of Encord platform external storage integration
            redact_dicom_tags:
                Specifies if DICOM tags redaction should be enabled.
            redact_pixels_mode:
                Specifies which text redaction policy should be applied to pixel data.
            save_conditions:
                Specifies a list of conditions which all have to be true for DICOM deidentified file to be saved.
            upload_dir:
                Specifies a directory that files will be uploaded to. By default, set to None,
                deidentified files will be uploaded to the same directory as source files.
        Returns:
            Function returns list of links pointing to deidentified DICOM files,
            those will be saved to the same bucket and the same directory
            as original files with prefix ( deid_{timestamp}_ ).
            Example output:
            `[ "https://s3.region-code.amazonaws.com/bucket-name/deid_167294769118005312_dicom-file-input.dcm" ]`

        """

        return self.querier.basic_setter(
            DicomDeidentifyTask,
            uid=integration_hash,
            payload={
                "dicom_urls": dicom_urls,
                "redact_dicom_tags": redact_dicom_tags,
                "redact_pixels_mode": redact_pixels_mode.value,
                "save_conditions": [x.to_dict() for x in (save_conditions or [])],
                "upload_dir": upload_dir,
            },
        )

ListingFilter

Available properties_filter keys for get_projects() and get_datasets().

The values for _before and _after should be datetime objects.

class encord.user_client.ListingFilter(value)

TITLE_EQ = 'title_eq'
TITLE_LIKE = 'title_like'
DESC_EQ = 'desc_eq'
DESC_LIKE = 'desc_like'
CREATED_BEFORE = 'created_before'
CREATED_AFTER = 'created_after'
EDITED_BEFORE = 'edited_before'
EDITED_AFTER = 'edited_after'

class ListingFilter(Enum):
    """
    Available properties_filter keys for get_projects() and get_datasets().

    The values for *_before* and *_after* should be datetime objects.
    """

    TITLE_EQ = "title_eq"
    TITLE_LIKE = "title_like"
    DESC_EQ = "desc_eq"
    DESC_LIKE = "desc_like"
    CREATED_BEFORE = "created_before"
    CREATED_AFTER = "created_after"
    EDITED_BEFORE = "edited_before"
    EDITED_AFTER = "edited_after"

get_client_metadata_schema

Returns the schema for your custom metadata. Only one custom metadata schema can exist in Encord at a time.

Refer to the following topic for information on viewing your custom metadata schema: Verify your schema


    def get_client_metadata_schema(self) -> Optional[Dict[str, ClientMetadataSchemaTypes]]:
        return get_client_metadata_schema(self._api_client)

set_client_metadata_schema_from_dict

Specifies the structure for the custom metadata schema in Encord.

Metadata schemas support the following data types for your metadata:

  • NUMBER = "number"
  • STRING = "string"
  • BOOLEAN = "boolean"
  • DATETIME = "datetime"

ℹ️

Note

  • Encord supports ONE metadata schema for each Organisation. Performing multiple imports overwrites the current schema with the new schema.
  • NUMBER supports float values
  • Boolean supports "true" and "false"
  • DATETIME format is ISO 8601 date time (for example: yyyymmdd, yyyy-mm-dd, yyyy-mm-ddThh:mm:ss.msusps)

See Import custom metadata schema for information on performing the schema import.


    def set_client_metadata_schema_from_dict(self, json_dict: Dict[str, ClientMetadataSchemaTypes]):
        set_client_metadata_schema_from_dict(self._api_client, json_dict)

Source

from __future__ import annotations

import base64
import logging
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

from dateutil import parser as datetime_parser

from encord.client import EncordClient, EncordClientDataset, EncordClientProject
from encord.configs import SshConfig, UserConfig, get_env_ssh_key
from encord.constants.string_constants import TYPE_DATASET, TYPE_ONTOLOGY, TYPE_PROJECT
from encord.dataset import Dataset
from encord.http.constants import DEFAULT_REQUESTS_SETTINGS, RequestsSettings
from encord.http.querier import Querier
from encord.http.utils import (
    CloudUploadSettings,
    upload_images_to_encord,
    upload_to_signed_url_list,
)
from encord.http.v2.api_client import ApiClient
from encord.objects import OntologyStructure
from encord.objects.common import (
    DeidentifyRedactTextMode,
    SaveDeidentifiedDicomCondition,
)
from encord.ontology import Ontology
from encord.orm.cloud_integration import CloudIntegration
from encord.orm.dataset import DEFAULT_DATASET_ACCESS_SETTINGS, CreateDatasetResponse
from encord.orm.dataset import Dataset as OrmDataset
from encord.orm.dataset import (
    DatasetAccessSettings,
    DatasetAPIKey,
    DatasetInfo,
    DatasetScope,
    DatasetUserRole,
    DicomDeidentifyTask,
    Images,
    StorageLocation,
)
from encord.orm.dataset_with_user_role import DatasetWithUserRole
from encord.orm.ontology import Ontology as OrmOntology
from encord.orm.project import (
    BenchmarkQaWorkflowSettings,
    CvatExportType,
    ManualReviewWorkflowSettings,
)
from encord.orm.project import Project as OrmProject
from encord.orm.project import (
    ProjectImporter,
    ProjectImporterCvatInfo,
    ProjectWorkflowSettings,
    ProjectWorkflowType,
    ReviewMode,
)
from encord.orm.project_api_key import ProjectAPIKey
from encord.orm.project_with_user_role import ProjectWithUserRole
from encord.project import Project
from encord.utilities.client_utilities import (
    APIKeyScopes,
    CvatImporterError,
    CvatImporterSuccess,
    ImportMethod,
    Issues,
    LocalImport,
)
from encord.utilities.ontology_user import OntologyUserRole, OntologyWithUserRole
from encord.utilities.project_user import ProjectUserRole

log = logging.getLogger(__name__)


class EncordUserClient:
    def __init__(self, user_config: UserConfig, querier: Querier):
        self.user_config = user_config
        self.querier = querier
        self._api_client = ApiClient(user_config)

    def get_dataset(
        self, dataset_hash: str, dataset_access_settings: DatasetAccessSettings = DEFAULT_DATASET_ACCESS_SETTINGS
    ) -> Dataset:
        """
        Get the Project class to access project fields and manipulate a project.

        You will only have access to this project if you are one of the following

            * Dataset admin

            * Organization admin of the project

        Args:
            dataset_hash: The Dataset ID
            dataset_access_settings: Set the dataset_access_settings if you would like to change the defaults.
        """
        config = SshConfig(self.user_config, resource_type=TYPE_DATASET, resource_id=dataset_hash)
        querier = Querier(config)
        client = EncordClientDataset(querier=querier, config=config, dataset_access_settings=dataset_access_settings)
        orm_dataset = client.get_dataset()
        return Dataset(client, orm_dataset)

    def get_project(self, project_hash: str) -> Project:
        """
        Get the Project class to access project fields and manipulate a project.

        You will only have access to this project if you are one of the following

            * Project admin

            * Project team manager

            * Organization admin of the project

        Args:
            project_hash: The Project ID
        """
        config = SshConfig(self.user_config, resource_type=TYPE_PROJECT, resource_id=project_hash)
        querier = Querier(config)
        client = EncordClientProject(querier=querier, config=config)

        orm_project = client.get_project(include_labels_metadata=False)

        # Querying ontology using project querier to avoid permission error,
        # as there might be only read-only ontology structure access in scope of the project,
        # not full access, that is implied by get_ontology method
        ontology_hash = orm_project["ontology_hash"]
        config = SshConfig(self.user_config, resource_type=TYPE_ONTOLOGY, resource_id=ontology_hash)

        orm_ontology = querier.basic_getter(OrmOntology, config.resource_id)
        project_ontology = Ontology(querier, config, orm_ontology)

        return Project(client, orm_project, project_ontology, client_v2=self._api_client)

    def get_ontology(self, ontology_hash: str) -> Ontology:
        config = SshConfig(self.user_config, resource_type=TYPE_ONTOLOGY, resource_id=ontology_hash)
        querier = Querier(config)
        orm_ontology = querier.basic_getter(OrmOntology, ontology_hash)
        return Ontology(querier, config, orm_ontology)

    def create_private_dataset(
        self,
        dataset_title: str,
        dataset_type: StorageLocation,
        dataset_description: Optional[str] = None,
    ) -> CreateDatasetResponse:
        """
        DEPRECATED - please use `create_dataset` instead.
        """
        return self.create_dataset(dataset_title, dataset_type, dataset_description)

    def create_dataset(
        self,
        dataset_title: str,
        dataset_type: StorageLocation,
        dataset_description: Optional[str] = None,
    ) -> CreateDatasetResponse:
        """
        Args:
            dataset_title:
                Title of dataset.
            dataset_type:
                StorageLocation type where data will be stored.
            dataset_description:
                Optional description of the dataset.
        Returns:
            CreateDatasetResponse
        """
        dataset = {
            "title": dataset_title,
            "type": dataset_type,
        }

        if dataset_description:
            dataset["description"] = dataset_description

        result = self.querier.basic_setter(OrmDataset, uid=None, payload=dataset)
        return CreateDatasetResponse.from_dict(result)

    def create_dataset_api_key(
        self, dataset_hash: str, api_key_title: str, dataset_scopes: List[DatasetScope]
    ) -> DatasetAPIKey:
        api_key_payload = {
            "dataset_hash": dataset_hash,
            "title": api_key_title,
            "scopes": list(map(lambda scope: scope.value, dataset_scopes)),
        }
        response = self.querier.basic_setter(DatasetAPIKey, uid=None, payload=api_key_payload)
        return DatasetAPIKey.from_dict(response)

    def get_dataset_api_keys(self, dataset_hash: str) -> List[DatasetAPIKey]:
        api_key_payload = {
            "dataset_hash": dataset_hash,
        }
        api_keys: List[DatasetAPIKey] = self.querier.get_multiple(DatasetAPIKey, uid=None, payload=api_key_payload)
        return api_keys

    def get_or_create_dataset_api_key(self, dataset_hash: str) -> DatasetAPIKey:
        api_key_payload = {
            "dataset_hash": dataset_hash,
        }
        response = self.querier.basic_put(DatasetAPIKey, uid=None, payload=api_key_payload)
        return DatasetAPIKey.from_dict(response)

    def get_datasets(
        self,
        title_eq: Optional[str] = None,
        title_like: Optional[str] = None,
        desc_eq: Optional[str] = None,
        desc_like: Optional[str] = None,
        created_before: Optional[Union[str, datetime]] = None,
        created_after: Optional[Union[str, datetime]] = None,
        edited_before: Optional[Union[str, datetime]] = None,
        edited_after: Optional[Union[str, datetime]] = None,
    ) -> List[Dict[str, Any]]:
        """
        List either all (if called with no arguments) or matching datasets the user has access to.

        Args:
            title_eq: optional exact title filter
            title_like: optional fuzzy title filter; SQL syntax
            desc_eq: optional exact description filter
            desc_like: optional fuzzy description filter; SQL syntax
            created_before: optional creation date filter, 'less'
            created_after: optional creation date filter, 'greater'
            edited_before: optional last modification date filter, 'less'
            edited_after: optional last modification date filter, 'greater'

        Returns:
            list of (role, dataset) pairs for datasets  matching filter conditions.
        """
        properties_filter = self.__validate_filter(locals())
        # a hack to be able to share validation code without too much c&p
        data = self.querier.get_multiple(DatasetWithUserRole, payload={"filter": properties_filter})

        def convert_dates(dataset):
            dataset["created_at"] = datetime_parser.isoparse(dataset["created_at"])
            dataset["last_edited_at"] = datetime_parser.isoparse(dataset["last_edited_at"])
            return dataset

        return [
            {"dataset": DatasetInfo(**convert_dates(d.dataset)), "user_role": DatasetUserRole(d.user_role)}
            for d in data
        ]

    @staticmethod
    def create_with_ssh_private_key(
        ssh_private_key: Optional[str] = None,
        password: Optional[str] = None,
        requests_settings: RequestsSettings = DEFAULT_REQUESTS_SETTINGS,
        ssh_private_key_path: Optional[str | Path] = None,
        **kwargs,
    ) -> EncordUserClient:
        """
        Creates an instance of EncordUserClient authenticated with private SSH key.
        Accepts the private key content, path to key file, that can be provided as method parameters or as following environment variables:

        * **ENCORD_SSH_KEY**: environment variable with the private key content
        * **ENCORD_SSH_KEY_FILE**: environment variable with the path to the key file

        Args:
            ssh_private_key: the private key content
            ssh_private_key_path: the pah to the private key file
            password: private key password
        """

        if ssh_private_key_path is not None:
            if isinstance(ssh_private_key_path, str):
                ssh_private_key_path = Path(ssh_private_key_path)

            ssh_private_key = ssh_private_key_path.read_text(encoding="ascii")

        if not ssh_private_key:
            ssh_private_key = get_env_ssh_key()

        user_config = UserConfig.from_ssh_private_key(
            ssh_private_key, password, requests_settings=requests_settings, **kwargs
        )
        querier = Querier(user_config)

        return EncordUserClient(user_config, querier)

    def get_projects(
        self,
        title_eq: Optional[str] = None,
        title_like: Optional[str] = None,
        desc_eq: Optional[str] = None,
        desc_like: Optional[str] = None,
        created_before: Optional[Union[str, datetime]] = None,
        created_after: Optional[Union[str, datetime]] = None,
        edited_before: Optional[Union[str, datetime]] = None,
        edited_after: Optional[Union[str, datetime]] = None,
    ) -> List[Dict]:
        """
        List either all (if called with no arguments) or matching projects the user has access to.

        Args:
            title_eq: optional exact title filter
            title_like: optional fuzzy title filter; SQL syntax
            desc_eq: optional exact description filter
            desc_like: optional fuzzy description filter; SQL syntax
            created_before: optional creation date filter, 'less'
            created_after: optional creation date filter, 'greater'
            edited_before: optional last modification date filter, 'less'
            edited_after: optional last modification date filter, 'greater'

        Returns:
            list of (role, projects) pairs for project matching filter conditions.
        """
        properties_filter = self.__validate_filter(locals())
        # a hack to be able to share validation code without too much c&p
        data = self.querier.get_multiple(ProjectWithUserRole, payload={"filter": properties_filter})
        return [{"project": OrmProject(p.project), "user_role": ProjectUserRole(p.user_role)} for p in data]

    def create_project(
        self,
        project_title: str,
        dataset_hashes: List[str],
        project_description: str = "",
        ontology_hash: str = "",
        workflow_settings: ProjectWorkflowSettings = ManualReviewWorkflowSettings(),
        workflow_template_hash: Optional[str] = None,
    ) -> str:
        """
        Creates a new project and returns its uid ('project_hash')

        Args:
            project_title: the title of the project
            dataset_hashes: a list of the dataset uids that the project will use
            project_description: the optional description of the project
            ontology_hash: the uid of an ontology to be used. If omitted, a new empty ontology will be created
            workflow_settings: selects and configures the type of the quality control workflow to use, See :class:`encord.orm.project.ProjectWorkflowSettings` for details. If omitted, :class:`~encord.orm.project.ManualReviewWorkflowSettings` is used.
            workflow_template_hash: project will be created using a workflow based on the template provided.
        Returns:
            the uid of the project.
        """
        project = {
            "title": project_title,
            "description": project_description,
            "dataset_hashes": dataset_hashes,
            "workflow_type": ProjectWorkflowType.MANUAL_QA.value,
        }
        if isinstance(workflow_settings, BenchmarkQaWorkflowSettings):
            project["workflow_type"] = ProjectWorkflowType.BENCHMARK_QA.value
            project["source_projects"] = workflow_settings.source_projects
        if ontology_hash and len(ontology_hash):
            project["ontology_hash"] = ontology_hash

        if workflow_template_hash is not None:
            project["workflow_template_id"] = workflow_template_hash

        return self.querier.basic_setter(OrmProject, uid=None, payload=project)

    def create_project_api_key(self, project_hash: str, api_key_title: str, scopes: List[APIKeyScopes]) -> str:
        """
        Returns:
            The created project API key.
        """
        payload = {"title": api_key_title, "scopes": list(map(lambda scope: scope.value, scopes))}

        return self.querier.basic_setter(ProjectAPIKey, uid=project_hash, payload=payload)

    def get_project_api_keys(self, project_hash: str) -> List[ProjectAPIKey]:
        return self.querier.get_multiple(ProjectAPIKey, uid=project_hash)

    def get_or_create_project_api_key(self, project_hash: str) -> str:
        return self.querier.basic_put(ProjectAPIKey, uid=project_hash, payload={})

    def get_dataset_client(
        self,
        dataset_hash: str,
        dataset_access_settings: DatasetAccessSettings = DEFAULT_DATASET_ACCESS_SETTINGS,
        **kwargs,
    ) -> EncordClientDataset:
        """
        DEPRECATED - prefer using :meth:`get_dataset()` instead.
        """
        dataset_api_key: DatasetAPIKey = self.get_or_create_dataset_api_key(dataset_hash)
        return EncordClientDataset.initialise(
            dataset_hash,
            dataset_api_key.api_key,
            requests_settings=self.user_config.requests_settings,
            dataset_access_settings=dataset_access_settings,
        )

    def get_project_client(self, project_hash: str, **kwargs) -> Union[EncordClientProject, EncordClientDataset]:
        """
        DEPRECATED - prefer using :meth:`get_project()` instead.
        """
        project_api_key: str = self.get_or_create_project_api_key(project_hash)
        return EncordClient.initialise(
            project_hash, project_api_key, requests_settings=self.user_config.requests_settings, **kwargs
        )

    def create_project_from_cvat(
        self,
        import_method: ImportMethod,
        dataset_name: str,
        review_mode: ReviewMode = ReviewMode.LABELLED,
        max_workers: Optional[int] = None,
        *,
        transform_bounding_boxes_to_polygons=False,
    ) -> Union[CvatImporterSuccess, CvatImporterError]:
        """
        Export your CVAT project with the "CVAT for images 1.1" option and use this function to import
            your images and annotations into encord. Ensure that during you have the "Save images"
            checkbox enabled when exporting from CVAT.

        Args:
            import_method:
                The chosen import method. See the `ImportMethod` class for details.
            dataset_name:
                The name of the dataset that will be created.
            review_mode:
                Set how much interaction is needed from the labeler and from the reviewer for the CVAT labels.
                    See the `ReviewMode` documentation for more details.
            max_workers:
                DEPRECATED: This argument will be ignored
            transform_bounding_boxes_to_polygons:
                All instances of CVAT bounding boxes will be converted to polygons in the final Encord project.

        Returns:
            CvatImporterSuccess: If the project was successfully imported.
            CvatImporterError: If the project could not be imported.

        Raises:
            ValueError:
                If the CVAT directory has an invalid format.
        """
        if not isinstance(import_method, LocalImport):
            raise ValueError("Only local imports are currently supported ")

        cvat_directory_path = import_method.file_path

        directory_path = Path(cvat_directory_path)
        images_directory_path = directory_path.joinpath("images")
        if images_directory_path not in list(directory_path.iterdir()):
            raise ValueError("The expected directory 'images' was not found.")

        annotations_file_path = directory_path.joinpath("annotations.xml")
        if not annotations_file_path.is_file():
            raise ValueError(f"The file `{annotations_file_path}` does not exist.")

        with annotations_file_path.open("rb") as f:
            annotations_base64 = base64.b64encode(f.read()).decode("utf-8")

        images_paths, used_base_path = self.__get_images_paths(annotations_base64, images_directory_path)

        log.info("Starting image upload.")
        dataset_hash, image_title_to_image_hash_map = self.__upload_cvat_images(
            images_paths, used_base_path, dataset_name
        )
        log.info("Image upload completed.")

        payload = {
            "cvat": {
                "annotations_base64": annotations_base64,
            },
            "dataset_hash": dataset_hash,
            "image_title_to_image_hash_map": image_title_to_image_hash_map,
            "review_mode": review_mode.value,
            "transform_bounding_boxes_to_polygons": transform_bounding_boxes_to_polygons,
        }

        log.info("Starting project import. This may take a few minutes.")
        server_ret = self.querier.basic_setter(ProjectImporter, uid=None, payload=payload)

        if "success" in server_ret:
            success = server_ret["success"]
            return CvatImporterSuccess(
                project_hash=success["project_hash"],
                dataset_hash=dataset_hash,
                issues=Issues.from_dict(success["issues"]),
            )
        elif "error" in server_ret:
            error = server_ret["error"]
            return CvatImporterError(dataset_hash=dataset_hash, issues=Issues.from_dict(error["issues"]))
        else:
            raise ValueError("The api server responded with an invalid payload.")

    def __get_images_paths(self, annotations_base64: str, images_directory_path: Path) -> Tuple[List[Path], Path]:
        payload = {"annotations_base64": annotations_base64}
        project_info = self.querier.basic_setter(ProjectImporterCvatInfo, uid=None, payload=payload)
        if "error" in project_info:
            message = project_info["error"]["message"]
            raise ValueError(message)

        export_type = project_info["success"]["export_type"]
        if export_type == CvatExportType.PROJECT.value:
            default_path = images_directory_path.joinpath("default")
            if default_path not in list(images_directory_path.iterdir()):
                raise ValueError("The expected directory 'default' was not found.")

            used_base_path = default_path
            # NOTE: it is possible that here we also need to use the __get_recursive_image_paths
            images = list(default_path.iterdir())

        elif export_type == CvatExportType.TASK.value:
            used_base_path = images_directory_path
            images = self.__get_recursive_image_paths(images_directory_path)
        else:
            raise ValueError(
                f"Received an unexpected response `{project_info}` from the server. Project import aborted."
            )

        if not images:
            raise ValueError("No images found in the provided data folder.")
        return images, used_base_path

    @staticmethod
    def __get_recursive_image_paths(images_directory_path: Path) -> List[Path]:
        """Recursively get all the images in all the sub folders."""
        ret = []
        for file in images_directory_path.glob("**/*"):
            if file.is_file():
                ret.append(file)
        return ret

    def __upload_cvat_images(
        self, images_paths: List[Path], used_base_path: Path, dataset_name: str
    ) -> Tuple[str, Dict[str, str]]:
        """
        This function does not create any image groups yet.
        Returns:
            * The created dataset_hash
            * A map from an image title to the image hash which is stored in the DB.
        """

        file_path_strings = list(map(lambda x: str(x), images_paths))
        dataset_info = self.create_dataset(dataset_name, StorageLocation.CORD_STORAGE)

        dataset_hash = dataset_info.dataset_hash

        dataset = self.get_dataset(
            dataset_hash,
        )
        querier = dataset._client._querier

        successful_uploads = upload_to_signed_url_list(
            file_path_strings, self.user_config, querier, Images, CloudUploadSettings()
        )
        if len(images_paths) != len(successful_uploads):
            raise RuntimeError("Could not upload all the images successfully. Aborting CVAT upload.")

        upload_images_to_encord(successful_uploads, querier)

        image_title_to_image_hash_map = {}
        for image_path, successful_upload in zip(images_paths, successful_uploads):
            trimmed_image_path_str = str(image_path.relative_to(used_base_path))
            image_title_to_image_hash_map[trimmed_image_path_str] = successful_upload.data_hash

        return dataset_hash, image_title_to_image_hash_map

    def get_cloud_integrations(self) -> List[CloudIntegration]:
        return self.querier.get_multiple(CloudIntegration)

    def get_ontologies(
        self,
        title_eq: Optional[str] = None,
        title_like: Optional[str] = None,
        desc_eq: Optional[str] = None,
        desc_like: Optional[str] = None,
        created_before: Optional[Union[str, datetime]] = None,
        created_after: Optional[Union[str, datetime]] = None,
        edited_before: Optional[Union[str, datetime]] = None,
        edited_after: Optional[Union[str, datetime]] = None,
    ) -> List[Dict]:
        """
        List either all (if called with no arguments) or matching ontologies the user has access to.

        Args:
            title_eq: optional exact title filter
            title_like: optional fuzzy title filter; SQL syntax
            desc_eq: optional exact description filter
            desc_like: optional fuzzy description filter; SQL syntax
            created_before: optional creation date filter, 'less'
            created_after: optional creation date filter, 'greater'
            edited_before: optional last modification date filter, 'less'
            edited_after: optional last modification date filter, 'greater'

        Returns:
            list of (role, projects) pairs for ontologies matching filter conditions.
        """
        properties_filter = self.__validate_filter(locals())
        # a hack to be able to share validation code without too much c&p
        data = self.querier.get_multiple(OntologyWithUserRole, payload={"filter": properties_filter})
        retval: List[Dict] = []
        for row in data:
            ontology = OrmOntology.from_dict(row.ontology)
            config = SshConfig(self.user_config, resource_type=TYPE_ONTOLOGY, resource_id=ontology.ontology_hash)
            querier = Querier(config)
            retval.append(
                {
                    "ontology": Ontology(querier, config, ontology),
                    "user_role": OntologyUserRole(row.user_role),
                }
            )
        return retval

    def create_ontology(
        self, title: str, description: str = "", structure: Optional[OntologyStructure] = None
    ) -> Ontology:
        structure_dict = structure.to_dict() if structure else dict()
        ontology = {
            "title": title,
            "description": description,
            "editor": structure_dict,
        }

        retval = self.querier.basic_setter(OrmOntology, uid=None, payload=ontology)
        ontology = OrmOntology.from_dict(retval)
        config = SshConfig(self.user_config, resource_type=TYPE_ONTOLOGY, resource_id=ontology.ontology_hash)
        querier = Querier(config)

        return Ontology(querier, config, ontology)

    def __validate_filter(self, properties_filter: Dict) -> Dict:
        if not isinstance(properties_filter, dict):
            raise ValueError("Filter should be a dictionary")

        valid_filters = set([f.value for f in ListingFilter])

        ret = dict()

        # be relaxed with what we receive: translate raw strings to enum values
        for clause, val in properties_filter.items():
            if val is None:
                continue

            if isinstance(clause, str):
                if clause in valid_filters:
                    clause = ListingFilter(clause)
                else:
                    continue
            elif not isinstance(clause, ListingFilter):
                continue

            if clause.value.endswith("before") or clause.value.endswith("after"):
                if isinstance(val, str):
                    val = datetime_parser.isoparse(val)
                if isinstance(val, datetime):
                    val = val.isoformat()
                else:
                    raise ValueError(f"Value for {clause.name} filter should be a datetime")

            ret[clause.value] = val

        return ret

    def deidentify_dicom_files(
        self,
        dicom_urls: List[str],
        integration_hash: str,
        redact_dicom_tags: bool = True,
        redact_pixels_mode: DeidentifyRedactTextMode = DeidentifyRedactTextMode.REDACT_NO_TEXT,
        save_conditions: Optional[List[SaveDeidentifiedDicomCondition]] = None,
        upload_dir: Optional[str] = None,
    ) -> List[str]:
        """
        Deidentify DICOM files in external storage.
        Given links to DICOM files pointing to AWS, GCP, AZURE or OTC, for example:
        [ "https://s3.region-code.amazonaws.com/bucket-name/dicom-file-input.dcm" ]
        Function executes deidentification on those files, it removes all
        DICOM tags (https://dicom.nema.org/medical/Dicom/2017e/output/chtml/part06/chapter_6.html)
        from metadata except for:

        * x00080018 SOPInstanceUID
        * x00100010 PatientName
        * x00180050 SliceThickness
        * x00180088 SpacingBetweenSlices
        * x0020000d StudyInstanceUID
        * x0020000e SeriesInstanceUID
        * x00200032 ImagePositionPatient
        * x00200037 ImageOrientationPatient
        * x00280008 NumberOfFrames
        * x00281050 WindowCenter
        * x00281051 WindowWidth
        * x00520014 ALinePixelSpacing

        Args:
            self: Encord client object.
            dicom_urls: a list of urls to DICOM files, e.g.
                `[ "https://s3.region-code.amazonaws.com/bucket-name/dicom-file-input.dcm" ]`
            integration_hash:
                integration_hash parameter of Encord platform external storage integration
            redact_dicom_tags:
                Specifies if DICOM tags redaction should be enabled.
            redact_pixels_mode:
                Specifies which text redaction policy should be applied to pixel data.
            save_conditions:
                Specifies a list of conditions which all have to be true for DICOM deidentified file to be saved.
            upload_dir:
                Specifies a directory that files will be uploaded to. By default, set to None,
                deidentified files will be uploaded to the same directory as source files.
        Returns:
            Function returns list of links pointing to deidentified DICOM files,
            those will be saved to the same bucket and the same directory
            as original files with prefix ( deid_{timestamp}_ ).
            Example output:
            `[ "https://s3.region-code.amazonaws.com/bucket-name/deid_167294769118005312_dicom-file-input.dcm" ]`

        """

        return self.querier.basic_setter(
            DicomDeidentifyTask,
            uid=integration_hash,
            payload={
                "dicom_urls": dicom_urls,
                "redact_dicom_tags": redact_dicom_tags,
                "redact_pixels_mode": redact_pixels_mode.value,
                "save_conditions": [x.to_dict() for x in (save_conditions or [])],
                "upload_dir": upload_dir,
            },
        )


class ListingFilter(Enum):
    """
    Available properties_filter keys for get_projects() and get_datasets().

    The values for *_before* and *_after* should be datetime objects.
    """

    TITLE_EQ = "title_eq"
    TITLE_LIKE = "title_like"
    DESC_EQ = "desc_eq"
    DESC_LIKE = "desc_like"
    CREATED_BEFORE = "created_before"
    CREATED_AFTER = "created_after"
    EDITED_BEFORE = "edited_before"
    EDITED_AFTER = "edited_after"