# coding: utf-8
"""
Amorphic Data Platform
Amorphic Data Platform - API Definition documentation
The version of the OpenAPI document: 0.3.0
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import pprint
import re # noqa: F401
import json
from pydantic import BaseModel, ConfigDict, Field, StrictBool, StrictStr
from typing import Any, ClassVar, Dict, List, Optional
from openapi_client.models.datasource_flows_details_dataflow_config_consumer_configuration import DatasourceFlowsDetailsDataflowConfigConsumerConfiguration
from openapi_client.models.datasource_flows_details_dataflow_config_table_mappings import DatasourceFlowsDetailsDataflowConfigTableMappings
from openapi_client.models.datasource_flows_details_dataflow_config_task_retry_config import DatasourceFlowsDetailsDataflowConfigTaskRetryConfig
from openapi_client.models.schedule_details_inner import ScheduleDetailsInner
from typing import Optional, Set
from typing_extensions import Self
[docs]
class DatasourceFlowsDetailsDataflowConfig(BaseModel):
"""
DatasourceFlowsDetailsDataflowConfig
""" # noqa: E501
shared_instance: Optional[StrictStr] = Field(default=None, alias="SharedInstance")
sync_to_s3: Optional[StrictStr] = Field(default=None, alias="SyncToS3")
extra_connection_attributes: Optional[StrictStr] = Field(default=None, alias="ExtraConnectionAttributes")
source_extra_connection_attributes: Optional[StrictStr] = Field(default=None, alias="SourceExtraConnectionAttributes")
target_table_prep_mode: Optional[StrictStr] = Field(default=None, alias="TargetTablePrepMode")
cdc_start_time: Optional[StrictStr] = Field(default=None, alias="CdcStartTime")
cdc_stop_position: Optional[StrictStr] = Field(default=None, alias="CdcStopPosition")
instance_class: Optional[StrictStr] = Field(default=None, alias="InstanceClass")
allocated_storage: Optional[StrictStr] = Field(default=None, alias="AllocatedStorage")
instance_az: Optional[StrictStr] = Field(default=None, alias="InstanceAZ")
instance_multi_az: Optional[StrictBool] = Field(default=None, alias="InstanceMultiAZ")
task_retry_config: Optional[DatasourceFlowsDetailsDataflowConfigTaskRetryConfig] = Field(default=None, alias="TaskRetryConfig")
additional_task_settings: Optional[Dict[str, Any]] = Field(default=None, alias="AdditionalTaskSettings")
schedules: Optional[List[List[ScheduleDetailsInner]]] = Field(default=None, alias="Schedules")
table_mappings: Optional[DatasourceFlowsDetailsDataflowConfigTableMappings] = Field(default=None, alias="TableMappings")
consumer_configuration: Optional[DatasourceFlowsDetailsDataflowConfigConsumerConfiguration] = Field(default=None, alias="ConsumerConfiguration")
__properties: ClassVar[List[str]] = ["SharedInstance", "SyncToS3", "ExtraConnectionAttributes", "SourceExtraConnectionAttributes", "TargetTablePrepMode", "CdcStartTime", "CdcStopPosition", "InstanceClass", "AllocatedStorage", "InstanceAZ", "InstanceMultiAZ", "TaskRetryConfig", "AdditionalTaskSettings", "Schedules", "TableMappings", "ConsumerConfiguration"]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
[docs]
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
[docs]
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
[docs]
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of DatasourceFlowsDetailsDataflowConfig from a JSON string"""
return cls.from_dict(json.loads(json_str))
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([
])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
# override the default output from pydantic by calling `to_dict()` of task_retry_config
if self.task_retry_config:
_dict['TaskRetryConfig'] = self.task_retry_config.to_dict()
# override the default output from pydantic by calling `to_dict()` of each item in schedules (list of list)
_items = []
if self.schedules:
for _item_schedules in self.schedules:
if _item_schedules:
_items.append(
[_inner_item.to_dict() for _inner_item in _item_schedules if _inner_item is not None]
)
_dict['Schedules'] = _items
# override the default output from pydantic by calling `to_dict()` of table_mappings
if self.table_mappings:
_dict['TableMappings'] = self.table_mappings.to_dict()
# override the default output from pydantic by calling `to_dict()` of consumer_configuration
if self.consumer_configuration:
_dict['ConsumerConfiguration'] = self.consumer_configuration.to_dict()
return _dict
[docs]
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of DatasourceFlowsDetailsDataflowConfig from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate({
"SharedInstance": obj.get("SharedInstance"),
"SyncToS3": obj.get("SyncToS3"),
"ExtraConnectionAttributes": obj.get("ExtraConnectionAttributes"),
"SourceExtraConnectionAttributes": obj.get("SourceExtraConnectionAttributes"),
"TargetTablePrepMode": obj.get("TargetTablePrepMode"),
"CdcStartTime": obj.get("CdcStartTime"),
"CdcStopPosition": obj.get("CdcStopPosition"),
"InstanceClass": obj.get("InstanceClass"),
"AllocatedStorage": obj.get("AllocatedStorage"),
"InstanceAZ": obj.get("InstanceAZ"),
"InstanceMultiAZ": obj.get("InstanceMultiAZ"),
"TaskRetryConfig": DatasourceFlowsDetailsDataflowConfigTaskRetryConfig.from_dict(obj["TaskRetryConfig"]) if obj.get("TaskRetryConfig") is not None else None,
"AdditionalTaskSettings": obj.get("AdditionalTaskSettings"),
"Schedules": [
[ScheduleDetailsInner.from_dict(_inner_item) for _inner_item in _item]
for _item in obj["Schedules"]
] if obj.get("Schedules") is not None else None,
"TableMappings": DatasourceFlowsDetailsDataflowConfigTableMappings.from_dict(obj["TableMappings"]) if obj.get("TableMappings") is not None else None,
"ConsumerConfiguration": DatasourceFlowsDetailsDataflowConfigConsumerConfiguration.from_dict(obj["ConsumerConfiguration"]) if obj.get("ConsumerConfiguration") is not None else None
})
return _obj