Source code for openapi_client.models.registered_dataset

# coding: utf-8

"""
    Amorphic Data Platform

    Amorphic Data Platform - API Definition documentation

    The version of the OpenAPI document: 0.3.0
    Generated by OpenAPI Generator (https://openapi-generator.tech)

    Do not edit the class manually.
"""  # noqa: E501


from __future__ import annotations
import pprint
import re  # noqa: F401
import json

from pydantic import BaseModel, ConfigDict, Field, StrictBool, StrictStr
from typing import Any, ClassVar, Dict, List, Optional
from openapi_client.models.column_name_and_type import ColumnNameAndType
from openapi_client.models.connection_details import ConnectionDetails
from openapi_client.models.data_metrics_collection_options import DataMetricsCollectionOptions
from openapi_client.models.malware_detection_options import MalwareDetectionOptions
from openapi_client.models.registered_dataset_dataset_report import RegisteredDatasetDatasetReport
from openapi_client.models.registered_dataset_schedules_inner import RegisteredDatasetSchedulesInner
from openapi_client.models.registered_dataset_sync_to_s3_options import RegisteredDatasetSyncToS3Options
from typing import Optional, Set
from typing_extensions import Self

[docs] class RegisteredDataset(BaseModel): """ RegisteredDataset """ # noqa: E501 dataset_id: Optional[StrictStr] = Field(default=None, alias="DatasetId") dataset_name: Optional[StrictStr] = Field(default=None, alias="DatasetName") display_name: Optional[StrictStr] = Field(default=None, alias="DisplayName") dataset_description: Optional[StrictStr] = Field(default=None, alias="DatasetDescription") data_classification: Optional[List[StrictStr]] = Field(default=None, alias="DataClassification") domain: Optional[StrictStr] = Field(default=None, alias="Domain") domain_display_name: Optional[StrictStr] = Field(default=None, alias="DomainDisplayName") partitioned: Optional[StrictStr] = Field(default=None, alias="Partitioned") file_type: Optional[StrictStr] = Field(default=None, alias="FileType") target_location: Optional[StrictStr] = Field(default=None, alias="TargetLocation") keywords: Optional[List[StrictStr]] = Field(default=None, alias="Keywords") dataset_schema: Optional[List[ColumnNameAndType]] = Field(default=None, alias="DatasetSchema") table_update: Optional[StrictStr] = Field(default=None, alias="TableUpdate") data_metrics_collection_options: Optional[DataMetricsCollectionOptions] = Field(default=None, alias="DataMetricsCollectionOptions") sync_to_s3_options: Optional[RegisteredDatasetSyncToS3Options] = Field(default=None, alias="SyncToS3Options") dwh_table_name: Optional[StrictStr] = Field(default=None, alias="DWHTableName") last_modified: Optional[StrictStr] = Field(default=None, alias="LastModified") last_modified_by: Optional[StrictStr] = Field(default=None, alias="LastModifiedBy") error_message: Optional[StrictStr] = Field(default=None, alias="ErrorMessage") table_update_column: Optional[StrictStr] = Field(default=None, alias="TableUpdateColumn") registration_status: Optional[StrictStr] = Field(default=None, alias="RegistrationStatus") created_by: Optional[StrictStr] = Field(default=None, alias="CreatedBy") creation_date: Optional[StrictStr] = Field(default=None, alias="CreationDate") is_active: Optional[StrictStr] = Field(default=None, alias="IsActive") access_type: Optional[StrictStr] = Field(default=None, alias="AccessType") is_owner: Optional[StrictBool] = Field(default=None, alias="IsOwner") file_delimiter: Optional[StrictStr] = Field(default=None, alias="FileDelimiter") skip_file_header: Optional[StrictBool] = Field(default=None, alias="SkipFileHeader") connection_details: Optional[ConnectionDetails] = Field(default=None, alias="ConnectionDetails") schedules: Optional[List[RegisteredDatasetSchedulesInner]] = Field(default=None, alias="Schedules") dataset_report: Optional[RegisteredDatasetDatasetReport] = Field(default=None, alias="DatasetReport") datasource_type: Optional[StrictStr] = Field(default=None, alias="DatasourceType") is_data_cleanup_enabled: Optional[StrictBool] = Field(default=None, alias="IsDataCleanupEnabled") malware_detection_options: Optional[MalwareDetectionOptions] = Field(default=None, alias="MalwareDetectionOptions") skip_lz_process: Optional[StrictBool] = Field(default=None, alias="SkipLZProcess") tenant_name: Optional[StrictStr] = Field(default=None, alias="TenantName") are_ai_services_enabled: Optional[StrictBool] = Field(default=None, alias="AreAIServicesEnabled") dataset_s3_path: Optional[StrictStr] = Field(default=None, alias="DatasetS3Path") dataset_status: Optional[StrictStr] = Field(default=None, alias="DatasetStatus") s3_athena_table_name: Optional[StrictStr] = Field(default=None, alias="S3AthenaTableName") data_stewards: Optional[List[StrictStr]] = Field(default=None, alias="DataStewards") __properties: ClassVar[List[str]] = ["DatasetId", "DatasetName", "DisplayName", "DatasetDescription", "DataClassification", "Domain", "DomainDisplayName", "Partitioned", "FileType", "TargetLocation", "Keywords", "DatasetSchema", "TableUpdate", "DataMetricsCollectionOptions", "SyncToS3Options", "DWHTableName", "LastModified", "LastModifiedBy", "ErrorMessage", "TableUpdateColumn", "RegistrationStatus", "CreatedBy", "CreationDate", "IsActive", "AccessType", "IsOwner", "FileDelimiter", "SkipFileHeader", "ConnectionDetails", "Schedules", "DatasetReport", "DatasourceType", "IsDataCleanupEnabled", "MalwareDetectionOptions", "SkipLZProcess", "TenantName", "AreAIServicesEnabled", "DatasetS3Path", "DatasetStatus", "S3AthenaTableName", "DataStewards"] model_config = ConfigDict( populate_by_name=True, validate_assignment=True, protected_namespaces=(), )
[docs] def to_str(self) -> str: """Returns the string representation of the model using alias""" return pprint.pformat(self.model_dump(by_alias=True))
[docs] def to_json(self) -> str: """Returns the JSON representation of the model using alias""" # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead return json.dumps(self.to_dict())
[docs] @classmethod def from_json(cls, json_str: str) -> Optional[Self]: """Create an instance of RegisteredDataset from a JSON string""" return cls.from_dict(json.loads(json_str))
[docs] def to_dict(self) -> Dict[str, Any]: """Return the dictionary representation of the model using alias. This has the following differences from calling pydantic's `self.model_dump(by_alias=True)`: * `None` is only added to the output dict for nullable fields that were set at model initialization. Other fields with value `None` are ignored. """ excluded_fields: Set[str] = set([ ]) _dict = self.model_dump( by_alias=True, exclude=excluded_fields, exclude_none=True, ) # override the default output from pydantic by calling `to_dict()` of each item in dataset_schema (list) _items = [] if self.dataset_schema: for _item_dataset_schema in self.dataset_schema: if _item_dataset_schema: _items.append(_item_dataset_schema.to_dict()) _dict['DatasetSchema'] = _items # override the default output from pydantic by calling `to_dict()` of data_metrics_collection_options if self.data_metrics_collection_options: _dict['DataMetricsCollectionOptions'] = self.data_metrics_collection_options.to_dict() # override the default output from pydantic by calling `to_dict()` of sync_to_s3_options if self.sync_to_s3_options: _dict['SyncToS3Options'] = self.sync_to_s3_options.to_dict() # override the default output from pydantic by calling `to_dict()` of connection_details if self.connection_details: _dict['ConnectionDetails'] = self.connection_details.to_dict() # override the default output from pydantic by calling `to_dict()` of each item in schedules (list) _items = [] if self.schedules: for _item_schedules in self.schedules: if _item_schedules: _items.append(_item_schedules.to_dict()) _dict['Schedules'] = _items # override the default output from pydantic by calling `to_dict()` of dataset_report if self.dataset_report: _dict['DatasetReport'] = self.dataset_report.to_dict() # override the default output from pydantic by calling `to_dict()` of malware_detection_options if self.malware_detection_options: _dict['MalwareDetectionOptions'] = self.malware_detection_options.to_dict() return _dict
[docs] @classmethod def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: """Create an instance of RegisteredDataset from a dict""" if obj is None: return None if not isinstance(obj, dict): return cls.model_validate(obj) _obj = cls.model_validate({ "DatasetId": obj.get("DatasetId"), "DatasetName": obj.get("DatasetName"), "DisplayName": obj.get("DisplayName"), "DatasetDescription": obj.get("DatasetDescription"), "DataClassification": obj.get("DataClassification"), "Domain": obj.get("Domain"), "DomainDisplayName": obj.get("DomainDisplayName"), "Partitioned": obj.get("Partitioned"), "FileType": obj.get("FileType"), "TargetLocation": obj.get("TargetLocation"), "Keywords": obj.get("Keywords"), "DatasetSchema": [ColumnNameAndType.from_dict(_item) for _item in obj["DatasetSchema"]] if obj.get("DatasetSchema") is not None else None, "TableUpdate": obj.get("TableUpdate"), "DataMetricsCollectionOptions": DataMetricsCollectionOptions.from_dict(obj["DataMetricsCollectionOptions"]) if obj.get("DataMetricsCollectionOptions") is not None else None, "SyncToS3Options": RegisteredDatasetSyncToS3Options.from_dict(obj["SyncToS3Options"]) if obj.get("SyncToS3Options") is not None else None, "DWHTableName": obj.get("DWHTableName"), "LastModified": obj.get("LastModified"), "LastModifiedBy": obj.get("LastModifiedBy"), "ErrorMessage": obj.get("ErrorMessage"), "TableUpdateColumn": obj.get("TableUpdateColumn"), "RegistrationStatus": obj.get("RegistrationStatus"), "CreatedBy": obj.get("CreatedBy"), "CreationDate": obj.get("CreationDate"), "IsActive": obj.get("IsActive"), "AccessType": obj.get("AccessType"), "IsOwner": obj.get("IsOwner"), "FileDelimiter": obj.get("FileDelimiter"), "SkipFileHeader": obj.get("SkipFileHeader"), "ConnectionDetails": ConnectionDetails.from_dict(obj["ConnectionDetails"]) if obj.get("ConnectionDetails") is not None else None, "Schedules": [RegisteredDatasetSchedulesInner.from_dict(_item) for _item in obj["Schedules"]] if obj.get("Schedules") is not None else None, "DatasetReport": RegisteredDatasetDatasetReport.from_dict(obj["DatasetReport"]) if obj.get("DatasetReport") is not None else None, "DatasourceType": obj.get("DatasourceType"), "IsDataCleanupEnabled": obj.get("IsDataCleanupEnabled"), "MalwareDetectionOptions": MalwareDetectionOptions.from_dict(obj["MalwareDetectionOptions"]) if obj.get("MalwareDetectionOptions") is not None else None, "SkipLZProcess": obj.get("SkipLZProcess"), "TenantName": obj.get("TenantName"), "AreAIServicesEnabled": obj.get("AreAIServicesEnabled"), "DatasetS3Path": obj.get("DatasetS3Path"), "DatasetStatus": obj.get("DatasetStatus"), "S3AthenaTableName": obj.get("S3AthenaTableName"), "DataStewards": obj.get("DataStewards") }) return _obj