diff --git a/example.py b/example.py index 98cd53aa6..dbf07db02 100644 --- a/example.py +++ b/example.py @@ -1,10 +1,16 @@ +import logging +from pprint import pprint + +logging.basicConfig(level=logging.DEBUG) + import sailpoint -import sailpoint.v3 import sailpoint.beta +import sailpoint.v3 +import sailpoint.v2025 from sailpoint.configuration import Configuration from sailpoint.paginator import Paginator from sailpoint.v3.models.search import Search -from pprint import pprint +from sailpoint.v2025.models.account import Account configuration = Configuration() @@ -36,7 +42,7 @@ "Exception when calling AccessProfilesApi->list_access_profiles: %s\n" % e ) - # Use the paginator with search + #Use the paginator with search search = Search() search.indices = ['identities'] @@ -46,8 +52,16 @@ identities = Paginator.paginate_search(sailpoint.v3.SearchApi(api_client),search, 250, 1000) for identity in identities: print(identity['name']) - + # Stream search results using paginate_stream_search + search_stream = Search() + search_stream.indices = ['identities'] + search_stream.query = { 'query': '*' } + search_stream.sort = ['-name'] + + print("Streaming search results (paginate_stream_search):\n") + for identity in Paginator.paginate_stream_search(sailpoint.v3.SearchApi(api_client), search_stream, 250, 1000): + print(identity['name']) # Use the paginator to paginate 1000 accounts 100 at a time accounts = Paginator.paginate(sailpoint.v3.AccountsApi(api_client).list_accounts, 1000, limit=100) @@ -60,4 +74,36 @@ workgroups = sailpoint.beta.GovernanceGroupsApi(api_client).list_workgroups() for workgroup in workgroups: - print(workgroup.name) \ No newline at end of file + print(workgroup.name) + +#Stream v2025 accounts with optional model typing +with sailpoint.v2025.ApiClient(configuration) as api_client: + try: + account_stream = Paginator.paginate_stream( + sailpoint.v2025.AccountsApi(api_client).list_accounts, + 1000, + limit=100, + model=Account + ) + print("Streaming v2025 accounts (paginate_stream with model=Account):\n") + for account in account_stream: + print(account.name) + except Exception as e: + print("Exception when streaming accounts: %s\n" % e) + +# Stream v2025 accounts with HTTP info (status code, headers) and optional model typing +with sailpoint.v2025.ApiClient(configuration) as api_client: + try: + account_stream = Paginator.paginate_stream_with_http_info( + sailpoint.v2025.AccountsApi(api_client).list_accounts_with_http_info, + 1000, + limit=100, + model=Account + ) + print("Streaming v2025 accounts (paginate_stream_with_http_info):\n") + for account, response in account_stream: + print(f"[{response.status_code}] {account.name}") + except Exception as e: + print("Exception when streaming accounts with http info: %s\n" % e) + + diff --git a/sailpoint/paginator.py b/sailpoint/paginator.py index de3a07d8e..cebd23aed 100644 --- a/sailpoint/paginator.py +++ b/sailpoint/paginator.py @@ -1,8 +1,12 @@ +import logging from sailpoint.v3.api.search_api import SearchApi from sailpoint.v3.models.search import Search -from typing import TypeVar +from typing import Any, Callable, Iterator, Optional, Tuple, Type, TypeVar, overload T = TypeVar('T') +TItem = TypeVar('TItem') + +logger = logging.getLogger(__name__) class PaginationParams: limit: int @@ -27,7 +31,7 @@ def paginate(T, result_limit, **kwargs) -> T: modified = [] while True: - print(f'Paginating call, offset = {kwargs["offset"]}') + logger.debug(f'Paginating call, offset = {kwargs["offset"]}') # Call endpoint and pass any arguments results = T(**kwargs) @@ -48,6 +52,188 @@ def paginate(T, result_limit, **kwargs) -> T: kwargs['offset'] += increment + @overload + @staticmethod + def paginate_stream( + api_call: Callable[..., Any], + result_limit: Optional[int] = None, + *, + model: Type[TItem], + **kwargs + ) -> Iterator[TItem]: ... + + @overload + @staticmethod + def paginate_stream( + api_call: Callable[..., Any], + result_limit: Optional[int] = None, + **kwargs + ) -> Iterator[Any]: ... + + @staticmethod + def paginate_stream( + api_call: Callable[..., Any], + result_limit: Optional[int] = None, + *, + model: Optional[Type[TItem]] = None, + **kwargs + ) -> Iterator[TItem]: + """ + Stream paginated results by yielding items as each API page is received. + When model is provided, the iterator is typed as Iterator[model] for IDE support. + """ + result_limit = result_limit if result_limit else 1000 + increment = kwargs.get('limit') if kwargs.get('limit') is not None else 250 + kwargs['offset'] = kwargs.get('offset') if kwargs.get('offset') is not None else 0 + yielded = 0 + + while True: + logger.debug(f'Paginating call, offset = {kwargs["offset"]}') + + results = api_call(**kwargs) + + if isinstance(results, list): + batch = results + else: + batch = results.data + + for item in batch: + yield item + yielded += 1 + if result_limit > 0 and yielded >= result_limit: + return + + if len(batch) < increment: + return + + kwargs['offset'] += increment + + @overload + @staticmethod + def paginate_stream_with_http_info( + api_call: Callable[..., Any], + result_limit: Optional[int] = None, + *, + model: Type[TItem], + **kwargs + ) -> Iterator[Tuple[TItem, Any]]: ... + + @overload + @staticmethod + def paginate_stream_with_http_info( + api_call: Callable[..., Any], + result_limit: Optional[int] = None, + **kwargs + ) -> Iterator[Tuple[Any, Any]]: ... + + @staticmethod + def paginate_stream_with_http_info( + api_call: Callable[..., Any], + result_limit: Optional[int] = None, + *, + model: Optional[Type[TItem]] = None, + **kwargs + ) -> Iterator[Tuple[TItem, Any]]: + """ + Stream paginated results from a _with_http_info API call. + Yields (item, response) tuples so callers can inspect status_code/headers + for every page, not just the first. + When model is provided, items in the tuples are typed as model for IDE support. + """ + result_limit = result_limit if result_limit else 1000 + increment = kwargs.get('limit') if kwargs.get('limit') is not None else 250 + kwargs['offset'] = kwargs.get('offset') if kwargs.get('offset') is not None else 0 + yielded = 0 + + while True: + logger.debug(f'Paginating call, offset = {kwargs["offset"]}') + response = api_call(**kwargs) + batch = response.data + + for item in batch: + yield (item, response) + yielded += 1 + if result_limit > 0 and yielded >= result_limit: + return + + if len(batch) < increment: + return + + kwargs['offset'] += increment + + @staticmethod + def paginate_stream_search(search_api: SearchApi, search: Search, increment: int, limit: int): + """ + Stream search results by yielding each result as it is received from each API page. + """ + increment = increment if increment else 250 + max_limit = limit if limit else 0 + yielded = 0 + + if search.sort is None or len(search.sort) != 1: + raise Exception('search query must include exactly one sort parameter to paginate properly') + + while True: + logger.debug('Paginating call') + results = search_api.search_post(search, None, increment) + + for result in results: + yield result + yielded += 1 + if max_limit > 0 and yielded >= max_limit: + return + + logger.debug(f'Received {len(results)} results') + + if len(results) < increment: + return + + result = results[len(results) - 1] + if result[search.sort[0].strip('+-')] is not None: + next_search_after = result[str(search.sort[0]).strip('+-')] + search.search_after = [next_search_after] + else: + raise Exception('Search unexpectedly did not return a result we can search after!') + + @staticmethod + def paginate_stream_search_with_http_info( + search_api: SearchApi, search: Search, increment: int, limit: int + ) -> Iterator[Tuple[Any, Any]]: + """ + Stream search results from search_post_with_http_info. + Yields (item, response) tuples so callers can inspect status_code/headers + for every page, not just the first. + """ + increment = increment if increment else 250 + max_limit = limit if limit else 0 + yielded = 0 + + if search.sort is None or len(search.sort) != 1: + raise Exception('search query must include exactly one sort parameter to paginate properly') + + while True: + logger.debug('Paginating call') + response = search_api.search_post_with_http_info(search, None, increment) + batch = response.data + + for result in batch: + yield (result, response) + yielded += 1 + if max_limit > 0 and yielded >= max_limit: + return + + logger.debug(f'Received {len(batch)} results') + + if len(batch) < increment: + return + + last = batch[len(batch) - 1] + if last[search.sort[0].strip('+-')] is not None: + next_search_after = last[str(search.sort[0]).strip('+-')] + search.search_after = [next_search_after] + else: + raise Exception('Search unexpectedly did not return a result we can search after!') + @staticmethod def paginate_search(search_api: SearchApi, search: Search, increment: int, limit: int): increment = increment if increment else 250 @@ -60,11 +246,11 @@ def paginate_search(search_api: SearchApi, search: Search, increment: int, limit raise Exception('search query must include exactly one sort parameter to paginate properly') while True: - print(f'Paginating call, offset = {offset}') + logger.debug(f'Paginating call, offset = {offset}') results = search_api.search_post(search, None, increment) modified = modified + results - print(f'Received {len(results)} results') + logger.debug(f'Received {len(results)} results') if len(results) < increment or (len(modified) >= max_limit and max_limit > 0): results = modified @@ -91,11 +277,11 @@ def paginate_search_with_http_info(search_api: SearchApi, search: Search, increm raise Exception('search query must include exactly one sort parameter to paginate properly') while True: - print(f'Paginating call, offset = {offset}') + logger.debug(f'Paginating call, offset = {offset}') results = search_api.search_post_with_http_info(search, None, increment) modified = modified + results.data - print(f'Recieved {len(results.data)} results') + logger.debug(f'Received {len(results.data)} results') if len(results.data) < increment or (len(modified) >= max_limit and max_limit > 0): results.data = modified diff --git a/validation_test.py b/validation_test.py index 89e2a6721..916bdf486 100644 --- a/validation_test.py +++ b/validation_test.py @@ -1,8 +1,10 @@ +from typing import Any import unittest import sailpoint.beta import sailpoint.v3 import sailpoint.v2024 +import sailpoint.v2025 import sailpoint.v2026 from sailpoint.configuration import Configuration, ConfigurationParams from sailpoint.paginator import Paginator @@ -16,6 +18,7 @@ class TestPythonSDK(unittest.TestCase): beta_api_client = sailpoint.beta.ApiClient(configuration) configuration.experimental = True v2024_api_client = sailpoint.v2024.ApiClient(configuration) + v2025_api_client = sailpoint.v2025.ApiClient(configuration) v2026_api_client = sailpoint.v2026.ApiClient(configuration) @@ -47,6 +50,19 @@ def test_search_pagination(self): self.assertEqual(100,len(search_results.data)) self.assertEqual(200,search_results.status_code) + def test_paginate_stream_search(self): + """Stream search yields same count as paginate_search when fully consumed.""" + search = Search() + search.indices = ['identities'] + search.query = {'query': '*'} + search.sort = ['-name'] + + stream = Paginator.paginate_stream_search( + sailpoint.v3.SearchApi(self.v3_api_client), search, 10, 100 + ) + items = list(stream) + self.assertEqual(100, len(items)) + def test_list_transforms(self): transforms = sailpoint.v3.TransformsApi(self.v3_api_client).list_transforms_with_http_info() self.assertIsNotNone(transforms.data) @@ -58,6 +74,74 @@ def test_pagination(self): self.assertEqual(100, len(accounts.data)) self.assertEqual(200, accounts.status_code) + def test_paginate_stream(self): + """Stream yields same count as paginate when fully consumed.""" + stream = Paginator.paginate_stream( + sailpoint.v3.AccountsApi(self.v3_api_client).list_accounts_with_http_info, + 100, + limit=10 + ) + items = list(stream) + self.assertEqual(100, len(items)) + + def test_paginate_stream_consumed_incrementally(self): + """Stream yields items as they come; consuming first N then stopping does not require full fetch.""" + stream = Paginator.paginate_stream( + sailpoint.v3.AccountsApi(self.v3_api_client).list_accounts_with_http_info, + 100, + limit=2 + ) + first_three = [] + for i, item in enumerate(stream): + first_three.append(item) + if i >= 2: + break + self.assertGreaterEqual(len(first_three), 1) + self.assertLessEqual(len(first_three), 3) + + def test_paginate_stream_with_model_v2025(self): + """When model=Account is passed, yielded items are typed (and are Account instances).""" + from sailpoint.v2025.models.account import Account + + stream = Paginator.paginate_stream( + sailpoint.v2025.AccountsApi(self.v2025_api_client).list_accounts_with_http_info, + 10, + limit=5, + model=Account + ) + for item in stream: + self.assertIsInstance(item, Account) + break # at least one item has correct type + + def test_paginate_stream_with_http_info(self): + """Yields (item, response) tuples; every tuple carries the response from its page.""" + stream = Paginator.paginate_stream_with_http_info( + sailpoint.v3.AccountsApi(self.v3_api_client).list_accounts_with_http_info, + 100, + limit=10 + ) + items = [] + for item, response in stream: + self.assertEqual(200, response.status_code) + items.append(item) + self.assertEqual(100, len(items)) + + def test_paginate_stream_search_with_http_info(self): + """Yields (item, response) tuples; every tuple carries the response from its page.""" + search = Search() + search.indices = ['identities'] + search.query = {'query': '*'} + search.sort = ['-name'] + + stream = Paginator.paginate_stream_search_with_http_info( + sailpoint.v3.SearchApi(self.v3_api_client), search, 10, 100 + ) + items = [] + for item, response in stream: + self.assertEqual(200, response.status_code) + items.append(item) + self.assertEqual(100, len(items)) + def test_list_accounts_beta(self): accounts = sailpoint.beta.AccountsApi(self.beta_api_client).list_accounts_with_http_info() self.assertIsNotNone(accounts.data)