Import datasets
Import from Amazon S3 via the Python SDK
Build a Data Clean Room whose sole purpose is to import a dataset from S3 and have an enclave encrypt the imported dataset with a key managed by us. When we later want to publish the imported dataset into another Data Clean Room, we can do so by providing the enclave with this key (and thus proving ownership of the data).
import decentriq_platform.legacy as dq
import decentriq_platform.legacy.sql as dqsql
import decentriq_platform.legacy.container as dqc
from decentriq_platform.legacy.container.proto import MountPoint
from decentriq_platform.legacy import DataRoomBuilder
user_email = "test_user@company.com"
api_token = "@@ YOUR TOKEN HERE @@"
client = dq.create_client(user_email, api_token)
enclave_specs = dq.enclave_specifications.versions([...])
auth, _ = client.create_auth_using_decentriq_pki(enclave_specs)
session = client.create_session(auth, enclave_specs)
import io
import json
from decentriq_platform.legacy import (
DataRoomBuilder,
Key,
Permissions,
)
from decentriq_platform.legacy.container import read_result_as_zipfile, StaticContainerCompute
from decentriq_platform.legacy.data_source_s3 import DataSourceS3
from decentriq_platform.legacy.dataset_sink import DatasetSink
from decentriq_platform.legacy.dataset_sink.proto.dataset_sink_pb2 import (
SinkInput,
ZipFile,
FileSelection,
SingleFile,
RawFile,
)
builder = DataRoomBuilder("My Dataset Import DCR", enclave_specs=enclave_specs)
# A data node to hold our AWS credentials. This will be a simple JSON payload.
credentials_id = builder.add_data_node(
name="credentials",
is_required=True
)
# The computation node that will download the data over a secure
# channel and make it available to other enclaves.
s3_source = DataSourceS3(
"s3",
credentials_dependency=credentials_id,
bucket="decentriq-integration-tests",
region="eu-west-3",
object_key="hello.txt",
s3_provider="AWS",
)
s3_source_id = builder.add_compute_node(s3_source)
# The data node to hold the encryption key with which the imported
# data should be encrypted.
key_node_id = builder.add_data_node(
name="key",
is_required=True
)
# The computation node that will encrypt the input dataset coming from S3.
dataset_sink = DatasetSink(
"dataset_sink",
inputs=[
SinkInput(
dependency=s3_source_id,
name="My encrypted dataset",
raw=RawFile()
)
],
encryption_key_dependency=key_node_id,
)
dataset_sink_id = builder.add_compute_node(dataset_sink)
# Define the necessary permissions for us to upload the necessary credentials
# and the encryption key.
builder.add_user_permission(
email=user_email,
authentication_method=client.decentriq_pki_authentication,
permissions=[
Permissions.leaf_crud(key_node_id),
Permissions.leaf_crud(credentials_id),
Permissions.execute_compute(dataset_sink_id),
Permissions.retrieve_compute_result(dataset_sink_id),
]
)
data_room = builder.build()
data_room_id = session.publish_data_room(data_room)
# The credentials to our AWS bucket as a simple JSON payload.
aws_credentials = {
"accessKey": "@@ AWS ACCESS KEY HERE @@",
"secretKey": "@@ AWS SECRET HERE @@",
}
# Now we upload and publish the encrypted credentials.
# Notice that the key we generate here does not need to be saved,
# as it's only used within this one-off DCR.
key = Key()
manifest_hash = client.upload_dataset(
io.BytesIO(json.dumps(aws_credentials).encode()),
key,
"credentials file",
)
session.publish_dataset(
data_room_id,
manifest_hash ,
credentials_id,
key
)
# The key to be used for the encryption of the imported dataset.
# In this example we have the SDK generate a key, but you could also bring our own.
# If generating a key this way, make sure to store it in a secure location.
dataset_key = Key()
# Now it's time to upload the dataset encryption key. Again, notice that
# the encryption key itself is encrypted with another generated key.
enc_key_manifest_hash = client.upload_dataset(
io.BytesIO(dataset_key.material),
key,
"encryption key",
)
session.publish_dataset(
data_room_id,
enc_key_manifest_hash,
key_node_id,
key
)
Now it's time to trigger the S3 download and dataset encryption process.
For this we run the DatasetSink
node as we would any other computation.
The properties of the dataset will be returned as part of a JSON file contained
in a ZIP archive. The resulting manifestHash
can be used for publishing the
dataset to other Data Clean Rooms, just as if we uploaded the dataset ourselves.
job = session.run_computation(data_room_id, dataset_sink_id)
result = session.get_computation_result(job, interval=1)
result_zip = read_result_as_zipfile(result)
datasets_meta = json.loads(result_zip.read("datasets.json").decode())
{
'datasets': [
{
'manifestHash': '6e644413c782b198da7d1c25c3592ecd70cdd2fdb6dd0185a33d34e8ee845d62',
'datasetId': '4e461e5497971dbb69797be9699c27a40bb80db62ec39a65a12a908eee42e026',
'schemaBase64': None
}
]
}