Skip to content

Commit 2a88c90

Browse files
author
Chengfeng Mao
authored
Merge pull request #12 from hydrator/feature/add-dependencies-spark-comp
Add dependencies property for spark compute
2 parents a8891a2 + a8adfc8 commit 2a88c90

File tree

4 files changed

+82
-35
lines changed

4 files changed

+82
-35
lines changed

src/main/java/co/cask/hydrator/plugin/spark/dynamic/ScalaSparkCompute.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,21 @@
3434
import org.apache.spark.api.java.JavaRDD;
3535
import org.apache.spark.api.java.function.Function;
3636
import org.apache.spark.rdd.RDD;
37-
import org.apache.spark.sql.DataFrame;
3837
import org.apache.spark.sql.Row;
3938
import org.apache.spark.sql.SQLContext;
4039
import org.apache.spark.sql.types.DataType;
4140
import org.apache.spark.sql.types.StructType;
4241
import org.slf4j.Logger;
4342
import org.slf4j.LoggerFactory;
4443

44+
import java.io.File;
4545
import java.io.IOException;
4646
import java.io.PrintWriter;
4747
import java.io.StringWriter;
4848
import java.lang.reflect.Method;
4949
import java.lang.reflect.ParameterizedType;
5050
import java.lang.reflect.Type;
51+
import java.nio.file.Files;
5152
import javax.annotation.Nullable;
5253

5354
/**
@@ -96,10 +97,16 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws Ille
9697
throw new IllegalArgumentException("Unable to parse output schema " + config.getSchema(), e);
9798
}
9899

99-
if (!config.containsMacro("scalaCode") && Boolean.TRUE.equals(config.getDeployCompile())) {
100+
if (!config.containsMacro("scalaCode") && !config.containsMacro("dependencies")
101+
&& Boolean.TRUE.equals(config.getDeployCompile())) {
100102
SparkInterpreter interpreter = SparkCompilers.createInterpreter();
101103
if (interpreter != null) {
104+
File dir = null;
102105
try {
106+
if (config.getDependencies() != null) {
107+
dir = Files.createTempDirectory("sparkprogram").toFile();
108+
SparkCompilers.addDependencies(dir, interpreter, config.getDependencies());
109+
}
103110
// We don't need the actual stage name as this only happen in deployment time for compilation check.
104111
String className = generateClassName("dummy");
105112
interpreter.compile(generateSourceClass(className));
@@ -114,6 +121,10 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws Ille
114121

115122
} catch (CompilationFailureException e) {
116123
throw new IllegalArgumentException(e.getMessage(), e);
124+
} catch (IOException e) {
125+
throw new RuntimeException(e);
126+
} finally {
127+
SparkCompilers.deleteDir(dir);
117128
}
118129
}
119130
}
@@ -123,8 +134,16 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws Ille
123134
public void initialize(SparkExecutionPluginContext context) throws Exception {
124135
String className = generateClassName(context.getStageName());
125136
interpreter = context.createSparkInterpreter();
137+
File dir = config.getDependencies() == null ? null : Files.createTempDirectory("sparkprogram").toFile();
138+
try {
139+
if (config.getDependencies() != null) {
140+
SparkCompilers.addDependencies(dir, interpreter, config.getDependencies());
141+
}
126142
interpreter.compile(generateSourceClass(className));
127143
method = getTransformMethod(interpreter.getClassLoader(), className);
144+
} finally {
145+
SparkCompilers.deleteDir(dir);
146+
}
128147
isDataFrame = method.getParameterTypes()[0].equals(DATAFRAME_TYPE);
129148
takeContext = method.getParameterTypes().length == 2;
130149

@@ -329,6 +348,16 @@ public static final class Config extends PluginConfig {
329348
@Macro
330349
private final String scalaCode;
331350

351+
@Description(
352+
"Extra dependencies for the Spark program. " +
353+
"It is a ',' separated list of URI for the location of dependency jars. " +
354+
"A path can be ended with an asterisk '*' as a wildcard, in which all files with extension '.jar' under the " +
355+
"parent path will be included."
356+
)
357+
@Macro
358+
@Nullable
359+
private final String dependencies;
360+
332361
@Description("The schema of output objects. If no schema is given, it is assumed that the output schema is " +
333362
"the same as the input schema.")
334363
@Nullable
@@ -340,9 +369,11 @@ public static final class Config extends PluginConfig {
340369
@Nullable
341370
private final Boolean deployCompile;
342371

343-
public Config(String scalaCode, @Nullable String schema, @Nullable Boolean deployCompile) {
372+
public Config(String scalaCode, @Nullable String schema, @Nullable String dependencies,
373+
@Nullable Boolean deployCompile) {
344374
this.scalaCode = scalaCode;
345375
this.schema = schema;
376+
this.dependencies = dependencies;
346377
this.deployCompile = deployCompile;
347378
}
348379

@@ -355,6 +386,11 @@ public String getSchema() {
355386
return schema;
356387
}
357388

389+
@Nullable
390+
public String getDependencies() {
391+
return dependencies;
392+
}
393+
358394
@Nullable
359395
public Boolean getDeployCompile() {
360396
return deployCompile;

src/main/java/co/cask/hydrator/plugin/spark/dynamic/ScalaSparkProgram.java

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,7 @@
3535
import java.io.IOException;
3636
import java.lang.reflect.Method;
3737
import java.lang.reflect.Modifier;
38-
import java.nio.file.FileVisitResult;
3938
import java.nio.file.Files;
40-
import java.nio.file.Path;
41-
import java.nio.file.SimpleFileVisitor;
42-
import java.nio.file.attribute.BasicFileAttributes;
4339
import java.util.concurrent.Callable;
4440
import javax.annotation.Nullable;
4541

@@ -79,7 +75,7 @@ public ScalaSparkProgram(Config config) throws CompilationFailureException, IOEx
7975
getMethodCallable(interpreter.getClassLoader(), config.getMainClass(), null);
8076
}
8177
} finally {
82-
deleteDir(dir);
78+
SparkCompilers.deleteDir(dir);
8379
}
8480
} finally {
8581
interpreter.close();
@@ -98,7 +94,7 @@ public void run(JavaSparkExecutionContext sec) throws Exception {
9894
interpreter.compile(config.getScalaCode());
9995
getMethodCallable(interpreter.getClassLoader(), config.getMainClass(), sec).call();
10096
} finally {
101-
deleteDir(dir);
97+
SparkCompilers.deleteDir(dir);
10298
}
10399
}
104100

@@ -166,32 +162,6 @@ public Void call() throws Exception {
166162
}
167163
}
168164

169-
/**
170-
* Recursively delete a directory.
171-
*/
172-
public static void deleteDir(@Nullable File dir) {
173-
if (dir == null) {
174-
return;
175-
}
176-
try {
177-
Files.walkFileTree(dir.toPath(), new SimpleFileVisitor<Path>() {
178-
@Override
179-
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
180-
Files.deleteIfExists(file);
181-
return FileVisitResult.CONTINUE;
182-
}
183-
184-
@Override
185-
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
186-
Files.deleteIfExists(dir);
187-
return FileVisitResult.CONTINUE;
188-
}
189-
});
190-
} catch (IOException e) {
191-
LOG.warn("Failed to cleanup temporary directory {}", dir, e);
192-
}
193-
}
194-
195165
/**
196166
* Plugin configuration
197167
*/

src/main/java/co/cask/hydrator/plugin/spark/dynamic/SparkCompilers.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.hadoop.fs.LocatedFileStatus;
2525
import org.apache.hadoop.fs.Path;
2626
import org.apache.hadoop.fs.RemoteIterator;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729
import scala.Function0;
2830
import scala.Option$;
2931
import scala.collection.JavaConversions;
@@ -44,7 +46,10 @@
4446
import java.net.URI;
4547
import java.net.URISyntaxException;
4648
import java.net.URL;
49+
import java.nio.file.FileVisitResult;
4750
import java.nio.file.Files;
51+
import java.nio.file.SimpleFileVisitor;
52+
import java.nio.file.attribute.BasicFileAttributes;
4853
import java.util.ArrayList;
4954
import java.util.Collection;
5055
import java.util.Collections;
@@ -57,6 +62,8 @@
5762
*/
5863
public final class SparkCompilers {
5964

65+
private static final Logger LOG = LoggerFactory.getLogger(SparkCompilers.class);
66+
6067
private static final FilenameFilter JAR_FILE_FILTER = new FilenameFilter() {
6168
@Override
6269
public boolean accept(File dir, String name) {
@@ -214,4 +221,30 @@ private static void copyPathAndAdd(FileSystem fs, Path from, File dir, Collectio
214221
private SparkCompilers() {
215222
// no-op
216223
}
224+
225+
/**
226+
* Recursively delete a directory.
227+
*/
228+
public static void deleteDir(@Nullable File dir) {
229+
if (dir == null) {
230+
return;
231+
}
232+
try {
233+
Files.walkFileTree(dir.toPath(), new SimpleFileVisitor<java.nio.file.Path>() {
234+
@Override
235+
public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException {
236+
Files.deleteIfExists(file);
237+
return FileVisitResult.CONTINUE;
238+
}
239+
240+
@Override
241+
public FileVisitResult postVisitDirectory(java.nio.file.Path dir, IOException exc) throws IOException {
242+
Files.deleteIfExists(dir);
243+
return FileVisitResult.CONTINUE;
244+
}
245+
});
246+
} catch (IOException e) {
247+
LOG.warn("Failed to cleanup temporary directory {}", dir, e);
248+
}
249+
}
217250
}

widgets/ScalaSparkCompute-sparkcompute.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@
1515
"default": "/**\n * Transforms the provided input Apache Spark RDD or DataFrame into another RDD or DataFrame.\n *\n * The input DataFrame has the same schema as the input schema to this stage and the transform method should return a DataFrame that has the same schema as the output schema setup for this stage.\n * To emit logs, use: \n * import org.slf4j.LoggerFactory\n * val logger = LoggerFactory.getLogger('mylogger')\n * logger.info('Logging')\n *\n *\n * @param input the input DataFrame which has the same schema as the input schema to this stage.\n * @param context a SparkExecutionPluginContext object that can be used to emit zero or more records (using the emitter.emit() method) or errors (using the emitter.emitError() method) \n * @param context an object that provides access to:\n * 1. CDAP Datasets and Streams - context.fromDataset('counts'); or context.fromStream('input');\n * 2. Original Spark Context - context.getSparkContext();\n * 3. Runtime Arguments - context.getArguments.get('priceThreshold')\n */\ndef transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame = {\n df\n}"
1616
}
1717
},
18+
{
19+
"widget-type": "dsv",
20+
"label": "Dependencies",
21+
"name": "dependencies",
22+
"widget-attributes": {
23+
"delimiter": ","
24+
}
25+
},
1826
{
1927
"widget-type": "select",
2028
"label": "Compile at Deployment Time",

0 commit comments

Comments
 (0)