Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import logging
from pprint import pprint

logging.basicConfig(level=logging.DEBUG)

import sailpoint
import sailpoint.v3
import sailpoint.beta
import sailpoint.v3
import sailpoint.v2025
from sailpoint.configuration import Configuration
from sailpoint.paginator import Paginator
from sailpoint.v3.models.search import Search
from pprint import pprint
from sailpoint.v2025.models.account import Account

configuration = Configuration()

Expand Down Expand Up @@ -36,7 +42,7 @@
"Exception when calling AccessProfilesApi->list_access_profiles: %s\n" % e
)

# Use the paginator with search
#Use the paginator with search

search = Search()
search.indices = ['identities']
Expand All @@ -46,8 +52,16 @@
identities = Paginator.paginate_search(sailpoint.v3.SearchApi(api_client),search, 250, 1000)
for identity in identities:
print(identity['name'])


# Stream search results using paginate_stream_search
search_stream = Search()
search_stream.indices = ['identities']
search_stream.query = { 'query': '*' }
search_stream.sort = ['-name']

print("Streaming search results (paginate_stream_search):\n")
for identity in Paginator.paginate_stream_search(sailpoint.v3.SearchApi(api_client), search_stream, 250, 1000):
print(identity['name'])

# Use the paginator to paginate 1000 accounts 100 at a time
accounts = Paginator.paginate(sailpoint.v3.AccountsApi(api_client).list_accounts, 1000, limit=100)
Expand All @@ -60,4 +74,36 @@

workgroups = sailpoint.beta.GovernanceGroupsApi(api_client).list_workgroups()
for workgroup in workgroups:
print(workgroup.name)
print(workgroup.name)

#Stream v2025 accounts with optional model typing
with sailpoint.v2025.ApiClient(configuration) as api_client:
try:
account_stream = Paginator.paginate_stream(
sailpoint.v2025.AccountsApi(api_client).list_accounts,
1000,
limit=100,
model=Account
)
print("Streaming v2025 accounts (paginate_stream with model=Account):\n")
for account in account_stream:
print(account.name)
except Exception as e:
print("Exception when streaming accounts: %s\n" % e)

# Stream v2025 accounts with HTTP info (status code, headers) and optional model typing
with sailpoint.v2025.ApiClient(configuration) as api_client:
try:
account_stream = Paginator.paginate_stream_with_http_info(
sailpoint.v2025.AccountsApi(api_client).list_accounts_with_http_info,
1000,
limit=100,
model=Account
)
print("Streaming v2025 accounts (paginate_stream_with_http_info):\n")
for account, response in account_stream:
print(f"[{response.status_code}] {account.name}")
except Exception as e:
print("Exception when streaming accounts with http info: %s\n" % e)


198 changes: 192 additions & 6 deletions sailpoint/paginator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import logging
from sailpoint.v3.api.search_api import SearchApi
from sailpoint.v3.models.search import Search
from typing import TypeVar
from typing import Any, Callable, Iterator, Optional, Tuple, Type, TypeVar, overload

T = TypeVar('T')
TItem = TypeVar('TItem')

logger = logging.getLogger(__name__)

class PaginationParams:
limit: int
Expand All @@ -27,7 +31,7 @@ def paginate(T, result_limit, **kwargs) -> T:

modified = []
while True:
print(f'Paginating call, offset = {kwargs["offset"]}')
logger.debug(f'Paginating call, offset = {kwargs["offset"]}')

# Call endpoint and pass any arguments
results = T(**kwargs)
Expand All @@ -48,6 +52,188 @@ def paginate(T, result_limit, **kwargs) -> T:

kwargs['offset'] += increment

@overload
@staticmethod
def paginate_stream(
api_call: Callable[..., Any],
result_limit: Optional[int] = None,
*,
model: Type[TItem],
**kwargs
) -> Iterator[TItem]: ...

@overload
@staticmethod
def paginate_stream(
api_call: Callable[..., Any],
result_limit: Optional[int] = None,
**kwargs
) -> Iterator[Any]: ...

@staticmethod
def paginate_stream(
api_call: Callable[..., Any],
result_limit: Optional[int] = None,
*,
model: Optional[Type[TItem]] = None,
**kwargs
) -> Iterator[TItem]:
"""
Stream paginated results by yielding items as each API page is received.
When model is provided, the iterator is typed as Iterator[model] for IDE support.
"""
result_limit = result_limit if result_limit else 1000
increment = kwargs.get('limit') if kwargs.get('limit') is not None else 250
kwargs['offset'] = kwargs.get('offset') if kwargs.get('offset') is not None else 0
yielded = 0

while True:
logger.debug(f'Paginating call, offset = {kwargs["offset"]}')

results = api_call(**kwargs)

if isinstance(results, list):
batch = results
else:
batch = results.data

for item in batch:
yield item
yielded += 1
if result_limit > 0 and yielded >= result_limit:
return

if len(batch) < increment:
return

kwargs['offset'] += increment

@overload
@staticmethod
def paginate_stream_with_http_info(
api_call: Callable[..., Any],
result_limit: Optional[int] = None,
*,
model: Type[TItem],
**kwargs
) -> Iterator[Tuple[TItem, Any]]: ...

@overload
@staticmethod
def paginate_stream_with_http_info(
api_call: Callable[..., Any],
result_limit: Optional[int] = None,
**kwargs
) -> Iterator[Tuple[Any, Any]]: ...

@staticmethod
def paginate_stream_with_http_info(
api_call: Callable[..., Any],
result_limit: Optional[int] = None,
*,
model: Optional[Type[TItem]] = None,
**kwargs
) -> Iterator[Tuple[TItem, Any]]:
"""
Stream paginated results from a _with_http_info API call.
Yields (item, response) tuples so callers can inspect status_code/headers
for every page, not just the first.
When model is provided, items in the tuples are typed as model for IDE support.
"""
result_limit = result_limit if result_limit else 1000
increment = kwargs.get('limit') if kwargs.get('limit') is not None else 250
kwargs['offset'] = kwargs.get('offset') if kwargs.get('offset') is not None else 0
yielded = 0

while True:
logger.debug(f'Paginating call, offset = {kwargs["offset"]}')
response = api_call(**kwargs)
batch = response.data

for item in batch:
yield (item, response)
yielded += 1
if result_limit > 0 and yielded >= result_limit:
return

if len(batch) < increment:
return

kwargs['offset'] += increment

@staticmethod
def paginate_stream_search(search_api: SearchApi, search: Search, increment: int, limit: int):
"""
Stream search results by yielding each result as it is received from each API page.
"""
increment = increment if increment else 250
max_limit = limit if limit else 0
yielded = 0

if search.sort is None or len(search.sort) != 1:
raise Exception('search query must include exactly one sort parameter to paginate properly')

while True:
logger.debug('Paginating call')
results = search_api.search_post(search, None, increment)

for result in results:
yield result
yielded += 1
if max_limit > 0 and yielded >= max_limit:
return

logger.debug(f'Received {len(results)} results')

if len(results) < increment:
return

result = results[len(results) - 1]
if result[search.sort[0].strip('+-')] is not None:
next_search_after = result[str(search.sort[0]).strip('+-')]
search.search_after = [next_search_after]
else:
raise Exception('Search unexpectedly did not return a result we can search after!')

@staticmethod
def paginate_stream_search_with_http_info(
search_api: SearchApi, search: Search, increment: int, limit: int
) -> Iterator[Tuple[Any, Any]]:
"""
Stream search results from search_post_with_http_info.
Yields (item, response) tuples so callers can inspect status_code/headers
for every page, not just the first.
"""
increment = increment if increment else 250
max_limit = limit if limit else 0
yielded = 0

if search.sort is None or len(search.sort) != 1:
raise Exception('search query must include exactly one sort parameter to paginate properly')

while True:
logger.debug('Paginating call')
response = search_api.search_post_with_http_info(search, None, increment)
batch = response.data

for result in batch:
yield (result, response)
yielded += 1
if max_limit > 0 and yielded >= max_limit:
return

logger.debug(f'Received {len(batch)} results')

if len(batch) < increment:
return

last = batch[len(batch) - 1]
if last[search.sort[0].strip('+-')] is not None:
next_search_after = last[str(search.sort[0]).strip('+-')]
search.search_after = [next_search_after]
else:
raise Exception('Search unexpectedly did not return a result we can search after!')

@staticmethod
def paginate_search(search_api: SearchApi, search: Search, increment: int, limit: int):
increment = increment if increment else 250
Expand All @@ -60,11 +246,11 @@ def paginate_search(search_api: SearchApi, search: Search, increment: int, limit
raise Exception('search query must include exactly one sort parameter to paginate properly')

while True:
print(f'Paginating call, offset = {offset}')
logger.debug(f'Paginating call, offset = {offset}')
results = search_api.search_post(search, None, increment)
modified = modified + results

print(f'Received {len(results)} results')
logger.debug(f'Received {len(results)} results')

if len(results) < increment or (len(modified) >= max_limit and max_limit > 0):
results = modified
Expand All @@ -91,11 +277,11 @@ def paginate_search_with_http_info(search_api: SearchApi, search: Search, increm
raise Exception('search query must include exactly one sort parameter to paginate properly')

while True:
print(f'Paginating call, offset = {offset}')
logger.debug(f'Paginating call, offset = {offset}')
results = search_api.search_post_with_http_info(search, None, increment)
modified = modified + results.data

print(f'Recieved {len(results.data)} results')
logger.debug(f'Received {len(results.data)} results')

if len(results.data) < increment or (len(modified) >= max_limit and max_limit > 0):
results.data = modified
Expand Down
Loading