decentriq_platform
The Python package decentriq_platform
provides all of the tools needed to interact with the Decentriq platform.
Functionality around the different types of Data Clean Rooms (DCRs) supported by the platform is provided
by submodules. The submodule analytics
, for example, provides classes and
methods for creating and interacting with Analytics DCRs. Similarly, the
submodule data_lab
contains the code nessary to create and run DataLabs.
The package decentriq_platform
can be used to
- create DCRs (Data Clean Rooms)
- encrypt and upload datasets
- trigger computation of authorized computations
The currently available compute modules are:
decentriq_platform.sql
- for SQL-based computationsdecentriq_platform.container
- for containerized Python-based computations
To learn more, click on these modules in the sidebar on the left.
The source code of the Decentriq Python SDK can be found on GitHub.
Installation
Follow the Python SDK installation guide.
Available enclaves
Browse through all available enclave workers in the Computations page.
Sub-modules
- decentriq_platform.analytics
- decentriq_platform.archv2
- decentriq_platform.attestation
- decentriq_platform.authentication
- decentriq_platform.channel
- decentriq_platform.connection
- decentriq_platform.data_connectors
- decentriq_platform.data_lab
- decentriq_platform.endorsement
- decentriq_platform.legacy
- decentriq_platform.logger
- decentriq_platform.lookalike_media
- decentriq_platform.media
Functions
create_client
def create_client(
user_email: str,
api_token: str,
*,
client_id: str = 'MHyVW112w7Ql95G96fn9rnLWkYuOLmdk',
api_host: str = 'api.decentriq.com',
api_port: int = 443,
api_use_tls: bool = True,
request_timeout: Optional[int] = None,
unsafe_disable_known_root_ca_check: bool = False,
) ‑> decentriq_platform.client.Client
The primary way to create a Client
object.
Parameters:
api_token
: An API token with which to authenticate oneself. The API token can be obtained in the user account settings in the Decentriq UI.user_email
: The email address of the user that generated the given API token.
Classes
Client
Client(
user_email: str,
enclave_api_token: str,
api: decentriq_platform.api.Api,
graphql: decentriq_platform.graphql.GqlClient,
request_timeout: Optional[int] = None,
unsafe_disable_known_root_ca_check: bool = False,
custom_mrsigner_driver_spec: Optional[attestation_pb2.AttestationSpecification] = None,
)
A Client
object allows you to upload datasets and to create Session
objects that
can communicate with enclaves and perform essential operations such as publishing
data rooms and execute computations and retrieve results.
Objects of this class can be used to create and run data rooms, as well as to securely upload data and retrieve computation results.
Objects of this class should be created using the create_client
function.
Create a client instance.
Rather than creating Client
instances directly using this constructor,
use the function create_client
.
Instance variables
decentriq_ca_root_certificate: bytes
: Returns the root certificate used by the Decentriq identity provider.
Note that when using this certificate in any authentication scheme,
you trust Decentriq as an identity provider!
decentriq_pki_authentication: data_room_pb2.AuthenticationMethod
: The authentication method that uses the Decentriq root certificate to authenticate
users.
This method should be specified when building a data room in case you want to interact
with the that data room either via the web interface or with sessions created using
`create_auth_using_decentriq_pki`.
Note that when using this authentication method you trust Decentriq as an identity provider!
You can also create an `AuthenticationMethod` object directly and supply your own root certificate,
with which to authenticate users connecting to your data room.
In this case you will also need to issue corresponding user certificates and create your
own custom `decentriq_platform.authentication.Auth` objects.
check_enclave_availability
def check_enclave_availability(
self,
specs: Dict[str, decentriq_platform.types.EnclaveSpecification],
)
Check whether the selected enclaves are deployed at this moment. If one of the enclaves is not deployed, an exception will be raised.
create_auth
def create_auth(
self,
) ‑> decentriq_platform.authentication.Auth
Creates a decentriq_platform.authentication.Auth
object which can be attached
to decentriq_platform.session.Session
.
create_auth_using_decentriq_pki
def create_auth_using_decentriq_pki(
self,
enclaves: Dict[str, decentriq_platform.types.EnclaveSpecification],
) ‑> Tuple[decentriq_platform.authentication.Auth, decentriq_platform.endorsement.Endorser]
create_session
def create_session(
self,
auth: decentriq_platform.authentication.Auth,
enclaves: Dict[str, decentriq_platform.types.EnclaveSpecification],
) ‑> decentriq_platform.session.Session
Creates a new decentriq_platform.session.Session
instance to communicate
with a driver enclave.
The passed set of enclave specifications must include a specification for
a driver enclave.
Messages sent through this session will be authenticated with the given authentication object.
create_session_from_data_room_description
def create_session_from_data_room_description(
self,
data_room_description: decentriq_platform.types.DataRoomDescription,
specs: Optional[List[decentriq_platform.types.EnclaveSpecification]] = None,
) ‑> decentriq_platform.session.Session
Create a session for interacting with a DCR of the given data room description.
create_session_v2
def create_session_v2(
self,
) ‑> decentriq_platform.archv2.session.SessionV2
Creates a new decentriq_platform.session.SessionV2
instance to communicate
with a driver enclave.
delete_dataset
def delete_dataset(
self,
manifest_hash: str,
force: bool = False,
)
Deletes the dataset with the given id from the Decentriq platform.
In case the dataset is still published to one or more data rooms,
an exception will be thrown and the dataset will need to be
unpublished manually from the respective data rooms using
Session.remove_published_dataset
.
This behavior can be overridden by using the force
flag.
Note, however, that this might put some data rooms in a broken
state as they might try to read data that does not exist anymore.
get_available_datasets
def get_available_datasets(
self,
) ‑> List[decentriq_platform.types.DatasetDescription]
Returns the a list of datasets that the current user uploaded, regardless of whether they have already been connected to a data room or not.
get_data_lab
def get_data_lab(
self,
id: str,
) ‑> decentriq_platform.types.DataLabDefinition
Return the DataLab with the given ID.
Parameters:
id
: ID of the DataLab to get.
get_data_room_description
def get_data_room_description(
self,
data_room_hash,
enclave_specs,
) ‑> Optional[decentriq_platform.types.DataRoomDescription]
Get a single data room description.
get_data_room_descriptions
def get_data_room_descriptions(
self,
) ‑> List[decentriq_platform.types.DataRoomDescription]
Returns the a list of descriptions of all the data rooms a user created or participates in.
get_dataset
def get_dataset(
self,
manifest_hash: str,
) ‑> Optional[decentriq_platform.types.DatasetDescription]
Returns information about a user dataset given a dataset id.
get_dataset_key
def get_dataset_key(
self,
manifest_hash: str,
) ‑> decentriq_platform.storage.Key
get_lookalike_media_data_rooms
def get_lookalike_media_data_rooms(
self,
) ‑> List[decentriq_platform.types.DataRoom]
Get all Lookalike Media data clean rooms.
list_data_labs
def list_data_labs(
self,
filter: Optional[decentriq_platform.types.DataLabListFilter] = None,
) ‑> List[decentriq_platform.types.DataLabDefinition]
Return a list of DataLabs based on the filter
criteria.
Parameters:
filter
: Criteria used to filter the list. Can be one of the following values:- NONE: Display all DataLabs.
- VALIDATED: Display DataLabs that have been validated.
- UNVALIDATED: Display DataLabs that have not been validated.
publish_analytics_dcr
def publish_analytics_dcr(
self,
dcr_definition: decentriq_platform.analytics.analytics_dcr.AnalyticsDcrDefinition,
*,
enclave_specs: Optional[Dict[str, decentriq_platform.types.EnclaveSpecification]] = None,
) ‑> decentriq_platform.analytics.analytics_dcr.AnalyticsDcr
Publish an Analytics DCR.
Parameters:
dcr_definition
: Definition of the Analytics DCR.enclave_specs
: The enclave specifications that are considered to be trusted. If not specified, all enclave specifications known to this version of the SDK will be used.
publish_media_dcr
def publish_media_dcr(
self,
dcr_definition: decentriq_platform.media.media.MediaDcrDefinition,
*,
enclave_specs: Optional[Dict[str, decentriq_platform.types.EnclaveSpecification]] = None,
) ‑> decentriq_platform.media.media.MediaDcr
Publish a Media DCR.
Parameters:
dcr_definition
: Definition of the Media DCR.enclave_specs
: The enclave specifications that are considered to be trusted. If not specified, all enclave specifications known to this version of the SDK will be used.
retrieve_analytics_dcr
def retrieve_analytics_dcr(
self,
dcr_id,
enclave_specs: Optional[List[decentriq_platform.types.EnclaveSpecification]] = None,
) ‑> decentriq_platform.analytics.analytics_dcr.AnalyticsDcr
Retrieve an existing Analytics DCR.
Parameters:
dcr_id
: Data Clean Room ID.enclave_specs
: The enclave specifications that are considered to be trusted. If not specified, all enclave specifications known to this version of the SDK will be used.
retrieve_media_dcr
def retrieve_media_dcr(
self,
dcr_id,
enclave_specs: Optional[List[decentriq_platform.types.EnclaveSpecification]] = None,
) ‑> decentriq_platform.media.media.MediaDcr
Retrieve an existing Media DCR.
Parameters:
dcr_id
: Data Clean Room ID.enclave_specs
: The enclave specifications that are considered to be trusted. If not specified, all enclave specifications known to this version of the SDK will be used.
upload_dataset
def upload_dataset(
self,
data: <class 'BinaryIO'>,
key: decentriq_platform.storage.Key,
file_name: str,
/,
*,
description: str = '',
chunk_size: int = 8388608,
parallel_uploads: int = 8,
usage: decentriq_platform.types.DatasetUsage = DatasetUsage.PUBLISHED,
secret_store_options: Optional[decentriq_platform.client.SecretStoreOptions] = None,
) ‑> str
Uploads data
as a file usable by enclaves and returns the
corresponding manifest hash.
Parameters:
data
: The data to upload as a buffered stream. Such an object can be obtained by wrapping a binary string in aio.BytesIO()
object or, if reading from a file, by usingwith open(path, "rb") as file
.key
: Encryption key used to encrypt the file.file_name
: Name of the file.description
: An optional file description.chunk_size
: Size of the chunks into which the stream is split in bytes.parallel_uploads
: Whether to upload chunks in parallel.
EnclaveSpecifications
EnclaveSpecifications(
specifications: Dict[str, decentriq_platform.types.EnclaveSpecification],
)
Provider of the available enclave specifications provided by the Decentriq platform.
Enclave specifications enable you to express which particular enclaves you trust.
The field containing the measurement (e.g. mrenclave
in the case of Intel SGX) identifies
the exact binary that will process your data.
Users of the Decentriq platform are encouraged to reproduce this value by building the enclave
binary from audited source code and re-producing the measurement (in the case of Intel SGX,
this would involve simply hashing the produced executable).
When connecting to the driver enclave, the configured attestation algorithm will guarantee that the enclave you connect to is the one corresponding to the enclave specification you chose. The associated root certificate will be used to verify that the attestation was signed by the expected party (e.g. Intel/AMD/Amazon, depending on the CC technology used).
Any communication between the driver enclave and worker enclaves handling your data will also first be secured by additional attestation procedures. Which enclaves are trusted by the driver enclave is controlled by choosing the additional enclave specs from the respective compute packages.
A list of enclave specifications, each encoding your trust in a particular enclave type, can
be obtained by selecting a subset of the enclave specifications provided by the object
decentriq_platform.enclave_specifications
. Selecting the subset of versions should be done
by calling its versions
method.
all
def all(
self,
) ‑> List[decentriq_platform.types.EnclaveSpecification]
Get a list of all available enclave specifications.
latest
def latest(
self,
) ‑> Dict[str, decentriq_platform.types.EnclaveSpecification]
Select the latest specification of each enclave type
list
def list(
self,
) ‑> List[str]
Get a list of all available enclave identifiers.
merge
def merge(
self,
other,
)
Merge two sets of enclave specifications into a single set.
versions
def versions(
self,
enclave_versions: List[str],
) ‑> Dict[str, decentriq_platform.types.EnclaveSpecification]
Get the enclave specifications for the given versioned enclave types.
Make sure to always include the specification of a driver enclave, e.g.
"decentriq.driver:v1"
as this is the node with which you communicate directly.
Add additional versioned enclaves depending on the compute module you use.
Refer to the main documentation page of each compute module to learn which
enclaves are available.
Endorser
Endorser(
auth: Auth,
client: Client,
enclaves: Dict[str, EnclaveSpecification],
)
Instance variables
auth: decentriq_platform.authentication.Auth
:
dcr_secret_endorsement
def dcr_secret_endorsement(
self,
dcr_secret: str,
) ‑> Tuple[identity_endorsement_pb2.EnclaveEndorsement, bytes]
decentriq_pki_endorsement
def decentriq_pki_endorsement(
self,
) ‑> identity_endorsement_pb2.EnclaveEndorsement
pki_endorsement
def pki_endorsement(
self,
cert_chain_pem: bytes,
) ‑> identity_endorsement_pb2.EnclaveEndorsement
Key
Key(
material: Optional[bytes] = None,
)
This class wraps the key material that is used to encrypt the files that are uploaded to the decentriq platform.
Returns a new Key
instance, can optional specify the raw key material.
Secret
Secret(
secret: bytes,
state: decentriq_dcr_compiler._schemas.secret_store_entry_state.SecretStoreEntryState,
)
Secret(secret: bytes, state: decentriq_dcr_compiler._schemas.secret_store_entry_state.SecretStoreEntryState)
SecretStoreOptions
SecretStoreOptions(
*,
store_encryption_key: bool = True,
encryption_key_acl: Union[dict[str, 'JSONType'], list['JSONType'], str, int, float, bool, ForwardRef(None)] = None,
encryption_key_acl_version: int = 0,
)
Session
Session(
client: Client,
connection: Connection,
client_protocols: List[int],
auth: Auth,
)
Class for managing the communication with an enclave.
Session
instances should not be instantiated directly but rather
be created using a Client
object using decentriq_platform.Client.create_session
.
dcr_secret_endorsement
def dcr_secret_endorsement(
self,
dcr_secret: str,
) ‑> identity_endorsement_pb2.DcrSecretEndorsementResponse
generate_merge_approval_signature
def generate_merge_approval_signature(
self,
configuration_commit_id: str,
) ‑> bytes
Generate an approval signature required for merging a configuration commit.
To merge a specific configuration commit, each user referenced in the list
of ids returned by retrieveConfigurationCommitApprovers
needs to
generate an approval signature using this method.
get_computation_result
def get_computation_result(
self,
job_id: JobId,
/,
*,
interval: int = 5,
timeout: Optional[int] = None,
) ‑> bytes
Wait for the given job to complete and retrieve its results as a raw byte string.
The method will check for the job's completeness every interval
seconds and up to
an optional timeout
seconds after which the method will raise an exception.
If the job completes and the results can be retrieved successfully, a raw byte string
will be returned. The bytes string can be transformed into a more useful object using
a variety of helper methods. These helper methods are specific for the type of computation
you ran and can be found in the corresponding packages.
get_computation_result_size
def get_computation_result_size(
self,
job_id: JobId,
/,
*,
interval: int = 5,
timeout: Optional[int] = None,
) ‑> int
Wait for the given job to complete and retrieve its results size.
The method will check for the job's completeness every interval
seconds and up to
an optional timeout
seconds after which the method will raise an exception.
If the job completes and the results can be retrieved successfully, an int containing
the raw result size is returned.
get_computation_status
def get_computation_status(
self,
job_id: str,
) ‑> gcg_pb2.JobStatusResponse
Returns the status of the provided job_id
which will include the names
of the nodes that completed their execution
merge_configuration_commit
def merge_configuration_commit(
self,
configuration_commit_id: str,
approval_signatures: Dict[str, bytes],
) ‑> gcg_pb2.MergeConfigurationCommitResponse
Request the enclave to merge the given configuration commit into the main data room configuration.
Parameters:
configuration_commit_id
: The id of the commit to be merged.approval_signatures
: A dictionary containing the approval signature for each of the required approvers, e.g.{ "some@email.com": signature }
.
pki_endorsement
def pki_endorsement(
self,
certificate_chain_pem: bytes,
) ‑> identity_endorsement_pb2.PkiEndorsementResponse
publish_data_room
def publish_data_room(
self,
data_room_definition: DataRoom,
/,
*,
show_organization_logo: bool = False,
require_password: bool = False,
purpose: CreateDcrPurpose.V = 0,
kind: CreateDcrKind.V = 0,
high_level_representation: Optional[bytes] = None,
) ‑> str
Create a data room with the provided protobuf configuration object and have the enclave apply the given list of modifications to the data room configuration.
The id returned from this method will be used when interacting with the published data room (for example when running computations or publishing datasets).
publish_data_room_configuration_commit
def publish_data_room_configuration_commit(
self,
configuration_commit: ConfigurationCommit,
) ‑> str
Publish the given data room configuration commit.
Configuration commits can be built using a DataRoomCommitBuilder
object.
The id returned from this method will be used when running development computations or when trying to merge this commit into the main data room configuration.
publish_dataset
def publish_dataset(
self,
data_room_id: str,
manifest_hash: str,
leaf_id: str,
key: Key,
*,
force: bool = False,
) ‑> gcg_pb2.PublishDatasetToDataRoomResponse
Publishes a file and its encryption key to a data room. Neither the file or the encryption key will ever be stored in unencrypted form.
This method will check whether the to-be-published file exists.
If this is not the case, an exception will be raised.
This behavior can be disabled by setting the force
flag.
In case the original client was created with platform integration enabled, the method will further check whether there already is a dataset published for the given data room. In this case, an exception will be thrown and the dataset will need to be unpublished first.
A special note for when the referenced data room was created using the Decentriq UI:
In this case, the leaf_id
argument will have the format {NODE_ID}_leaf
,
where {NODE_ID}
corresponds to the value that you see when hovering your mouse pointer over
the name of the data node.
remove_published_dataset
def remove_published_dataset(
self,
data_room_id: str,
leaf_id: str,
) ‑> gcg_pb2.RemovePublishedDatasetResponse
Removes a published dataset from the data room.
Parameters:
data_room_id
: The ID of the data room that contains the given data set.leaf_id
: The ID of the data node from which the dataset should be removed. In case the referenced data room was created using the Decentriq UI, theleaf_id
argument will have the special format@table/UUID/dataset
(whereUUID
corresponds to the value that you see when hovering your mouse pointer over the name of the data node).
retrieve_audit_log
def retrieve_audit_log(
self,
data_room_id: str,
) ‑> gcg_pb2.RetrieveAuditLogResponse
Returns the audit log for the data room.
retrieve_configuration_commit
def retrieve_configuration_commit(
self,
configuration_commit_id: str,
) ‑> gcg_pb2.RetrieveConfigurationCommitResponse
Retrieve the content of given configuration commit id.
Returns:
A ConfigurationCommit
.
retrieve_configuration_commit_approvers
def retrieve_configuration_commit_approvers(
self,
configuration_commit_id: str,
) ‑> List[str]
Retrieve the list of users who need to approve the merger of a given configuration commit.
Returns: A list of ids belonging to the users that need to approve the configuration commit.
retrieve_current_data_room_configuration
def retrieve_current_data_room_configuration(
self,
data_room_id: str,
) ‑> Tuple[data_room_pb2.DataRoomConfiguration, str]
Retrieve the current data room confguration, as well as the current "history pin".
A history pin is the hash of all the ids of configuration commits that
make up the structure of a data room. This pin therefore uniquely identifies
a data room's structure at a certain point in time.
A data room configuration, as well as its associated history pin, can be used
to extend an existing data room (for example by adding new compute nodes).
Extending an existing data room is done using the DataRoomCommitBuilder
class.
retrieve_data_room
def retrieve_data_room(
self,
data_room_id: str,
) ‑> gcg_pb2.RetrieveDataRoomResponse
Returns the underlying protobuf object for the data room.
retrieve_data_room_json
def retrieve_data_room_json(
self,
data_room_id: str,
) ‑> str
Get the JSON configuration file for the data room with the given ID. Returns a JSON string representing the configuration.
retrieve_data_room_status
def retrieve_data_room_status(
self,
data_room_id: str,
) ‑> str
Returns the status of the data room. Valid values are "Active"
or "Stopped"
.
retrieve_published_datasets
def retrieve_published_datasets(
self,
data_room_id: str,
) ‑> gcg_pb2.RetrievePublishedDatasetsResponse
Returns the datasets published to the given data room.
retrieve_used_airlock_quotas
def retrieve_used_airlock_quotas(
self,
data_room_id: str,
) ‑> Dict[str, decentriq_platform.session.AirlockQuotaInfo]
Retrieves the limit and used airlock quota for the current user.
run_computation
def run_computation(
self,
data_room_id: str,
compute_node_id: str,
/,
*,
dry_run: Optional[DryRunOptions] = None,
parameters: Optional[Mapping[Text, Text]] = None,
) ‑> decentriq_platform.types.JobId
Run a specific computation within the data room with the given id.
The result will be an identifier object of the job executing the computation. This object is required for checking a job's status and retrieving its results.
run_computation_and_get_results
def run_computation_and_get_results(
self,
data_room_id: str,
compute_node_id: str,
/,
*,
interval: int = 5,
timeout: Optional[int] = None,
parameters: Optional[Mapping[Text, Text]] = None,
) ‑> Optional[bytes]
Run a specific computation and return its results.
This method is simply a wrapper for running run_computation
and
get_computation_result
directly after each other
run_dev_computation
def run_dev_computation(
self,
data_room_id: str,
configuration_commit_id: str,
compute_node_id: str,
/,
*,
dry_run: Optional[DryRunOptions] = None,
parameters: Optional[Mapping[Text, Text]] = None,
) ‑> decentriq_platform.types.JobId
Run a specific computation within the context of the data room configuration defined by the given commit id. Such "development" computations can also be run for configuration commits that have not yet been merged.
The result will be an identifier object of the job executing the computation. This object is required for checking a job's status and retrieving its results.
send_compilable_request
def send_compilable_request(
self,
compile_request: Callable[[CompilerRequest, Channel], bytes],
request: CompilerRequest,
decompile_response: Callable[[List[bytes]], CompilerResponse],
protocol: int,
) ‑> ~CompilerResponse
send_request
def send_request(
self,
request: GcgRequest,
protocol: int,
) ‑> List[gcg_pb2.GcgResponse]
Low-level method for sending a raw GcgRequest
to the enclave.
Use this method if any of the convenience methods (such as run_computation
) don't perform
the exact task you want.
send_request_raw
def send_request_raw(
self,
request: bytes,
protocol: int,
) ‑> List[bytes]
Low-level method for sending a raw GcgRequest
to the enclave.
Use this method if any of the convenience methods (such as run_computation
) don't perform
the exact task you want.
stop_data_room
def stop_data_room(
self,
data_room_id: str,
)
Stop the data room with the given id, making it impossible to run new computations.
wait_until_computation_has_finished
def wait_until_computation_has_finished(
self,
job_id: JobId,
/,
*,
interval: int = 5,
timeout: Optional[int] = None,
)
Wait for the given job to complete.
The method will check for the job's completeness every interval
seconds and up to
an optional timeout
seconds after which the method will raise an exception.
wait_until_computation_has_finished_for_all_compute_nodes
def wait_until_computation_has_finished_for_all_compute_nodes(
self,
job_id: str,
compute_node_ids: List[str],
/,
*,
interval: int = 5,
timeout: Optional[int] = None,
)
Wait for the given job to complete for all of the given compute nodes.
The method will check for the job's completeness every interval
seconds and up to
an optional timeout
seconds after which the method will raise an exception.
SessionV2
SessionV2(
client: Client,
connection: Connection,
)
Class for managing the communication with an enclave.
Session
instances should not be instantiated directly but rather
be created using a Client
object using decentriq_platform.Client.create_session_v2
.
create_secret
def create_secret(
self,
secret: Secret,
) ‑> str
Store a secret in the user's own enclave-protected secret store
get_secret
def get_secret(
self,
secret_id: str,
) ‑> Tuple[decentriq_platform.archv2.secret.Secret, int]
remove_secret
def remove_secret(
self,
secret_id: str,
expected_cas_index: int,
) ‑> bool
send_authenticated_request
def send_authenticated_request(
self,
authenticated_request: AuthenticatedRequest,
) ‑> gcg_pb2.AuthenticatedResponse
send_secret_store_request
def send_secret_store_request(
self,
request: SecretStoreRequest,
) ‑> secret_store_pb2.SecretStoreResponse
update_secret_acl
def update_secret_acl(
self,
secret_id: str,
new_acl: v0.SecretStoreEntryAcl,
expected_cas_index: int,
) ‑> bool
Update a secret ACL