Skip to content

parquet_dataset

Parquet datasets, to be used with raw data under the ".parquet" format.

Classes:

Name Description
BaseParquetDataset

Base Parquet Dataset class.

ParquetDataset

Parquet Dataset class, to be analyzed via the AutoAnalyzer.

AnalyzedParquetDataset

Analyzed Parquet Dataset class to be created from an existing analyzed schema.

FittedParquetDataset

Fitted Parquet Dataset class to be created from an existing fitted schema.

BaseParquetDataset #

Base Parquet Dataset class.

Parameters:

Name Type Description Default

split_name #

str

A split name, for example, "train", only used for the visualization XpViz.

required

identifier_name #

str

A key to group each dataset, only used in the visualization platform XpViz.

required

path #

str

The directory artifact path.

required

storage_options #

dict[str, Any]

Optional storage options to stream data from a cloud storage instance.

required

Attributes:

Name Type Description
split_name str
identifier_name str
path str
storage_options dict[str, object]

split_name: str #

identifier_name: str #

path: str #

storage_options: dict[str, object] = field(factory=dict) #

ParquetDataset #

Parquet Dataset class, to be analyzed via the AutoAnalyzer.

Methods:

Name Description
analyze

Analyze the dataset and create an Analyzed Schema.

analyze(*forced_type: Feature | IndexMetadata, target_names: list[str] | None = None) -> AnalyzedParquetDataset #

Analyze the dataset and create an Analyzed Schema.

Parameters:

Name Type Description Default
forced_type #
Feature

Features objects to force custom feature type for specific column names in the Arrow Table.

()
target_names #
list[str] | None

Optional list of column names indicating which columns should be considered targets. Default None.

None

Returns:

Type Description
AnalyzedParquetDataset

The analyzed dataset, a parquet dataset with an analyzed schema attached.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def analyze(
    self, *forced_type: Feature | IndexMetadata, target_names: list[str] | None = None
) -> "AnalyzedParquetDataset":
    """Analyze the dataset and create an Analyzed Schema.

    Parameters
    ----------
    forced_type : Feature
        Features objects to force custom feature type for specific column names in the Arrow Table.
    target_names : list[str] | None
        Optional list of column names indicating which columns should be considered targets. Default None.

    Returns
    -------
    AnalyzedParquetDataset
        The analyzed dataset, a parquet dataset with an analyzed schema attached.
    """
    client_factory = ClientFactory.CURRENT.get()

    forced_types_as_dict = {
        value.name: (base64.encodebytes(NumpyMsgpackEncoder().encode(value.as_exposed())).decode("utf-8"))
        for value in forced_type
    }

    with client_factory() as client:
        response = create_analyzed_schema.sync_detailed(
            Project.CURRENT.get().model.id,
            body=ParquetDatasetAnalyzeRequestBody(
                parquet_dataset=self.__as_request_body,
                forced_type=ParquetDatasetAnalyzeRequestBodyForcedTypeType0.from_dict(forced_types_as_dict)
                if len(forced_types_as_dict) > 0
                else None,
                target_names=target_names,
            ),
            client=client,
        )

        analyzed_schema = AnalyzedSchema.from_bytes(response.content)

        for feature in forced_type:
            if (
                isinstance(feature, Feature)
                and feature.feature_augmentation is not None
                and feature.name in [feature_.name for feature_ in analyzed_schema.columns]
            ):
                if isinstance(analyzed_schema[feature.name], IndexMetadata | BaseFeature):
                    message = (
                        "Unexpected feature type found when accessing to analyzed schema. `IndexMetadata` and "
                        "`BaseFeature` cannot have feature augmentation."
                    )
                    raise TypeError(message)

                cast("Feature", analyzed_schema[feature.name]).feature_augmentation = feature.feature_augmentation

        return AnalyzedParquetDataset(
            self.split_name,
            self.identifier_name,
            self.path,
            analyzed_schema=analyzed_schema,
        )

AnalyzedParquetDataset #

Analyzed Parquet Dataset class to be created from an existing analyzed schema.

Parameters:

Name Type Description Default

analyzed_schema #

AnalyzedSchema
required

Methods:

Name Description
fit

Create a Fitted Parquet Dataset object.

Attributes:

Name Type Description
analyzed_schema AnalyzedSchema

analyzed_schema: AnalyzedSchema = field(kw_only=True) #

fit() -> FittedParquetDataset #

Create a Fitted Parquet Dataset object.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def fit(self) -> "FittedParquetDataset":
    """Create a Fitted Parquet Dataset object."""
    for column in self.analyzed_schema.as_exposed.columns:
        if type(column) is ExposedBaseFeature:
            message = "Base features are not accepted for fitting schemas."
            raise TypeError(message) from None

    client_factory = ClientFactory.CURRENT.get()

    with client_factory() as client:
        response = create_fitted_schema.sync_detailed(
            project_id=Project.CURRENT.get().model.id,
            client=client,
            body=FitSchemaRequestBody(
                self._as_request_body,
                base64.encodebytes(NumpyMsgpackEncoder().encode(self.analyzed_schema.as_exposed)).decode("utf-8"),
            ),
        )

    fitted_schema = FittedSchema.from_bytes(response.content)

    features = [
        feature for feature in self.analyzed_schema.columns if not isinstance(feature, IndexMetadata | BaseFeature)
    ]

    for feature in features:
        if (
            isinstance(feature, Feature)
            and feature.feature_augmentation is not None
            and feature.name in [feature_.name for feature_ in fitted_schema.columns]
        ):
            if isinstance(fitted_schema[feature.name], IndexMetadata):
                message = (
                    "Unexpected feature type found when accessing to fitted schema. `IndexMetadata` cannot have "
                    "feature augmentation."
                )
                raise TypeError(message)

            cast("Feature", fitted_schema[feature.name]).feature_augmentation = feature.feature_augmentation

    return FittedParquetDataset(self.split_name, self.identifier_name, self.path, fitted_schema=fitted_schema)

FittedParquetDataset #

Fitted Parquet Dataset class to be created from an existing fitted schema.

Parameters:

Name Type Description Default

fitted_schema #

FittedSchema
required

Attributes:

Name Type Description
fitted_schema FittedSchema
artifact_id str

Get artifact id.

fitted_schema: FittedSchema = field(kw_only=True) #

artifact_id: str #

Get artifact id.