diff --git a/bloom_lims/api/v1/worksets.py b/bloom_lims/api/v1/worksets.py
index 4534ee3..d20495b 100644
--- a/bloom_lims/api/v1/worksets.py
+++ b/bloom_lims/api/v1/worksets.py
@@ -98,14 +98,18 @@ async def get_workset(euid: str, user: APIUser = Depends(require_api_auth)):
bdb = get_bdb(user.email)
bob = get_bob(bdb)
- workset = bob.get_by_euid(euid)
+ try:
+ workset = bob.get_by_euid(euid)
+ except Exception:
+ workset = None
+
if not workset:
raise HTTPException(status_code=404, detail=f"Workset not found: {euid}")
if workset.category != "subject" or workset.type != "workset":
raise HTTPException(status_code=404, detail=f"Not a workset: {euid}")
- props = workset.json_addl.get("properties", {})
+ props = workset.json_addl.get("properties", {}) if workset.json_addl else {}
members = subjecting.list_members_for_subject(bob, euid)
return {
@@ -181,7 +185,11 @@ async def add_workset_members(
bob = get_bob(bdb)
# Verify workset exists
- workset = bob.get_by_euid(euid)
+ try:
+ workset = bob.get_by_euid(euid)
+ except Exception:
+ workset = None
+
if not workset:
raise HTTPException(status_code=404, detail=f"Workset not found: {euid}")
@@ -217,7 +225,11 @@ async def get_workset_members(euid: str, user: APIUser = Depends(require_api_aut
bob = get_bob(bdb)
# Verify workset exists
- workset = bob.get_by_euid(euid)
+ try:
+ workset = bob.get_by_euid(euid)
+ except Exception:
+ workset = None
+
if not workset:
raise HTTPException(status_code=404, detail=f"Workset not found: {euid}")
@@ -254,7 +266,11 @@ async def complete_workset(
bob = get_bob(bdb)
# Verify workset exists
- workset = bob.get_by_euid(euid)
+ try:
+ workset = bob.get_by_euid(euid)
+ except Exception:
+ workset = None
+
if not workset:
raise HTTPException(status_code=404, detail=f"Workset not found: {euid}")
diff --git a/templates/legacy/workflow_details.html b/templates/legacy/workflow_details.html
index 71b5824..5f99279 100644
--- a/templates/legacy/workflow_details.html
+++ b/templates/legacy/workflow_details.html
@@ -155,9 +155,10 @@
+ {# Query both container_instance AND content_instance lineages per Rule 6 #}
{% if step.parent_of_lineages %}
- {% for child_step in step.get_sorted_parent_of_lineages(['container_instance']) %}
+ {% for child_step in step.get_sorted_parent_of_lineages(['container_instance', 'content_instance']) %}
{% if child_step.child_instance.category != 'workflow_step' and udat.get('wf_filter','off') == 'off' %}
{% else %}
diff --git a/templates/modern/workflow_details.html b/templates/modern/workflow_details.html
index 15c1e46..db2a1f3 100644
--- a/templates/modern/workflow_details.html
+++ b/templates/modern/workflow_details.html
@@ -470,11 +470,12 @@ Workflow Actions
{% set is_open = accordion_states.get(step.euid, 'closed') == 'open' %}
{# Separate workflow step children from container and content children #}
+{# Query both container_instance AND content_instance lineages per Rule 6 (polymorphic identity naming) #}
{% set workflow_children = [] %}
{% set container_children = [] %}
{% set content_children = [] %}
{% if step.parent_of_lineages %}
- {% for child_lineage in step.get_sorted_parent_of_lineages(['container_instance']) %}
+ {% for child_lineage in step.get_sorted_parent_of_lineages(['container_instance', 'content_instance']) %}
{% set child = child_lineage.child_instance %}
{% if child.category == 'workflow_step' %}
{% set _ = workflow_children.append(child) %}
diff --git a/tests/test_api_v1.py b/tests/test_api_v1.py
index e38da5d..7846475 100644
--- a/tests/test_api_v1.py
+++ b/tests/test_api_v1.py
@@ -1106,3 +1106,95 @@ def test_task_cancel(self, client):
"""Test task cancel endpoint."""
response = client.post("/api/v1/tasks/00000000-0000-0000-0000-000000000000/cancel")
assert response.status_code in [200, 400, 404, 422, 500]
+
+
+class TestWorksetsAPI:
+ """Tests for /api/v1/worksets endpoints."""
+
+ def test_list_worksets(self, client):
+ """Test listing worksets."""
+ response = client.get("/api/v1/worksets/")
+ assert response.status_code == 200
+ data = response.json()
+ assert "items" in data
+ assert "total" in data
+ assert "page" in data
+ assert "page_size" in data
+
+ def test_list_worksets_with_filters(self, client):
+ """Test listing worksets with status filter."""
+ response = client.get("/api/v1/worksets/?status=complete&page_size=10")
+ assert response.status_code == 200
+ data = response.json()
+ assert data["page_size"] == 10
+
+ def test_list_worksets_with_workflow_filter(self, client):
+ """Test listing worksets with workflow filter."""
+ response = client.get("/api/v1/worksets/?workflow_euid=AY1")
+ assert response.status_code == 200
+ data = response.json()
+ assert "items" in data
+
+ def test_get_workset_not_found(self, client):
+ """Test getting non-existent workset."""
+ # Use a truly non-existent EUID pattern that won't match any object
+ response = client.get("/api/v1/worksets/ZZZZZ_DOES_NOT_EXIST_99999")
+ assert response.status_code == 404
+
+ def test_get_workset_not_a_workset(self, client):
+ """Test getting an object that exists but is not a workset."""
+ # GT1 is a template, not a workset
+ response = client.get("/api/v1/worksets/GT1")
+ assert response.status_code == 404
+
+ def test_create_workset_response_structure(self, client):
+ """Test creating workset returns expected response structure."""
+ # Note: This may create a workset even with non-existent anchor
+ # depending on implementation. We just verify the response structure.
+ response = client.post(
+ "/api/v1/worksets/",
+ json={
+ "anchor_euid": "WSX1", # Use a likely existing workflow step
+ "workset_type": "accession",
+ }
+ )
+ # Should return 200 with success structure or 400/500 on error
+ assert response.status_code in [200, 400, 500]
+ if response.status_code == 200:
+ data = response.json()
+ assert "success" in data
+ assert "euid" in data or "message" in data
+
+ def test_add_members_workset_not_found(self, client):
+ """Test adding members to non-existent workset."""
+ response = client.post(
+ "/api/v1/worksets/ZZZZZ_DOES_NOT_EXIST_99999/members",
+ json={"member_euids": ["CX1", "CX2"]}
+ )
+ assert response.status_code == 404
+
+ def test_get_members_workset_not_found(self, client):
+ """Test getting members of non-existent workset."""
+ response = client.get("/api/v1/worksets/ZZZZZ_DOES_NOT_EXIST_99999/members")
+ assert response.status_code == 404
+
+ def test_complete_workset_not_found(self, client):
+ """Test completing non-existent workset."""
+ response = client.put(
+ "/api/v1/worksets/ZZZZZ_DOES_NOT_EXIST_99999/complete",
+ json={"status": "complete"}
+ )
+ assert response.status_code == 404
+
+ def test_get_workset_by_anchor_response(self, client):
+ """Test finding workset by anchor returns expected structure."""
+ # Use a unique anchor that likely doesn't have a workset
+ import uuid
+ unique_anchor = f"TEST_ANCHOR_{uuid.uuid4().hex[:8]}"
+ response = client.get(f"/api/v1/worksets/by-anchor/{unique_anchor}")
+ # Should return 404 for non-existent anchor, or 200 with workset info
+ assert response.status_code in [200, 404]
+ if response.status_code == 200:
+ data = response.json()
+ assert "euid" in data
+ assert "anchor_euid" in data
\ No newline at end of file