# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. from __future__ import annotations from typing import Union, Mapping, cast from typing_extensions import Literal import httpx from .... import _legacy_response from .content import ( Content, AsyncContent, ContentWithRawResponse, AsyncContentWithRawResponse, ContentWithStreamingResponse, AsyncContentWithStreamingResponse, ) from ...._types import ( Body, Omit, Query, Headers, NotGiven, FileTypes, SequenceNotStr, omit, not_given, ) from ...._utils import extract_files, maybe_transform, deepcopy_minimal, async_maybe_transform from ...._compat import cached_property from ...._resource import SyncAPIResource, AsyncAPIResource from ...._response import to_streamed_response_wrapper, async_to_streamed_response_wrapper from ....pagination import SyncCursorPage, AsyncCursorPage from ...._base_client import AsyncPaginator, make_request_options from ....types.skills import version_list_params, version_create_params from ....types.skills.skill_version import SkillVersion from ....types.skills.deleted_skill_version import DeletedSkillVersion __all__ = ["Versions", "AsyncVersions"] class Versions(SyncAPIResource): @cached_property def content(self) -> Content: return Content(self._client) @cached_property def with_raw_response(self) -> VersionsWithRawResponse: """ This property can be used as a prefix for any HTTP method call to return the raw response object instead of the parsed content. For more information, see https://www.github.com/openai/openai-python#accessing-raw-response-data-eg-headers """ return VersionsWithRawResponse(self) @cached_property def with_streaming_response(self) -> VersionsWithStreamingResponse: """ An alternative to `.with_raw_response` that doesn't eagerly read the response body. For more information, see https://www.github.com/openai/openai-python#with_streaming_response """ return VersionsWithStreamingResponse(self) def create( self, skill_id: str, *, default: bool | Omit = omit, files: Union[SequenceNotStr[FileTypes], FileTypes] | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> SkillVersion: """ Create a new immutable skill version. Args: default: Whether to set this version as the default. files: Skill files to upload (directory upload) or a single zip file. extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ if not skill_id: raise ValueError(f"Expected a non-empty value for `skill_id` but received {skill_id!r}") body = deepcopy_minimal( { "default": default, "files": files, } ) extracted_files = extract_files(cast(Mapping[str, object], body), paths=[["files", ""], ["files"]]) if extracted_files: # It should be noted that the actual Content-Type header that will be # sent to the server will contain a `boundary` parameter, e.g. # multipart/form-data; boundary=---abc-- extra_headers = {"Content-Type": "multipart/form-data", **(extra_headers or {})} return self._post( f"/skills/{skill_id}/versions", body=maybe_transform(body, version_create_params.VersionCreateParams), files=extracted_files, options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout ), cast_to=SkillVersion, ) def retrieve( self, version: str, *, skill_id: str, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> SkillVersion: """ Get a specific skill version. Args: version: The version number to retrieve. extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ if not skill_id: raise ValueError(f"Expected a non-empty value for `skill_id` but received {skill_id!r}") if not version: raise ValueError(f"Expected a non-empty value for `version` but received {version!r}") return self._get( f"/skills/{skill_id}/versions/{version}", options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout ), cast_to=SkillVersion, ) def list( self, skill_id: str, *, after: str | Omit = omit, limit: int | Omit = omit, order: Literal["asc", "desc"] | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> SyncCursorPage[SkillVersion]: """ List skill versions for a skill. Args: after: The skill version ID to start after. limit: Number of versions to retrieve. order: Sort order of results by version number. extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ if not skill_id: raise ValueError(f"Expected a non-empty value for `skill_id` but received {skill_id!r}") return self._get_api_list( f"/skills/{skill_id}/versions", page=SyncCursorPage[SkillVersion], options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout, query=maybe_transform( { "after": after, "limit": limit, "order": order, }, version_list_params.VersionListParams, ), ), model=SkillVersion, ) def delete( self, version: str, *, skill_id: str, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> DeletedSkillVersion: """ Delete a skill version. Args: version: The skill version number. extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ if not skill_id: raise ValueError(f"Expected a non-empty value for `skill_id` but received {skill_id!r}") if not version: raise ValueError(f"Expected a non-empty value for `version` but received {version!r}") return self._delete( f"/skills/{skill_id}/versions/{version}", options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout ), cast_to=DeletedSkillVersion, ) class AsyncVersions(AsyncAPIResource): @cached_property def content(self) -> AsyncContent: return AsyncContent(self._client) @cached_property def with_raw_response(self) -> AsyncVersionsWithRawResponse: """ This property can be used as a prefix for any HTTP method call to return the raw response object instead of the parsed content. For more information, see https://www.github.com/openai/openai-python#accessing-raw-response-data-eg-headers """ return AsyncVersionsWithRawResponse(self) @cached_property def with_streaming_response(self) -> AsyncVersionsWithStreamingResponse: """ An alternative to `.with_raw_response` that doesn't eagerly read the response body. For more information, see https://www.github.com/openai/openai-python#with_streaming_response """ return AsyncVersionsWithStreamingResponse(self) async def create( self, skill_id: str, *, default: bool | Omit = omit, files: Union[SequenceNotStr[FileTypes], FileTypes] | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> SkillVersion: """ Create a new immutable skill version. Args: default: Whether to set this version as the default. files: Skill files to upload (directory upload) or a single zip file. extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ if not skill_id: raise ValueError(f"Expected a non-empty value for `skill_id` but received {skill_id!r}") body = deepcopy_minimal( { "default": default, "files": files, } ) extracted_files = extract_files(cast(Mapping[str, object], body), paths=[["files", ""], ["files"]]) if extracted_files: # It should be noted that the actual Content-Type header that will be # sent to the server will contain a `boundary` parameter, e.g. # multipart/form-data; boundary=---abc-- extra_headers = {"Content-Type": "multipart/form-data", **(extra_headers or {})} return await self._post( f"/skills/{skill_id}/versions", body=await async_maybe_transform(body, version_create_params.VersionCreateParams), files=extracted_files, options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout ), cast_to=SkillVersion, ) async def retrieve( self, version: str, *, skill_id: str, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> SkillVersion: """ Get a specific skill version. Args: version: The version number to retrieve. extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ if not skill_id: raise ValueError(f"Expected a non-empty value for `skill_id` but received {skill_id!r}") if not version: raise ValueError(f"Expected a non-empty value for `version` but received {version!r}") return await self._get( f"/skills/{skill_id}/versions/{version}", options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout ), cast_to=SkillVersion, ) def list( self, skill_id: str, *, after: str | Omit = omit, limit: int | Omit = omit, order: Literal["asc", "desc"] | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> AsyncPaginator[SkillVersion, AsyncCursorPage[SkillVersion]]: """ List skill versions for a skill. Args: after: The skill version ID to start after. limit: Number of versions to retrieve. order: Sort order of results by version number. extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ if not skill_id: raise ValueError(f"Expected a non-empty value for `skill_id` but received {skill_id!r}") return self._get_api_list( f"/skills/{skill_id}/versions", page=AsyncCursorPage[SkillVersion], options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout, query=maybe_transform( { "after": after, "limit": limit, "order": order, }, version_list_params.VersionListParams, ), ), model=SkillVersion, ) async def delete( self, version: str, *, skill_id: str, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> DeletedSkillVersion: """ Delete a skill version. Args: version: The skill version number. extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ if not skill_id: raise ValueError(f"Expected a non-empty value for `skill_id` but received {skill_id!r}") if not version: raise ValueError(f"Expected a non-empty value for `version` but received {version!r}") return await self._delete( f"/skills/{skill_id}/versions/{version}", options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout ), cast_to=DeletedSkillVersion, ) class VersionsWithRawResponse: def __init__(self, versions: Versions) -> None: self._versions = versions self.create = _legacy_response.to_raw_response_wrapper( versions.create, ) self.retrieve = _legacy_response.to_raw_response_wrapper( versions.retrieve, ) self.list = _legacy_response.to_raw_response_wrapper( versions.list, ) self.delete = _legacy_response.to_raw_response_wrapper( versions.delete, ) @cached_property def content(self) -> ContentWithRawResponse: return ContentWithRawResponse(self._versions.content) class AsyncVersionsWithRawResponse: def __init__(self, versions: AsyncVersions) -> None: self._versions = versions self.create = _legacy_response.async_to_raw_response_wrapper( versions.create, ) self.retrieve = _legacy_response.async_to_raw_response_wrapper( versions.retrieve, ) self.list = _legacy_response.async_to_raw_response_wrapper( versions.list, ) self.delete = _legacy_response.async_to_raw_response_wrapper( versions.delete, ) @cached_property def content(self) -> AsyncContentWithRawResponse: return AsyncContentWithRawResponse(self._versions.content) class VersionsWithStreamingResponse: def __init__(self, versions: Versions) -> None: self._versions = versions self.create = to_streamed_response_wrapper( versions.create, ) self.retrieve = to_streamed_response_wrapper( versions.retrieve, ) self.list = to_streamed_response_wrapper( versions.list, ) self.delete = to_streamed_response_wrapper( versions.delete, ) @cached_property def content(self) -> ContentWithStreamingResponse: return ContentWithStreamingResponse(self._versions.content) class AsyncVersionsWithStreamingResponse: def __init__(self, versions: AsyncVersions) -> None: self._versions = versions self.create = async_to_streamed_response_wrapper( versions.create, ) self.retrieve = async_to_streamed_response_wrapper( versions.retrieve, ) self.list = async_to_streamed_response_wrapper( versions.list, ) self.delete = async_to_streamed_response_wrapper( versions.delete, ) @cached_property def content(self) -> AsyncContentWithStreamingResponse: return AsyncContentWithStreamingResponse(self._versions.content)