From 5bcdca165979e38658eab2e0cf5273da16e56869 Mon Sep 17 00:00:00 2001 From: Bajram Bojku Date: Fri, 26 Jun 2020 11:17:08 -0500 Subject: [PATCH 1/3] first commit --- docs/SFTPCopy-action.md | 7 +- docs/SFTPDelete-action.md | 7 +- docs/SFTPPut-action.md | 7 +- pom.xml | 51 ++++++++-- .../java/io/cdap/plugin/SFTPCopyAction.java | 50 +++++++++- .../java/io/cdap/plugin/SFTPDeleteAction.java | 54 +++++++++-- .../java/io/cdap/plugin/SFTPPutAction.java | 95 ++++++++++++++----- .../cdap/plugin/common/SFTPActionConfig.java | 39 +++++++- .../io/cdap/plugin/common/SFTPConnector.java | 30 ++++-- .../io.cdap.plugin/SFTPCopyActionTest.java | 70 ++++++++++++++ .../test/io.cdap.plugin/SFTPDeleteTest.java | 67 +++++++++++++ .../io.cdap.plugin/SFTPPutActionTest.java | 66 +++++++++++++ src/main/test/resources/data.txt | 1 + widgets/SFTPCopy-action.json | 80 ++++++++++++---- widgets/SFTPDelete-action.json | 61 +++++++++++- widgets/SFTPPut-action.json | 63 +++++++++++- 16 files changed, 669 insertions(+), 79 deletions(-) create mode 100644 src/main/test/io.cdap.plugin/SFTPCopyActionTest.java create mode 100644 src/main/test/io.cdap.plugin/SFTPDeleteTest.java create mode 100644 src/main/test/io.cdap.plugin/SFTPPutActionTest.java create mode 100644 src/main/test/resources/data.txt diff --git a/docs/SFTPCopy-action.md b/docs/SFTPCopy-action.md index 1f25f67..883cf9f 100644 --- a/docs/SFTPCopy-action.md +++ b/docs/SFTPCopy-action.md @@ -39,8 +39,11 @@ Plugin Configuration | :------------ | :------: | :----- | :---------- | | **Host** | **Y** | N/A | Specifies the host name of the SFTP server.| | **Port** | **N** | 22 | Specifies the port on which SFTP server is running.| -| **User** | **Y** | N/A | Specifies the name of the user which will be used to connect to the SFTP server.| -| **Password** | **Y** | N/A | Specifies the password of the user.| +| **User Name** | **Y** | N/A | Specifies the name of the user which will be used to connect to the SFTP server.| +|**Authentication**|**Y**|**PrivateKey**| Specifies the type of Authentication that will be used to connect to the SFTP Server| +|**Private Key**|**N**|N/A| Private RSA Key to be used to connect to the SFTP Server. This key can be stored in the Secure Keys Store and macro called into the Configuration. Must be a RSA key example: -----BEGIN RSA PRIVATE KEY-----| +|**Private Key Passphrase**|**N**|N/A| Passphrase to be used with RSA Private Key if a Passphrase was specified when key was generated| +| **Password** | **N** | N/A | Specifies the password of the user. Only Required if Private Key is not being used| | **Source Directory** | **Y** | N/A | Absolute path of the directory on the SFTP server which is to be copied. If the directory is empty, the execution of the plugin will be no-op.| | **Destination Directory** | **Y** | N/A | Destination directory on the file system, where files need to be copied. If directory does not exist, it will lbe created.| | **Uncompress** | **N** | true | Boolean flag to determine whether to uncompress the `.zip` files while copying.| diff --git a/docs/SFTPDelete-action.md b/docs/SFTPDelete-action.md index 99c08ac..4a9d7f4 100644 --- a/docs/SFTPDelete-action.md +++ b/docs/SFTPDelete-action.md @@ -22,8 +22,11 @@ Plugin Configuration | :------------ | :------: | :----- | :---------- | | **Host** | **Y** | N/A | Specifies the host name of the SFTP server.| | **Port** | **N** | 22 | Specifies the port on which SFTP server is running.| -| **User** | **Y** | N/A | Specifies the name of the user which will be used to connect to the SFTP server.| -| **Password** | **Y** | N/A | Specifies the password of the user.| +| **User Name** | **Y** | N/A | Specifies the name of the user which will be used to connect to the SFTP server.| +|**Authentication**|**Y**|**PrivateKey**| Specifies the type of Authentication that will be used to connect to the SFTP Server| +|**Private Key**|**N**|N/A| Private RSA Key to be used to connect to the SFTP Server. This key can be stored in the Secure Keys Store and macro called into the Configuration. Must be a RSA key example: -----BEGIN RSA PRIVATE KEY-----| +|**Private Key Passphrase**|**N**|N/A| Passphrase to be used with RSA Private Key if a Passphrase was specified when key was generated| +| **Password** | **N** | N/A | Specifies the password of the user. Only Required if Private Key is not being used| | **Files to be deleted** | **Y** | ${sftp.copied.file.names} | Comma separated list of files on the SFTP server to be deleted. Default value for this field is a Macro which will be substituted by SFTP copy plugin when this plugin is used with it. | | **Destination Directory** | **Y** | N/A | Destination directory on the file system, where files need to be copied. If directory does not exist, it will lbe created.| | **Continue execution on error** | **N** | false | Boolean flag to determine whether to proceed with next files in case there is a failure in deletion of any particular file. | diff --git a/docs/SFTPPut-action.md b/docs/SFTPPut-action.md index 32aded2..7826e5b 100644 --- a/docs/SFTPPut-action.md +++ b/docs/SFTPPut-action.md @@ -29,8 +29,11 @@ Plugin Configuration | :------------ | :------: | :----- | :---------- | | **Host** | **Y** | N/A | Specifies the host name of the SFTP server.| | **Port** | **N** | 22 | Specifies the port on which SFTP server is running.| -| **User** | **Y** | N/A | Specifies the name of the user which will be used to connect to the SFTP server.| -| **Password** | **Y** | N/A | Specifies the password of the user.| +| **User Name** | **Y** | N/A | Specifies the name of the user which will be used to connect to the SFTP server.| +|**Authentication**|**Y**|**PrivateKey**| Specifies the type of Authentication that will be used to connect to the SFTP Server| +|**Private Key**|**N**|N/A| Private RSA Key to be used to connect to the SFTP Server. This key can be stored in the Secure Keys Store and macro called into the Configuration. Must be a RSA key example: -----BEGIN RSA PRIVATE KEY-----| +|**Private Key Passphrase**|**N**|N/A| Passphrase to be used with RSA Private Key if a Passphrase was specified when key was generated| +| **Password** | **N** | N/A | Specifies the password of the user. Only Required if Private Key is not being used| | **Source Path** | **Y** | N/A | Path of file or directory on the file system which is to be copied.| | **Destination Directory** | **Y** | N/A | Destination directory on the FTP Server, where files need to be copied. If directory does not exist, it will lbe created.| | **File Name Regex** | **N** | .* | Regex to choose only the files that are of interest. All files will be copied by default.| diff --git a/pom.xml b/pom.xml index b1dc74f..567d334 100644 --- a/pom.xml +++ b/pom.xml @@ -18,14 +18,14 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 io.cdap.plugin - sftp-actions + sftp-actions-ssh jar - 1.5.0-SNAPSHOT + 1.5.1-SNAPSHOT SFTP Actions UTF-8 - 6.2.0 - 2.4.0 + 6.1.2 + 2.3.5 0.1.53 2.3.0 19.0 @@ -62,6 +62,26 @@ hydrator-common ${hydrator.version} + + io.cdap.cdap + cdap-unit-test + ${cdap.version} + test + + + asm + asm + + + + + io.cdap.cdap + hydrator-test + ${cdap.version} + test + + + com.jcraft jsch @@ -81,7 +101,24 @@ junit junit - 3.8.1 + 4.12 + test + + + + + + + software.sham + sham-ssh + 0.1.0 + + + + + org.mockito + mockito-all + 1.10.19 test @@ -173,8 +210,8 @@ maven-compiler-plugin 3.1 - 1.7 - 1.7 + 8 + 8 diff --git a/src/main/java/io/cdap/plugin/SFTPCopyAction.java b/src/main/java/io/cdap/plugin/SFTPCopyAction.java index 5119959..07623bd 100644 --- a/src/main/java/io/cdap/plugin/SFTPCopyAction.java +++ b/src/main/java/io/cdap/plugin/SFTPCopyAction.java @@ -16,11 +16,13 @@ package io.cdap.plugin; +import com.jcraft.jsch.SftpException; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; import io.cdap.plugin.common.SFTPActionConfig; @@ -62,10 +64,16 @@ public SFTPCopyAction(SFTPCopyActionConfig config) { this.config = config; } + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + super.configurePipeline(pipelineConfigurer); + config.validate(); + } + /** * Configurations for the FTP copy action plugin. */ - public class SFTPCopyActionConfig extends SFTPActionConfig { + public static class SFTPCopyActionConfig extends SFTPActionConfig { @Description("Directory on the SFTP server which is to be copied.") @Macro public String srcDirectory; @@ -123,6 +131,22 @@ public Map getFileSystemProperties(){ } return properties; } + public SFTPCopyActionConfig(String host, int port, String userName, String password, + String sshProperties, String srcPath, String destDirectory, String authType){ + this.host = host; + this.port = port; + this.userName = userName; + this.password = password; + this.sshProperties = sshProperties; + this.srcDirectory = srcPath; + this.destDirectory = destDirectory; + this.authTypeBeingUsed = authType; + } + + public void validate() throws IllegalArgumentException { + // Check for required parameters + // Check for required params for each action + } } @Override @@ -142,8 +166,27 @@ public void run(ActionContext context) throws Exception { fileSystem.mkdirs(destination); } - try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), - config.getPassword(), config.getSSHProperties())) { + if (config.getAuthTypeBeingUsed().equals("privatekey-select")) { + try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), + config.getPrivateKey(), config.getPassphrase(), config.getSSHProperties())) { + sftpCopyLogic(fileSystem, destination, SFTPConnector, context); + } catch (Exception e){ + LOG.error(String.valueOf(e)); + } + } else { + try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), + config.getPassword(), config.getSSHProperties())) { + sftpCopyLogic(fileSystem, destination, SFTPConnector, context); + } catch (Exception e) { + LOG.error(String.valueOf(e)); + } + } + } + + + public void sftpCopyLogic(FileSystem fileSystem, Path destination, SFTPConnector SFTPConnector, + ActionContext context) throws SftpException, IOException { + ChannelSftp channelSftp = SFTPConnector.getSftpChannel(); Vector files = channelSftp.ls(config.getSrcDirectory()); @@ -187,7 +230,6 @@ public void run(ActionContext context) throws Exception { context.getArguments().set(config.getVariableNameHoldingFileList(), Joiner.on(",").join(filesCopied)); LOG.info("Variables copied to {}.", Joiner.on(",").join(filesCopied)); } - } private void copyJschZip(InputStream is, FileSystem fs, Path destination) throws IOException { try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(is))) { diff --git a/src/main/java/io/cdap/plugin/SFTPDeleteAction.java b/src/main/java/io/cdap/plugin/SFTPDeleteAction.java index 7d471a6..85e0ca8 100644 --- a/src/main/java/io/cdap/plugin/SFTPDeleteAction.java +++ b/src/main/java/io/cdap/plugin/SFTPDeleteAction.java @@ -1,9 +1,11 @@ package io.cdap.plugin; +import com.jcraft.jsch.SftpException; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; import io.cdap.plugin.common.SFTPActionConfig; @@ -21,22 +23,45 @@ public class SFTPDeleteAction extends Action { private static final Logger LOG = LoggerFactory.getLogger(SFTPDeleteAction.class); private SFTPDeleteActionConfig config; + public SFTPDeleteAction(SFTPDeleteActionConfig config) { this.config = config; } - public class SFTPDeleteActionConfig extends SFTPActionConfig { + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + super.configurePipeline(pipelineConfigurer); + config.validate(); + } + + public static class SFTPDeleteActionConfig extends SFTPActionConfig { @Description("Comma separated list of files to be deleted from FTP server.") @Macro public String filesToDelete; @Description("Boolean flag to determine if execution should continue if there is an error while deleting any file." + - " Defaults to 'false'.") + " Defaults to 'false'.") boolean continueOnError; public String getFilesToDelete() { return filesToDelete; } + + public SFTPDeleteActionConfig(String host, int port, String userName, String password, + String sshProperties, String filesToDelete, String authType){ + this.host = host; + this.port = port; + this.userName = userName; + this.password = password; + this.sshProperties = sshProperties; + this.filesToDelete = filesToDelete; + this.authTypeBeingUsed = authType; + } + + public void validate() throws IllegalArgumentException { + // Check for required parameters + // Check for required params for each action + } } @Override @@ -45,8 +70,26 @@ public void run(ActionContext context) throws Exception { if (filesToDelete == null || filesToDelete.isEmpty()) { return; } - try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), - config.getPassword(), config.getSSHProperties())) { + if (config.getAuthTypeBeingUsed().equals("privatekey-select")) { + try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), + config.getPrivateKey(), config.getPassphrase(), config.getSSHProperties())) { + + sftpDeleteLogic(filesToDelete, SFTPConnector); + } catch (Exception e){ + LOG.error(String.valueOf(e)); + } + } else { + try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), + config.getPassword(), config.getSSHProperties())) { + + sftpDeleteLogic(filesToDelete, SFTPConnector); + } catch (Exception e){ + LOG.error(String.valueOf(e)); + } + } + } + + private void sftpDeleteLogic (String filesToDelete, SFTPConnector SFTPConnector) throws SftpException { ChannelSftp channelSftp = SFTPConnector.getSftpChannel(); for (String fileToDelete : filesToDelete.split(",")) { LOG.info("Deleting {}", fileToDelete); @@ -61,5 +104,4 @@ public void run(ActionContext context) throws Exception { } } } - } -} + } \ No newline at end of file diff --git a/src/main/java/io/cdap/plugin/SFTPPutAction.java b/src/main/java/io/cdap/plugin/SFTPPutAction.java index da056f6..5cd362c 100644 --- a/src/main/java/io/cdap/plugin/SFTPPutAction.java +++ b/src/main/java/io/cdap/plugin/SFTPPutAction.java @@ -20,6 +20,7 @@ import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; import io.cdap.plugin.common.SFTPActionConfig; @@ -30,10 +31,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.InputStream; import javax.annotation.Nullable; @@ -47,10 +48,20 @@ public class SFTPPutAction extends Action { private SFTPPutActionConfig config; + public SFTPPutAction(SFTPPutActionConfig config){ + this.config = config; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + super.configurePipeline(pipelineConfigurer); + config.validate(); + } + /** * Configurations for the SFTP put action plugin. */ - public class SFTPPutActionConfig extends SFTPActionConfig { + public static class SFTPPutActionConfig extends SFTPActionConfig { @Description("Directory or File on the Filesystem which needs to be copied to the SFTP Server.") @Macro @@ -75,6 +86,26 @@ public String getDestDirectory() { public String getFileNameRegex() { return (fileNameRegex != null) ? fileNameRegex : ".*"; } + + public SFTPPutActionConfig(String host, int port, String userName, String password, + String sshProperties, String srcPath, String destDirectory, String authType){ + this.host = host; + this.port = port; + this.userName = userName; + this.password = password; + this.sshProperties = sshProperties; + this.srcPath = srcPath; + this.destDirectory = destDirectory; + this.authTypeBeingUsed = authType; + } + + /** + * Validates the config parameters required for unloading the data. + */ + public void validate() throws IllegalArgumentException { + // Check for required parameters + // Check for required params for each action + } } @Override @@ -85,32 +116,44 @@ public void run(ActionContext context) throws Exception { throw new RuntimeException(String.format("Source Path doesn't exist at %s", source)); } - try (SFTPConnector sftp = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), - config.getPassword(), config.getSSHProperties())) { - ChannelSftp channel = sftp.getSftpChannel(); - - try { - channel.mkdir(config.getDestDirectory()); - } catch (SftpException ex) { - // Suppress since the directory might already exist. + if (config.getAuthTypeBeingUsed().equals("privatekey-select")) { + try (SFTPConnector sftp = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), + config .getPrivateKey(), config.getPassphrase(), config.getSSHProperties())) { + sftpPutLogic(fileSystem, source, sftp); + } catch (Exception e){ + LOG.error(String.valueOf(e)); + } + } else { + try (SFTPConnector sftp = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), + config.getPassword(), config.getSSHProperties())) { + sftpPutLogic(fileSystem, source, sftp); + } catch (Exception e){ + LOG.error(String.valueOf(e)); } + } + } + + private void sftpPutLogic(FileSystem fileSystem, Path source, SFTPConnector sftp) throws SftpException, IOException { + ChannelSftp channel = sftp.getSftpChannel(); + + try { + channel.mkdir(config.getDestDirectory()); + } catch (SftpException ex) { + // Suppress since the directory might already exist. + } + + channel.cd(config.getDestDirectory()); + + // Filter out only the files to copy + FileStatus[] filesToCopy = fileSystem.listStatus(source, path -> { + String fileName = path.getName(); + return fileName.matches(config.getFileNameRegex()); + }); - channel.cd(config.getDestDirectory()); - - // Filter out only the files to copy - FileStatus[] filesToCopy = fileSystem.listStatus(source, new PathFilter() { - @Override - public boolean accept(Path path) { - String fileName = path.getName(); - return fileName.matches(config.getFileNameRegex()); - } - }); - - for (FileStatus file : filesToCopy) { - Path filePath = file.getPath(); - try (InputStream inputStream = fileSystem.open(filePath)) { - channel.put(inputStream, filePath.getName()); - } + for (FileStatus file : filesToCopy) { + Path filePath = file.getPath(); + try (InputStream inputStream = fileSystem.open(filePath)) { + channel.put(inputStream, filePath.getName()); } } } diff --git a/src/main/java/io/cdap/plugin/common/SFTPActionConfig.java b/src/main/java/io/cdap/plugin/common/SFTPActionConfig.java index 2138557..2dfd24f 100644 --- a/src/main/java/io/cdap/plugin/common/SFTPActionConfig.java +++ b/src/main/java/io/cdap/plugin/common/SFTPActionConfig.java @@ -18,10 +18,9 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.dataset.lib.KeyValue; import io.cdap.cdap.api.plugin.PluginConfig; -import io.cdap.plugin.common.KeyValueListParser; - import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; @@ -37,14 +36,30 @@ public class SFTPActionConfig extends PluginConfig { @Description("Port on which SFTP server is running. Defaults to 22.") @Nullable @Macro - public String port; + public Integer port; @Description("Name of the user used to login to SFTP server.") @Macro public String userName; - @Description("Password used to login to SFTP server.") + @Description("Private Key to be used to login to SFTP Server. SSH key must be of RSA type") + @Macro + @Nullable + public String privateKey; + + @Description("Passphrase to be used with private key if passphrase was enabled when key was created. " + + "If PrivateKey is selected for Authentication") + @Macro + @Nullable + public String passphrase; + + @Name("Authentication") + @Description("Authentication type to be used for connection") + public String authTypeBeingUsed; + + @Description("Password used to login to SFTP server. If Password is selected for Authentication") @Macro + @Nullable public String password; @Description("Properties that will be used to configure the SSH connection to the FTP server. " + @@ -59,7 +74,7 @@ public String getHost() { } public int getPort() { - return (port != null) ? Integer.parseInt(port) : 22; + return (port != null) ? port : 22; } public String getUserName() { @@ -70,6 +85,20 @@ public String getPassword() { return password; } + public byte[] getPrivateKey() { + assert privateKey != null; + return privateKey.getBytes(); + } + + public String getAuthTypeBeingUsed() { return authTypeBeingUsed; } + + public byte[] getPassphrase(){ + if (passphrase == null){ + passphrase = ""; + } + return passphrase.getBytes(); + } + public Map getSSHProperties(){ Map properties = new HashMap<>(); // Default set to no diff --git a/src/main/java/io/cdap/plugin/common/SFTPConnector.java b/src/main/java/io/cdap/plugin/common/SFTPConnector.java index b3f589a..5d256f5 100644 --- a/src/main/java/io/cdap/plugin/common/SFTPConnector.java +++ b/src/main/java/io/cdap/plugin/common/SFTPConnector.java @@ -31,17 +31,35 @@ */ public class SFTPConnector implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(SFTPConnector.class); + private static Channel channel; private final Session session; - private final Channel channel; - public SFTPConnector(String host, int port, String userName, String password, Map sessionProperties) - throws Exception { + + //Connector Object to be used for Auth with Password + public SFTPConnector(String host, int port, String userName, String password, + Map sessionProperties) + throws Exception { JSch jsch = new JSch(); this.session = jsch.getSession(userName, host, port); session.setPassword(password); LOG.info("Properties {}", sessionProperties); Properties properties = new Properties(); - // properties.put("StrictHostKeyChecking", "no"); + properties.putAll(sessionProperties); + session.setConfig(properties); + LOG.info("Connecting to Host: {}, Port: {}, with User: {}", host, port, userName); + session.connect(30000); + channel = session.openChannel("sftp"); + channel.connect(); + } + // Connector Object to be used for Auth with SSH privatekey. + public SFTPConnector(String host, int port, String userName, byte[] privateKey, + byte[] passphrase, Map sessionProperties) + throws Exception { + JSch jsch = new JSch(); + jsch.addIdentity("key", privateKey,null,passphrase); + this.session = jsch.getSession(userName, host, port); + LOG.info("Properties {}", sessionProperties); + Properties properties = new Properties(); properties.putAll(sessionProperties); session.setConfig(properties); LOG.info("Connecting to Host: {}, Port: {}, with User: {}", host, port, userName); @@ -53,12 +71,12 @@ public SFTPConnector(String host, int port, String userName, String password, Ma /** * Get the established sftp channel to perform operations. */ - public ChannelSftp getSftpChannel() { + public static ChannelSftp getSftpChannel() { return (ChannelSftp) channel; } @Override - public void close() throws Exception { + public void close() { LOG.info("Closing SFTP session."); if (channel != null) { try { diff --git a/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java b/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java new file mode 100644 index 0000000..38668c6 --- /dev/null +++ b/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java @@ -0,0 +1,70 @@ +package io.cdap.plugin; + +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; + +import io.cdap.cdap.etl.mock.action.MockActionContext; +import io.cdap.cdap.etl.mock.common.MockPipelineConfigurer; +import org.junit.After; +import org.junit.Before; + + +import org.junit.Test; +import software.sham.sftp.MockSftpServer; + +import java.io.IOException; +import java.util.Properties; + + + +public class SFTPCopyActionTest { + +MockSftpServer server; + Session sshSession; + + @Before + public void initSftp() throws IOException { + server = new MockSftpServer(9022); + } + + @Before + public void initSshClient() throws JSchException { + JSch jsch = new JSch(); + sshSession = jsch.getSession("tester", "localhost", 9022); + Properties config = new Properties(); + config.setProperty("StrictHostKeyChecking", "no"); + sshSession.setConfig(config); + sshSession.setPassword("testing"); + sshSession.connect(); + } + + @After + public void stopSftp() throws IOException { + server.stop(); + } + + @Test + public void testCopyFile() throws Exception { + String filePath = "src/main/test/resources/"; + String destPath = server.getBaseDirectory().toString(); + SFTPCopyAction.SFTPCopyActionConfig config = new SFTPCopyAction.SFTPCopyActionConfig( + "localhost", + 9022, + "tester", + "testing", + "", + filePath, + destPath, + "password"); + MockPipelineConfigurer configurer = new MockPipelineConfigurer(null); + new SFTPCopyAction(config).configurePipeline(configurer); + new SFTPCopyAction(config).run(new MockActionContext()); + } + +} + + + + + diff --git a/src/main/test/io.cdap.plugin/SFTPDeleteTest.java b/src/main/test/io.cdap.plugin/SFTPDeleteTest.java new file mode 100644 index 0000000..b24d7c9 --- /dev/null +++ b/src/main/test/io.cdap.plugin/SFTPDeleteTest.java @@ -0,0 +1,67 @@ +package io.cdap.plugin; + +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; + +import io.cdap.cdap.etl.mock.action.MockActionContext; +import io.cdap.cdap.etl.mock.common.MockPipelineConfigurer; +import org.junit.After; +import org.junit.Before; + + +import org.junit.Test; +import software.sham.sftp.MockSftpServer; + +import java.io.IOException; +import java.util.Properties; + + + +public class SFTPDeleteTest { + + MockSftpServer server; + Session sshSession; + + @Before + public void initSftp() throws IOException { + server = new MockSftpServer(9022); + } + + @Before + public void initSshClient() throws JSchException { + JSch jsch = new JSch(); + sshSession = jsch.getSession("tester", "localhost", 9022); + Properties config = new Properties(); + config.setProperty("StrictHostKeyChecking", "no"); + sshSession.setConfig(config); + sshSession.setPassword("testing"); + sshSession.connect(); + } + + @After + public void stopSftp() throws IOException { + server.stop(); + } + + @Test + public void testCopyFile() throws Exception { + SFTPDeleteAction.SFTPDeleteActionConfig config = new SFTPDeleteAction.SFTPDeleteActionConfig( + "localhost", + 9022, + "tester", + "testing", + "", + "", + "password"); + MockPipelineConfigurer configurer = new MockPipelineConfigurer(null); + new SFTPDeleteAction(config).configurePipeline(configurer); + new SFTPDeleteAction(config).run(new MockActionContext()); + } + +} + + + + + diff --git a/src/main/test/io.cdap.plugin/SFTPPutActionTest.java b/src/main/test/io.cdap.plugin/SFTPPutActionTest.java new file mode 100644 index 0000000..beeef99 --- /dev/null +++ b/src/main/test/io.cdap.plugin/SFTPPutActionTest.java @@ -0,0 +1,66 @@ +package io.cdap.plugin; + + +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import io.cdap.cdap.etl.mock.action.MockActionContext; +import io.cdap.cdap.etl.mock.common.MockPipelineConfigurer; +import org.junit.*; +import software.sham.sftp.MockSftpServer; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + + +public class SFTPPutActionTest { + + MockSftpServer server; + Session sshSession; + + + @Before + public void initSftp() throws IOException { + server = new MockSftpServer(9022); + } + + @Before + public void initSshClient() throws JSchException { + Map properties = new HashMap<>(); + properties.put("StrictHostKeyChecking", "no"); + JSch jsch = new JSch(); + sshSession = jsch.getSession("tester", "localhost", 9022); + Properties config = new Properties(); + config.putAll(properties); + sshSession.setConfig(config); + sshSession.setPassword("testing"); + sshSession.connect(); + } + + @After + public void stopSftp() throws IOException { + server.stop(); + } + + @Test + public void testPutFile() throws Exception { + String filePath = "src/main/test/resources/"; + String destPath = server.getBaseDirectory().toString(); + SFTPPutAction.SFTPPutActionConfig config = new SFTPPutAction.SFTPPutActionConfig( + "localhost", + 9022, + "tester", + "testing", + "", + filePath, + destPath, + "password"); + MockPipelineConfigurer configurer = new MockPipelineConfigurer(null); + new SFTPPutAction(config).configurePipeline(configurer); + new SFTPPutAction(config).run(new MockActionContext()); + } + +} + diff --git a/src/main/test/resources/data.txt b/src/main/test/resources/data.txt new file mode 100644 index 0000000..30d74d2 --- /dev/null +++ b/src/main/test/resources/data.txt @@ -0,0 +1 @@ +test \ No newline at end of file diff --git a/widgets/SFTPCopy-action.json b/widgets/SFTPCopy-action.json index 5a8dd42..e463d0c 100644 --- a/widgets/SFTPCopy-action.json +++ b/widgets/SFTPCopy-action.json @@ -18,36 +18,52 @@ }, { "widget-type": "textbox", - "label": "User name", + "label": "User Name", "name": "userName" }, + { + "name": "Authentication", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "privatekey-select", + "options": [ + { + "id": "privatekey-select", + "label": "PrivateKey" + }, + { + "id": "password-select", + "label": "Password" + } + ] + } + }, { "widget-type": "password", "label": "Password", "name": "password" }, + { + "widget-type": "textarea", + "label": "PrivateKey", + "name": "privateKey" + }, + { + "widget-type": "password", + "label": "PrivateKey Passphrase", + "name": "passphrase" + }, { "widget-type": "textbox", - "label": "Source directory", + "label": "Source Directory", "name": "srcDirectory" }, { "widget-type": "textbox", - "label": "Destination directory", + "label": "Destination Directory", "name": "destDirectory" }, - { - "widget-type": "select", - "label": "Uncompress", - "name": "extractZipFiles", - "widget-attributes": { - "values": [ - "true", - "false" - ], - "default": "true" - } - }, { "widget-type": "textbox", "label": "Variable name to hold list of copied file names", @@ -61,6 +77,7 @@ "label": "Regex to match files that needs to be copied", "name" : "fileNameRegex" }, + { "widget-type": "keyvalue", "label": "Properties for SSH", @@ -86,5 +103,36 @@ } ], "outputs": [ - ] + ], + "filters": [ + { + "name": "AuthByPrivateKey", + "condition": { + "expression": "Authentication == 'privatekey-select'" + }, + "show":[ + { + "type": "property", + "name": "privateKey" + }, + { + "type": "property", + "name": "passphrase" + } + ] + }, + { + "name": "AuthByPassword", + "condition": { + "expression": "Authentication == 'password-select'" + }, + "show": + [ + { + "type": "property", + "name": "password" + } + ] + } + ] } diff --git a/widgets/SFTPDelete-action.json b/widgets/SFTPDelete-action.json index 7517d85..ee8dd05 100644 --- a/widgets/SFTPDelete-action.json +++ b/widgets/SFTPDelete-action.json @@ -18,14 +18,42 @@ }, { "widget-type": "textbox", - "label": "User name", + "label": "User Name", "name": "userName" }, + { + "name": "Authentication", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "privatekey-select", + "options": [ + { + "id": "privatekey-select", + "label": "PrivateKey" + }, + { + "id": "password-select", + "label": "Password" + } + ] + } + }, { "widget-type": "password", "label": "Password", "name": "password" }, + { + "widget-type": "textarea", + "label": "PrivateKey", + "name": "privateKey" + }, + { + "widget-type": "password", + "label": "Private Key Passphrase", + "name": "passphrase" + }, { "widget-type": "textbox", "label": "Comma separated list of file names to delete", @@ -60,5 +88,36 @@ } ], "outputs": [ + ], + "filters": [ + { + "name": "AuthByPrivateKey", + "condition": { + "expression": "Authentication == 'privatekey-select'" + }, + "show":[ + { + "type": "property", + "name": "privateKey" + }, + { + "type": "property", + "name": "passphrase" + } + ] + }, + { + "name": "AuthByPassword", + "condition": { + "expression": "Authentication == 'password-select'" + }, + "show": + [ + { + "type": "property", + "name": "password" + } + ] + } ] } diff --git a/widgets/SFTPPut-action.json b/widgets/SFTPPut-action.json index 9068dec..c232f35 100644 --- a/widgets/SFTPPut-action.json +++ b/widgets/SFTPPut-action.json @@ -18,14 +18,42 @@ }, { "widget-type": "textbox", - "label": "User name", + "label": "User Name", "name": "userName" }, + { + "name": "Authentication", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "privatekey-select", + "options": [ + { + "id": "privatekey-select", + "label": "PrivateKey" + }, + { + "id": "password-select", + "label": "Password" + } + ] + } + }, { "widget-type": "password", "label": "Password", "name": "password" }, + { + "widget-type": "textarea", + "label": "Private Key", + "name": "privateKey" + }, + { + "widget-type": "password", + "label": "Private Key Passphrase", + "name": "passphrase" + }, { "widget-type": "textbox", "label": "Source Path", @@ -33,7 +61,7 @@ }, { "widget-type": "textbox", - "label": "Destination directory", + "label": "Destination Directory", "name": "destDirectory" }, { @@ -55,5 +83,36 @@ } ], "outputs": [ + ], + "filters": [ + { + "name": "AuthByPrivateKey", + "condition": { + "expression": "Authentication == 'privatekey-select'" + }, + "show":[ + { + "type": "property", + "name": "privateKey" + }, + { + "type": "property", + "name": "passphrase" + } + ] + }, + { + "name": "AuthByPassword", + "condition": { + "expression": "Authentication == 'password-select'" + }, + "show": + [ + { + "type": "property", + "name": "password" + } + ] + } ] } From 77381d9e15615327e12904be2484eebef05424ee Mon Sep 17 00:00:00 2001 From: Bajram Bojku Date: Thu, 16 Jul 2020 11:42:46 -0500 Subject: [PATCH 2/3] adding privateKey functions to SFTP-Action, and new SCP action plugin --- docs/SCPRemote-action.md | 35 +++ pom.xml | 2 +- .../cdap/plugin/SCPRemotetoRemoteAction.java | 293 ++++++++++++++++++ .../java/io/cdap/plugin/SFTPCopyAction.java | 3 +- .../io.cdap.plugin/SFTPCopyActionTest.java | 14 +- .../io.cdap.plugin/SFTPPutActionTest.java | 12 +- src/main/test/resources/data.txt | 1 - widgets/SCPRemote-action.json | 140 +++++++++ 8 files changed, 493 insertions(+), 7 deletions(-) create mode 100644 docs/SCPRemote-action.md create mode 100644 src/main/java/io/cdap/plugin/SCPRemotetoRemoteAction.java delete mode 100644 src/main/test/resources/data.txt create mode 100644 widgets/SCPRemote-action.json diff --git a/docs/SCPRemote-action.md b/docs/SCPRemote-action.md new file mode 100644 index 0000000..ea59456 --- /dev/null +++ b/docs/SCPRemote-action.md @@ -0,0 +1,35 @@ +# SCP Remote to Remote + +Description +----------- +This Action will connect to a Bastion Host and execute a SCP command over SSH to copy a file path from Host A to Host B + +Use Case +-------- +The Use Cases for this Plugin is when we want to connect to a Bastion Host in order to execute an SCP command that will copy files between to other Remote hosts. + +Properties +---------- +| Configuration | Required | Default | Description | +| :------------ | :------: | :------ | :---------- | +| **Bastion Host** | **Y** | None | This is the Hostname for the Bastion host that will be executing the SCP command. Can be a hostname or IP Address | +| **Port** | **N** | 22 | Specifies the Port that will be used. Defaults to 22 | +| **Bastion User Name** | **Y** | None | Specifies the Bastion Hosts User Name. | +| **Private Key** | **Y** | None | The private RSA key to be used to connect over SHH to the Bastion host. This should be an RSA key. | +| **Passphrase** | **N** | None | Passphrase to be used with the Private RSA key if a Passphrase was setup when the key was created. | +| **Compression Flag** | **N** | None | Flag for SCP command to enable compression of files. | +| **Verbose Flag** | **N** | None | Flag for SCP command to enable verbose mode which extends Logs. | +| **Directory Flag** | **N** | None | Flag for SCP command to enable movement of directories. | +| **Host A User Name** | **Y** | None | The User Name of Host A. This is the host that the source files are on that need to be moved | +| **Host A Host** | **Y** | None |The Hostname of Host A. Can be a hostname or IP Address| +| **Source Path** | **Y** | None | The Absolute Path of the File that needs to be copied. | +| **Host B User Name** | **Y** | None | The User Name of Host B. This is the host that the source files are being copied to | +| **Host B Host** | **Y** | None |The Hostname of Host B. Can be a hostname or IP Address| +| **Destination Path** | **Y** | None | The Absolute Path of the location the files need to be copied to. | + + + +Usage Notes +-------- + +In order to perform SCP between to remote hosts, we require a Bato. An SCP command based on the configuration supplied will be created to perform a compressed file copy. Authentication setup between all hosts will need to be setup before hand. This includes being able to SSH on the bastion host with the private key being supplied in the configuration and have the 2 remote hosts that files are being moved on having known host/authenticated_keys setup for SSH communication between them.stion host that we will ssh in \ No newline at end of file diff --git a/pom.xml b/pom.xml index 567d334..5002449 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ widgets docs - [6.0.0-SNAPSHOT,7.0.0-SNAPSHOT) diff --git a/src/main/java/io/cdap/plugin/SCPRemotetoRemoteAction.java b/src/main/java/io/cdap/plugin/SCPRemotetoRemoteAction.java new file mode 100644 index 0000000..29d0c09 --- /dev/null +++ b/src/main/java/io/cdap/plugin/SCPRemotetoRemoteAction.java @@ -0,0 +1,293 @@ +package io.cdap.plugin; + +/* + * Copyright © 2017 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +import com.google.common.annotations.VisibleForTesting; + +import com.jcraft.jsch.ChannelExec; +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.JSchException; + +import com.jcraft.jsch.Session; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; + +import io.cdap.cdap.api.plugin.PluginConfig; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.action.Action; +import io.cdap.cdap.etl.api.action.ActionContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.InputStream; + + +@Plugin(type = Action.PLUGIN_TYPE) +@Name(SCPRemotetoRemoteAction.PLUGIN_NAME) +@Description("This action will connect to a Bastion Host and execute a SCP command to copy a file from Host A to Host B.") +public class SCPRemotetoRemoteAction extends Action { + + public static final String PLUGIN_NAME = "SCPRemote"; + private static final Logger LOG = LoggerFactory.getLogger(SCPRemotetoRemoteAction.class); + + private final SCPRemotetoRemoteActionConfig config; + + @VisibleForTesting + public SCPRemotetoRemoteAction(SCPRemotetoRemoteActionConfig config) { + this.config = config; + } + + /** + * This function is executed by the Pipelines framework when the Pipeline is deployed. This + * is a good place to validate any configuration options the user has entered. If this throws + * an exception, the Pipeline will not be deployed and the user will be shown the error message. + */ + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException { + super.configurePipeline(pipelineConfigurer); + LOG.debug(String.format("Running the 'configurePipeline' method of the %s plugin.", PLUGIN_NAME)); + config.validate(); + } + + @Override + public void run(ActionContext context) throws JSchException, IOException { + LOG.debug(String.format("Running the 'run' method of the %s plugin.", PLUGIN_NAME)); + config.validate(); + + byte[] key = config.getPrivateKey(); + byte[] passphrase = config.getPassphrase(); + JSch jsch = new JSch(); + jsch.addIdentity("key", key, null,passphrase); + Session session = jsch.getSession(config.getUserNameBastion(),config.getHostBastion(),config.getPort()); + + session.setConfig("StrictHostKeyChecking", "no"); + session.connect(); + ChannelExec channel = (ChannelExec) session.openChannel("exec"); + + String userA = config.getUserNameA(); + String hostA = config.getHostA(); + String source = config.getSource(); + String pathSource = userA+"@"+hostA+":"+source; + + String userB = config.getUserNameB(); + String hostB = config.getHostB(); + String dest= config.getDest(); + String pathDest = userB+"@"+hostB+":"+dest; + + String dirFlag = config.getDirFlag(); + String compressFlag = config.getCompressFlag(); + String verboseFlag = config.getVerboseFlag(); + + //Host A -> Host B + channel.setCommand("scp "+compressFlag+ " "+verboseFlag+ + " " +dirFlag +" " +pathSource +" " +pathDest); + + StringBuilder outputBuffer = new StringBuilder(); + StringBuilder errorBuffer = new StringBuilder(); + + InputStream in = channel.getInputStream(); + InputStream err = channel.getExtInputStream(); + + channel.connect(); + + byte[] tmp = new byte[1024]; + while (true) { + while (in.available() > 0) { + int i = in.read(tmp, 0, 1024); + if (i < 0) break; + outputBuffer.append(new String(tmp, 0, i)); + } + while (err.available() > 0) { + int i = err.read(tmp, 0, 1024); + if (i < 0) break; + errorBuffer.append(new String(tmp, 0, i)); + } + if (channel.isClosed()) { + if ((in.available() > 0) || (err.available() > 0)) continue; + System.out.println("exit-status: " + channel.getExitStatus()); + break; + } + try { + Thread.sleep(1000); + } catch (Exception ignored) { + } + } + + LOG.debug("output: " + outputBuffer.toString()); + LOG.info("info: " + errorBuffer.toString()); + + channel.disconnect(); + session.disconnect(); + + } + + /** + * The config class for {@link SCPRemotetoRemoteAction} that contains all properties that need to be filled in by + * the user when building a Pipeline. + */ + public static class SCPRemotetoRemoteActionConfig extends PluginConfig { + + @Description("Hostname or IP Address of the SSH server.") + @Macro + public String hostBastion; + + @Description("Port on which SSH server is running. Defaults to 22.") + @Nullable + @Macro + public Integer port; + + @Description("Name of the User used to login to the Bastion SSH server.") + @Macro + public String userNameBastion; + + @Description("Private Key to be used to login to the Bastion SSH Server. SSH key must be of RSA type") + @Macro + public String privateKey; + + @Description("Passphrase to be used with private key if passphrase was enabled when key was created. ") + @Macro + @Nullable + public String passphrase; + + @Description("Name of the user used to login to SSH server.") + @Macro + public String userNameA; + + @Description("Hostname or IP Address of the SSH server that contains the files to copy.") + @Macro + public String hostA; + + @Description("Absolute path on Host A") + @Macro + public String sourcePath; + + @Description("Name of the user used to login to SSH server.") + @Macro + public String userNameB; + + @Description("Hostname or IP Address of the SSH server that the files should be copied to.") + @Macro + public String hostB; + + @Description("Location files should be copied to") + @Macro + public String destPath; + + @Name("compressionFlag") + @Description("Setting Compression Flag") + public String compressFlag; + + @Name("verboseFlag") + @Description("Setting Verbose Flag for more Log data") + public String verboseFlag; + + @Name("directoryFlag") + @Description("Setting Directory Flag") + public String dirFlag; + + + + public String getHostBastion() { + return hostBastion; + } + + public int getPort() { + return (port != null) ? port : 22; + } + + public String getUserNameBastion() { + return userNameBastion; + } + + public byte[] getPrivateKey() { + assert privateKey != null; + return privateKey.getBytes(); + } + + public byte[] getPassphrase(){ + if (passphrase == null){ + passphrase = ""; + } + return passphrase.getBytes(); + } + + + public String getUserNameA() { + return userNameA; + } + + public String getHostA() { + return hostA; + } + + public String getSource() { + return sourcePath; + } + + + public String getUserNameB() { + return userNameB; + } + + public String getHostB() { + return hostB; + } + + + public String getDest() { + return destPath; + } + + public String getCompressFlag() { + if (compressFlag.equals("compression-off")){ + return compressFlag = ""; + } + return compressFlag="-C"; + } + + public String getVerboseFlag() { + if (verboseFlag.equals("verbose-off")){ + return verboseFlag = ""; + } + return verboseFlag="-v"; + } + + public String getDirFlag() { + if (dirFlag.equals("directory-off")){ + return dirFlag = ""; + } + return dirFlag="-r"; + } + + /** + * You can leverage this function to validate the configure options entered by the user. + */ + + public void validate() throws IllegalArgumentException { + // The containsMacro function can be used to check if there is a macro in the config option. + // At runtime, the containsMacro function will always return false. + + } + } +} + + diff --git a/src/main/java/io/cdap/plugin/SFTPCopyAction.java b/src/main/java/io/cdap/plugin/SFTPCopyAction.java index 07623bd..ffacd88 100644 --- a/src/main/java/io/cdap/plugin/SFTPCopyAction.java +++ b/src/main/java/io/cdap/plugin/SFTPCopyAction.java @@ -132,7 +132,8 @@ public Map getFileSystemProperties(){ return properties; } public SFTPCopyActionConfig(String host, int port, String userName, String password, - String sshProperties, String srcPath, String destDirectory, String authType){ + String sshProperties, String srcPath, String destDirectory, String authType){ + this.host = host; this.port = port; this.userName = userName; diff --git a/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java b/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java index 38668c6..9679a1b 100644 --- a/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java +++ b/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java @@ -10,10 +10,15 @@ import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import software.sham.sftp.MockSftpServer; +import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.Properties; @@ -23,6 +28,8 @@ public class SFTPCopyActionTest { MockSftpServer server; Session sshSession; + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); @Before public void initSftp() throws IOException { server = new MockSftpServer(9022); @@ -46,7 +53,10 @@ public void stopSftp() throws IOException { @Test public void testCopyFile() throws Exception { - String filePath = "src/main/test/resources/"; + File tempFile = tempFolder.newFile(); + Files.write(tempFile.toPath(), "test".getBytes(StandardCharsets.UTF_8)); + + String sourcePath = tempFile.getAbsoluteFile().toString(); String destPath = server.getBaseDirectory().toString(); SFTPCopyAction.SFTPCopyActionConfig config = new SFTPCopyAction.SFTPCopyActionConfig( "localhost", @@ -54,7 +64,7 @@ public void testCopyFile() throws Exception { "tester", "testing", "", - filePath, + sourcePath, destPath, "password"); MockPipelineConfigurer configurer = new MockPipelineConfigurer(null); diff --git a/src/main/test/io.cdap.plugin/SFTPPutActionTest.java b/src/main/test/io.cdap.plugin/SFTPPutActionTest.java index beeef99..0ed1211 100644 --- a/src/main/test/io.cdap.plugin/SFTPPutActionTest.java +++ b/src/main/test/io.cdap.plugin/SFTPPutActionTest.java @@ -7,8 +7,13 @@ import io.cdap.cdap.etl.mock.action.MockActionContext; import io.cdap.cdap.etl.mock.common.MockPipelineConfigurer; import org.junit.*; +import org.junit.rules.TemporaryFolder; import software.sham.sftp.MockSftpServer; + +import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -20,6 +25,8 @@ public class SFTPPutActionTest { MockSftpServer server; Session sshSession; + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); @Before public void initSftp() throws IOException { @@ -46,7 +53,8 @@ public void stopSftp() throws IOException { @Test public void testPutFile() throws Exception { - String filePath = "src/main/test/resources/"; + File tempFile = tempFolder.newFile(); + Files.write(tempFile.toPath(), "test".getBytes(StandardCharsets.UTF_8)); String destPath = server.getBaseDirectory().toString(); SFTPPutAction.SFTPPutActionConfig config = new SFTPPutAction.SFTPPutActionConfig( "localhost", @@ -54,7 +62,7 @@ public void testPutFile() throws Exception { "tester", "testing", "", - filePath, + tempFile.toString(), destPath, "password"); MockPipelineConfigurer configurer = new MockPipelineConfigurer(null); diff --git a/src/main/test/resources/data.txt b/src/main/test/resources/data.txt deleted file mode 100644 index 30d74d2..0000000 --- a/src/main/test/resources/data.txt +++ /dev/null @@ -1 +0,0 @@ -test \ No newline at end of file diff --git a/widgets/SCPRemote-action.json b/widgets/SCPRemote-action.json new file mode 100644 index 0000000..452d07f --- /dev/null +++ b/widgets/SCPRemote-action.json @@ -0,0 +1,140 @@ +{ + "metadata": { + "spec-version": "1.4" + }, + "configuration-groups": [ + { + "label": "Bastion Host Properties", + "properties": [ + { + "widget-type": "textbox", + "label": "Bastion Hostname", + "name": "hostBastion" + }, + { + "widget-type": "textbox", + "label": "Port", + "name": "port" + }, + { + "widget-type": "textbox", + "label": "Bastion User Name", + "name": "userNameBastion" + }, + { + "widget-type": "textarea", + "label": "PrivateKey", + "name": "privateKey" + }, + { + "widget-type": "password", + "label": "PrivateKey Passphrase", + "name": "passphrase" + }, + + { + "label": "Compression Flag", + "name": "compressionFlag", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "compression-on", + "options": [ + { + "id": "compression-on", + "label": "Compression On" + }, + { + "id": "compression-off", + "label": "Compression Off" + } + ] + } + }, + { + "label": "Verbose Flag", + "name": "verboseFlag", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "verbose-on", + "options": [ + { + "id": "verbose-on", + "label": "Verbose On" + }, + { + "id": "verbose-off", + "label": "Verbose Off" + } + ] + } + }, + { + "label": "Directory Flag", + "name": "directoryFlag", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "directory-off", + "options": [ + { + "id": "directory-on", + "label": "Directory On" + }, + { + "id": "directory-off", + "label": "Directory Off" + } + ] + } + } + ] + }, + { + "label": "Remote Host A Properties", + "properties": [ + { + "widget-type": "textbox", + "label": "Host A User Name", + "name": "userNameA" + }, + { + "widget-type": "textbox", + "label": "Host A Hostname", + "name": "hostA" + }, + { + "widget-type": "textbox", + "label": "Source Path", + "name": "sourcePath" + } + ] + }, + + { + "label": "Remote Host B Properties", + "properties": [ + { + "widget-type": "textbox", + "label": "Host B User Name", + "name": "userNameB" + }, + { + "widget-type": "textbox", + "label": "Host B Hostname", + "name": "hostB" + }, + { + "widget-type": "textbox", + "label": "Destination Path", + "name": "destPath" + } + ] + } + ], + "outputs": [ + ], + "filters": [ + ] +} From e5469a5d41212b581db2000b9181ee9389699697 Mon Sep 17 00:00:00 2001 From: Bajram Bojku Date: Fri, 17 Jul 2020 13:13:10 -0500 Subject: [PATCH 3/3] Changed formatting --- docs/SCPRemote-action.md | 2 +- pom.xml | 21 +- .../cdap/plugin/SCPRemotetoRemoteAction.java | 200 ++++++------------ .../java/io/cdap/plugin/SFTPCopyAction.java | 31 +-- .../java/io/cdap/plugin/SFTPDeleteAction.java | 20 +- .../java/io/cdap/plugin/SFTPPutAction.java | 21 +- .../cdap/plugin/common/SFTPActionConfig.java | 19 +- .../io/cdap/plugin/common/SFTPConnector.java | 21 +- .../io.cdap.plugin/SFTPCopyActionTest.java | 35 +-- .../test/io.cdap.plugin/SFTPDeleteTest.java | 63 +++--- .../io.cdap.plugin/SFTPPutActionTest.java | 32 +-- widgets/SCPRemote-action.json | 18 +- widgets/SFTPCopy-action.json | 3 +- 13 files changed, 195 insertions(+), 291 deletions(-) diff --git a/docs/SCPRemote-action.md b/docs/SCPRemote-action.md index ea59456..65ec913 100644 --- a/docs/SCPRemote-action.md +++ b/docs/SCPRemote-action.md @@ -32,4 +32,4 @@ Properties Usage Notes -------- -In order to perform SCP between to remote hosts, we require a Bato. An SCP command based on the configuration supplied will be created to perform a compressed file copy. Authentication setup between all hosts will need to be setup before hand. This includes being able to SSH on the bastion host with the private key being supplied in the configuration and have the 2 remote hosts that files are being moved on having known host/authenticated_keys setup for SSH communication between them.stion host that we will ssh in \ No newline at end of file +In order to perform SCP between to remote hosts, we require a Bastion Host. An SCP command based on the configuration supplied will be created to perform a file copy. Authentication setup between all hosts will need to be setup before hand. This includes being able to SSH on the bastion host with the private key being supplied in the configuration and have the 2 remote hosts that files are being moved on having known host/authenticated_keys setup for SSH communication. \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5002449..6667946 100644 --- a/pom.xml +++ b/pom.xml @@ -20,36 +20,31 @@ io.cdap.plugin sftp-actions-ssh jar - 1.5.1-SNAPSHOT + 1.5.0-SNAPSHOT SFTP Actions UTF-8 - 6.1.2 - 2.3.5 + 6.2.0 + 2.4.0 0.1.53 2.3.0 19.0 - widgets docs - [6.0.0-SNAPSHOT,7.0.0-SNAPSHOT) - system:cdap-data-pipeline, system:cdap-data-streams - ${project.basedir} - http://maven.apache.org - io.cdap.cdap @@ -80,8 +75,6 @@ ${cdap.version} test - - com.jcraft jsch @@ -104,16 +97,13 @@ 4.12 test - - - software.sham sham-ssh 0.1.0 + test - org.mockito @@ -122,7 +112,6 @@ test - diff --git a/src/main/java/io/cdap/plugin/SCPRemotetoRemoteAction.java b/src/main/java/io/cdap/plugin/SCPRemotetoRemoteAction.java index 29d0c09..13f8ae5 100644 --- a/src/main/java/io/cdap/plugin/SCPRemotetoRemoteAction.java +++ b/src/main/java/io/cdap/plugin/SCPRemotetoRemoteAction.java @@ -1,7 +1,5 @@ -package io.cdap.plugin; - /* - * Copyright © 2017 Cask Data, Inc. + * Copyright © 2019 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -16,136 +14,121 @@ * the License. */ -import com.google.common.annotations.VisibleForTesting; +package io.cdap.plugin; +import com.google.common.annotations.VisibleForTesting; import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; - import com.jcraft.jsch.Session; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; - import io.cdap.cdap.api.plugin.PluginConfig; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; - +import java.nio.charset.StandardCharsets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; - @Plugin(type = Action.PLUGIN_TYPE) @Name(SCPRemotetoRemoteAction.PLUGIN_NAME) @Description("This action will connect to a Bastion Host and execute a SCP command to copy a file from Host A to Host B.") public class SCPRemotetoRemoteAction extends Action { - public static final String PLUGIN_NAME = "SCPRemote"; private static final Logger LOG = LoggerFactory.getLogger(SCPRemotetoRemoteAction.class); - private final SCPRemotetoRemoteActionConfig config; @VisibleForTesting - public SCPRemotetoRemoteAction(SCPRemotetoRemoteActionConfig config) { - this.config = config; - } + public SCPRemotetoRemoteAction(SCPRemotetoRemoteActionConfig config) { this.config = config; } - /** - * This function is executed by the Pipelines framework when the Pipeline is deployed. This - * is a good place to validate any configuration options the user has entered. If this throws - * an exception, the Pipeline will not be deployed and the user will be shown the error message. - */ @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException { super.configurePipeline(pipelineConfigurer); - LOG.debug(String.format("Running the 'configurePipeline' method of the %s plugin.", PLUGIN_NAME)); + LOG.debug("Executing the 'run' method of the {} plugin", PLUGIN_NAME); config.validate(); } @Override public void run(ActionContext context) throws JSchException, IOException { - LOG.debug(String.format("Running the 'run' method of the %s plugin.", PLUGIN_NAME)); + LOG.debug("Running the 'run' method of the {} plugin.", PLUGIN_NAME); config.validate(); - byte[] key = config.getPrivateKey(); byte[] passphrase = config.getPassphrase(); JSch jsch = new JSch(); - jsch.addIdentity("key", key, null,passphrase); - Session session = jsch.getSession(config.getUserNameBastion(),config.getHostBastion(),config.getPort()); - + jsch.addIdentity("key", key, null, passphrase); + Session session = jsch.getSession(config.getUserNameBastion(), + config.getHostBastion(), config.getPort()); session.setConfig("StrictHostKeyChecking", "no"); session.connect(); ChannelExec channel = (ChannelExec) session.openChannel("exec"); - String userA = config.getUserNameA(); String hostA = config.getHostA(); String source = config.getSource(); - String pathSource = userA+"@"+hostA+":"+source; - + String pathSource = userA + "@" + hostA + ":" + source; String userB = config.getUserNameB(); String hostB = config.getHostB(); - String dest= config.getDest(); - String pathDest = userB+"@"+hostB+":"+dest; - + String dest = config.getDest(); + String pathDest = userB + "@" + hostB + ":" + dest; String dirFlag = config.getDirFlag(); + if (dirFlag.equals("directory-off")){ + dirFlag = ""; + } else { + dirFlag = "-r"; + } String compressFlag = config.getCompressFlag(); + if (compressFlag.equals("compression-off")){ + compressFlag = ""; + } else { + compressFlag = "-C"; + } String verboseFlag = config.getVerboseFlag(); - + if (verboseFlag.equals("verbose-off")){ + verboseFlag = ""; + } else { + verboseFlag = "-v"; + } //Host A -> Host B - channel.setCommand("scp "+compressFlag+ " "+verboseFlag+ - " " +dirFlag +" " +pathSource +" " +pathDest); + channel.setCommand("scp " + compressFlag + " " + verboseFlag + + " " + dirFlag + " " + pathSource + " " + pathDest); + channel.connect(); + verboseLogging(channel); + channel.disconnect(); + session.disconnect(); + } + private void verboseLogging(ChannelExec channel) throws IOException { + StringBuilder inputBuffer = new StringBuilder(); StringBuilder outputBuffer = new StringBuilder(); - StringBuilder errorBuffer = new StringBuilder(); - InputStream in = channel.getInputStream(); - InputStream err = channel.getExtInputStream(); - - channel.connect(); - + InputStream out = channel.getExtInputStream(); byte[] tmp = new byte[1024]; - while (true) { - while (in.available() > 0) { - int i = in.read(tmp, 0, 1024); - if (i < 0) break; - outputBuffer.append(new String(tmp, 0, i)); - } - while (err.available() > 0) { - int i = err.read(tmp, 0, 1024); - if (i < 0) break; - errorBuffer.append(new String(tmp, 0, i)); - } - if (channel.isClosed()) { - if ((in.available() > 0) || (err.available() > 0)) continue; - System.out.println("exit-status: " + channel.getExitStatus()); - break; - } - try { - Thread.sleep(1000); - } catch (Exception ignored) { - } + int lenIn = in.read(tmp, 0, tmp.length); + while (lenIn > 0){ + inputBuffer.append(new String(tmp, 0, lenIn, StandardCharsets.UTF_8)); + lenIn = in.read(tmp, 0, tmp.length); } - - LOG.debug("output: " + outputBuffer.toString()); - LOG.info("info: " + errorBuffer.toString()); - - channel.disconnect(); - session.disconnect(); - + int lenOut = out.read(tmp, 0, tmp.length); + while (lenOut > 0){ + outputBuffer.append(new String(tmp, 0, lenOut, StandardCharsets.UTF_8)); + lenOut = out.read(tmp, 0, tmp.length); + } + if (channel.isClosed()) { + LOG.info("Exit-Status: " + channel.getExitStatus()); + } + LOG.info("Input: " + inputBuffer.toString()); + LOG.info("Verbose Info: " + outputBuffer.toString()); } - /** * The config class for {@link SCPRemotetoRemoteAction} that contains all properties that need to be filled in by * the user when building a Pipeline. */ public static class SCPRemotetoRemoteActionConfig extends PluginConfig { - @Description("Hostname or IP Address of the SSH server.") @Macro public String hostBastion; @@ -168,7 +151,7 @@ public static class SCPRemotetoRemoteActionConfig extends PluginConfig { @Nullable public String passphrase; - @Description("Name of the user used to login to SSH server.") + @Description("Name of the user used to login to SSH server that contains the files to copy.") @Macro public String userNameA; @@ -176,11 +159,11 @@ public static class SCPRemotetoRemoteActionConfig extends PluginConfig { @Macro public String hostA; - @Description("Absolute path on Host A") + @Description("Absolute path on Host A to copy") @Macro public String sourcePath; - @Description("Name of the user used to login to SSH server.") + @Description("Name of the user used to login to SSH server that files should be copied to.") @Macro public String userNameB; @@ -192,20 +175,21 @@ public static class SCPRemotetoRemoteActionConfig extends PluginConfig { @Macro public String destPath; + @Nullable @Name("compressionFlag") @Description("Setting Compression Flag") public String compressFlag; + @Nullable @Name("verboseFlag") @Description("Setting Verbose Flag for more Log data") public String verboseFlag; + @Nullable @Name("directoryFlag") @Description("Setting Directory Flag") public String dirFlag; - - public String getHostBastion() { return hostBastion; } @@ -218,76 +202,32 @@ public String getUserNameBastion() { return userNameBastion; } - public byte[] getPrivateKey() { - assert privateKey != null; - return privateKey.getBytes(); - } + public byte[] getPrivateKey() { return privateKey.getBytes(StandardCharsets.UTF_8); } public byte[] getPassphrase(){ - if (passphrase == null){ - passphrase = ""; - } - return passphrase.getBytes(); - } + return passphrase == null ? new byte[0] : passphrase.getBytes(StandardCharsets.UTF_8); } + public String getUserNameA() { return userNameA; } - public String getUserNameA() { - return userNameA; - } - - public String getHostA() { - return hostA; - } - - public String getSource() { - return sourcePath; - } - - - public String getUserNameB() { - return userNameB; - } + public String getHostA() { return hostA; } - public String getHostB() { - return hostB; - } + public String getSource() { return sourcePath; } + public String getUserNameB() { return userNameB; } - public String getDest() { - return destPath; - } + public String getHostB() { return hostB; } - public String getCompressFlag() { - if (compressFlag.equals("compression-off")){ - return compressFlag = ""; - } - return compressFlag="-C"; - } + public String getDest() { return destPath; } - public String getVerboseFlag() { - if (verboseFlag.equals("verbose-off")){ - return verboseFlag = ""; - } - return verboseFlag="-v"; - } + public String getCompressFlag() { return compressFlag; } - public String getDirFlag() { - if (dirFlag.equals("directory-off")){ - return dirFlag = ""; - } - return dirFlag="-r"; - } + public String getVerboseFlag() { return verboseFlag; } - /** - * You can leverage this function to validate the configure options entered by the user. - */ + public String getDirFlag() { return dirFlag; } public void validate() throws IllegalArgumentException { // The containsMacro function can be used to check if there is a macro in the config option. // At runtime, the containsMacro function will always return false. - } } -} - - +} \ No newline at end of file diff --git a/src/main/java/io/cdap/plugin/SFTPCopyAction.java b/src/main/java/io/cdap/plugin/SFTPCopyAction.java index ffacd88..0b1f3aa 100644 --- a/src/main/java/io/cdap/plugin/SFTPCopyAction.java +++ b/src/main/java/io/cdap/plugin/SFTPCopyAction.java @@ -1,5 +1,5 @@ /* - * Copyright © 2017 Cask Data, Inc. + * Copyright © 2019 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -37,7 +37,6 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; @@ -59,7 +58,6 @@ public class SFTPCopyAction extends Action { private static final Logger LOG = LoggerFactory.getLogger(SFTPCopyAction.class); private SFTPCopyActionConfig config; - public SFTPCopyAction(SFTPCopyActionConfig config) { this.config = config; } @@ -69,7 +67,6 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); config.validate(); } - /** * Configurations for the FTP copy action plugin. */ @@ -122,7 +119,6 @@ public Map getFileSystemProperties(){ if (fileSystemProperties == null || fileSystemProperties.isEmpty()) { return properties; } - KeyValueListParser kvParser = new KeyValueListParser("\\s*,\\s*", "=>"); for (KeyValue keyVal : kvParser.parse(fileSystemProperties)) { String key = keyVal.getKey(); @@ -131,9 +127,9 @@ public Map getFileSystemProperties(){ } return properties; } + public SFTPCopyActionConfig(String host, int port, String userName, String password, String sshProperties, String srcPath, String destDirectory, String authType){ - this.host = host; this.port = port; this.userName = userName; @@ -153,30 +149,26 @@ public void validate() throws IllegalArgumentException { @Override public void run(ActionContext context) throws Exception { Path destination = new Path(config.getDestDirectory()); - Configuration conf = new Configuration(); Map properties = config.getFileSystemProperties(); for (Map.Entry entry : properties.entrySet()) { conf.set(entry.getKey(), entry.getValue()); } - FileSystem fileSystem = FileSystem.get(conf); - destination = fileSystem.makeQualified(destination); if (!fileSystem.exists(destination)) { fileSystem.mkdirs(destination); } - if (config.getAuthTypeBeingUsed().equals("privatekey-select")) { - try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), - config.getPrivateKey(), config.getPassphrase(), config.getSSHProperties())) { + try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), + config.getUserName(), config.getPrivateKey(), config.getPassphrase(), config.getSSHProperties())) { sftpCopyLogic(fileSystem, destination, SFTPConnector, context); } catch (Exception e){ LOG.error(String.valueOf(e)); } } else { - try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), - config.getPassword(), config.getSSHProperties())) { + try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), + config.getUserName(), config.getPassword(), config.getSSHProperties())) { sftpCopyLogic(fileSystem, destination, SFTPConnector, context); } catch (Exception e) { LOG.error(String.valueOf(e)); @@ -184,14 +176,10 @@ public void run(ActionContext context) throws Exception { } } - public void sftpCopyLogic(FileSystem fileSystem, Path destination, SFTPConnector SFTPConnector, - ActionContext context) throws SftpException, IOException { - + ActionContext context) throws SftpException, IOException { ChannelSftp channelSftp = SFTPConnector.getSftpChannel(); - Vector files = channelSftp.ls(config.getSrcDirectory()); - List filesCopied = new ArrayList<>(); for (int index = 0; index < files.size(); index++) { Object obj = files.elementAt(index); @@ -203,7 +191,6 @@ public void sftpCopyLogic(FileSystem fileSystem, Path destination, SFTPConnector // ignore "." and ".." files continue; } - // Ignore files that don't match the given file regex if (!Strings.isNullOrEmpty(config.fileNameRegex)) { String fileName = entry.getFilename(); @@ -212,10 +199,8 @@ public void sftpCopyLogic(FileSystem fileSystem, Path destination, SFTPConnector continue; } } - LOG.info("Downloading file {}", entry.getFilename()); String completeFileName = config.getSrcDirectory() + "/" + entry.getFilename(); - if (config.getExtractZipFiles() && entry.getFilename().endsWith(".zip")) { copyJschZip(channelSftp.get(completeFileName), fileSystem, destination); } else { @@ -245,4 +230,4 @@ private void copyJschZip(InputStream is, FileSystem fs, Path destination) throws } } } -} +} \ No newline at end of file diff --git a/src/main/java/io/cdap/plugin/SFTPDeleteAction.java b/src/main/java/io/cdap/plugin/SFTPDeleteAction.java index 85e0ca8..4e17482 100644 --- a/src/main/java/io/cdap/plugin/SFTPDeleteAction.java +++ b/src/main/java/io/cdap/plugin/SFTPDeleteAction.java @@ -1,3 +1,19 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package io.cdap.plugin; import com.jcraft.jsch.SftpException; @@ -20,10 +36,8 @@ @Plugin(type = Action.PLUGIN_TYPE) @Name("SFTPDelete") public class SFTPDeleteAction extends Action { - private static final Logger LOG = LoggerFactory.getLogger(SFTPDeleteAction.class); private SFTPDeleteActionConfig config; - public SFTPDeleteAction(SFTPDeleteActionConfig config) { this.config = config; } @@ -73,7 +87,6 @@ public void run(ActionContext context) throws Exception { if (config.getAuthTypeBeingUsed().equals("privatekey-select")) { try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), config.getPrivateKey(), config.getPassphrase(), config.getSSHProperties())) { - sftpDeleteLogic(filesToDelete, SFTPConnector); } catch (Exception e){ LOG.error(String.valueOf(e)); @@ -81,7 +94,6 @@ public void run(ActionContext context) throws Exception { } else { try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), config.getPassword(), config.getSSHProperties())) { - sftpDeleteLogic(filesToDelete, SFTPConnector); } catch (Exception e){ LOG.error(String.valueOf(e)); diff --git a/src/main/java/io/cdap/plugin/SFTPPutAction.java b/src/main/java/io/cdap/plugin/SFTPPutAction.java index 5cd362c..36d6b73 100644 --- a/src/main/java/io/cdap/plugin/SFTPPutAction.java +++ b/src/main/java/io/cdap/plugin/SFTPPutAction.java @@ -1,5 +1,5 @@ /* - * Copyright © 2017 Cask Data, Inc. + * Copyright © 2019 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -33,7 +33,6 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.io.InputStream; import javax.annotation.Nullable; @@ -45,9 +44,7 @@ @Name("SFTPPut") public class SFTPPutAction extends Action { private static final Logger LOG = LoggerFactory.getLogger(SFTPPutAction.class); - private SFTPPutActionConfig config; - public SFTPPutAction(SFTPPutActionConfig config){ this.config = config; } @@ -57,7 +54,6 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); config.validate(); } - /** * Configurations for the SFTP put action plugin. */ @@ -88,7 +84,7 @@ public String getFileNameRegex() { } public SFTPPutActionConfig(String host, int port, String userName, String password, - String sshProperties, String srcPath, String destDirectory, String authType){ + String sshProperties, String srcPath, String destDirectory, String authType){ this.host = host; this.port = port; this.userName = userName; @@ -99,9 +95,6 @@ public SFTPPutActionConfig(String host, int port, String userName, String passwo this.authTypeBeingUsed = authType; } - /** - * Validates the config parameters required for unloading the data. - */ public void validate() throws IllegalArgumentException { // Check for required parameters // Check for required params for each action @@ -115,7 +108,6 @@ public void run(ActionContext context) throws Exception { if (!fileSystem.exists(source)) { throw new RuntimeException(String.format("Source Path doesn't exist at %s", source)); } - if (config.getAuthTypeBeingUsed().equals("privatekey-select")) { try (SFTPConnector sftp = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), config .getPrivateKey(), config.getPassphrase(), config.getSSHProperties())) { @@ -133,23 +125,20 @@ public void run(ActionContext context) throws Exception { } } - private void sftpPutLogic(FileSystem fileSystem, Path source, SFTPConnector sftp) throws SftpException, IOException { + private void sftpPutLogic(FileSystem fileSystem, Path source, SFTPConnector sftp) + throws SftpException, IOException { ChannelSftp channel = sftp.getSftpChannel(); - try { channel.mkdir(config.getDestDirectory()); } catch (SftpException ex) { // Suppress since the directory might already exist. } - channel.cd(config.getDestDirectory()); - // Filter out only the files to copy FileStatus[] filesToCopy = fileSystem.listStatus(source, path -> { String fileName = path.getName(); return fileName.matches(config.getFileNameRegex()); }); - for (FileStatus file : filesToCopy) { Path filePath = file.getPath(); try (InputStream inputStream = fileSystem.open(filePath)) { @@ -157,4 +146,4 @@ private void sftpPutLogic(FileSystem fileSystem, Path source, SFTPConnector sftp } } } -} +} \ No newline at end of file diff --git a/src/main/java/io/cdap/plugin/common/SFTPActionConfig.java b/src/main/java/io/cdap/plugin/common/SFTPActionConfig.java index 2dfd24f..91dd1ef 100644 --- a/src/main/java/io/cdap/plugin/common/SFTPActionConfig.java +++ b/src/main/java/io/cdap/plugin/common/SFTPActionConfig.java @@ -1,5 +1,5 @@ /* - * Copyright © 2017 Cask Data, Inc. + * Copyright © 2019 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -21,6 +21,7 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.dataset.lib.KeyValue; import io.cdap.cdap.api.plugin.PluginConfig; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; @@ -73,9 +74,7 @@ public String getHost() { return host; } - public int getPort() { - return (port != null) ? port : 22; - } + public int getPort() { return (port != null) ? port : 22; } public String getUserName() { return userName; @@ -85,19 +84,12 @@ public String getPassword() { return password; } - public byte[] getPrivateKey() { - assert privateKey != null; - return privateKey.getBytes(); - } + public byte[] getPrivateKey() { return privateKey.getBytes(StandardCharsets.UTF_8); } public String getAuthTypeBeingUsed() { return authTypeBeingUsed; } public byte[] getPassphrase(){ - if (passphrase == null){ - passphrase = ""; - } - return passphrase.getBytes(); - } + return passphrase == null ? new byte[0] : passphrase.getBytes(StandardCharsets.UTF_8); } public Map getSSHProperties(){ Map properties = new HashMap<>(); @@ -106,7 +98,6 @@ public Map getSSHProperties(){ if (sshProperties == null || sshProperties.isEmpty()) { return properties; } - KeyValueListParser kvParser = new KeyValueListParser("\\s*,\\s*", ":"); for (KeyValue keyVal : kvParser.parse(sshProperties)) { String key = keyVal.getKey(); diff --git a/src/main/java/io/cdap/plugin/common/SFTPConnector.java b/src/main/java/io/cdap/plugin/common/SFTPConnector.java index 5d256f5..981964a 100644 --- a/src/main/java/io/cdap/plugin/common/SFTPConnector.java +++ b/src/main/java/io/cdap/plugin/common/SFTPConnector.java @@ -1,5 +1,5 @@ /* - * Copyright © 2017 Cask Data, Inc. + * Copyright © 2019 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -34,11 +34,9 @@ public class SFTPConnector implements AutoCloseable { private static Channel channel; private final Session session; - //Connector Object to be used for Auth with Password - public SFTPConnector(String host, int port, String userName, String password, - Map sessionProperties) - throws Exception { + public SFTPConnector(String host, int port, String userName, String password, + Map sessionProperties) throws Exception { JSch jsch = new JSch(); this.session = jsch.getSession(userName, host, port); session.setPassword(password); @@ -51,12 +49,11 @@ public SFTPConnector(String host, int port, String userName, String password, channel = session.openChannel("sftp"); channel.connect(); } - // Connector Object to be used for Auth with SSH privatekey. + // Connector Object to be used for Auth with SSH PrivateKey. public SFTPConnector(String host, int port, String userName, byte[] privateKey, - byte[] passphrase, Map sessionProperties) - throws Exception { + byte[] passphrase, Map sessionProperties) throws Exception { JSch jsch = new JSch(); - jsch.addIdentity("key", privateKey,null,passphrase); + jsch.addIdentity("key", privateKey,null, passphrase); this.session = jsch.getSession(userName, host, port); LOG.info("Properties {}", sessionProperties); Properties properties = new Properties(); @@ -67,13 +64,10 @@ public SFTPConnector(String host, int port, String userName, byte[] privateKey, channel = session.openChannel("sftp"); channel.connect(); } - /** * Get the established sftp channel to perform operations. */ - public static ChannelSftp getSftpChannel() { - return (ChannelSftp) channel; - } + public static ChannelSftp getSftpChannel() { return (ChannelSftp) channel; } @Override public void close() { @@ -85,7 +79,6 @@ public void close() { LOG.warn("Error while disconnecting sftp channel.", t); } } - if (session != null) { try { session.disconnect(); diff --git a/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java b/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java index 9679a1b..8350986 100644 --- a/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java +++ b/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java @@ -1,30 +1,39 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package io.cdap.plugin; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; - import io.cdap.cdap.etl.mock.action.MockActionContext; import io.cdap.cdap.etl.mock.common.MockPipelineConfigurer; import org.junit.After; import org.junit.Before; - - import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import software.sham.sftp.MockSftpServer; - import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Properties; - - public class SFTPCopyActionTest { - MockSftpServer server; Session sshSession; @@ -55,23 +64,15 @@ public void stopSftp() throws IOException { public void testCopyFile() throws Exception { File tempFile = tempFolder.newFile(); Files.write(tempFile.toPath(), "test".getBytes(StandardCharsets.UTF_8)); - String sourcePath = tempFile.getAbsoluteFile().toString(); String destPath = server.getBaseDirectory().toString(); SFTPCopyAction.SFTPCopyActionConfig config = new SFTPCopyAction.SFTPCopyActionConfig( - "localhost", - 9022, - "tester", - "testing", - "", - sourcePath, - destPath, - "password"); + "localhost", 9022, "tester", "testing", "", + sourcePath, destPath, "password"); MockPipelineConfigurer configurer = new MockPipelineConfigurer(null); new SFTPCopyAction(config).configurePipeline(configurer); new SFTPCopyAction(config).run(new MockActionContext()); } - } diff --git a/src/main/test/io.cdap.plugin/SFTPDeleteTest.java b/src/main/test/io.cdap.plugin/SFTPDeleteTest.java index b24d7c9..8258ebb 100644 --- a/src/main/test/io.cdap.plugin/SFTPDeleteTest.java +++ b/src/main/test/io.cdap.plugin/SFTPDeleteTest.java @@ -1,64 +1,67 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package io.cdap.plugin; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; - import io.cdap.cdap.etl.mock.action.MockActionContext; import io.cdap.cdap.etl.mock.common.MockPipelineConfigurer; import org.junit.After; import org.junit.Before; - - import org.junit.Test; import software.sham.sftp.MockSftpServer; - import java.io.IOException; import java.util.Properties; - - public class SFTPDeleteTest { - MockSftpServer server; Session sshSession; @Before public void initSftp() throws IOException { - server = new MockSftpServer(9022); + server = new MockSftpServer(9022); } @Before public void initSshClient() throws JSchException { - JSch jsch = new JSch(); - sshSession = jsch.getSession("tester", "localhost", 9022); - Properties config = new Properties(); - config.setProperty("StrictHostKeyChecking", "no"); - sshSession.setConfig(config); - sshSession.setPassword("testing"); - sshSession.connect(); + JSch jsch = new JSch(); + sshSession = jsch.getSession("tester", "localhost", 9022); + Properties config = new Properties(); + config.setProperty("StrictHostKeyChecking", "no"); + sshSession.setConfig(config); + sshSession.setPassword("testing"); + sshSession.connect(); } @After public void stopSftp() throws IOException { - server.stop(); + server.stop(); } - @Test - public void testCopyFile() throws Exception { - SFTPDeleteAction.SFTPDeleteActionConfig config = new SFTPDeleteAction.SFTPDeleteActionConfig( - "localhost", - 9022, - "tester", - "testing", - "", - "", - "password"); - MockPipelineConfigurer configurer = new MockPipelineConfigurer(null); - new SFTPDeleteAction(config).configurePipeline(configurer); - new SFTPDeleteAction(config).run(new MockActionContext()); + @Test + public void testCopyFile() throws Exception { + SFTPDeleteAction.SFTPDeleteActionConfig config = new SFTPDeleteAction.SFTPDeleteActionConfig( + "localhost", 9022, "tester", "testing", + "", "", "password"); + MockPipelineConfigurer configurer = new MockPipelineConfigurer(null); + new SFTPDeleteAction(config).configurePipeline(configurer); + new SFTPDeleteAction(config).run(new MockActionContext()); } - } diff --git a/src/main/test/io.cdap.plugin/SFTPPutActionTest.java b/src/main/test/io.cdap.plugin/SFTPPutActionTest.java index 0ed1211..1a17f7a 100644 --- a/src/main/test/io.cdap.plugin/SFTPPutActionTest.java +++ b/src/main/test/io.cdap.plugin/SFTPPutActionTest.java @@ -1,5 +1,20 @@ -package io.cdap.plugin; +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; @@ -9,7 +24,6 @@ import org.junit.*; import org.junit.rules.TemporaryFolder; import software.sham.sftp.MockSftpServer; - import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -18,10 +32,7 @@ import java.util.Map; import java.util.Properties; - - public class SFTPPutActionTest { - MockSftpServer server; Session sshSession; @@ -57,18 +68,11 @@ public void testPutFile() throws Exception { Files.write(tempFile.toPath(), "test".getBytes(StandardCharsets.UTF_8)); String destPath = server.getBaseDirectory().toString(); SFTPPutAction.SFTPPutActionConfig config = new SFTPPutAction.SFTPPutActionConfig( - "localhost", - 9022, - "tester", - "testing", - "", - tempFile.toString(), - destPath, - "password"); + "localhost", 9022, "tester", "testing", + "", tempFile.toString(), destPath, "password"); MockPipelineConfigurer configurer = new MockPipelineConfigurer(null); new SFTPPutAction(config).configurePipeline(configurer); new SFTPPutAction(config).run(new MockActionContext()); } - } diff --git a/widgets/SCPRemote-action.json b/widgets/SCPRemote-action.json index 452d07f..7a49e36 100644 --- a/widgets/SCPRemote-action.json +++ b/widgets/SCPRemote-action.json @@ -31,7 +31,6 @@ "label": "PrivateKey Passphrase", "name": "passphrase" }, - { "label": "Compression Flag", "name": "compressionFlag", @@ -96,13 +95,13 @@ "properties": [ { "widget-type": "textbox", - "label": "Host A User Name", - "name": "userNameA" + "label": "Host A Hostname", + "name": "hostA" }, { "widget-type": "textbox", - "label": "Host A Hostname", - "name": "hostA" + "label": "Host A User Name", + "name": "userNameA" }, { "widget-type": "textbox", @@ -111,19 +110,18 @@ } ] }, - { "label": "Remote Host B Properties", "properties": [ { "widget-type": "textbox", - "label": "Host B User Name", - "name": "userNameB" + "label": "Host B Hostname", + "name": "hostB" }, { "widget-type": "textbox", - "label": "Host B Hostname", - "name": "hostB" + "label": "Host B User Name", + "name": "userNameB" }, { "widget-type": "textbox", diff --git a/widgets/SFTPCopy-action.json b/widgets/SFTPCopy-action.json index e463d0c..1f1601b 100644 --- a/widgets/SFTPCopy-action.json +++ b/widgets/SFTPCopy-action.json @@ -77,7 +77,6 @@ "label": "Regex to match files that needs to be copied", "name" : "fileNameRegex" }, - { "widget-type": "keyvalue", "label": "Properties for SSH", @@ -135,4 +134,4 @@ ] } ] - } +}