|
| 1 | +import time |
| 2 | + |
| 3 | +import requests |
| 4 | +import urllib3 |
| 5 | + |
| 6 | + |
| 7 | +class ArgoClient: |
| 8 | + """Client for interacting with Argo Workflows REST API.""" |
| 9 | + |
| 10 | + def __init__( |
| 11 | + self, url: str, token: str | None, namespace="argo-events", ssl_verify=False |
| 12 | + ): |
| 13 | + """Initialize the Argo client. |
| 14 | +
|
| 15 | + Args: |
| 16 | + url: Base URL of the Argo Workflows server |
| 17 | + (Optional) token: Authentication token for API access. If not provided |
| 18 | + the default token from |
| 19 | + /var/run/secrets/kubernetes.io/serviceaccount/token is used. |
| 20 | + """ |
| 21 | + self.url = url.rstrip("/") |
| 22 | + self.token = token or self._kubernetes_token |
| 23 | + self.session = requests.Session() |
| 24 | + self.session.verify = ssl_verify |
| 25 | + if not ssl_verify: |
| 26 | + urllib3.disable_warnings() |
| 27 | + self.session.headers.update( |
| 28 | + { |
| 29 | + "Authorization": f"Bearer {self.token}", |
| 30 | + "Content-Type": "application/json", |
| 31 | + } |
| 32 | + ) |
| 33 | + self.namespace = namespace |
| 34 | + |
| 35 | + def _generate_workflow_name(self, playbook_name: str) -> str: |
| 36 | + """Generate workflow name based on playbook name. |
| 37 | +
|
| 38 | + Strips .yaml/.yml suffix and creates ansible-<name>- format. |
| 39 | +
|
| 40 | + Args: |
| 41 | + playbook_name: Name of the Ansible playbook |
| 42 | +
|
| 43 | + Returns: |
| 44 | + str: Generated workflow name in format ansible-<name>- |
| 45 | +
|
| 46 | + Examples: |
| 47 | + storage_on_server_create.yml -> ansible-storage_on_server_create- |
| 48 | + network_setup.yaml -> ansible-network_setup- |
| 49 | + deploy_app -> ansible-deploy_app- |
| 50 | + """ |
| 51 | + base_name = playbook_name.replace("_", "-") |
| 52 | + if base_name.endswith((".yaml", ".yml")): |
| 53 | + base_name = base_name.rsplit(".", 1)[0] |
| 54 | + return f"ansible-{base_name}-" |
| 55 | + |
| 56 | + @property |
| 57 | + def _kubernetes_token(self) -> str: |
| 58 | + """Reads pod's Kubernetes token. |
| 59 | +
|
| 60 | + Args: |
| 61 | + None |
| 62 | + Returns: |
| 63 | + str: value of the token |
| 64 | + """ |
| 65 | + with open("/var/run/secrets/kubernetes.io/serviceaccount/token") as f: |
| 66 | + return f.read() |
| 67 | + |
| 68 | + def run_playbook(self, playbook_name: str, **extra_vars) -> dict: |
| 69 | + """Run an Ansible playbook via Argo Workflows. |
| 70 | +
|
| 71 | + This method creates a workflow from the ansible-workflow-template and waits |
| 72 | + for it to complete synchronously. |
| 73 | +
|
| 74 | + Args: |
| 75 | + playbook_name: Name of the Ansible playbook to run |
| 76 | + **extra_vars: Arbitrary key/value pairs to pass as extra_vars to Ansible |
| 77 | +
|
| 78 | + Returns: |
| 79 | + dict: The final workflow status |
| 80 | +
|
| 81 | + Raises: |
| 82 | + requests.RequestException: If API requests fail |
| 83 | + RuntimeError: If workflow fails or times out |
| 84 | + """ |
| 85 | + # Convert extra_vars dict to space-separated key=value string |
| 86 | + extra_vars_str = " ".join(f"{key}={value}" for key, value in extra_vars.items()) |
| 87 | + |
| 88 | + # Generate workflow name based on playbook name |
| 89 | + generate_name = self._generate_workflow_name(playbook_name) |
| 90 | + |
| 91 | + # Create workflow from template |
| 92 | + workflow_request = { |
| 93 | + "workflow": { |
| 94 | + "metadata": {"generateName": generate_name}, |
| 95 | + "spec": { |
| 96 | + "workflowTemplateRef": {"name": "ansible-workflow-template"}, |
| 97 | + "entrypoint": "ansible-run", |
| 98 | + "arguments": { |
| 99 | + "parameters": [ |
| 100 | + {"name": "playbook", "value": playbook_name}, |
| 101 | + { |
| 102 | + "name": "extra_vars", |
| 103 | + "value": extra_vars_str, |
| 104 | + }, |
| 105 | + ] |
| 106 | + }, |
| 107 | + }, |
| 108 | + } |
| 109 | + } |
| 110 | + |
| 111 | + # Submit workflow |
| 112 | + response = self.session.post( |
| 113 | + f"{self.url}/api/v1/workflows/{self.namespace}", json=workflow_request |
| 114 | + ) |
| 115 | + response.raise_for_status() |
| 116 | + |
| 117 | + workflow = response.json() |
| 118 | + workflow_name = workflow["metadata"]["name"] |
| 119 | + |
| 120 | + # Wait for workflow completion |
| 121 | + return self._wait_for_completion(workflow_name) |
| 122 | + |
| 123 | + def _wait_for_completion( |
| 124 | + self, workflow_name: str, timeout: int = 600, poll_interval: int = 5 |
| 125 | + ) -> dict: |
| 126 | + """Wait for workflow to complete. |
| 127 | +
|
| 128 | + Args: |
| 129 | + workflow_name: Name of the workflow to monitor |
| 130 | + timeout: Maximum time to wait in seconds (default: 10 minutes) |
| 131 | + poll_interval: Time between status checks in seconds |
| 132 | +
|
| 133 | + Returns: |
| 134 | + dict: Final workflow status |
| 135 | +
|
| 136 | + Raises: |
| 137 | + RuntimeError: If workflow fails or times out |
| 138 | + """ |
| 139 | + start_time = time.time() |
| 140 | + |
| 141 | + while time.time() - start_time < timeout: |
| 142 | + response = self.session.get( |
| 143 | + f"{self.url}/api/v1/workflows/{self.namespace}/{workflow_name}" |
| 144 | + ) |
| 145 | + response.raise_for_status() |
| 146 | + |
| 147 | + workflow = response.json() |
| 148 | + phase = workflow.get("status", {}).get("phase") |
| 149 | + |
| 150 | + if phase == "Succeeded": |
| 151 | + return workflow |
| 152 | + elif phase == "Failed": |
| 153 | + status = workflow.get("status", {}).get("message", "Unknown error") |
| 154 | + raise RuntimeError(f"Workflow {workflow_name} failed: {status}") |
| 155 | + elif phase == "Error": |
| 156 | + status = workflow.get("status", {}).get("message", "Unknown error") |
| 157 | + raise RuntimeError( |
| 158 | + f"Workflow {workflow_name} encountered an error: {status}" |
| 159 | + ) |
| 160 | + |
| 161 | + time.sleep(poll_interval) |
| 162 | + |
| 163 | + raise RuntimeError( |
| 164 | + f"Workflow {workflow_name} timed out after {timeout} seconds" |
| 165 | + ) |
0 commit comments