Skip to main content

Importing data from a connector

import decentriq_platform as dq
user_email = "@@ YOUR EMAIL HERE @@"
api_token = "@@ YOUR TOKEN HERE @@"
client = dq.create_client(user_email, api_token)
enclave_specs = dq.enclave_specifications.latest()

Import datasets

The following example shows you how to use the Python SDK to import a dataset from S3 into the Decentriq platform.

import io
import json

from decentriq_platform.legacy.dataset_sink import DatasetSink
from decentriq_platform.legacy.data_source_s3 import DataSourceS3
from decentriq_platform.legacy.dataset_sink.proto import SinkInput, RawFile
from decentriq_platform.legacy import (
create_client,
DataRoomBuilder,
Permissions,
)
from decentriq_platform import Key
import decentriq_platform as dq
from decentriq_platform.legacy.container import read_result_as_zipfile

Then, we create the Client instance with which we can communicate with the Decentriq platform, as well as a Session instance that manages the encrypted communication channel with the enclave.

user_email = "@@ YOUR EMAIL HERE @@"
user_api_token = "@@ YOUR TOKEN HERE @@"

client = create_client(user_email, api_token=user_api_token)

# The enclaves we plan to use in this DCR.
enclave_specs = dq.enclave_specifications.versions([
"decentriq.driver:v20",
"decentriq.dataset-sink-worker:v6",
"decentriq.data-source-s3-worker:v5"
])

auth, _ = client.create_auth_using_decentriq_pki(enclave_specs)
session = client.create_session(auth, enclave_specs)

Having setup the enclave connection, we can now proceed with building a Data Clean Room whose sole purpose is to import a dataset from S3 and have an enclave encrypt the imported dataset with a key managed by us. When we later want to publish the imported dataset into another Data Clean Room, we can do so by providing the enclave with this key (and thus proving ownership of the data).

builder = DataRoomBuilder("My Dataset Import DCR", enclave_specs=enclave_specs)

# A data node to hold our AWS credentials. This will be a simple JSON payload.
credentials_id = builder.add_data_node(
name="credentials",
is_required=True
)

# The computation node that will download the data over a secure
# channel and make it available to other enclaves.
s3_source = DataSourceS3(
"s3",
credentials_dependency=credentials_id,
bucket="decentriq-integration-tests",
region="eu-west-3",
object_key="hello.txt", #"path/to/my/file.csv",
s3_provider="AWS",
)
s3_source_id = builder.add_compute_node(s3_source)

# The data node to hold the encryption key with which the imported
# data should be encrypted.
key_node_id = builder.add_data_node(
name="key",
is_required=True
)

# The computation node that will encrypt the input dataset coming from S3.
dataset_sink = DatasetSink(
"dataset_sink",
inputs=[
SinkInput(
dependency=s3_source_id,
name="My encrypted dataset",
raw=RawFile()
)
],
encryption_key_dependency=key_node_id,
)
dataset_sink_id = builder.add_compute_node(dataset_sink)

# Define the necessary permissions for us to upload the necessary credentials
# and the encryption key.
builder.add_user_permission(
email=user_email,
authentication_method=client.decentriq_pki_authentication,
permissions=[
Permissions.leaf_crud(key_node_id),
Permissions.leaf_crud(credentials_id),
Permissions.execute_compute(dataset_sink_id),
Permissions.retrieve_compute_result(dataset_sink_id),
]
)

data_room = builder.build()
data_room_id = session.publish_data_room(data_room)

# The credentials to our AWS bucket as a simple JSON payload.
# aws_credentials = {
# "accessKey": "XXXX",
# "secretKey": "YYYY",
# }

aws_credentials = {
"accessKey": "@@ AWS ACCESS KEY HERE @@",
"secretKey": "@@ AWS SECRET HERE @@",
}

# Now we upload and publish the encrypted credentials.
# Notice that the key we generate here does not need to be saved,
# as it's only used within this one-off DCR.
key = Key()
manifest_hash = client.upload_dataset(
io.BytesIO(json.dumps(aws_credentials).encode()),
key,
"credentials file",
)
session.publish_dataset(
data_room_id,
manifest_hash ,
credentials_id,
key
)

# The key to be used for the encryption of the imported dataset.
# In this example we have the SDK generate a key, but you could also bring our own.
# If generating a key this way, make sure to store it in a secure location.
dataset_key = Key()

# Now it's time to upload the dataset encryption key. Again, notice that
# the encryption key itself is encrypted with another generated key.
enc_key_manifest_hash = client.upload_dataset(
io.BytesIO(dataset_key.material),
key,
"encryption key",
)
session.publish_dataset(
data_room_id,
enc_key_manifest_hash,
key_node_id,
key
)

Now it's time to trigger the S3 download and dataset encryption process. For this we run the DatasetSink node as we would any other computation. The properties of the dataset will be returned as part of a JSON file contained in a ZIP archive. The resulting manifestHash can be used for publishing the dataset to other Data Clean Rooms, just as if we uploaded the dataset ourselves.

job = session.run_computation(data_room_id, dataset_sink_id)
result = session.get_computation_result(job, interval=1)
result_zip = read_result_as_zipfile(result)

datasets_meta = json.loads(result_zip.read("datasets.json").decode())
{
'datasets': [
{
'manifestHash': '6e644413c782b198da7d1c25c3592ecd70cdd2fdb6dd0185a33d34e8ee845d62',
'datasetId': '4e461e5497971dbb69797be9699c27a40bb80db62ec39a65a12a908eee42e026',
'schemaBase64': None
}
]
}