Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
<!-- markdownlint-disable MD012 MD013 MD024 MD033 -->
# Change Log

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](https://semver.org/)

## [Unreleased]

### Fixed

- move connection test from init to execute phase (all tasks)
- this avoids project loading errors
- Fix typos and grammar in parameter descriptions

### Added

- testing infrastructure based in testcontainers


## [1.0.0] 2025-07-18

### Changed
Expand Down
30 changes: 21 additions & 9 deletions cmem_plugin_ssh/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
PluginParameter(
name="username",
label="Username",
description="The username of which a connection will be instantiated.",
description="The username with which a connection will be instantiated.",
),
PluginParameter(
name="authentication_method",
Expand All @@ -106,14 +106,18 @@
name="password",
label="Password",
description="Depending on your authentication method this will either be used to"
"connect via password to SSH or is used to decrypt the SSH private key",
"connect via password to SSH, or to decrypt the SSH private key",
param_type=PasswordParameterType(),
default_value="",
),
PluginParameter(
name="path",
label="Path",
description="The currently selected path withing your SSH instance.",
description=(
"The currently selected path within your SSH instance."
" Auto-completion starts from user home folder, use '..' for parent directory"
" or '/' for root directory."
),
default_value="",
param_type=DirectoryParameterType("directories", "Folder"),
),
Expand Down Expand Up @@ -158,6 +162,9 @@
class DownloadFiles(WorkflowPlugin):
"""SSH Workflow Plugin: File download"""

ssh_client: paramiko.SSHClient
sftp: paramiko.SFTPClient

def __init__( # noqa: PLR0913
self,
hostname: str,
Expand Down Expand Up @@ -186,11 +193,8 @@ def __init__( # noqa: PLR0913
self.input_ports = FixedNumberOfInputs([FixedSchemaPort(schema=generate_list_schema())])
self.output_port = FixedSchemaPort(schema=FileEntitySchema())
self.download_dir = tempfile.mkdtemp()
self.ssh_client = paramiko.SSHClient()
self.connect_ssh_client()
self.sftp = self.ssh_client.open_sftp()

def connect_ssh_client(self) -> None:
def establish_ssh_connection(self) -> None:
"""Connect to the ssh client with the selected authentication method"""
if self.authentication_method == "key":
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
Expand All @@ -212,13 +216,19 @@ def connect_ssh_client(self) -> None:
timeout=20,
)

def close_connections(self) -> None:
def cleanup_ssh_connections(self) -> None:
"""Close connection from sftp and ssh"""
self.sftp.close()
self.ssh_client.close()

def _initialize_ssh_and_sftp_connections(self) -> None:
self.ssh_client = paramiko.SSHClient()
self.establish_ssh_connection()
self.sftp = self.ssh_client.open_sftp()

def preview_results(self) -> str:
"""Preview the results of an execution"""
self._initialize_ssh_and_sftp_connections()
return preview_results(
ssh_client=self.ssh_client,
no_subfolder=self.no_subfolder,
Expand All @@ -233,6 +243,8 @@ def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Enti
_ = inputs
schema = FileEntitySchema()

self._initialize_ssh_and_sftp_connections()

context.report.update(
ExecutionReport(entity_count=0, operation="wait", operation_desc="files listed.")
)
Expand Down Expand Up @@ -286,7 +298,7 @@ def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Enti

self.update_context(context, entities, files, schema)

self.close_connections()
self.cleanup_ssh_connections()

return Entities(entities=iter(entities), schema=schema)

Expand Down
33 changes: 22 additions & 11 deletions cmem_plugin_ssh/execute_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def setup_timeout(timeout: float) -> None | float:
PluginParameter(
name="username",
label="Username",
description="The username of which a connection will be instantiated.",
description="The username with which a connection will be instantiated.",
),
PluginParameter(
name="authentication_method",
Expand All @@ -119,21 +119,25 @@ def setup_timeout(timeout: float) -> None | float:
name="password",
label="Password",
description="Depending on your authentication method this will either be used to"
"connect via password to SSH or is used to decrypt the SSH private key",
"connect via password to SSH, or to decrypt the SSH private key",
param_type=PasswordParameterType(),
default_value="",
),
PluginParameter(
name="path",
label="Path",
description="The currently selected path withing your SSH instance.",
description=(
"The currently selected path within your SSH instance."
" Auto-completion starts from user home folder, use '..' for parent directory"
" or '/' for root directory."
),
default_value="",
param_type=DirectoryParameterType("directories", "Folder"),
),
PluginParameter(
name="input_method",
label="Input method",
description="Parameter to decide weather files will be used as stdin or no input is "
description="Parameter to decide whether files will be used as stdin or no input is "
"needed. If 'File input' is chosen, the input port will open for all entities with"
"the FileEntitySchema.",
param_type=ChoiceParameterType(COMMAND_INPUT_CHOICES),
Expand All @@ -142,7 +146,7 @@ def setup_timeout(timeout: float) -> None | float:
name="output_method",
label="Output method",
description="Parameter to decide which type of output the user wants. This can be "
"either no output, a structured process output with its own schema or "
"either no output, a structured process output with its own schema, or "
"a file based output",
param_type=ChoiceParameterType(COMMAND_OUTPUT_CHOICES),
),
Expand All @@ -164,6 +168,9 @@ def setup_timeout(timeout: float) -> None | float:
class ExecuteCommands(WorkflowPlugin):
"""Execute commands Plugin SSH"""

ssh_client: paramiko.SSHClient
sftp: paramiko.SFTPClient

def __init__( # noqa: PLR0913
self,
hostname: str,
Expand Down Expand Up @@ -191,11 +198,8 @@ def __init__( # noqa: PLR0913
self.timeout = setup_timeout(timeout)
self.input_ports = self.setup_input_port()
self.output_port = self.setup_output_port()
self.ssh_client = paramiko.SSHClient()
self.connect_ssh_client()
self.sftp = self.ssh_client.open_sftp()

def connect_ssh_client(self) -> None:
def establish_ssh_connection(self) -> None:
"""Connect to the ssh client with the selected authentication method"""
if self.authentication_method == "key":
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
Expand All @@ -217,14 +221,21 @@ def connect_ssh_client(self) -> None:
timeout=20,
)

def close_connections(self) -> None:
def cleanup_ssh_connections(self) -> None:
"""Close connection from sftp and ssh"""
self.sftp.close()
self.ssh_client.close()

def _initialize_ssh_and_sftp_connections(self) -> None:
self.ssh_client = paramiko.SSHClient()
self.establish_ssh_connection()
self.sftp = self.ssh_client.open_sftp()

def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Entities:
"""Execute the workflow task"""
entities: list = []

self._initialize_ssh_and_sftp_connections()
context.report.update(
ExecutionReport(
entity_count=len(entities),
Expand All @@ -238,7 +249,7 @@ def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Enti
if self.input_method == "no_input":
self.no_input_execution(entities)

self.close_connections()
self.cleanup_ssh_connections()

operation_desc = (
f"times executed '{self.command}'"
Expand Down
32 changes: 22 additions & 10 deletions cmem_plugin_ssh/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
PluginParameter(
name="hostname",
label="Hostname",
description="Hostname to connect to.Usually in the form of an IP address",
description="Hostname to connect to. Usually in the form of an IP address",
),
PluginParameter(
name="port",
Expand All @@ -79,7 +79,7 @@
PluginParameter(
name="username",
label="Username",
description="The username of which a connection will be instantiated.",
description="The username with which a connection will be instantiated.",
),
PluginParameter(
name="authentication_method",
Expand All @@ -99,14 +99,18 @@
name="password",
label="Password",
description="Depending on your authentication method this will either be used to"
"connect via password to SSH or is used to decrypt the SSH private key",
"connect via password to SSH, or to decrypt the SSH private key",
param_type=PasswordParameterType(),
default_value="",
),
PluginParameter(
name="path",
label="Path",
description="The currently selected path withing your SSH instance.",
description=(
"The currently selected path within your SSH instance."
" Auto-completion starts from user home folder, use '..' for parent directory"
" or '/' for root directory."
),
default_value="",
param_type=DirectoryParameterType("directories", "Folder"),
),
Expand Down Expand Up @@ -151,6 +155,9 @@
class ListFiles(WorkflowPlugin):
"""List Plugin SSH"""

ssh_client: paramiko.SSHClient
sftp: paramiko.SFTPClient

def __init__( # noqa: PLR0913
self,
hostname: str,
Expand Down Expand Up @@ -178,16 +185,13 @@ def __init__( # noqa: PLR0913
self.max_workers = setup_max_workers(max_workers)
self.input_ports = FixedNumberOfInputs([])
self.output_port = FixedSchemaPort(schema=generate_list_schema())
self.ssh_client = paramiko.SSHClient()
self.connect_ssh_client()
self.sftp = self.ssh_client.open_sftp()

def close_connections(self) -> None:
def cleanup_ssh_connections(self) -> None:
"""Close connection from sftp and ssh"""
self.sftp.close()
self.ssh_client.close()

def connect_ssh_client(self) -> None:
def establish_ssh_connection(self) -> None:
"""Connect to the ssh client with the selected authentication method"""
if self.authentication_method == "key":
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
Expand All @@ -211,6 +215,7 @@ def connect_ssh_client(self) -> None:

def preview_results(self) -> str:
"""Preview the results of an execution"""
self._initialize_ssh_and_sftp_connections()
return preview_results(
ssh_client=self.ssh_client,
no_subfolder=self.no_subfolder,
Expand All @@ -220,6 +225,11 @@ def preview_results(self) -> str:
max_workers=self.max_workers,
)

def _initialize_ssh_and_sftp_connections(self) -> None:
self.ssh_client = paramiko.SSHClient()
self.establish_ssh_connection()
self.sftp = self.ssh_client.open_sftp()

def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Entities:
"""Execute the workflow task"""
_ = inputs
Expand All @@ -228,6 +238,8 @@ def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Enti
)
entities = []

self._initialize_ssh_and_sftp_connections()

retrieval = SSHRetrieval(
ssh_client=self.ssh_client,
no_subfolder=self.no_subfolder,
Expand Down Expand Up @@ -316,7 +328,7 @@ def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Enti
)
)

self.close_connections()
self.cleanup_ssh_connections()

return Entities(
entities=iter(entities),
Expand Down
Loading