diff --git a/kite-data/kite-data-gcs/README.md b/kite-data/kite-data-gcs/README.md new file mode 100644 index 0000000000..5faa4a92af --- /dev/null +++ b/kite-data/kite-data-gcs/README.md @@ -0,0 +1,10 @@ +Kite Data GCS Module +-------- + +To test, is needed to configure bucket: + + export GCS_BUCKET=test-bucket + +To test outside GCE VM: + + export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json diff --git a/kite-data/kite-data-gcs/pom.xml b/kite-data/kite-data-gcs/pom.xml new file mode 100644 index 0000000000..8c6ee43530 --- /dev/null +++ b/kite-data/kite-data-gcs/pom.xml @@ -0,0 +1,200 @@ + + + + + 4.0.0 + kite-data-gcs + + + org.kitesdk + kite-data + 1.1.1-SNAPSHOT + + + Kite Data GCS Module + + The Kite Data GCS module provides tools for storing Kite datasets in Google Cloud Storage. + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-shade-plugin + + + org.apache.maven.plugins + maven-source-plugin + + + org.apache.maven.plugins + maven-javadoc-plugin + + + org.apache.rat + apache-rat-plugin + + + org.codehaus.mojo + findbugs-maven-plugin + + + org.codehaus.mojo + findbugs-maven-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + ${test.google.gcs.bucket} + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + compile + + run + + + + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-project-info-reports-plugin + + + false + + index + summary + dependency-info + dependencies + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + + + + + + org.kitesdk + kite-data-core + ${project.parent.version} + + + + + org.kitesdk + ${artifact.hadoop-deps} + pom + provided + + + org.kitesdk + kite-hadoop-compatibility + + + + + org.apache.avro + avro + + + + com.google.cloud.bigdataoss + gcs-connector + ${vers.hadoop-gcs} + provided + + + + + org.slf4j + slf4j-api + + + com.google.guava + guava + ${vers.guava.gcs} + + + com.google.code.findbugs + jsr305 + provided + true + + + com.google.code.findbugs + annotations + provided + + + + + + junit + junit + test + + + + org.kitesdk + kite-data-core + ${project.parent.version} + test-jar + test + + + + org.kitesdk + ${artifact.hadoop-test-deps} + pom + test + + + + diff --git a/kite-data/kite-data-gcs/src/main/java/org/kitesdk/data/spi/gcs/Loader.java b/kite-data/kite-data-gcs/src/main/java/org/kitesdk/data/spi/gcs/Loader.java new file mode 100644 index 0000000000..e2c2fe4f8a --- /dev/null +++ b/kite-data/kite-data-gcs/src/main/java/org/kitesdk/data/spi/gcs/Loader.java @@ -0,0 +1,127 @@ +/* + * Copyright 2015 Cloudera. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.kitesdk.data.spi.gcs; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.kitesdk.data.DatasetIOException; +import org.kitesdk.data.DatasetOperationException; +import org.kitesdk.data.spi.DatasetRepository; +import org.kitesdk.data.spi.DefaultConfiguration; +import org.kitesdk.data.spi.Loadable; +import org.kitesdk.data.spi.OptionBuilder; +import org.kitesdk.data.spi.Registration; +import org.kitesdk.data.spi.URIPattern; +import org.kitesdk.data.spi.filesystem.FileSystemDatasetRepository; + +/** + * A Loader implementation to register URIs for GS. + */ +public class Loader implements Loadable { + + private static final int UNSPECIFIED_PORT = -1; + private static final String GOOGLE_HADOOP_FILE_SYSTEM = "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"; + + /** + * This class builds configured instances of + * {@code FileSystemDatasetRepository} from a Map of options. This is for the + * URI system. + */ + private static class URIBuilder implements OptionBuilder { + + @Override + public DatasetRepository getFromOptions(Map match) { + String path = match.get("path"); + final Path root = (path == null || path.isEmpty()) ? + new Path("/") : new Path("/", path); + + Configuration conf = DefaultConfiguration.get(); + FileSystem fs; + try { + fs = FileSystem.get(fileSystemURI(match), conf); + } catch (IOException e) { + // "Incomplete HDFS URI, no host" => add a helpful suggestion + if (e.getMessage().startsWith("Incomplete")) { + throw new DatasetIOException("Could not get a FileSystem: " + + "make sure the credentials for " + match.get(URIPattern.SCHEME) + + " URIs are configured.", e); + } + throw new DatasetIOException("Could not get a FileSystem", e); + } + return new FileSystemDatasetRepository.Builder() + .configuration(new Configuration(conf)) // make a modifiable copy + .rootDirectory(fs.makeQualified(root)) + .build(); + } + } + + @Override + public void load() { + try { + // load hdfs-site.xml by loading HdfsConfiguration + Configuration config = DefaultConfiguration.get(); + config.set("fs.gs.impl", GOOGLE_HADOOP_FILE_SYSTEM); + config.set("fs.AbstractFileSystem.gs.impl", GOOGLE_HADOOP_FILE_SYSTEM); + config.set("google.cloud.auth.service.account.enable", "true"); + + FileSystem.getLocal(config); + } catch (IOException e) { + throw new DatasetIOException("Cannot load default config", e); + } + + OptionBuilder builder = new URIBuilder(); + + // username and secret are the same; host is the bucket + Registration.register( + new URIPattern("gs:/*path"), + new URIPattern("gs:/*path/:namespace/:dataset"), + builder); + } + + private static URI fileSystemURI(Map match) { + final String userInfo; + if (match.containsKey(URIPattern.USERNAME)) { + if (match.containsKey(URIPattern.PASSWORD)) { + userInfo = match.get(URIPattern.USERNAME) + ":" + + match.get(URIPattern.PASSWORD); + } else { + userInfo = match.get(URIPattern.USERNAME); + } + } else { + userInfo = null; + } + try { + int port = UNSPECIFIED_PORT; + if (match.containsKey(URIPattern.PORT)) { + try { + port = Integer.parseInt(match.get(URIPattern.PORT)); + } catch (NumberFormatException e) { + port = UNSPECIFIED_PORT; + } + } + return new URI(match.get(URIPattern.SCHEME), userInfo, + match.get(URIPattern.HOST), port, "/", null, null); + } catch (URISyntaxException ex) { + throw new DatasetOperationException("[BUG] Could not build FS URI", ex); + } + } +} diff --git a/kite-data/kite-data-gcs/src/main/resources/META-INF/services/org.kitesdk.data.spi.Loadable b/kite-data/kite-data-gcs/src/main/resources/META-INF/services/org.kitesdk.data.spi.Loadable new file mode 100644 index 0000000000..c3c0053988 --- /dev/null +++ b/kite-data/kite-data-gcs/src/main/resources/META-INF/services/org.kitesdk.data.spi.Loadable @@ -0,0 +1,16 @@ +# +# Copyright 2015 Cloudera Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.kitesdk.data.spi.gcs.Loader diff --git a/kite-data/kite-data-gcs/src/test/java/org/kitesdk/data/spi/gcs/TestGCSDataset.java b/kite-data/kite-data-gcs/src/test/java/org/kitesdk/data/spi/gcs/TestGCSDataset.java new file mode 100644 index 0000000000..72facbcb52 --- /dev/null +++ b/kite-data/kite-data-gcs/src/test/java/org/kitesdk/data/spi/gcs/TestGCSDataset.java @@ -0,0 +1,99 @@ +/* + * Copyright 2015 Cloudera Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.kitesdk.data.spi.gcs; + +import com.google.common.collect.Lists; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.junit.*; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.DatasetReader; +import org.kitesdk.data.DatasetWriter; +import org.kitesdk.data.Datasets; +import org.kitesdk.data.spi.DefaultConfiguration; + +public class TestGCSDataset { + private static final String GOOGLE_HADOOP_FILE_SYSTEM = "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"; + private static final String BUCKET = System.getProperty("test.google.gcs.bucket"); + + private static Configuration original = null; + + @BeforeClass + public static void addCredentials() { + original = DefaultConfiguration.get(); + Configuration conf = DefaultConfiguration.get(); + if (BUCKET != null) { + conf.set("fs.gs.impl", GOOGLE_HADOOP_FILE_SYSTEM); + conf.set("fs.AbstractFileSystem.gs.impl", GOOGLE_HADOOP_FILE_SYSTEM); + conf.set("google.cloud.auth.service.account.enable", "true"); + } + DefaultConfiguration.set(conf); + } + + @AfterClass + public static void resetConfiguration() { + DefaultConfiguration.set(original); + } + + @Test + public void testBasicGCS() { + // only run this test if bucket defined + Assume.assumeTrue(BUCKET != null && !BUCKET.isEmpty()); + + String uri = "dataset:gs://" + BUCKET + "/ns/test"; + + // make sure the dataset doesn't already exist + Datasets.delete(uri); + + DatasetDescriptor descriptor = new DatasetDescriptor.Builder() + .schemaLiteral("\"string\"") + .build(); + + Dataset dataset = Datasets.create(uri, descriptor, String.class); + + List expected = Lists.newArrayList("a", "b", "time"); + DatasetWriter writer = null; + try { + writer = dataset.newWriter(); + for (String s : expected) { + writer.write(s); + } + } finally { + if (writer != null) { + writer.close(); + } + } + + DatasetReader reader = null; + try { + reader = dataset.newReader(); + Assert.assertEquals("Should match written strings", + expected, Lists.newArrayList((Iterator) reader)); + } finally { + if (reader != null) { + reader.close(); + } + } + + // clean up + Datasets.delete(uri); + } + +} diff --git a/kite-data/pom.xml b/kite-data/pom.xml index 0fd2bf752f..1e7486a0ea 100644 --- a/kite-data/pom.xml +++ b/kite-data/pom.xml @@ -27,6 +27,7 @@ kite-data-oozie kite-data-hive kite-data-s3 + kite-data-gcs kite-data-crunch kite-data-flume kite-data-hbase diff --git a/pom.xml b/pom.xml index 3f2915c502..1dc25a4e24 100644 --- a/pom.xml +++ b/pom.xml @@ -140,6 +140,7 @@ 2.0.1 1.4.0 11.0.2 + 22.0 1.2.1 2.6.0 2.0.0-cdh${cdh4.version} @@ -181,6 +182,7 @@ 1.35 1.0.0 2.6.0 + hadoop2-1.9.10 1.4 @@ -215,6 +217,7 @@ ${env.AWS_ACCESS_KEY_ID} ${env.AWS_SECRET_ACCESS_KEY} ${env.S3_BUCKET} + ${env.GCS_BUCKET} @@ -780,6 +783,12 @@ ${vers.hadoop-aws} provided + + com.google.cloud.bigdataoss + gcs-connector + ${vers.hadoop-gcs} + provided + @@ -1248,6 +1257,7 @@ 0.10.0-cdh${cdh4.version} 3.3.2-cdh${cdh4.version} +