Source code for openapi_client.api.resource_audit_logs_api

# coding: utf-8

"""
    Amorphic Data Platform

    Amorphic Data Platform - API Definition documentation

    The version of the OpenAPI document: 0.3.0
    Generated by OpenAPI Generator (https://openapi-generator.tech)

    Do not edit the class manually.
"""  # noqa: E501

import warnings
from pydantic import validate_call, Field, StrictFloat, StrictStr, StrictInt
from typing import Any, Dict, List, Optional, Tuple, Union
from typing_extensions import Annotated

from pydantic import StrictStr
from typing import Optional
from openapi_client.models.resource_audit_logs_output import ResourceAuditLogsOutput

from openapi_client.api_client import ApiClient, RequestSerialized
from openapi_client.api_response import ApiResponse
from openapi_client.rest import RESTResponseType


class ResourceAuditLogsApi:
    """NOTE: This class is auto generated by OpenAPI Generator
    Ref: https://openapi-generator.tech

    Do not edit the class manually.
    """

    def __init__(self, api_client=None) -> None:
        if api_client is None:
            api_client = ApiClient.get_default()
        self.api_client = api_client


[docs] @validate_call def get_audit_logs_of_resource( self, resource: StrictStr, id: StrictStr, role_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, sortby: Optional[StrictStr] = None, projection_expression: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ResourceAuditLogsOutput: """Get audit log information of user/resource Returns the audit logs for the requested resource/user present inside the db :param resource: (required) :type resource: str :param id: (required) :type id: str :param role_id: (required) :type role_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param sortby: :type sortby: str :param projection_expression: :type projection_expression: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._get_audit_logs_of_resource_serialize( resource=resource, id=id, role_id=role_id, limit=limit, offset=offset, sortorder=sortorder, sortby=sortby, projection_expression=projection_expression, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "ResourceAuditLogsOutput", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data
@validate_call def get_audit_logs_of_resource_with_http_info( self, resource: StrictStr, id: StrictStr, role_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, sortby: Optional[StrictStr] = None, projection_expression: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[ResourceAuditLogsOutput]: """Get audit log information of user/resource Returns the audit logs for the requested resource/user present inside the db :param resource: (required) :type resource: str :param id: (required) :type id: str :param role_id: (required) :type role_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param sortby: :type sortby: str :param projection_expression: :type projection_expression: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._get_audit_logs_of_resource_serialize( resource=resource, id=id, role_id=role_id, limit=limit, offset=offset, sortorder=sortorder, sortby=sortby, projection_expression=projection_expression, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "ResourceAuditLogsOutput", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call def get_audit_logs_of_resource_without_preload_content( self, resource: StrictStr, id: StrictStr, role_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, sortby: Optional[StrictStr] = None, projection_expression: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get audit log information of user/resource Returns the audit logs for the requested resource/user present inside the db :param resource: (required) :type resource: str :param id: (required) :type id: str :param role_id: (required) :type role_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param sortby: :type sortby: str :param projection_expression: :type projection_expression: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._get_audit_logs_of_resource_serialize( resource=resource, id=id, role_id=role_id, limit=limit, offset=offset, sortorder=sortorder, sortby=sortby, projection_expression=projection_expression, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "ResourceAuditLogsOutput", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _get_audit_logs_of_resource_serialize( self, resource, id, role_id, limit, offset, sortorder, sortby, projection_expression, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if resource is not None: _path_params['resource'] = resource if id is not None: _path_params['id'] = id # process the query parameters if limit is not None: _query_params.append(('limit', limit)) if offset is not None: _query_params.append(('offset', offset)) if sortorder is not None: _query_params.append(('sortorder', sortorder)) if sortby is not None: _query_params.append(('sortby', sortby)) if projection_expression is not None: _query_params.append(('projectionExpression', projection_expression)) # process the header parameters if role_id is not None: _header_params['role_id'] = role_id # process the form parameters # process the body parameter # set the HTTP header `Accept` if 'Accept' not in _header_params: _header_params['Accept'] = self.api_client.select_header_accept( [ 'application/json' ] ) # authentication setting _auth_settings: List[str] = [ 'LambdaAuthorizer' ] return self.api_client.param_serialize( method='GET', resource_path='/resource-audit-logs/{resource}/{id}', path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth )