Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions deepset_cloud_sdk/_api/pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
Pipeline API for deepset Cloud.

This module takes care of all pipeline-related API calls to deepset Cloud.
"""

from enum import Enum
from typing import Dict, List
from uuid import UUID

import structlog
from httpx import codes

from deepset_cloud_sdk._api.deepset_cloud_api import DeepsetCloudAPI

logger = structlog.get_logger(__name__)


class FileIndexingStatus(str, Enum):
"""File indexing status."""

FAILED = "FAILED"
INDEXED_NO_DOCUMENTS = "INDEXED_NO_DOCUMENTS"


class PipelineNotFoundException(Exception):
"""Raised if pipeline was not found."""


class FailedToFetchFileIdsException(Exception):
"""Failed fetching pipeline files."""


class PipelinesAPI:
"""Pipeline API for deepset Cloud.

This module takes care of all pipeline-related API calls to deepset Cloud.

:param deepset_cloud_api: Instance of the DeepsetCloudAPI.
"""

def __init__(self, deepset_cloud_api: DeepsetCloudAPI) -> None:
"""
Create FileAPI object.

:param deepset_cloud_api: Instance of the DeepsetCloudAPI.
"""
self._deepset_cloud_api = deepset_cloud_api

async def get_pipeline_file_ids(
self, pipeline_name: str, workspace_name: str, status: FileIndexingStatus = FileIndexingStatus.FAILED
) -> List[UUID]:
"""Get file ids that failed or did not create documents during indexing.

:param pipeline_name: Name of the pipeline that indexed files.
:param workspace_name: Name of the workspace.
:param status: Status that should be used for fetching files
"""
params: Dict[str, str] = {"status": status.value}
response = await self._deepset_cloud_api.get(workspace_name, f"pipelines/{pipeline_name}/files", params=params)
if response.status_code == codes.NOT_FOUND:
raise PipelineNotFoundException()
if response.status_code != codes.OK:
raise FailedToFetchFileIdsException(response.text)
file_ids: List[UUID] = [UUID(_id) for _id in response.json()]
return file_ids
1 change: 1 addition & 0 deletions deepset_cloud_sdk/_service/files_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Module for all file-related operations."""

from __future__ import annotations

import asyncio
Expand Down
51 changes: 51 additions & 0 deletions deepset_cloud_sdk/_service/pipeline_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Module for all file-related operations."""

from __future__ import annotations

from contextlib import asynccontextmanager
from typing import AsyncGenerator, List
from uuid import UUID

import structlog

from deepset_cloud_sdk._api.config import CommonConfig
from deepset_cloud_sdk._api.deepset_cloud_api import DeepsetCloudAPI
from deepset_cloud_sdk._api.pipelines import FileIndexingStatus, PipelinesAPI

logger = structlog.get_logger(__name__)


class PipelinesService:
"""Service for all pipeline-related operations."""

def __init__(self, pipelines: PipelinesAPI):
"""Initialize the service.

:param pipelines: API for pipelines.
"""
self._pipelines = pipelines

@classmethod
@asynccontextmanager
async def factory(cls, config: CommonConfig) -> AsyncGenerator[PipelinesService, None]:
"""Create a new instance of the service.

:param config: CommonConfig object.
:return: New instance of the service.
"""
async with DeepsetCloudAPI.factory(config) as deepset_cloud_api:
yield cls(PipelinesAPI(deepset_cloud_api))

async def get_pipeline_file_ids(
self, pipeline_name: str, workspace_name: str, status: FileIndexingStatus = FileIndexingStatus.FAILED
) -> List[UUID]:
"""Get file ids that failed or did not create documents during indexing.

:param pipeline_name: Name of the pipeline that indexed files.
:param workspace_name: Name of the workspace.
:param status: Status that should be used for fetching files.
:return: List of file ids.
"""
return await self._pipelines.get_pipeline_file_ids(
pipeline_name=pipeline_name, workspace_name=workspace_name, status=status
)
38 changes: 38 additions & 0 deletions deepset_cloud_sdk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from deepset_cloud_sdk._api.config import DEFAULT_WORKSPACE_NAME, ENV_FILE_PATH
from deepset_cloud_sdk._api.upload_sessions import WriteMode
from deepset_cloud_sdk.workflows.sync_client.files import download as sync_download
from deepset_cloud_sdk.workflows.sync_client.files import (
download_pipeline_files as sync_download_pipeline_files,
)
from deepset_cloud_sdk.workflows.sync_client.files import (
get_upload_session as sync_get_upload_session,
)
Expand Down Expand Up @@ -72,6 +75,41 @@ def upload( # pylint: disable=too-many-arguments
)


@cli_app.command()
def download_pipeline_files( # pylint: disable=too-many-arguments
pipeline_name: str,
workspace_name: str = DEFAULT_WORKSPACE_NAME,
file_dir: Optional[str] = None,
batch_size: int = 50,
api_key: Optional[str] = None,
api_url: Optional[str] = None,
show_progress: bool = True,
timeout_s: Optional[int] = None,
) -> None:
"""Download all files that led to a error status during indexing.

:param pipeline_name: Name of the pipeline.
:param workspace_name: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default.
:param file_dir: Path to the folder to download. If the folder contains unsupported files, they're skipped.
during the upload. Supported file formats are TXT and PDF.
:param batch_size: Batch size for the listing.
:param api_key: API key to use for authentication.
:param api_url: API URL to use for authentication.
:param show_progress: Shows the upload progress.
:param timeout_s: Timeout in seconds for the download.
"""
sync_download_pipeline_files(
workspace_name=workspace_name,
pipeline_name=pipeline_name,
file_dir=file_dir,
batch_size=batch_size,
api_key=api_key,
api_url=api_url,
show_progress=show_progress,
timeout_s=timeout_s,
)


@cli_app.command()
def download( # pylint: disable=too-many-arguments
workspace_name: str = DEFAULT_WORKSPACE_NAME,
Expand Down
64 changes: 63 additions & 1 deletion deepset_cloud_sdk/workflows/async_client/files.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# pylint:disable=too-many-arguments
"""This module contains async functions for uploading files and folders to deepset Cloud."""
from pathlib import Path
from typing import AsyncGenerator, List, Optional, Union
from typing import AsyncGenerator, Generator, List, Optional, Union
from uuid import UUID

from sniffio import AsyncLibraryNotFoundError
Expand All @@ -13,13 +13,21 @@
CommonConfig,
)
from deepset_cloud_sdk._api.files import File
from deepset_cloud_sdk._api.pipelines import FileIndexingStatus
from deepset_cloud_sdk._api.upload_sessions import (
UploadSessionDetail,
UploadSessionStatus,
WriteMode,
)
from deepset_cloud_sdk._s3.upload import S3UploadSummary
from deepset_cloud_sdk._service.files_service import DeepsetCloudFile, FilesService
from deepset_cloud_sdk._service.pipeline_service import PipelinesService


def _chunked(iterable: List, chunk_size: int) -> Generator[List, None, None]:
"""Yield successive n-sized chunks from iterable."""
for i in range(0, len(iterable), chunk_size):
yield iterable[i : i + chunk_size]


def _get_config(api_key: Optional[str] = None, api_url: Optional[str] = None) -> CommonConfig:
Expand Down Expand Up @@ -205,6 +213,60 @@ async def download(
)


async def download_pipeline_files(
pipeline_name: str,
workspace_name: str = DEFAULT_WORKSPACE_NAME,
file_dir: Optional[Union[Path, str]] = None,
batch_size: int = 50,
api_key: Optional[str] = None,
api_url: Optional[str] = None,
show_progress: bool = True,
timeout_s: Optional[int] = None,
) -> None:
"""Download all files that led to a error status during indexing.

:param pipeline_name: Name of the pipeline.
:param workspace_name: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default.
:param file_dir: Path to the folder to download. If the folder contains unsupported files, they're skipped.
during the upload. Supported file formats are TXT and PDF.
:param batch_size: Batch size for the listing.
:param api_key: API key to use for authentication.
:param api_url: API URL to use for authentication.
:param show_progress: Shows the upload progress.
:param timeout_s: Timeout in seconds for the download.
"""
file_ids: List[UUID]

if file_dir is None:
file_dir = Path.cwd()
if isinstance(file_dir, str):
file_dir = Path(file_dir).resolve()

for _status in FileIndexingStatus:
status_file_dir = Path(str(file_dir) + f"/{_status.value}")

async with PipelinesService.factory(_get_config(api_key=api_key, api_url=api_url)) as pipeline_service:
file_ids = await pipeline_service.get_pipeline_file_ids(
pipeline_name=pipeline_name, workspace_name=workspace_name, status=_status
)

async with FilesService.factory(_get_config(api_key=api_key, api_url=api_url)) as file_service:
# WARNING: This filter might explode if we have many files that failed or did not
# create documents. We need to use the download method to get the file names here.

# chunk the file_ids into batches of 10 to avoid a too long URL
for chunk in _chunked(file_ids, 10):
await file_service.download(
workspace_name=workspace_name,
file_dir=status_file_dir,
odata_filter=" or ".join([f"file_id eq '{_id}'" for _id in chunk]),
include_meta=True,
batch_size=batch_size,
show_progress=show_progress,
timeout_s=timeout_s,
)


async def upload_texts(
files: List[DeepsetCloudFile],
api_key: Optional[str] = None,
Expand Down
39 changes: 39 additions & 0 deletions deepset_cloud_sdk/workflows/sync_client/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from deepset_cloud_sdk._s3.upload import S3UploadSummary
from deepset_cloud_sdk._service.files_service import DeepsetCloudFile
from deepset_cloud_sdk.workflows.async_client.files import download as async_download
from deepset_cloud_sdk.workflows.async_client.files import (
download_pipeline_files as async_download_pipeline_files,
)
from deepset_cloud_sdk.workflows.async_client.files import (
get_upload_session as async_get_upload_session,
)
Expand Down Expand Up @@ -128,6 +131,42 @@ def download( # pylint: disable=too-many-arguments
)


def download_pipeline_files(
pipeline_name: str,
workspace_name: str = DEFAULT_WORKSPACE_NAME,
file_dir: Optional[Union[Path, str]] = None,
batch_size: int = 50,
api_key: Optional[str] = None,
api_url: Optional[str] = None,
show_progress: bool = True,
timeout_s: Optional[int] = None,
) -> None:
"""Download all files that led to a error status during indexing.

:param pipeline_name: Name of the pipeline.
:param workspace_name: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default.
:param file_dir: Path to the folder to download. If the folder contains unsupported files, they're skipped.
during the upload. Supported file formats are TXT and PDF.
:param batch_size: Batch size for the listing.
:param api_key: API key to use for authentication.
:param api_url: API URL to use for authentication.
:param show_progress: Shows the upload progress.
:param timeout_s: Timeout in seconds for the download.
"""
asyncio.run(
async_download_pipeline_files(
api_key=api_key,
api_url=api_url,
workspace_name=workspace_name,
pipeline_name=pipeline_name,
file_dir=file_dir,
batch_size=batch_size,
show_progress=show_progress,
timeout_s=timeout_s,
)
)


def upload_texts(
files: List[DeepsetCloudFile],
api_key: Optional[str] = None,
Expand Down
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from deepset_cloud_sdk._api.config import CommonConfig
from deepset_cloud_sdk._api.deepset_cloud_api import DeepsetCloudAPI
from deepset_cloud_sdk._api.files import FilesAPI
from deepset_cloud_sdk._api.pipelines import PipelinesAPI
from deepset_cloud_sdk._api.upload_sessions import (
AWSPrefixedRequestConfig,
UploadSession,
Expand Down Expand Up @@ -71,6 +72,11 @@ def mocked_upload_sessions_api() -> Mock:
return Mock(spec=UploadSessionsAPI)


@pytest.fixture
def mocked_pipelines() -> Mock:
return Mock(spec=PipelinesAPI)


@pytest.fixture
def mocked_files_api() -> Mock:
return Mock(spec=FilesAPI)
Expand Down
Loading