Skip to content

Commit 18433b6

Browse files
committed
enable specifying concurrencyPolicy when creating argo workflow
1 parent 5e53881 commit 18433b6

File tree

4 files changed

+22
-9
lines changed

4 files changed

+22
-9
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ R/.Rbuildignore
2020

2121
.DS_Store
2222
.env
23+
.venv
2324
node_modules
2425
main.js.map
2526

metaflow/plugins/argo/argo_client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,9 @@ def trigger_workflow_template(self, name, usertype, username, parameters={}):
309309
json.loads(e.body)["message"] if e.body is not None else e.reason
310310
)
311311

312-
def schedule_workflow_template(self, name, schedule=None, timezone=None):
312+
def schedule_workflow_template(
313+
self, name, schedule=None, timezone=None, concurrency_policy=None
314+
):
313315
# Unfortunately, Kubernetes client does not handle optimistic
314316
# concurrency control by itself unlike kubectl
315317
client = self._client.get()
@@ -321,6 +323,7 @@ def schedule_workflow_template(self, name, schedule=None, timezone=None):
321323
"suspend": schedule is None,
322324
"schedule": schedule,
323325
"timezone": timezone,
326+
"concurrencyPolicy": concurrency_policy,
324327
"failedJobsHistoryLimit": 10000, # default is unfortunately 1
325328
"successfulJobsHistoryLimit": 10000, # default is unfortunately 3
326329
"workflowSpec": {"workflowTemplateRef": {"name": name}},

metaflow/plugins/argo/argo_workflows.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def __init__(
175175
self.parameters = self._process_parameters()
176176
self.config_parameters = self._process_config_parameters()
177177
self.triggers, self.trigger_options = self._process_triggers()
178-
self._schedule, self._timezone = self._get_schedule()
178+
self._schedule, self._timezone, self._concurrency_policy = self._get_schedule()
179179

180180
self._base_labels = self._base_kubernetes_labels()
181181
self._base_annotations = self._base_kubernetes_annotations()
@@ -386,14 +386,18 @@ def _get_schedule(self):
386386
if schedule:
387387
# Remove the field "Year" if it exists
388388
schedule = schedule[0]
389-
return " ".join(schedule.schedule.split()[:5]), schedule.timezone
389+
return (
390+
" ".join(schedule.schedule.split()[:5]),
391+
schedule.timezone,
392+
schedule.concurrency_policy,
393+
)
390394
return None, None
391395

392396
def schedule(self):
393397
try:
394398
argo_client = ArgoClient(namespace=KUBERNETES_NAMESPACE)
395399
argo_client.schedule_workflow_template(
396-
self.name, self._schedule, self._timezone
400+
self.name, self._schedule, self._timezone, self._concurrency_policy
397401
)
398402
# Register sensor.
399403
# Metaflow will overwrite any existing sensor.
@@ -731,11 +735,14 @@ def _compile_workflow_template(self):
731735

732736
annotations = {}
733737
if self._schedule is not None:
734-
# timezone is an optional field and json dumps on None will result in null
735-
# hence configuring it to an empty string
736-
if self._timezone is None:
737-
self._timezone = ""
738-
cron_info = {"schedule": self._schedule, "tz": self._timezone}
738+
# timezone and concurrency_policy is an optional field and json
739+
# dumps on None will result in null hence configuring it to an empty
740+
# string
741+
cron_info = {
742+
"schedule": self._schedule,
743+
"tz": self._timezone or "",
744+
"concurrency_policy": self._concurrency_policy or "",
745+
}
739746
annotations.update({"metaflow/cron": json.dumps(cron_info)})
740747

741748
if self.parameters:

metaflow/plugins/aws/step_functions/schedule_decorator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class ScheduleDecorator(FlowDecorator):
3030
"daily": True,
3131
"hourly": False,
3232
"timezone": None,
33+
"concurrency_policy": None,
3334
}
3435

3536
def flow_init(
@@ -50,3 +51,4 @@ def flow_init(
5051

5152
# Argo Workflows supports the IANA timezone standard, e.g. America/Los_Angeles
5253
self.timezone = self.attributes["timezone"]
54+
self.concurrency_policy = self.attributes["concurrency_policy"]

0 commit comments

Comments
 (0)