BaseConfig
class encord.configs.BaseConfig(endpoint, requests_settings=RequestsSettings(max_retries=3, backoff_factor=1.5, connection_retries=3, connect_timeout=180, read_timeout=180, write_timeout=180))
class BaseConfig(ABC):
def __init__(self, endpoint: str, requests_settings: RequestsSettings = DEFAULT_REQUESTS_SETTINGS):
self.read_timeout: int = requests_settings.read_timeout
self.write_timeout: int = requests_settings.write_timeout
self.connect_timeout: int = requests_settings.connect_timeout
self.endpoint: str = endpoint
self.requests_settings = requests_settings
@abstractmethod
def define_headers(self, data: str) -> Dict:
pass
define_headers
abstract define_headers(data)
Return type:
Dict
@abstractmethod
def define_headers(self, data: str) -> Dict:
pass
UserConfig
class encord.configs.UserConfig(private_key, domain='https://api.encord.com', requests_settings=RequestsSettings(max_retries=3, backoff_factor=1.5, connection_retries=3, connect_timeout=180, read_timeout=180, write_timeout=180))
class UserConfig(BaseConfig):
def __init__(
self,
private_key: Ed25519PrivateKey,
domain: str = ENCORD_DOMAIN,
requests_settings: RequestsSettings = DEFAULT_REQUESTS_SETTINGS,
):
self.private_key: Ed25519PrivateKey = private_key
self.public_key: Ed25519PublicKey = private_key.public_key()
self.public_key_hex: str = self.public_key.public_bytes(Encoding.Raw, PublicFormat.Raw).hex()
self.domain = domain
endpoint = domain + ENCORD_PUBLIC_USER_PATH
super().__init__(endpoint, requests_settings=requests_settings)
def define_headers(self, data: str) -> Dict:
signature = _get_signature(data, self.private_key)
return {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"Content-Type": "application/json",
"Authorization": _get_ssh_authorization_header(self.public_key_hex, signature),
}
@staticmethod
def from_ssh_private_key(
ssh_private_key: str,
password: Optional[str] = "",
requests_settings: RequestsSettings = DEFAULT_REQUESTS_SETTINGS,
**kwargs,
):
"""
Instantiate a UserConfig object by the content of a private ssh key.
Args:
ssh_private_key: The content of a private key file.
password: The password for the private key file.
requests_settings: The requests settings for all outgoing network requests.
Returns:
UserConfig.
Raises:
ValueError: If the provided key content is not of the correct format.
"""
key_bytes = ssh_private_key.encode()
password_bytes = password.encode() if password else None
private_key = load_ssh_private_key(key_bytes, password_bytes)
if isinstance(private_key, Ed25519PrivateKey):
return UserConfig(private_key, requests_settings=requests_settings, **kwargs)
else:
raise ValueError(f"Provided key [{ssh_private_key}] is not an Ed25519 private key")
define_headers
define_headers(data)
Return type:
Dict
def define_headers(self, data: str) -> Dict:
signature = _get_signature(data, self.private_key)
return {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"Content-Type": "application/json",
"Authorization": _get_ssh_authorization_header(self.public_key_hex, signature),
}
from_ssh_private_key
Instantiate a UserConfig object by the content of a private ssh key.
static from_ssh_private_key(ssh_private_key, password='', requests_settings=RequestsSettings(max_retries=3, backoff_factor=1.5, connection_retries=3, connect_timeout=180, read_timeout=180, write_timeout=180), **kwargs)
Parameters:
- ssh_private_key (str) – The content of a private key file.
- password (Optional[str]) – The password for the private key file.
- requests_settings (RequestsSettings) – The requests settings for all outgoing network requests.
Returns:
Raises:
ValueError – If the provided key content is not of the correct format.
@staticmethod
def from_ssh_private_key(
ssh_private_key: str,
password: Optional[str] = "",
requests_settings: RequestsSettings = DEFAULT_REQUESTS_SETTINGS,
**kwargs,
):
"""
Instantiate a UserConfig object by the content of a private ssh key.
Args:
ssh_private_key: The content of a private key file.
password: The password for the private key file.
requests_settings: The requests settings for all outgoing network requests.
Returns:
UserConfig.
Raises:
ValueError: If the provided key content is not of the correct format.
"""
key_bytes = ssh_private_key.encode()
password_bytes = password.encode() if password else None
private_key = load_ssh_private_key(key_bytes, password_bytes)
if isinstance(private_key, Ed25519PrivateKey):
return UserConfig(private_key, requests_settings=requests_settings, **kwargs)
else:
raise ValueError(f"Provided key [{ssh_private_key}] is not an Ed25519 private key")
Config
Config defining endpoint, project id, API key, and timeouts.
class encord.configs.Config(resource_id=None, web_file_path='/public', domain=None, websocket_endpoint='wss://message-api.cord.tech/websocket', requests_settings=RequestsSettings(max_retries=3, backoff_factor=1.5, connection_retries=3, connect_timeout=180, read_timeout=180, write_timeout=180))
class Config(BaseConfig):
"""
Config defining endpoint, project id, API key, and timeouts.
"""
def __init__(
self,
resource_id: Optional[str] = None,
web_file_path: str = ENCORD_PUBLIC_PATH,
domain: Optional[str] = None,
websocket_endpoint: str = WEBSOCKET_ENDPOINT,
requests_settings: RequestsSettings = DEFAULT_REQUESTS_SETTINGS,
):
if resource_id is None:
resource_id = get_env_resource_id()
self.resource_id = resource_id
self.websocket_endpoint = websocket_endpoint
if domain is None:
raise RuntimeError("`domain` must be specified")
self.domain = domain
endpoint = domain + web_file_path
super().__init__(endpoint, requests_settings=requests_settings)
logger.info("Initialising Encord client with endpoint: %s and resource_id: %s", endpoint, resource_id)
@abstractmethod
def get_websocket_url(self):
raise NotImplementedError("The specialised config needs to implement this method.")
get_websocket_url
abstract get_websocket_url()
@abstractmethod
def get_websocket_url(self):
raise NotImplementedError("The specialised config needs to implement this method.")
get_env_resource_id
encord.configs.get_env_resource_id()
Return type:
str
def get_env_resource_id() -> str:
project_id = os.environ.get(_ENCORD_PROJECT_ID) or os.environ.get(_CORD_PROJECT_ID)
dataset_id = os.environ.get(_ENCORD_DATASET_ID) or os.environ.get(_CORD_DATASET_ID)
if (project_id is not None) and (dataset_id is not None):
raise encord.exceptions.InitialisationError(
message=(
"Found both Project EntityId and Dataset EntityId in os.environ. "
"Please initialise EncordClient by passing resource_id."
)
)
elif project_id is not None:
resource_id = project_id
elif dataset_id is not None:
resource_id = dataset_id
else:
raise encord.exceptions.AuthenticationError(message="Project EntityId or dataset EntityId not provided")
return resource_id
get_env_api_key
encord.configs.get_env_api_key()
Return type:
str
def get_env_api_key() -> str:
api_key = os.environ.get(_ENCORD_API_KEY) or os.environ.get(_CORD_API_KEY)
if api_key is None:
raise encord.exceptions.AuthenticationError(message="API key not provided")
return api_key
get_env_ssh_key
Returns the raw ssh key by looking up the ENCORD_SSH_KEY_FILE and ENCORD_SSH_KEY environment variables in the mentioned order and returns the first successfully identified key.
encord.configs.get_env_ssh_key()
Return type:
str
def get_env_ssh_key() -> str:
"""
Returns the raw ssh key by looking up the `ENCORD_SSH_KEY_FILE` and `ENCORD_SSH_KEY` environment variables
in the mentioned order and returns the first successfully identified key.
"""
# == 1. Look for key file
ssh_file = os.environ.get(_ENCORD_SSH_KEY_FILE)
if ssh_file:
ssh_file = os.path.abspath(os.path.expanduser(ssh_file))
if not os.path.exists(ssh_file):
raise ResourceNotFoundError(
f"SSH key file `{ssh_file}` which is defined in the `{_ENCORD_SSH_KEY_FILE}` environment variable does not seem to exist. "
f"Failed to load private ssh key."
)
with open(ssh_file, encoding="ascii") as f:
return f.read()
# == 2. Look for raw key
raw_ssh_key = os.environ.get(_ENCORD_SSH_KEY)
if raw_ssh_key is None:
raise ResourceNotFoundError(
f"Neither of the environment variables {_ENCORD_SSH_KEY_FILE} or {_ENCORD_SSH_KEY} were found. "
f"Failed to load private ssh key."
)
if raw_ssh_key == "":
raise ResourceNotFoundError(
f"Environment variable {_ENCORD_SSH_KEY} found but is empty. " f"Failed to load private ssh key."
)
return raw_ssh_key
ApiKeyConfig
class encord.configs.ApiKeyConfig(resource_id=None, api_key=None, domain=None, requests_settings=RequestsSettings(max_retries=3, backoff_factor=1.5, connection_retries=3, connect_timeout=180, read_timeout=180, write_timeout=180))
class ApiKeyConfig(Config):
def __init__(
self,
resource_id: Optional[str] = None,
api_key: Optional[str] = None,
domain: Optional[str] = None,
requests_settings: RequestsSettings = DEFAULT_REQUESTS_SETTINGS,
):
web_file_path = ENCORD_PUBLIC_PATH
if api_key is None:
api_key = get_env_api_key()
self.api_key = api_key
self._headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"Content-Type": "application/json",
"ResourceID": resource_id,
"Authorization": self.api_key,
}
super().__init__(resource_id, web_file_path=web_file_path, domain=domain, requests_settings=requests_settings)
def define_headers(self, data) -> Dict:
return self._headers
def get_websocket_url(self):
return (
f"{self.websocket_endpoint}?"
f"client_type={2}&"
f"project_hash={self.resource_id}&"
f"api_key={self.api_key}"
)
define_headers
define_headers(data)
Return type:
Dict
def define_headers(self, data) -> Dict:
return self._headers
get_websocket_url
get_websocket_url()
def get_websocket_url(self):
return (
f"{self.websocket_endpoint}?"
f"client_type={2}&"
f"project_hash={self.resource_id}&"
f"api_key={self.api_key}"
)
EncordConfig
Alias of encord.configs.ApiKeyConfig.
encord.configs.EncordConfig
CordConfig
Alias of encord.configs.ApiKeyConfig.
encord.configs.CordConfig
SshConfig
class encord.configs.SshConfig(user_config, resource_type, resource_id=None)
class SshConfig(Config):
def __init__(
self,
user_config: UserConfig,
resource_type: str,
resource_id: Optional[str] = None,
):
self._user_config = user_config
if resource_type not in ALL_RESOURCE_TYPES:
raise TypeError(f"The passed resource type `{resource_type}` is invalid.")
self._resource_type = resource_type
super().__init__(
resource_id=resource_id,
domain=self._user_config.domain,
requests_settings=self._user_config.requests_settings,
)
def define_headers(self, data: str) -> Dict:
signature = _get_signature(data, self._user_config.private_key)
return {
"Accept": "application/json",
"Content-Type": "application/json",
"Accept-Encoding": "gzip",
"ResourceID": self.resource_id,
"ResourceType": self._resource_type,
"Authorization": _get_ssh_authorization_header(self._user_config.public_key_hex, signature),
}
def get_websocket_url(self):
signature = _get_signature(self.resource_id, self._user_config.private_key)
return (
f"{self.websocket_endpoint}?"
f"client_type={2}&"
f"project_hash={self.resource_id}&"
f"ssh_authorization={_get_ssh_authorization_header(self._user_config.public_key_hex, signature)}"
)
define_headers
define_headers(data)
Return type:
Dict
def define_headers(self, data: str) -> Dict:
signature = _get_signature(data, self._user_config.private_key)
return {
"Accept": "application/json",
"Content-Type": "application/json",
"Accept-Encoding": "gzip",
"ResourceID": self.resource_id,
"ResourceType": self._resource_type,
"Authorization": _get_ssh_authorization_header(self._user_config.public_key_hex, signature),
}
get_websocket_url
get_websocket_url()
def get_websocket_url(self):
signature = _get_signature(self.resource_id, self._user_config.private_key)
return (
f"{self.websocket_endpoint}?"
f"client_type={2}&"
f"project_hash={self.resource_id}&"
f"ssh_authorization={_get_ssh_authorization_header(self._user_config.public_key_hex, signature)}"
)
ENCORD_DOMAIN
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
encord.configs.ENCORD_DOMAIN = 'https://api.encord.com'
Source
#
# Copyright (c) 2023 Cord Technologies Limited
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import logging
import os
from abc import ABC, abstractmethod
from typing import Dict, Optional
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
load_ssh_private_key,
)
import encord.exceptions
from encord.constants.string_constants import ALL_RESOURCE_TYPES
from encord.http.constants import DEFAULT_REQUESTS_SETTINGS, RequestsSettings
ENCORD_DOMAIN = "https://api.encord.com"
ENCORD_PUBLIC_PATH = "/public"
ENCORD_PUBLIC_USER_PATH = "/public/user"
ENCORD_ENDPOINT = ENCORD_DOMAIN + ENCORD_PUBLIC_PATH
ENCORD_USER_ENDPOINT = ENCORD_DOMAIN + ENCORD_PUBLIC_USER_PATH
WEBSOCKET_PATH = "/websocket"
WEBSOCKET_DOMAIN = "wss://message-api.cord.tech"
WEBSOCKET_ENDPOINT = WEBSOCKET_DOMAIN + WEBSOCKET_PATH
_CORD_PROJECT_ID = "CORD_PROJECT_ID"
_ENCORD_PROJECT_ID = "ENCORD_PROJECT_ID"
_CORD_DATASET_ID = "CORD_DATASET_ID"
_ENCORD_DATASET_ID = "ENCORD_DATASET_ID"
_CORD_API_KEY = "CORD_API_KEY"
_ENCORD_API_KEY = "ENCORD_API_KEY"
_ENCORD_SSH_KEY = "ENCORD_SSH_KEY"
_ENCORD_SSH_KEY_FILE = "ENCORD_SSH_KEY_FILE"
logger = logging.getLogger(__name__)
from encord.exceptions import ResourceNotFoundError
class BaseConfig(ABC):
def __init__(self, endpoint: str, requests_settings: RequestsSettings = DEFAULT_REQUESTS_SETTINGS):
self.read_timeout: int = requests_settings.read_timeout
self.write_timeout: int = requests_settings.write_timeout
self.connect_timeout: int = requests_settings.connect_timeout
self.endpoint: str = endpoint
self.requests_settings = requests_settings
@abstractmethod
def define_headers(self, data: str) -> Dict:
pass
def _get_signature(data: str, private_key: Ed25519PrivateKey) -> bytes:
hash_builder = hashlib.sha256()
hash_builder.update(data.encode())
contents_hash = hash_builder.digest()
return private_key.sign(contents_hash)
def _get_ssh_authorization_header(public_key_hex: str, signature: bytes) -> str:
return f"{public_key_hex}:{signature.hex()}"
class UserConfig(BaseConfig):
def __init__(
self,
private_key: Ed25519PrivateKey,
domain: str = ENCORD_DOMAIN,
requests_settings: RequestsSettings = DEFAULT_REQUESTS_SETTINGS,
):
self.private_key: Ed25519PrivateKey = private_key
self.public_key: Ed25519PublicKey = private_key.public_key()
self.public_key_hex: str = self.public_key.public_bytes(Encoding.Raw, PublicFormat.Raw).hex()
self.domain = domain
endpoint = domain + ENCORD_PUBLIC_USER_PATH
super().__init__(endpoint, requests_settings=requests_settings)
def define_headers(self, data: str) -> Dict:
signature = _get_signature(data, self.private_key)
return {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"Content-Type": "application/json",
"Authorization": _get_ssh_authorization_header(self.public_key_hex, signature),
}
@staticmethod
def from_ssh_private_key(
ssh_private_key: str,
password: Optional[str] = "",
requests_settings: RequestsSettings = DEFAULT_REQUESTS_SETTINGS,
**kwargs,
):
"""
Instantiate a UserConfig object by the content of a private ssh key.
Args:
ssh_private_key: The content of a private key file.
password: The password for the private key file.
requests_settings: The requests settings for all outgoing network requests.
Returns:
UserConfig.
Raises:
ValueError: If the provided key content is not of the correct format.
"""
key_bytes = ssh_private_key.encode()
password_bytes = password.encode() if password else None
private_key = load_ssh_private_key(key_bytes, password_bytes)
if isinstance(private_key, Ed25519PrivateKey):
return UserConfig(private_key, requests_settings=requests_settings, **kwargs)
else:
raise ValueError(f"Provided key [{ssh_private_key}] is not an Ed25519 private key")
class Config(BaseConfig):
"""
Config defining endpoint, project id, API key, and timeouts.
"""
def __init__(
self,
resource_id: Optional[str] = None,
web_file_path: str = ENCORD_PUBLIC_PATH,
domain: Optional[str] = None,
websocket_endpoint: str = WEBSOCKET_ENDPOINT,
requests_settings: RequestsSettings = DEFAULT_REQUESTS_SETTINGS,
):
if resource_id is None:
resource_id = get_env_resource_id()
self.resource_id = resource_id
self.websocket_endpoint = websocket_endpoint
if domain is None:
raise RuntimeError("`domain` must be specified")
self.domain = domain
endpoint = domain + web_file_path
super().__init__(endpoint, requests_settings=requests_settings)
logger.info("Initialising Encord client with endpoint: %s and resource_id: %s", endpoint, resource_id)
@abstractmethod
def get_websocket_url(self):
raise NotImplementedError("The specialised config needs to implement this method.")
def get_env_resource_id() -> str:
project_id = os.environ.get(_ENCORD_PROJECT_ID) or os.environ.get(_CORD_PROJECT_ID)
dataset_id = os.environ.get(_ENCORD_DATASET_ID) or os.environ.get(_CORD_DATASET_ID)
if (project_id is not None) and (dataset_id is not None):
raise encord.exceptions.InitialisationError(
message=(
"Found both Project EntityId and Dataset EntityId in os.environ. "
"Please initialise EncordClient by passing resource_id."
)
)
elif project_id is not None:
resource_id = project_id
elif dataset_id is not None:
resource_id = dataset_id
else:
raise encord.exceptions.AuthenticationError(message="Project EntityId or dataset EntityId not provided")
return resource_id
def get_env_api_key() -> str:
api_key = os.environ.get(_ENCORD_API_KEY) or os.environ.get(_CORD_API_KEY)
if api_key is None:
raise encord.exceptions.AuthenticationError(message="API key not provided")
return api_key
def get_env_ssh_key() -> str:
"""
Returns the raw ssh key by looking up the `ENCORD_SSH_KEY_FILE` and `ENCORD_SSH_KEY` environment variables
in the mentioned order and returns the first successfully identified key.
"""
# == 1. Look for key file
ssh_file = os.environ.get(_ENCORD_SSH_KEY_FILE)
if ssh_file:
ssh_file = os.path.abspath(os.path.expanduser(ssh_file))
if not os.path.exists(ssh_file):
raise ResourceNotFoundError(
f"SSH key file `{ssh_file}` which is defined in the `{_ENCORD_SSH_KEY_FILE}` environment variable does not seem to exist. "
f"Failed to load private ssh key."
)
with open(ssh_file, encoding="ascii") as f:
return f.read()
# == 2. Look for raw key
raw_ssh_key = os.environ.get(_ENCORD_SSH_KEY)
if raw_ssh_key is None:
raise ResourceNotFoundError(
f"Neither of the environment variables {_ENCORD_SSH_KEY_FILE} or {_ENCORD_SSH_KEY} were found. "
f"Failed to load private ssh key."
)
if raw_ssh_key == "":
raise ResourceNotFoundError(
f"Environment variable {_ENCORD_SSH_KEY} found but is empty. " f"Failed to load private ssh key."
)
return raw_ssh_key
class ApiKeyConfig(Config):
def __init__(
self,
resource_id: Optional[str] = None,
api_key: Optional[str] = None,
domain: Optional[str] = None,
requests_settings: RequestsSettings = DEFAULT_REQUESTS_SETTINGS,
):
web_file_path = ENCORD_PUBLIC_PATH
if api_key is None:
api_key = get_env_api_key()
self.api_key = api_key
self._headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"Content-Type": "application/json",
"ResourceID": resource_id,
"Authorization": self.api_key,
}
super().__init__(resource_id, web_file_path=web_file_path, domain=domain, requests_settings=requests_settings)
def define_headers(self, data) -> Dict:
return self._headers
def get_websocket_url(self):
return (
f"{self.websocket_endpoint}?"
f"client_type={2}&"
f"project_hash={self.resource_id}&"
f"api_key={self.api_key}"
)
EncordConfig = ApiKeyConfig
CordConfig = EncordConfig
class SshConfig(Config):
def __init__(
self,
user_config: UserConfig,
resource_type: str,
resource_id: Optional[str] = None,
):
self._user_config = user_config
if resource_type not in ALL_RESOURCE_TYPES:
raise TypeError(f"The passed resource type `{resource_type}` is invalid.")
self._resource_type = resource_type
super().__init__(
resource_id=resource_id,
domain=self._user_config.domain,
requests_settings=self._user_config.requests_settings,
)
def define_headers(self, data: str) -> Dict:
signature = _get_signature(data, self._user_config.private_key)
return {
"Accept": "application/json",
"Content-Type": "application/json",
"Accept-Encoding": "gzip",
"ResourceID": self.resource_id,
"ResourceType": self._resource_type,
"Authorization": _get_ssh_authorization_header(self._user_config.public_key_hex, signature),
}
def get_websocket_url(self):
signature = _get_signature(self.resource_id, self._user_config.private_key)
return (
f"{self.websocket_endpoint}?"
f"client_type={2}&"
f"project_hash={self.resource_id}&"
f"ssh_authorization={_get_ssh_authorization_header(self._user_config.public_key_hex, signature)}"
)