diff --git a/deepset_cloud_sdk/_api/pipelines.py b/deepset_cloud_sdk/_api/pipelines.py new file mode 100644 index 00000000..e6c87aa3 --- /dev/null +++ b/deepset_cloud_sdk/_api/pipelines.py @@ -0,0 +1,66 @@ +""" +Pipeline API for deepset Cloud. + +This module takes care of all pipeline-related API calls to deepset Cloud. +""" + +from enum import Enum +from typing import Dict, List +from uuid import UUID + +import structlog +from httpx import codes + +from deepset_cloud_sdk._api.deepset_cloud_api import DeepsetCloudAPI + +logger = structlog.get_logger(__name__) + + +class FileIndexingStatus(str, Enum): + """File indexing status.""" + + FAILED = "FAILED" + INDEXED_NO_DOCUMENTS = "INDEXED_NO_DOCUMENTS" + + +class PipelineNotFoundException(Exception): + """Raised if pipeline was not found.""" + + +class FailedToFetchFileIdsException(Exception): + """Failed fetching pipeline files.""" + + +class PipelinesAPI: + """Pipeline API for deepset Cloud. + + This module takes care of all pipeline-related API calls to deepset Cloud. + + :param deepset_cloud_api: Instance of the DeepsetCloudAPI. + """ + + def __init__(self, deepset_cloud_api: DeepsetCloudAPI) -> None: + """ + Create FileAPI object. + + :param deepset_cloud_api: Instance of the DeepsetCloudAPI. + """ + self._deepset_cloud_api = deepset_cloud_api + + async def get_pipeline_file_ids( + self, pipeline_name: str, workspace_name: str, status: FileIndexingStatus = FileIndexingStatus.FAILED + ) -> List[UUID]: + """Get file ids that failed or did not create documents during indexing. + + :param pipeline_name: Name of the pipeline that indexed files. + :param workspace_name: Name of the workspace. + :param status: Status that should be used for fetching files + """ + params: Dict[str, str] = {"status": status.value} + response = await self._deepset_cloud_api.get(workspace_name, f"pipelines/{pipeline_name}/files", params=params) + if response.status_code == codes.NOT_FOUND: + raise PipelineNotFoundException() + if response.status_code != codes.OK: + raise FailedToFetchFileIdsException(response.text) + file_ids: List[UUID] = [UUID(_id) for _id in response.json()] + return file_ids diff --git a/deepset_cloud_sdk/_service/files_service.py b/deepset_cloud_sdk/_service/files_service.py index aaeb553b..d823f2d1 100644 --- a/deepset_cloud_sdk/_service/files_service.py +++ b/deepset_cloud_sdk/_service/files_service.py @@ -1,4 +1,5 @@ """Module for all file-related operations.""" + from __future__ import annotations import asyncio diff --git a/deepset_cloud_sdk/_service/pipeline_service.py b/deepset_cloud_sdk/_service/pipeline_service.py new file mode 100644 index 00000000..6822c2a5 --- /dev/null +++ b/deepset_cloud_sdk/_service/pipeline_service.py @@ -0,0 +1,51 @@ +"""Module for all file-related operations.""" + +from __future__ import annotations + +from contextlib import asynccontextmanager +from typing import AsyncGenerator, List +from uuid import UUID + +import structlog + +from deepset_cloud_sdk._api.config import CommonConfig +from deepset_cloud_sdk._api.deepset_cloud_api import DeepsetCloudAPI +from deepset_cloud_sdk._api.pipelines import FileIndexingStatus, PipelinesAPI + +logger = structlog.get_logger(__name__) + + +class PipelinesService: + """Service for all pipeline-related operations.""" + + def __init__(self, pipelines: PipelinesAPI): + """Initialize the service. + + :param pipelines: API for pipelines. + """ + self._pipelines = pipelines + + @classmethod + @asynccontextmanager + async def factory(cls, config: CommonConfig) -> AsyncGenerator[PipelinesService, None]: + """Create a new instance of the service. + + :param config: CommonConfig object. + :return: New instance of the service. + """ + async with DeepsetCloudAPI.factory(config) as deepset_cloud_api: + yield cls(PipelinesAPI(deepset_cloud_api)) + + async def get_pipeline_file_ids( + self, pipeline_name: str, workspace_name: str, status: FileIndexingStatus = FileIndexingStatus.FAILED + ) -> List[UUID]: + """Get file ids that failed or did not create documents during indexing. + + :param pipeline_name: Name of the pipeline that indexed files. + :param workspace_name: Name of the workspace. + :param status: Status that should be used for fetching files. + :return: List of file ids. + """ + return await self._pipelines.get_pipeline_file_ids( + pipeline_name=pipeline_name, workspace_name=workspace_name, status=status + ) diff --git a/deepset_cloud_sdk/cli.py b/deepset_cloud_sdk/cli.py index 40022b86..7615c34f 100644 --- a/deepset_cloud_sdk/cli.py +++ b/deepset_cloud_sdk/cli.py @@ -13,6 +13,9 @@ from deepset_cloud_sdk._api.config import DEFAULT_WORKSPACE_NAME, ENV_FILE_PATH from deepset_cloud_sdk._api.upload_sessions import WriteMode from deepset_cloud_sdk.workflows.sync_client.files import download as sync_download +from deepset_cloud_sdk.workflows.sync_client.files import ( + download_pipeline_files as sync_download_pipeline_files, +) from deepset_cloud_sdk.workflows.sync_client.files import ( get_upload_session as sync_get_upload_session, ) @@ -72,6 +75,41 @@ def upload( # pylint: disable=too-many-arguments ) +@cli_app.command() +def download_pipeline_files( # pylint: disable=too-many-arguments + pipeline_name: str, + workspace_name: str = DEFAULT_WORKSPACE_NAME, + file_dir: Optional[str] = None, + batch_size: int = 50, + api_key: Optional[str] = None, + api_url: Optional[str] = None, + show_progress: bool = True, + timeout_s: Optional[int] = None, +) -> None: + """Download all files that led to a error status during indexing. + + :param pipeline_name: Name of the pipeline. + :param workspace_name: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default. + :param file_dir: Path to the folder to download. If the folder contains unsupported files, they're skipped. + during the upload. Supported file formats are TXT and PDF. + :param batch_size: Batch size for the listing. + :param api_key: API key to use for authentication. + :param api_url: API URL to use for authentication. + :param show_progress: Shows the upload progress. + :param timeout_s: Timeout in seconds for the download. + """ + sync_download_pipeline_files( + workspace_name=workspace_name, + pipeline_name=pipeline_name, + file_dir=file_dir, + batch_size=batch_size, + api_key=api_key, + api_url=api_url, + show_progress=show_progress, + timeout_s=timeout_s, + ) + + @cli_app.command() def download( # pylint: disable=too-many-arguments workspace_name: str = DEFAULT_WORKSPACE_NAME, diff --git a/deepset_cloud_sdk/workflows/async_client/files.py b/deepset_cloud_sdk/workflows/async_client/files.py index 1df4b7ba..23f83e09 100644 --- a/deepset_cloud_sdk/workflows/async_client/files.py +++ b/deepset_cloud_sdk/workflows/async_client/files.py @@ -1,7 +1,7 @@ # pylint:disable=too-many-arguments """This module contains async functions for uploading files and folders to deepset Cloud.""" from pathlib import Path -from typing import AsyncGenerator, List, Optional, Union +from typing import AsyncGenerator, Generator, List, Optional, Union from uuid import UUID from sniffio import AsyncLibraryNotFoundError @@ -13,6 +13,7 @@ CommonConfig, ) from deepset_cloud_sdk._api.files import File +from deepset_cloud_sdk._api.pipelines import FileIndexingStatus from deepset_cloud_sdk._api.upload_sessions import ( UploadSessionDetail, UploadSessionStatus, @@ -20,6 +21,13 @@ ) from deepset_cloud_sdk._s3.upload import S3UploadSummary from deepset_cloud_sdk._service.files_service import DeepsetCloudFile, FilesService +from deepset_cloud_sdk._service.pipeline_service import PipelinesService + + +def _chunked(iterable: List, chunk_size: int) -> Generator[List, None, None]: + """Yield successive n-sized chunks from iterable.""" + for i in range(0, len(iterable), chunk_size): + yield iterable[i : i + chunk_size] def _get_config(api_key: Optional[str] = None, api_url: Optional[str] = None) -> CommonConfig: @@ -205,6 +213,60 @@ async def download( ) +async def download_pipeline_files( + pipeline_name: str, + workspace_name: str = DEFAULT_WORKSPACE_NAME, + file_dir: Optional[Union[Path, str]] = None, + batch_size: int = 50, + api_key: Optional[str] = None, + api_url: Optional[str] = None, + show_progress: bool = True, + timeout_s: Optional[int] = None, +) -> None: + """Download all files that led to a error status during indexing. + + :param pipeline_name: Name of the pipeline. + :param workspace_name: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default. + :param file_dir: Path to the folder to download. If the folder contains unsupported files, they're skipped. + during the upload. Supported file formats are TXT and PDF. + :param batch_size: Batch size for the listing. + :param api_key: API key to use for authentication. + :param api_url: API URL to use for authentication. + :param show_progress: Shows the upload progress. + :param timeout_s: Timeout in seconds for the download. + """ + file_ids: List[UUID] + + if file_dir is None: + file_dir = Path.cwd() + if isinstance(file_dir, str): + file_dir = Path(file_dir).resolve() + + for _status in FileIndexingStatus: + status_file_dir = Path(str(file_dir) + f"/{_status.value}") + + async with PipelinesService.factory(_get_config(api_key=api_key, api_url=api_url)) as pipeline_service: + file_ids = await pipeline_service.get_pipeline_file_ids( + pipeline_name=pipeline_name, workspace_name=workspace_name, status=_status + ) + + async with FilesService.factory(_get_config(api_key=api_key, api_url=api_url)) as file_service: + # WARNING: This filter might explode if we have many files that failed or did not + # create documents. We need to use the download method to get the file names here. + + # chunk the file_ids into batches of 10 to avoid a too long URL + for chunk in _chunked(file_ids, 10): + await file_service.download( + workspace_name=workspace_name, + file_dir=status_file_dir, + odata_filter=" or ".join([f"file_id eq '{_id}'" for _id in chunk]), + include_meta=True, + batch_size=batch_size, + show_progress=show_progress, + timeout_s=timeout_s, + ) + + async def upload_texts( files: List[DeepsetCloudFile], api_key: Optional[str] = None, diff --git a/deepset_cloud_sdk/workflows/sync_client/files.py b/deepset_cloud_sdk/workflows/sync_client/files.py index 71850c89..fe211e2a 100644 --- a/deepset_cloud_sdk/workflows/sync_client/files.py +++ b/deepset_cloud_sdk/workflows/sync_client/files.py @@ -17,6 +17,9 @@ from deepset_cloud_sdk._s3.upload import S3UploadSummary from deepset_cloud_sdk._service.files_service import DeepsetCloudFile from deepset_cloud_sdk.workflows.async_client.files import download as async_download +from deepset_cloud_sdk.workflows.async_client.files import ( + download_pipeline_files as async_download_pipeline_files, +) from deepset_cloud_sdk.workflows.async_client.files import ( get_upload_session as async_get_upload_session, ) @@ -128,6 +131,42 @@ def download( # pylint: disable=too-many-arguments ) +def download_pipeline_files( + pipeline_name: str, + workspace_name: str = DEFAULT_WORKSPACE_NAME, + file_dir: Optional[Union[Path, str]] = None, + batch_size: int = 50, + api_key: Optional[str] = None, + api_url: Optional[str] = None, + show_progress: bool = True, + timeout_s: Optional[int] = None, +) -> None: + """Download all files that led to a error status during indexing. + + :param pipeline_name: Name of the pipeline. + :param workspace_name: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default. + :param file_dir: Path to the folder to download. If the folder contains unsupported files, they're skipped. + during the upload. Supported file formats are TXT and PDF. + :param batch_size: Batch size for the listing. + :param api_key: API key to use for authentication. + :param api_url: API URL to use for authentication. + :param show_progress: Shows the upload progress. + :param timeout_s: Timeout in seconds for the download. + """ + asyncio.run( + async_download_pipeline_files( + api_key=api_key, + api_url=api_url, + workspace_name=workspace_name, + pipeline_name=pipeline_name, + file_dir=file_dir, + batch_size=batch_size, + show_progress=show_progress, + timeout_s=timeout_s, + ) + ) + + def upload_texts( files: List[DeepsetCloudFile], api_key: Optional[str] = None, diff --git a/tests/conftest.py b/tests/conftest.py index 6c317afc..e00dc252 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,6 +16,7 @@ from deepset_cloud_sdk._api.config import CommonConfig from deepset_cloud_sdk._api.deepset_cloud_api import DeepsetCloudAPI from deepset_cloud_sdk._api.files import FilesAPI +from deepset_cloud_sdk._api.pipelines import PipelinesAPI from deepset_cloud_sdk._api.upload_sessions import ( AWSPrefixedRequestConfig, UploadSession, @@ -71,6 +72,11 @@ def mocked_upload_sessions_api() -> Mock: return Mock(spec=UploadSessionsAPI) +@pytest.fixture +def mocked_pipelines() -> Mock: + return Mock(spec=PipelinesAPI) + + @pytest.fixture def mocked_files_api() -> Mock: return Mock(spec=FilesAPI) diff --git a/tests/unit/api/test_pipelines.py b/tests/unit/api/test_pipelines.py new file mode 100644 index 00000000..14c716bd --- /dev/null +++ b/tests/unit/api/test_pipelines.py @@ -0,0 +1,60 @@ +from unittest.mock import Mock +from uuid import UUID + +import httpx +import pytest + +from deepset_cloud_sdk._api.pipelines import ( + FailedToFetchFileIdsException, + FileIndexingStatus, + PipelineNotFoundException, + PipelinesAPI, +) + + +@pytest.fixture +def pipelines_api(mocked_deepset_cloud_api: Mock) -> PipelinesAPI: + return PipelinesAPI(mocked_deepset_cloud_api) + + +@pytest.mark.asyncio +class TestGetPipelineFileIDs: + async def test_get_pipeline_file_ids(self, pipelines_api: PipelinesAPI, mocked_deepset_cloud_api: Mock) -> None: + mocked_deepset_cloud_api.get.return_value = httpx.Response( + status_code=httpx.codes.OK, + json=["cd16435f-f6eb-423f-bf6f-994dc8a36a10", "cd16435f-f6eb-423f-bf6f-994dc8a36a13"], + ) + result = await pipelines_api.get_pipeline_file_ids( + workspace_name="test_workspace", + pipeline_name="test_pipeline", + status=FileIndexingStatus.INDEXED_NO_DOCUMENTS, + ) + assert result == [UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10"), UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a13")] + + async def test_get_pipeline_file_ids_with_not_found_pipeline( + self, pipelines_api: PipelinesAPI, mocked_deepset_cloud_api: Mock + ) -> None: + mocked_deepset_cloud_api.get.return_value = httpx.Response( + status_code=httpx.codes.NOT_FOUND, + json=[], + ) + with pytest.raises(PipelineNotFoundException): + await pipelines_api.get_pipeline_file_ids( + workspace_name="test_workspace", + pipeline_name="test_pipeline", + status=FileIndexingStatus.INDEXED_NO_DOCUMENTS, + ) + + async def test_get_pipeline_file_ids_with_api_error( + self, pipelines_api: PipelinesAPI, mocked_deepset_cloud_api: Mock + ) -> None: + mocked_deepset_cloud_api.get.return_value = httpx.Response( + status_code=httpx.codes.SERVICE_UNAVAILABLE, + json=[], + ) + with pytest.raises(FailedToFetchFileIdsException): + await pipelines_api.get_pipeline_file_ids( + workspace_name="test_workspace", + pipeline_name="test_pipeline", + status=FileIndexingStatus.INDEXED_NO_DOCUMENTS, + ) diff --git a/tests/unit/service/test_pipelines_service.py b/tests/unit/service/test_pipelines_service.py new file mode 100644 index 00000000..875ca7a7 --- /dev/null +++ b/tests/unit/service/test_pipelines_service.py @@ -0,0 +1,42 @@ +from unittest.mock import AsyncMock, Mock +from uuid import UUID + +import pytest + +from deepset_cloud_sdk._api.config import CommonConfig +from deepset_cloud_sdk._api.pipelines import FileIndexingStatus +from deepset_cloud_sdk._service.pipeline_service import PipelinesService + + +@pytest.fixture +def pipelines_service(mocked_pipelines: Mock) -> PipelinesService: + return PipelinesService(pipelines=mocked_pipelines) + + +@pytest.mark.asyncio +class TestUtilsPipelinesService: + async def test_factory(self, unit_config: CommonConfig) -> None: + async with PipelinesService.factory(unit_config) as pipelines_service: + assert isinstance(pipelines_service, PipelinesService) + + +@pytest.mark.asyncio +class TestGetFileIds: + async def test_get_file_ids( + self, + mocked_pipelines: Mock, + pipelines_service: PipelinesService, + ) -> None: + mocked_pipelines.get_pipeline_file_ids = AsyncMock(return_value=[UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10")]) + returned_file_ids = await pipelines_service.get_pipeline_file_ids( + workspace_name="test_workspace", + pipeline_name="test_pipeline", + status=FileIndexingStatus.INDEXED_NO_DOCUMENTS, + ) + assert returned_file_ids == [UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10")], "Unexpected file ids" + + mocked_pipelines.get_pipeline_file_ids.assert_called_once_with( + pipeline_name="test_pipeline", + workspace_name="test_workspace", + status=FileIndexingStatus.INDEXED_NO_DOCUMENTS, + ) diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index 3cde773c..60cf5e33 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -118,6 +118,23 @@ def test_download_files(self, sync_download_mock: AsyncMock) -> None: show_progress=True, ) + class TestDownloadPipelineFiles: + @patch("deepset_cloud_sdk.cli.sync_download_pipeline_files") + def test_download_pipeline_files(self, mocked_sync_download_pipeline_files: AsyncMock) -> None: + mocked_sync_download_pipeline_files.side_effect = Mock() + result = runner.invoke(cli_app, ["download-pipeline-files", "my-pipeline", "--workspace-name", "default"]) + assert result.exit_code == 0 + mocked_sync_download_pipeline_files.assert_called_once_with( + workspace_name="default", + pipeline_name="my-pipeline", + file_dir=None, + batch_size=50, + api_key=None, + api_url=None, + show_progress=True, + timeout_s=None, + ) + class TestListFiles: @patch("deepset_cloud_sdk.cli.sync_list_files") def test_listing_files(self, sync_list_files_mock: AsyncMock) -> None: diff --git a/tests/unit/workflows/async_client/test_async_workflow_files.py b/tests/unit/workflows/async_client/test_async_workflow_files.py index b0e9f307..99d078b3 100644 --- a/tests/unit/workflows/async_client/test_async_workflow_files.py +++ b/tests/unit/workflows/async_client/test_async_workflow_files.py @@ -1,7 +1,7 @@ import datetime from pathlib import Path from typing import Any, AsyncGenerator, List -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, Mock, call from uuid import UUID import pytest @@ -10,6 +10,7 @@ from deepset_cloud_sdk._api.config import DEFAULT_WORKSPACE_NAME from deepset_cloud_sdk._api.files import File +from deepset_cloud_sdk._api.pipelines import FileIndexingStatus from deepset_cloud_sdk._api.upload_sessions import ( UploadSessionDetail, UploadSessionIngestionStatus, @@ -19,9 +20,11 @@ WriteMode, ) from deepset_cloud_sdk._service.files_service import DeepsetCloudFile, FilesService +from deepset_cloud_sdk._service.pipeline_service import PipelinesService from deepset_cloud_sdk.models import UserInfo from deepset_cloud_sdk.workflows.async_client.files import ( download, + download_pipeline_files, get_upload_session, list_files, list_upload_sessions, @@ -282,3 +285,132 @@ async def mocked_get_upload_session( monkeypatch.setattr(FilesService, "get_upload_session", mocked_get_upload_session) returned_upload_session = await get_upload_session(session_id=UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10")) assert returned_upload_session == mocked_upload_session + + +@pytest.mark.asyncio +class TestPipelineFiles: + @pytest.fixture + def mocked_download(self) -> AsyncMock: + return AsyncMock() + + @pytest.fixture + def mocked_get_pipeline_file_ids(self) -> AsyncMock: + get_file_ids = AsyncMock() + get_file_ids.return_value = [ + UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10"), + UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a11"), + ] + return get_file_ids + + async def test_download_pipeline_files( + self, monkeypatch: MonkeyPatch, mocked_download: AsyncMock, mocked_get_pipeline_file_ids: AsyncMock + ) -> None: + monkeypatch.setattr(PipelinesService, "get_pipeline_file_ids", mocked_get_pipeline_file_ids) + monkeypatch.setattr(FilesService, "download", mocked_download) + + await download_pipeline_files( + workspace_name="my_workspace", + pipeline_name="my_pipeline", + batch_size=100, + timeout_s=100, + ) + + # Check that pipeline indexing file ids was called with + # the correct status for filtering + assert ( + call(pipeline_name="my_pipeline", workspace_name="my_workspace", status=FileIndexingStatus.FAILED) + in mocked_get_pipeline_file_ids.call_args_list + ) + assert ( + call( + pipeline_name="my_pipeline", + workspace_name="my_workspace", + status=FileIndexingStatus.INDEXED_NO_DOCUMENTS, + ) + in mocked_get_pipeline_file_ids.call_args_list + ) + + # Assert that download calls are correctly called for + # failed files + assert ( + call( + workspace_name="my_workspace", + file_dir=Path("./FAILED"), + odata_filter=f"file_id in 'cd16435f-f6eb-423f-bf6f-994dc8a36a10,cd16435f-f6eb-423f-bf6f-994dc8a36a11'", + include_meta=True, + batch_size=100, + show_progress=True, + timeout_s=100, + ) + in mocked_download.call_args_list + ) + assert ( + call( + workspace_name="my_workspace", + file_dir=Path("./INDEXED_NO_DOCUMENTS"), + odata_filter=f"file_id in 'cd16435f-f6eb-423f-bf6f-994dc8a36a10,cd16435f-f6eb-423f-bf6f-994dc8a36a11'", + include_meta=True, + batch_size=100, + show_progress=True, + timeout_s=100, + ) + in mocked_download.call_args_list + ) + + async def test_download_parses_path_for_string( + self, monkeypatch: MonkeyPatch, mocked_download: Mock, mocked_get_pipeline_file_ids: Mock + ) -> None: + monkeypatch.setattr(PipelinesService, "get_pipeline_file_ids", mocked_get_pipeline_file_ids) + monkeypatch.setattr(FilesService, "download", mocked_download) + + await download_pipeline_files( + workspace_name="my_workspace", + pipeline_name="my_pipeline", + file_dir="./my-path", + batch_size=100, + timeout_s=100, + ) + + # Assert that download calls are correctly called for + # failed files + assert ( + call( + workspace_name="my_workspace", + file_dir=Path("./my-path/FAILED"), # THIS IS TESTED + odata_filter=f"file_id in 'cd16435f-f6eb-423f-bf6f-994dc8a36a10,cd16435f-f6eb-423f-bf6f-994dc8a36a11'", + include_meta=True, + batch_size=100, + show_progress=True, + timeout_s=100, + ) + in mocked_download.call_args_list + ) + + async def test_download_parses_path_for_path_type( + self, monkeypatch: MonkeyPatch, mocked_download: Mock, mocked_get_pipeline_file_ids: Mock + ) -> None: + monkeypatch.setattr(PipelinesService, "get_pipeline_file_ids", mocked_get_pipeline_file_ids) + monkeypatch.setattr(FilesService, "download", mocked_download) + + await download_pipeline_files( + workspace_name="my_workspace", + pipeline_name="my_pipeline", + file_dir=Path("./my-path"), + batch_size=100, + timeout_s=100, + ) + + # Assert that download calls are correctly called for + # failed files + assert ( + call( + workspace_name="my_workspace", + file_dir=Path("./my-path/FAILED"), # THIS IS TESTED + odata_filter=f"file_id in 'cd16435f-f6eb-423f-bf6f-994dc8a36a10,cd16435f-f6eb-423f-bf6f-994dc8a36a11'", + include_meta=True, + batch_size=100, + show_progress=True, + timeout_s=100, + ) + in mocked_download.call_args_list + ) diff --git a/tests/unit/workflows/sync_client/test_sync_workflow_files.py b/tests/unit/workflows/sync_client/test_sync_workflow_files.py index e2909c65..abef406f 100644 --- a/tests/unit/workflows/sync_client/test_sync_workflow_files.py +++ b/tests/unit/workflows/sync_client/test_sync_workflow_files.py @@ -18,6 +18,7 @@ from deepset_cloud_sdk.models import UserInfo from deepset_cloud_sdk.workflows.sync_client.files import ( download, + download_pipeline_files, get_upload_session, list_files, list_upload_sessions, @@ -152,6 +153,31 @@ def test_download_files() -> None: ) +def test_download_pipeline_files() -> None: + mocked_async_download_pipeline_files = AsyncMock() + with patch( + "deepset_cloud_sdk.workflows.sync_client.files.async_download_pipeline_files", + new=mocked_async_download_pipeline_files, + ): + download_pipeline_files( + workspace_name="my_workspace", + pipeline_name="my_pipeline", + file_dir="./mypath", + batch_size=100, + timeout_s=100, + ) + mocked_async_download_pipeline_files.assert_called_once_with( + api_key=None, + api_url=None, + workspace_name="my_workspace", + pipeline_name="my_pipeline", + file_dir="./mypath", + batch_size=100, + show_progress=True, + timeout_s=100, + ) + + def test_list_upload_sessions() -> None: async def mocked_async_upload_sessions( *args: Any, **kwargs: Any