# coding: utf-8
"""
Amorphic Data Platform
Amorphic Data Platform - API Definition documentation
The version of the OpenAPI document: 0.3.0
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import pprint
import re # noqa: F401
import json
from pydantic import BaseModel, ConfigDict, Field, StrictBool, StrictInt, StrictStr, field_validator
from typing import Any, ClassVar, Dict, List, Optional
from openapi_client.models.datasource_details_datasource_config_auth_config import DatasourceDetailsDatasourceConfigAuthConfig
from openapi_client.models.datasource_details_datasource_config_cluster_config import DatasourceDetailsDatasourceConfigClusterConfig
from openapi_client.models.datasource_details_datasource_config_pagination_config import DatasourceDetailsDatasourceConfigPaginationConfig
from openapi_client.models.datasource_details_datasource_config_public_keys_list_inner import DatasourceDetailsDatasourceConfigPublicKeysListInner
from openapi_client.models.datasource_schedule_config import DatasourceScheduleConfig
from typing import Optional, Set
from typing_extensions import Self
[docs]
class DatasourceDetailsDatasourceConfig(BaseModel):
"""
DatasourceDetailsDatasourceConfig
""" # noqa: E501
public_accessibility: Optional[StrictStr] = Field(default=None, alias="PublicAccessibility")
ssl: Optional[StrictStr] = Field(default=None, alias="Ssl")
jdbc_url: Optional[StrictStr] = Field(default=None, alias="JdbcURL")
username: Optional[StrictStr] = Field(default=None, alias="Username")
password: Optional[StrictStr] = Field(default=None, alias="Password")
ingestion_timeout: Optional[StrictStr] = Field(default=None, alias="IngestionTimeout")
cluster_config: Optional[DatasourceDetailsDatasourceConfigClusterConfig] = Field(default=None, alias="ClusterConfig")
s3_connection_type: Optional[StrictStr] = Field(default=None, alias="S3ConnectionType")
s3_bucket_region: Optional[StrictStr] = Field(default=None, alias="S3BucketRegion")
s3_bucket: Optional[StrictStr] = Field(default=None, alias="S3Bucket")
secret_key: Optional[StrictStr] = Field(default=None, alias="SecretKey")
access_key: Optional[StrictStr] = Field(default=None, alias="AccessKey")
post_authentication_login_banner: Optional[StrictStr] = Field(default=None, alias="PostAuthenticationLoginBanner")
pre_authentication_login_banner: Optional[StrictStr] = Field(default=None, alias="PreAuthenticationLoginBanner")
set_stat_option: Optional[StrictBool] = Field(default=None, alias="SetStatOption")
public_keys_list: Optional[List[DatasourceDetailsDatasourceConfigPublicKeysListInner]] = Field(default=None, alias="PublicKeysList")
email_address: Optional[StrictStr] = Field(default=None, alias="EmailAddress")
enable_scan: Optional[StrictStr] = Field(default=None, alias="EnableScan")
enable_tls: Optional[StrictStr] = Field(default=None, alias="EnableTLS")
request_url: Optional[StrictStr] = Field(default=None, description="URL of the API endpoint", alias="RequestUrl")
request_method: Optional[StrictStr] = Field(default=None, description="Method to use for the API", alias="RequestMethod")
request_headers: Optional[Dict[str, Any]] = Field(default=None, description="Headers to send with the request", alias="RequestHeaders")
request_query_parameters: Optional[Dict[str, Any]] = Field(default=None, description="Query parameters to send with the request", alias="RequestQueryParameters")
request_body: Optional[Dict[str, Any]] = Field(default=None, description="Body to send with POST requests", alias="RequestBody")
auth_type: Optional[StrictStr] = Field(default=None, description="Authentication type for the API", alias="AuthType")
auth_config: Optional[DatasourceDetailsDatasourceConfigAuthConfig] = Field(default=None, alias="AuthConfig")
pagination_type: Optional[StrictStr] = Field(default=None, description="Type of pagination to use", alias="PaginationType")
pagination_config: Optional[DatasourceDetailsDatasourceConfigPaginationConfig] = Field(default=None, alias="PaginationConfig")
data_retention_period: Optional[StrictInt] = Field(default=None, alias="DataRetentionPeriod")
shard_count: Optional[StrictInt] = Field(default=None, alias="ShardCount")
stream_mode: Optional[StrictStr] = Field(default=None, alias="StreamMode")
amorphic_url: Optional[StrictStr] = Field(default=None, alias="AmorphicURL")
access_token: Optional[StrictStr] = Field(default=None, alias="AccessToken")
asset_filters: Optional[Dict[str, Any]] = Field(default=None, alias="AssetFilters")
schedule_config: Optional[DatasourceScheduleConfig] = Field(default=None, alias="ScheduleConfig")
client_id: Optional[StrictStr] = Field(default=None, alias="clientId")
client_secret: Optional[StrictStr] = Field(default=None, alias="clientSecret")
access_token: Optional[StrictStr] = Field(default=None, alias="accessToken")
instance_url: Optional[StrictStr] = Field(default=None, alias="instanceUrl")
__properties: ClassVar[List[str]] = ["PublicAccessibility", "Ssl", "JdbcURL", "Username", "Password", "IngestionTimeout", "ClusterConfig", "S3ConnectionType", "S3BucketRegion", "S3Bucket", "SecretKey", "AccessKey", "PostAuthenticationLoginBanner", "PreAuthenticationLoginBanner", "SetStatOption", "PublicKeysList", "EmailAddress", "EnableScan", "EnableTLS", "RequestUrl", "RequestMethod", "RequestHeaders", "RequestQueryParameters", "RequestBody", "AuthType", "AuthConfig", "PaginationType", "PaginationConfig", "DataRetentionPeriod", "ShardCount", "StreamMode", "AmorphicURL", "AccessToken", "AssetFilters", "ScheduleConfig", "clientId", "clientSecret", "accessToken", "instanceUrl"]
[docs]
@field_validator('request_method')
def request_method_validate_enum(cls, value):
"""Validates the enum"""
if value is None:
return value
if value not in set(['GET', 'POST']):
raise ValueError("must be one of enum values ('GET', 'POST')")
return value
[docs]
@field_validator('auth_type')
def auth_type_validate_enum(cls, value):
"""Validates the enum"""
if value is None:
return value
if value not in set(['NoAuth', 'BasicAuth', 'ApiKey', 'BearerToken', 'OAuth1', 'OAuth2']):
raise ValueError("must be one of enum values ('NoAuth', 'BasicAuth', 'ApiKey', 'BearerToken', 'OAuth1', 'OAuth2')")
return value
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
[docs]
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
[docs]
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
[docs]
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of DatasourceDetailsDatasourceConfig from a JSON string"""
return cls.from_dict(json.loads(json_str))
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([
])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
# override the default output from pydantic by calling `to_dict()` of cluster_config
if self.cluster_config:
_dict['ClusterConfig'] = self.cluster_config.to_dict()
# override the default output from pydantic by calling `to_dict()` of each item in public_keys_list (list)
_items = []
if self.public_keys_list:
for _item_public_keys_list in self.public_keys_list:
if _item_public_keys_list:
_items.append(_item_public_keys_list.to_dict())
_dict['PublicKeysList'] = _items
# override the default output from pydantic by calling `to_dict()` of auth_config
if self.auth_config:
_dict['AuthConfig'] = self.auth_config.to_dict()
# override the default output from pydantic by calling `to_dict()` of pagination_config
if self.pagination_config:
_dict['PaginationConfig'] = self.pagination_config.to_dict()
# override the default output from pydantic by calling `to_dict()` of schedule_config
if self.schedule_config:
_dict['ScheduleConfig'] = self.schedule_config.to_dict()
return _dict
[docs]
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of DatasourceDetailsDatasourceConfig from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate({
"PublicAccessibility": obj.get("PublicAccessibility"),
"Ssl": obj.get("Ssl"),
"JdbcURL": obj.get("JdbcURL"),
"Username": obj.get("Username"),
"Password": obj.get("Password"),
"IngestionTimeout": obj.get("IngestionTimeout"),
"ClusterConfig": DatasourceDetailsDatasourceConfigClusterConfig.from_dict(obj["ClusterConfig"]) if obj.get("ClusterConfig") is not None else None,
"S3ConnectionType": obj.get("S3ConnectionType"),
"S3BucketRegion": obj.get("S3BucketRegion"),
"S3Bucket": obj.get("S3Bucket"),
"SecretKey": obj.get("SecretKey"),
"AccessKey": obj.get("AccessKey"),
"PostAuthenticationLoginBanner": obj.get("PostAuthenticationLoginBanner"),
"PreAuthenticationLoginBanner": obj.get("PreAuthenticationLoginBanner"),
"SetStatOption": obj.get("SetStatOption"),
"PublicKeysList": [DatasourceDetailsDatasourceConfigPublicKeysListInner.from_dict(_item) for _item in obj["PublicKeysList"]] if obj.get("PublicKeysList") is not None else None,
"EmailAddress": obj.get("EmailAddress"),
"EnableScan": obj.get("EnableScan"),
"EnableTLS": obj.get("EnableTLS"),
"RequestUrl": obj.get("RequestUrl"),
"RequestMethod": obj.get("RequestMethod"),
"RequestHeaders": obj.get("RequestHeaders"),
"RequestQueryParameters": obj.get("RequestQueryParameters"),
"RequestBody": obj.get("RequestBody"),
"AuthType": obj.get("AuthType"),
"AuthConfig": DatasourceDetailsDatasourceConfigAuthConfig.from_dict(obj["AuthConfig"]) if obj.get("AuthConfig") is not None else None,
"PaginationType": obj.get("PaginationType"),
"PaginationConfig": DatasourceDetailsDatasourceConfigPaginationConfig.from_dict(obj["PaginationConfig"]) if obj.get("PaginationConfig") is not None else None,
"DataRetentionPeriod": obj.get("DataRetentionPeriod"),
"ShardCount": obj.get("ShardCount"),
"StreamMode": obj.get("StreamMode"),
"AmorphicURL": obj.get("AmorphicURL"),
"AccessToken": obj.get("AccessToken"),
"AssetFilters": obj.get("AssetFilters"),
"ScheduleConfig": DatasourceScheduleConfig.from_dict(obj["ScheduleConfig"]) if obj.get("ScheduleConfig") is not None else None,
"clientId": obj.get("clientId"),
"clientSecret": obj.get("clientSecret"),
"accessToken": obj.get("accessToken"),
"instanceUrl": obj.get("instanceUrl")
})
return _obj