diff --git a/persistence/sensorhub-storage-es-rest/README.md b/persistence/sensorhub-storage-es-rest/README.md new file mode 100644 index 000000000..dec7f0b41 --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/README.md @@ -0,0 +1,107 @@ +# Elasticsearch storage implementation + +This is a storage module allowing one to store and retrieve data to/from an elasticsearch V6+ server. It uses the elasticsearch + Java High Level REST Client [link](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html). + +The difference with the repository sensorhub-storage-es is: + + - Java High Level REST Client, which executes HTTP requests rather than serialized Java requests + - Convert data frame into ElasticSearch data component instead of obfuscating into a Java Serialized Object. (except for the OSH MetaData) + + The driver that create ElasticSearch index may not support your DataComponent, in this case create an issue with the specification of your unsupported DataComponent. (Missing fields) + +Three main OSH interfaces have been implemented: +1. IRecordStorageModule +2. IObsStorageModule +3. IMultiSourceStorage + +## Main classes + +An iterator wrapper class has been used to wrap scroll response without specify the scroll id every times. The ESIterator takes care +about making new requests with the specify scrollID when it necessary. + +A bulk processor is in charge of sending create/update/delete requests. The BulkProcessor class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period. + +Some settings are available through the ESBasicStorageConfig class: +- clusterName: ES cluster name +- user: ElasticSearch user for authentication (leave blank if not required) +- password: ElasticSearch password for authentication +- autoRefresh: Refresh store on commit. Require indices:admin/refresh rights +- filterByStorageId: Multiple storage instance can use the same index. If the filtering is disabled this driver will see all sensors (should be used only for read-only SOS service) +- certificatesPath: List of additional SSL certificates for ElasticSearch connection +- nodeUrls: list of nodes under the format : +- indexNamePrepend: String to add in index name before the data name +- indexNameMetaData: Index name of the OpenSensorHub metadata +- scrollMaxDuration: When scrolling, the maximum duration ScrollableResults will be usable if no other results are fetched from, in ms +- scrollFetchSize: When scrolling, the number of results fetched by each Elasticsearch call +- connectTimeout: Determines the timeout in milliseconds until a connection is established. A timeout value of zero is interpreted as an infinite timeout. +- socketTimeout: Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the timeout for waiting for data or, put differently, a maximum period inactivity between two consecutive data packets). +- maxRetryTimeout: Sets the maximum timeout (in milliseconds) to honour in case of multiple retries of the same request. +- bulkConcurrentRequests: Set the number of concurrent requests +- bulkActions: execute the bulk every n requests +- bulkSize: flush the bulk every n mb +- bulkFlushInterval: flush the bulk every n seconds whatever the number of requests +- maxBulkRetry: Bulk insertion may fail, client will resend in case of TimeOut exception. Retry is disabled by default in order to avoid overflow of ElasticSearch cluster + +A special parser into this driver will create appropriate default Elastic Search index mapping for each OSH DataComponent. +You can override this mapping using Elastic Search tools. (ex. Kibana) + +## Mappings + +There are the different mappings depending on the storage used: + +1. Open Sensor Hub specific metadata +```json +{ + "mapping": { + "osh_metadata": { + "properties": { + "blob": { + "type": "binary" + }, + "index": { + "type": "keyword" + }, + "metadataType": { + "type": "keyword" + }, + "storageID": { + "type": "keyword" + }, + "timestamp": { + "type": "date", + "format": "epoch_millis" + } + } + } + } +``` + +2. Sensor Location +```json +{ + "mapping": { + "sensorLocation": { + "dynamic": "false", + "properties": { + "location": { + "type": "geo_point" + }, + "location_height": { + "type": "double" + }, + "producerID": { + "type": "keyword" + }, + "storageID": { + "type": "keyword" + }, + "timestamp": { + "type": "date", + "format": "epoch_millis" + } + } + } + } +} +``` diff --git a/persistence/sensorhub-storage-es-rest/build.gradle b/persistence/sensorhub-storage-es-rest/build.gradle new file mode 100644 index 000000000..48800cedb --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/build.gradle @@ -0,0 +1,48 @@ +description = 'Elastic search Storage' +ext.details = 'Storage based on Elastic search database' +version = '1.0.1-SNAPSHOT' + +sourceCompatibility = 1.8 +targetCompatibility = 1.8 + +dependencies { + compile 'org.sensorhub:sensorhub-core:' + oshCoreVersion + compile 'org.elasticsearch.client:elasticsearch-rest-high-level-client:6.8.3' + compile 'com.esotericsoftware:kryo:4.0.0' + + + compile "org.locationtech.spatial4j:spatial4j:0.7" + compile "org.locationtech.jts:jts-core:1.15.0" + + testCompile group:'org.sensorhub', name:'sensorhub-core', version:oshCoreVersion, configuration: 'testArtifacts' +} + +// only run tests using mock DB automatically +// exclude tests requiring connection to the database; these have to be run manually +test { + exclude '**/integration/*.class' +} + +// add info to OSGi manifest +jar { + manifest { + instruction 'Bundle-Vendor', 'Sensia Software LLC' + instruction 'Bundle-Activator', 'org.sensorhub.process.math.Activator' + } +} + +// add info to maven pom +ext.pom >>= { + developers { + developer { + id 'mdhsl' + name 'Mathieu Dhainaut' + } + developer { + id 'nicolas-f' + name 'Nicolas Fortin' + organization 'UGE - UMRAE' + } + } +} + diff --git a/persistence/sensorhub-storage-es-rest/src/main/java/org/elasticsearch/client/CompressedClient.java b/persistence/sensorhub-storage-es-rest/src/main/java/org/elasticsearch/client/CompressedClient.java new file mode 100644 index 000000000..0ceaf49c9 --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/java/org/elasticsearch/client/CompressedClient.java @@ -0,0 +1,212 @@ +package org.elasticsearch.client; + +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.client.entity.EntityBuilder; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.message.BasicHeader; +import org.apache.http.protocol.HTTP; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.seqno.SequenceNumbers; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.zip.GZIPOutputStream; + +import static java.util.Collections.emptySet; +import static org.elasticsearch.client.RequestConverters.createContentType; +import static org.elasticsearch.client.RequestConverters.enforceSameContentType; + +public class CompressedClient extends RestHighLevelClient { + private static final String GZIP_CODEC = "gzip"; + + + public CompressedClient(RestClientBuilder restClientBuilder) { + super(restClientBuilder); + } + + public CompressedClient(RestClientBuilder restClientBuilder, List namedXContentEntries) { + super(restClientBuilder, namedXContentEntries); + } + + public CompressedClient(RestClient restClient, CheckedConsumer doClose, List namedXContentEntries) { + super(restClient, doClose, namedXContentEntries); + } + + + /** + * Asynchronously executes a bulk request using the Bulk API + * + * See Bulk API on elastic.co + */ + public final void bulkCompressedAsync(BulkRequest bulkRequest, ActionListener listener, Header... headers) { + performRequestAsyncAndParseEntity(bulkRequest, CompressedClient::bulkCompressed, BulkResponse::fromXContent, listener, emptySet(), headers); + } + + + static Request bulkCompressed(BulkRequest bulkRequest) throws IOException { + + + Request request = new Request(HttpPost.METHOD_NAME, "/_bulk"); + + RequestConverters.Params parameters = new RequestConverters.Params(request); + parameters.withTimeout(bulkRequest.timeout()); + parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy()); + parameters.withPipeline(bulkRequest.pipeline()); + parameters.withRouting(bulkRequest.routing()); + // Bulk API only supports newline delimited JSON or Smile. Before executing + // the bulk, we need to check that all requests have the same content-type + // and this content-type is supported by the Bulk API. + XContentType bulkContentType = null; + for (int i = 0; i < bulkRequest.numberOfActions(); i++) { + DocWriteRequest action = bulkRequest.requests().get(i); + + DocWriteRequest.OpType opType = action.opType(); + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { + bulkContentType = enforceSameContentType((IndexRequest) action, bulkContentType); + + } else if (opType == DocWriteRequest.OpType.UPDATE) { + UpdateRequest updateRequest = (UpdateRequest) action; + if (updateRequest.doc() != null) { + bulkContentType = enforceSameContentType(updateRequest.doc(), bulkContentType); + } + if (updateRequest.upsertRequest() != null) { + bulkContentType = enforceSameContentType(updateRequest.upsertRequest(), bulkContentType); + } + } + } + + if (bulkContentType == null) { + bulkContentType = XContentType.JSON; + } + + final byte separator = bulkContentType.xContent().streamSeparator(); + final ContentType requestContentType = createContentType(bulkContentType); + + ByteArrayOutputStream content = new ByteArrayOutputStream(); + for (DocWriteRequest action : bulkRequest.requests()) { + DocWriteRequest.OpType opType = action.opType(); + + try (XContentBuilder metadata = XContentBuilder.builder(bulkContentType.xContent())) { + metadata.startObject(); + { + metadata.startObject(opType.getLowercase()); + if (Strings.hasLength(action.index())) { + metadata.field("_index", action.index()); + } + if (Strings.hasLength(action.type())) { + metadata.field("_type", action.type()); + } + if (Strings.hasLength(action.id())) { + metadata.field("_id", action.id()); + } + if (Strings.hasLength(action.routing())) { + metadata.field("routing", action.routing()); + } + if (Strings.hasLength(action.parent())) { + metadata.field("parent", action.parent()); + } + if (action.version() != Versions.MATCH_ANY) { + metadata.field("version", action.version()); + } + + VersionType versionType = action.versionType(); + if (versionType != VersionType.INTERNAL) { + if (versionType == VersionType.EXTERNAL) { + metadata.field("version_type", "external"); + } else if (versionType == VersionType.EXTERNAL_GTE) { + metadata.field("version_type", "external_gte"); + } else if (versionType == VersionType.FORCE) { + metadata.field("version_type", "force"); + } + } + + if (action.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + metadata.field("if_seq_no", action.ifSeqNo()); + metadata.field("if_primary_term", action.ifPrimaryTerm()); + } + + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { + IndexRequest indexRequest = (IndexRequest) action; + if (Strings.hasLength(indexRequest.getPipeline())) { + metadata.field("pipeline", indexRequest.getPipeline()); + } + } else if (opType == DocWriteRequest.OpType.UPDATE) { + UpdateRequest updateRequest = (UpdateRequest) action; + if (updateRequest.retryOnConflict() > 0) { + metadata.field("retry_on_conflict", updateRequest.retryOnConflict()); + } + if (updateRequest.fetchSource() != null) { + metadata.field("_source", updateRequest.fetchSource()); + } + } + metadata.endObject(); + } + metadata.endObject(); + + BytesRef metadataSource = BytesReference.bytes(metadata).toBytesRef(); + content.write(metadataSource.bytes, metadataSource.offset, metadataSource.length); + content.write(separator); + } + + BytesRef source = null; + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { + IndexRequest indexRequest = (IndexRequest) action; + BytesReference indexSource = indexRequest.source(); + XContentType indexXContentType = indexRequest.getContentType(); + + try (XContentParser parser = XContentHelper.createParser( + /* + * EMPTY and THROW are fine here because we just call + * copyCurrentStructure which doesn't touch the + * registry or deprecation. + */ + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + indexSource, indexXContentType)) { + try (XContentBuilder builder = XContentBuilder.builder(bulkContentType.xContent())) { + builder.copyCurrentStructure(parser); + source = BytesReference.bytes(builder).toBytesRef(); + } + } + } else if (opType == DocWriteRequest.OpType.UPDATE) { + source = XContentHelper.toXContent((UpdateRequest) action, bulkContentType, false).toBytesRef(); + } + + if (source != null) { + content.write(source.bytes, source.offset, source.length); + content.write(separator); + } + } + + byte[] original = content.toByteArray(); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(original.length); + GZIPOutputStream gzip = new GZIPOutputStream(byteArrayOutputStream); + gzip.write(original); + gzip.finish(); + ByteArrayEntity entity = new ByteArrayEntity(byteArrayOutputStream.toByteArray(), requestContentType); + entity.setContentEncoding(new BasicHeader(HTTP.CONTENT_ENCODING, GZIP_CODEC)); + request.setEntity(entity); + return request; + } +} diff --git a/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/DataStreamInfo.java b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/DataStreamInfo.java new file mode 100644 index 000000000..9578bfb6d --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/DataStreamInfo.java @@ -0,0 +1,56 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2016 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es; + +import org.sensorhub.api.persistence.IRecordStoreInfo; +import net.opengis.swe.v20.DataComponent; +import net.opengis.swe.v20.DataEncoding; + + +public class DataStreamInfo implements IRecordStoreInfo +{ + String name; + DataComponent recordDescription; + DataEncoding recommendedEncoding; + + + public DataStreamInfo(String name, DataComponent recordDescription, DataEncoding recommendedEncoding) + { + this.name = name; + this.recordDescription = recordDescription; + this.recommendedEncoding = recommendedEncoding; + } + + + @Override + public String getName() + { + return name; + } + + + @Override + public DataComponent getRecordDescription() + { + return recordDescription; + } + + + @Override + public DataEncoding getRecommendedEncoding() + { + return recommendedEncoding; + } +} diff --git a/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESBasicStorageConfig.java b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESBasicStorageConfig.java new file mode 100644 index 000000000..f11ca169a --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESBasicStorageConfig.java @@ -0,0 +1,102 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2015 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es; + +import java.util.*; + +import org.sensorhub.api.config.DisplayInfo; +import org.sensorhub.api.config.DisplayInfo.Required; + +/** + *

+ * Configuration class for ES basic storage + *

+ * + * @author Mathieu Dhainaut + * @author Nicolas Fortin + * @since 2017 + */ +public class ESBasicStorageConfig extends org.sensorhub.api.persistence.ObsStorageConfig { + + public static final String DEFAULT_INDEX_NAME_METADATA = "osh_meta_record_store"; + + @DisplayInfo(desc="ES cluster name") + public String clusterName = "elasticsearch"; + + @DisplayInfo(desc = "ElasticSearch user for authentication (leave blank if not required)") + public String user = ""; + + @DisplayInfo(desc = "Refresh store on commit. Require indices:admin/refresh rights") + public boolean autoRefresh = true; + + @DisplayInfo(desc = "Multiple storage instance can use the same index. If the filtering is disabled this driver will see all sensors (should be used only for read-only SOS service)") + public boolean filterByStorageId = true; + + @DisplayInfo(desc="List of additional SSL certificates", label = "Certificates") + public List certificatesPath = new ArrayList<>(); + + @DisplayInfo(desc = "ElasticSearch password for authentication") + @DisplayInfo.FieldType(DisplayInfo.FieldType.Type.PASSWORD) + public String password = ""; + + @DisplayInfo(desc="List of nodes") + public List nodeUrls = new ArrayList<>(Arrays.asList("localhost:9200","localhost:9201")); + + @DisplayInfo(desc="String to add in index name before the data name") + public String indexNamePrepend = ""; + + @DisplayInfo(desc="Index name of the OpenSensorHub metadata") + public String indexNameMetaData = DEFAULT_INDEX_NAME_METADATA; + + @DisplayInfo(desc="When scrolling, the maximum duration ScrollableResults will be usable if no other results are fetched from, in ms") + public int scrollMaxDuration = 6000; + + @DisplayInfo(desc="MWhen scrolling, the number of results fetched by each Elasticsearch call") + public int scrollFetchSize = 10; + + @DisplayInfo(desc="Determines the timeout in milliseconds until a connection is established. A timeout value of zero is interpreted as an infinite timeout.") + @DisplayInfo.ValueRange(min = 0) + public int connectTimeout = 5000; + + @DisplayInfo(desc="Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the timeout for waiting for data or, put differently, a maximum period inactivity between two consecutive data packets).") + @DisplayInfo.ValueRange(min = 0) + public int socketTimeout = 60000; + + @DisplayInfo(desc="Sets the maximum timeout (in milliseconds) to honour in case of multiple retries of the same request.") + @DisplayInfo.ValueRange(min = 0) + public int maxRetryTimeout = 60000; + + @DisplayInfo(desc="Set the number of concurrent requests") + public int bulkConcurrentRequests = 10; + + @DisplayInfo(desc="We want to execute the bulk every n requests") + public int bulkActions = 10000; + + @DisplayInfo(desc="We want to flush the bulk every n mb") + public int bulkSize = 10; + + @DisplayInfo(desc="We want to flush the bulk every n seconds whatever the number of requests") + public int bulkFlushInterval = 10; + + @Override + public void setStorageIdentifier(String name) + { + indexNamePrepend = name; + } + + @DisplayInfo(desc = "Bulk insertion may fail, client will resend in case of TimeOut exception. Retry is disabled by default in order to avoid overflow of ElasticSearch cluster") + public int maxBulkRetry = 0; + +} diff --git a/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESBasicStorageDescriptor.java b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESBasicStorageDescriptor.java new file mode 100644 index 000000000..a3b8e80e5 --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESBasicStorageDescriptor.java @@ -0,0 +1,53 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2015 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es; + +import org.sensorhub.api.module.IModule; +import org.sensorhub.api.module.IModuleProvider; +import org.sensorhub.api.module.ModuleConfig; +import org.sensorhub.impl.module.JarModuleProvider; + +/** + *

+ * Descriptor of ES basic storage module. + * This is needed for automatic discovery by the ModuleRegistry. + *

+ * + * @author Mathieu Dhainaut + * @since 2017 + */ +public class ESBasicStorageDescriptor extends JarModuleProvider implements IModuleProvider{ + + @Override + public String getModuleName() { + return "ElasticSearch Basic Record Storage"; + } + + @Override + public String getModuleDescription() { + return "Generic implementation of record storage using ElasticSearch Database"; + } + + @Override + public Class> getModuleClass() { + return ESBasicStorageImpl.class; + } + + @Override + public Class getModuleConfigClass() { + return ESBasicStorageConfig.class; + } + +} diff --git a/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESBasicStorageImpl.java b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESBasicStorageImpl.java new file mode 100644 index 000000000..ee0d96f0e --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESBasicStorageImpl.java @@ -0,0 +1,1731 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2016 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es; + +import net.opengis.gml.v32.AbstractTimeGeometricPrimitive; +import net.opengis.gml.v32.TimeInstant; +import net.opengis.gml.v32.TimePeriod; +import net.opengis.sensorml.v20.AbstractProcess; +import net.opengis.swe.v20.*; +import net.opengis.swe.v20.Vector; +import org.apache.commons.codec.binary.Base64; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.client.CompressedClient; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.*; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.api.persistence.DataKey; +import org.sensorhub.api.persistence.IDataFilter; +import org.sensorhub.api.persistence.IDataRecord; +import org.sensorhub.api.persistence.IObsStorage; +import org.sensorhub.api.persistence.IRecordStorageModule; +import org.sensorhub.api.persistence.IStorageModule; +import org.sensorhub.api.persistence.StorageException; +import org.sensorhub.impl.SensorHub; +import org.sensorhub.impl.module.AbstractModule; +import org.sensorhub.impl.module.ModuleRegistry; +import org.sensorhub.impl.persistence.StorageUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.vast.swe.SWEConstants; +import org.vast.swe.SWEHelper; +import org.vast.swe.ScalarIndexer; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.Boolean; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.SocketTimeoutException; +import java.net.URL; +import java.net.URLEncoder; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyStore; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptySet; + +/** + *

+ * ES implementation of {@link IObsStorage} for storing observations. + * This class is Thread-safe. + *

+ * + * @author Mathieu Dhainaut + * @author Nicolas Fortin, UMRAE Ifsttar + * @since 2017 + */ +public class ESBasicStorageImpl extends AbstractModule implements IRecordStorageModule { + private static final int TIME_RANGE_CLUSTER_SCROLL_FETCH_SIZE = 5000; + + protected static final double MAX_TIME_CLUSTER_DELTA = 60.0; + + private static final int DELAY_RETRY_START = 10000; + + // ms .Fetch again record store map if it is done at least this time + private static final int RECORD_STORE_CACHE_LIFETIME = 5000; + + // From ElasticSearch v6, multiple index type is not supported + // v7 index type dropped + protected static final String INDEX_METADATA_TYPE = "osh_metadata"; + + protected static final String STORAGE_ID_FIELD_NAME = "storageID"; + + // The data index, serialization of OpenSensorHub internals metadata + protected static final String METADATA_TYPE_FIELD_NAME = "metadataType"; + + protected static final String DATA_INDEX_FIELD_NAME = "index"; + + protected static final String RS_KEY_SEPARATOR = "##"; + + protected static final String BLOB_FIELD_NAME = "blob"; + + private static final int WAIT_TIME_AFTER_COMMIT = 1000; + + public static final String Z_FIELD = "_height"; + + // Last time the store has been changed, may require waiting if data changed since less than a second + long storeChanged = 0; + + private List addedIndex = new ArrayList<>(); + + private Map recordStoreCache = new HashMap<>(); + + protected static final double[] ALL_TIMES = new double[] {Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY}; + /** + * Class logger + */ + private static final Logger log = LoggerFactory.getLogger(ESBasicStorageImpl.class); + + /** + * The TransportClient connects remotely to an Elasticsearch cluster using the transport module. + * It does not join the cluster, but simply gets one or more initial transport addresses and communicates + * with them in round robin fashion on each action (though most actions will probably be "two hop" operations). + */ + RestHighLevelClient client; + + private BulkProcessor bulkProcessor; + + // Kinds of OpenSensorHub serialized objects + protected static final String METADATA_TYPE_DESCRIPTION = "desc"; + protected static final String METADATA_TYPE_RECORD_STORE = "info"; + + String indexNamePrepend; + String indexNameMetaData; + + BulkListener bulkListener; + + + public ESBasicStorageImpl() { + // default constructor + } + + public ESBasicStorageImpl(RestHighLevelClient client) { + this.client = client; + } + + @Override + public void backup(OutputStream os) throws IOException { + throw new UnsupportedOperationException("Backup"); + } + + public RestHighLevelClient getClient() { + return client; + } + + @Override + public void restore(InputStream is) throws IOException { + throw new UnsupportedOperationException("Restore"); + } + + @Override + public void commit() { + refreshIndex(); + // https://www.elastic.co/guide/en/elasticsearch/guide/current/near-real-time.html + // document changes are not visible to search immediately, but will become visible within 1 second. + long now = System.currentTimeMillis(); + if(now - storeChanged < WAIT_TIME_AFTER_COMMIT) { + try { + Thread.sleep(WAIT_TIME_AFTER_COMMIT); + } catch (InterruptedException ignored) { + } + } + } + + @Override + public void rollback() { + throw new UnsupportedOperationException("Rollback"); + } + + @Override + public void sync(IStorageModule storage) throws StorageException { + throw new UnsupportedOperationException("Storage Sync"); + } + + @Override + public void init() { + this.indexNamePrepend = (config.indexNamePrepend != null) ? config.indexNamePrepend : ""; + this.indexNameMetaData = (config.indexNameMetaData != null && !config.indexNameMetaData.isEmpty()) ? config.indexNameMetaData : ESBasicStorageConfig.DEFAULT_INDEX_NAME_METADATA; + } + + @Override + public synchronized void start() throws SensorHubException { + log.info("ESBasicStorageImpl:start"); + + // init transport client + HttpHost[] hosts = new HttpHost[config.nodeUrls.size()]; + boolean foundOneHost = false; + while(!foundOneHost) { + int i=0; + for (String nodeUrl : config.nodeUrls) { + long tryStart = System.currentTimeMillis(); + int retry = 0; + while (true) { + try { + URL url = null; + // : + if (nodeUrl.startsWith("http")) { + url = new URL(nodeUrl); + } else { + url = new URL("http://" + nodeUrl); + } + + hosts[i++] = new HttpHost(InetAddress.getByName(url.getHost()), url.getPort(), url.getProtocol()); + foundOneHost = true; + break; + } catch (MalformedURLException | UnknownHostException e) { + retry++; + if (retry < config.maxBulkRetry) { + try { + Thread.sleep(Math.max(1000, config.connectTimeout - (System.currentTimeMillis() - tryStart))); + } catch (InterruptedException ex) { + throw new SensorHubException("Cannot initialize transport address", e); + } + } else { + getLogger().error(String.format("Cannot initialize transport address after %d retries", retry), e); + break; + } + } + } + } + if(!foundOneHost) { + try { + Thread.sleep(DELAY_RETRY_START); + } catch (InterruptedException ex) { + throw new SensorHubException("Cannot initialize transport address", ex); + } + } + } + + RestClientBuilder restClientBuilder = RestClient.builder(hosts); + + // Handle authentication + restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> { + if(!config.user.isEmpty()) { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(config.user, config.password)); + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + if(!config.certificatesPath.isEmpty()) { + + try { + KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); + Path ksPath = Paths.get(System.getProperty("java.home"), + "lib", "security", "cacerts"); + if(Files.exists(ksPath)) { + keyStore.load(Files.newInputStream(ksPath), + "changeit".toCharArray()); + } + + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + for(String filePath : config.certificatesPath) { + File file = new File(filePath); + if(file.exists()) { + try (InputStream caInput = new BufferedInputStream( + // this files is shipped with the application + new FileInputStream(file))) { + Certificate crt = cf.generateCertificate(caInput); + getLogger().info("Added Cert for " + ((X509Certificate) crt) + .getSubjectDN()); + + keyStore.setCertificateEntry(file.getName(), crt); + } + } else { + getLogger().warn("Could not find certificate " + filePath); + } + } + TrustManagerFactory tmf = TrustManagerFactory + .getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(keyStore); + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, tmf.getTrustManagers(), null); + httpClientBuilder.setSSLContext(sslContext); + } catch (Exception e) { + getLogger().error(e.getLocalizedMessage(), e); + } + } + return httpClientBuilder; + } + ); + + restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { + @Override + public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) { + return requestConfigBuilder.setConnectTimeout(config.connectTimeout).setSocketTimeout(config.socketTimeout); + } + }); + + restClientBuilder.setMaxRetryTimeoutMillis(config.maxRetryTimeout); + + client = new CompressedClient(restClientBuilder); + + bulkListener = new BulkListener(client, config.maxBulkRetry, config.scrollMaxDuration); + + bulkProcessor = BulkProcessor.builder(client instanceof CompressedClient ? ((CompressedClient)client)::bulkCompressedAsync : client::bulkAsync, bulkListener).setBulkActions(config.bulkActions) + .setBulkSize(new ByteSizeValue(config.bulkSize, ByteSizeUnit.MB)) + .setFlushInterval(TimeValue.timeValueSeconds(config.bulkFlushInterval)) + .setConcurrentRequests(config.bulkConcurrentRequests) + .build(); + + // Check if metadata mapping must be defined + GetIndexRequest getIndexRequest = new GetIndexRequest(); + getIndexRequest.indices(indexNameMetaData); + try { + if(!client.indices().exists(getIndexRequest)) { + createMetaMapping(); + } + } catch (IOException ex) { + log.error("Cannot create metadata mapping", ex); + } + + // Retrieve store info + + initStoreInfo(); + } + + void createMetaMappingProperties(XContentBuilder builder) throws IOException { + builder.startObject(STORAGE_ID_FIELD_NAME); + { + builder.field("type", "keyword"); + } + builder.endObject(); + builder.startObject(METADATA_TYPE_FIELD_NAME); + { + builder.field("type", "keyword"); + } + builder.endObject(); + builder.startObject(DATA_INDEX_FIELD_NAME); + { + builder.field("type", "keyword"); + } + builder.endObject(); + builder.startObject(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME); + { + builder.field("type", "date"); + builder.field("format", "epoch_millis"); + } + builder.endObject(); + builder.startObject(BLOB_FIELD_NAME); + { + builder.field("type", "binary"); + } + builder.endObject(); + } + + void createMetaMapping () throws IOException { + // create the index + CreateIndexRequest indexRequest = new CreateIndexRequest(indexNameMetaData); + XContentBuilder builder = XContentFactory.jsonBuilder(); + + builder.startObject(); + { + builder.startObject(INDEX_METADATA_TYPE); + { + builder.startObject("properties"); + { + createMetaMappingProperties(builder); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + + indexRequest.mapping(INDEX_METADATA_TYPE, builder); + + client.indices().create(indexRequest); + + addedIndex.add(indexNameMetaData); + } + + @Override + public synchronized void stop() throws SensorHubException { + log.info("ESBasicStorageImpl:stop"); + if(client != null) { + try { + client.close(); + } catch (IOException ex) { + throw new SensorHubException(ex.getLocalizedMessage(), ex); + } + } + } + + @Override + public AbstractProcess getLatestDataSourceDescription() { + AbstractProcess result = null; + SearchRequest searchRequest = new SearchRequest(indexNameMetaData); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + BoolQueryBuilder query = QueryBuilders.boolQuery() + .must(new TermQueryBuilder(METADATA_TYPE_FIELD_NAME, METADATA_TYPE_DESCRIPTION)); + if(config.filterByStorageId) { + query.must(QueryBuilders.termQuery(STORAGE_ID_FIELD_NAME, config.id)); + } + searchSourceBuilder.query(query); + searchSourceBuilder.sort(new FieldSortBuilder(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME).order(SortOrder.DESC)); + searchSourceBuilder.size(1); + searchRequest.source(searchSourceBuilder); + + try { + SearchResponse response = client.search(searchRequest); + if(response.getHits().getTotalHits() > 0) { + result = this.getObject(response.getHits().getAt(0).getSourceAsMap().get(BLOB_FIELD_NAME)); + } + } catch (IOException | ElasticsearchStatusException ex) { + log.error("getRecordStores failed", ex); + } + + return result; + } + + @Override + public List getDataSourceDescriptionHistory(double startTime, double endTime) { + List results = new ArrayList<>(); + + SearchRequest searchRequest = new SearchRequest(indexNameMetaData); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + BoolQueryBuilder query = QueryBuilders.boolQuery() + .must(new TermQueryBuilder(METADATA_TYPE_FIELD_NAME, METADATA_TYPE_DESCRIPTION)) + .must(new RangeQueryBuilder(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME).from(ESDataStoreTemplate.toEpochMillisecond(startTime)).to(ESDataStoreTemplate.toEpochMillisecond(endTime)).format("epoch_millis")); + + + if(config.filterByStorageId) { + query.must(QueryBuilders.termQuery(STORAGE_ID_FIELD_NAME, config.id)); + } + + searchSourceBuilder.query(query); + searchSourceBuilder.sort(new FieldSortBuilder(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME).order(SortOrder.DESC)); + searchSourceBuilder.size(config.scrollFetchSize); + searchRequest.source(searchSourceBuilder); + searchRequest.scroll(TimeValue.timeValueMillis(config.scrollMaxDuration)); + try { + SearchResponse response = client.search(searchRequest); + do { + String scrollId = response.getScrollId(); + for (SearchHit hit : response.getHits()) { + results.add(this.getObject(hit.getSourceAsMap().get(BLOB_FIELD_NAME))); + } + if (response.getHits().getHits().length > 0) { + SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); + scrollRequest.scroll(TimeValue.timeValueMillis(config.scrollMaxDuration)); + response = client.searchScroll(scrollRequest); + } + } while (response.getHits().getHits().length > 0); + + } catch (IOException | ElasticsearchStatusException ex) { + log.error("getRecordStores failed", ex); + } + + return results; + } + + @Override + public AbstractProcess getDataSourceDescriptionAtTime(double time) { + + + AbstractProcess result = null; + SearchRequest searchRequest = new SearchRequest(indexNameMetaData); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + BoolQueryBuilder query = QueryBuilders.boolQuery() + .must(new TermQueryBuilder(METADATA_TYPE_FIELD_NAME, METADATA_TYPE_DESCRIPTION)) + .must(new RangeQueryBuilder(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME).from(0).to(Double.valueOf(time * 1000).longValue())); + + + if(config.filterByStorageId) { + query.must(QueryBuilders.termQuery(STORAGE_ID_FIELD_NAME, config.id)); + } + + searchSourceBuilder.query(query); + searchSourceBuilder.sort(new FieldSortBuilder(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME).order(SortOrder.DESC)); + searchSourceBuilder.size(1); + searchRequest.source(searchSourceBuilder); + + try { + SearchResponse response = client.search(searchRequest); + if(response.getHits().getTotalHits() > 0) { + result = this.getObject(response.getHits().getAt(0).getSourceAsMap().get(BLOB_FIELD_NAME)); + } + } catch (IOException | ElasticsearchStatusException ex) { + log.error("getRecordStores failed", ex); + } + + return result; + } + + protected boolean storeDataSourceDescription(AbstractProcess process, double time, boolean update) { + // add new record storage + byte[] bytes = this.getBlob(process); + + try { + XContentBuilder builder = XContentFactory.jsonBuilder(); + // Convert to elastic search epoch millisecond + long epoch = ESDataStoreTemplate.toEpochMillisecond(time); + builder.startObject(); + { + builder.field(STORAGE_ID_FIELD_NAME, config.id); + builder.field(METADATA_TYPE_FIELD_NAME, METADATA_TYPE_DESCRIPTION); + builder.field(DATA_INDEX_FIELD_NAME, ""); + builder.field(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME, epoch); + builder.field(BLOB_FIELD_NAME, bytes); + } + builder.endObject(); + + IndexRequest request = new IndexRequest(indexNameMetaData, INDEX_METADATA_TYPE, config.id + "_" + epoch); + + request.source(builder); + + bulkProcessor.add(request); + + } catch (IOException ex) { + getLogger().error(String.format("storeDataSourceDescription exception %s in elastic search driver", process.getId()), ex); + } + return true; + } + + + protected boolean storeDataSourceDescription(AbstractProcess process, boolean update) { + + boolean ok = false; + + if (process.getNumValidTimes() > 0) { + // we add the description in index for each validity period/instant + for (AbstractTimeGeometricPrimitive validTime : process.getValidTimeList()) { + double time = Double.NaN; + + if (validTime instanceof TimeInstant) + time = ((TimeInstant) validTime).getTimePosition().getDecimalValue(); + else if (validTime instanceof TimePeriod) + time = ((TimePeriod) validTime).getBeginPosition().getDecimalValue(); + + if (!Double.isNaN(time)) + ok = storeDataSourceDescription(process, time, update); + } + } else { + double time = System.currentTimeMillis() / 1000.; + ok = storeDataSourceDescription(process, time, update); + } + + return ok; + } + + @Override + public void storeDataSourceDescription(AbstractProcess process) { + storeDataSourceDescription(process, false); + } + + @Override + public void updateDataSourceDescription(AbstractProcess process) { + storeDataSourceDescription(process, true); + } + + @Override + public void removeDataSourceDescription(double time) { + long epoch = ESDataStoreTemplate.toEpochMillisecond(time); + DeleteRequest deleteRequest = new DeleteRequest(indexNameMetaData, METADATA_TYPE_DESCRIPTION, config.id + "_" + epoch); + bulkProcessor.add(deleteRequest); + + storeChanged = System.currentTimeMillis(); + } + + @Override + public void removeDataSourceDescriptionHistory(double startTime, double endTime) { + try { + // Delete by query, currently not supported by High Level Api + BoolQueryBuilder query = QueryBuilders.boolQuery().must( + QueryBuilders.termQuery(METADATA_TYPE_FIELD_NAME, METADATA_TYPE_DESCRIPTION)) + .must(new RangeQueryBuilder(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME) + .from(ESDataStoreTemplate.toEpochMillisecond(startTime)) + .to(ESDataStoreTemplate.toEpochMillisecond(endTime)).format("epoch_millis")); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + XContentBuilder builder = XContentFactory.jsonBuilder(bos); + builder.startObject(); + builder.rawField("query", new ByteArrayInputStream(query.toString().getBytes(StandardCharsets.UTF_8)), XContentType.JSON); + builder.endObject(); + builder.flush(); + String json = bos.toString("UTF-8"); + HttpEntity entity = new NStringEntity(json, ContentType.APPLICATION_JSON); + client.getLowLevelClient().performRequest("POST" + , encodeEndPoint(indexNameMetaData, "_delete_by_query") + , Collections.EMPTY_MAP, entity); + + storeChanged = System.currentTimeMillis(); + + } catch (IOException ex) { + log.error("Failed to removeRecords", ex); + } + } + + void initStoreInfo() { + + Map result = new HashMap<>(); + + SearchRequest searchRequest = new SearchRequest(indexNameMetaData); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + BoolQueryBuilder query = QueryBuilders.boolQuery() + .must(new TermQueryBuilder(METADATA_TYPE_FIELD_NAME, METADATA_TYPE_RECORD_STORE)); + + + if(config.filterByStorageId) { + query.must(QueryBuilders.termQuery(STORAGE_ID_FIELD_NAME, config.id)); + } + + searchSourceBuilder.query(query); + // Default to 10 results + searchSourceBuilder.size(config.scrollFetchSize); + searchRequest.source(searchSourceBuilder); + searchRequest.scroll(TimeValue.timeValueMillis(config.scrollMaxDuration)); + try { + SearchResponse response = client.search(searchRequest); + do { + String scrollId = response.getScrollId(); + for (SearchHit hit : response.getHits()) { + Map dataMap = hit.getSourceAsMap(); + EsRecordStoreInfo rsInfo = getObject(dataMap.get(BLOB_FIELD_NAME)); // DataStreamInfo + result.put(rsInfo.getName(), rsInfo); + } + if (response.getHits().getHits().length > 0) { + SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); + scrollRequest.scroll(TimeValue.timeValueMillis(config.scrollMaxDuration)); + response = client.searchScroll(scrollRequest); + } + } while (response.getHits().getHits().length > 0); + + } catch (IOException | ElasticsearchStatusException ex) { + log.error("getRecordStores failed", ex); + } + + recordStoreCache = result; + } + + @Override + public Map getRecordStores() { + return recordStoreCache; + } + + /** + * Index in Elastic Search are restricted + * @param indexName Index name to remove undesired chars + * @return valid index for es + */ + String fixIndexName(String indexName) { + for(Character chr : Strings.INVALID_FILENAME_CHARS) { + indexName = indexName.replace(chr.toString(), ""); + } + indexName = indexName.replace("#", ""); + while(indexName.startsWith("_") || indexName.startsWith("-") || indexName.startsWith("+")) { + indexName = indexName.substring(1, indexName.length()); + } + return indexName.toLowerCase(Locale.ROOT); + } + + @Override + public void addRecordStore(String name, DataComponent recordStructure, DataEncoding recommendedEncoding) { + EsRecordStoreInfo rsInfo = new EsRecordStoreInfo(name,fixIndexName(indexNamePrepend + recordStructure.getName()), + recordStructure, recommendedEncoding); + + recordStoreCache.put(rsInfo.name, rsInfo); + + // add new record storage + byte[] bytes = this.getBlob(rsInfo); + + try { + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + { + // Convert to elastic search epoch millisecond + builder.field(STORAGE_ID_FIELD_NAME, config.id); + builder.field(METADATA_TYPE_FIELD_NAME, METADATA_TYPE_RECORD_STORE); + builder.field(DATA_INDEX_FIELD_NAME, rsInfo.getIndexName()); // store recordType + builder.field(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME, System.currentTimeMillis()); + builder.field(BLOB_FIELD_NAME, bytes); + } + builder.endObject(); + IndexRequest request = new IndexRequest(indexNameMetaData, INDEX_METADATA_TYPE); + + request.source(builder); + + client.index(request); + + // Check if metadata mapping must be defined + GetIndexRequest getIndexRequest = new GetIndexRequest(); + getIndexRequest.indices(rsInfo.indexName); + try { + if(!client.indices().exists(getIndexRequest)) { + createDataMapping(rsInfo); + } + } catch (IOException ex) { + getLogger().error("Cannot create metadata mapping", ex); + } + + storeChanged = System.currentTimeMillis(); + + refreshIndex(); + + + } catch (IOException ex) { + getLogger().error(String.format("addRecordStore exception %s:%s in elastic search driver",name, recordStructure.getName()), ex); + } + } + + @Override + public int getNumRecords(String recordType) { + commit(); + + Map recordStoreInfoMap = getRecordStores(); + EsRecordStoreInfo info = recordStoreInfoMap.get(recordType); + if(info != null) { + SearchRequest searchRequest = new SearchRequest(info.indexName); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0); + searchRequest.source(searchSourceBuilder); + if(config.filterByStorageId) { + searchSourceBuilder.query(new BoolQueryBuilder().must(QueryBuilders.termQuery(STORAGE_ID_FIELD_NAME, config.id))); + } + + try { + SearchResponse response = client.search(searchRequest); + try { + return Math.toIntExact(response.getHits().getTotalHits()); + } catch (ArithmeticException ex) { + getLogger().error("Too many records"); + return Integer.MAX_VALUE; + } + } catch (IOException | ElasticsearchStatusException ex) { + log.error("getRecordStores failed", ex); + } + } + return 0; + } + + @Override + public synchronized double[] getRecordsTimeRange(String recordType) { + double[] result = new double[2]; + + Map recordStoreInfoMap = getRecordStores(); + EsRecordStoreInfo info = recordStoreInfoMap.get(recordType); + if(info != null) { + SearchRequest searchRequest = new SearchRequest(info.indexName); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.matchAllQuery()); + searchSourceBuilder.sort(new FieldSortBuilder(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME).order(SortOrder.ASC)); + searchSourceBuilder.size(1); + searchRequest.source(searchSourceBuilder); + + try { + + // build request to get the least recent record + SearchResponse response = client.search(searchRequest); + + if (response.getHits().getTotalHits() > 0) { + result[0] = ESDataStoreTemplate.fromEpochMillisecond((Number) response.getHits().getAt(0).getSourceAsMap().get( ESDataStoreTemplate.TIMESTAMP_FIELD_NAME)); + } + + // build request to get the most recent record + searchRequest = new SearchRequest(info.indexName); + searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.matchAllQuery()); + searchSourceBuilder.sort(new FieldSortBuilder(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME).order(SortOrder.DESC)); + searchSourceBuilder.size(1); + searchRequest.source(searchSourceBuilder); + + response = client.search(searchRequest); + + if (response.getHits().getTotalHits() > 0) { + result[1] = ESDataStoreTemplate.fromEpochMillisecond((Number) response.getHits().getAt(0).getSourceAsMap().get( ESDataStoreTemplate.TIMESTAMP_FIELD_NAME)); + } + } catch (IOException ex) { + log.error("getRecordsTimeRange failed", ex); + } + } + return result; + } + + @Override + public int[] getEstimatedRecordCounts(String recordType, double[] timeStamps) { + return StorageUtils.computeDefaultRecordCounts(this, recordType, timeStamps); + } + + void dataSimpleComponent(SimpleComponent dataComponent, Map data, int i, DataBlock dataBlock) { + switch (dataComponent.getDataType()) { + case FLOAT: + dataBlock.setFloatValue(i, ((Number)data.get(dataComponent.getName())).floatValue()); + break; + case DOUBLE: + dataBlock.setDoubleValue(i, ((Number)data.get(dataComponent.getName())).doubleValue()); + break; + case SHORT: + case USHORT: + case UINT: + case INT: + dataBlock.setIntValue(i, ((Number)data.get(dataComponent.getName())).intValue()); + break; + case ASCII_STRING: + case UTF_STRING: + dataBlock.setStringValue(i, (String)data.get(dataComponent.getName())); + break; + case BOOLEAN: + dataBlock.setBooleanValue(i, (Boolean) data.get(dataComponent.getName())); + break; + case ULONG: + case LONG: + dataBlock.setLongValue(i,((Number)data.get(dataComponent.getName())).longValue()); + break; + case UBYTE: + case BYTE: + dataBlock.setByteValue(i, ((Number)data.get(dataComponent.getName())).byteValue()); + break; + default: + getLogger().error("Unsupported type " + ((SimpleComponent) dataComponent).getDataType()); + } + } + + void dataBlockFromES(DataComponent component, Map data, DataBlock dataBlock, AtomicInteger fieldIndex) { + if(component instanceof SimpleComponent) { + dataSimpleComponent((SimpleComponent) component, data, fieldIndex.getAndIncrement(), dataBlock); + } else if(component instanceof Vector) { + // Extract coordinate component + Object values = data.get(component.getName()); + if(values instanceof List) { + dataBlock.setDoubleValue(fieldIndex.getAndIncrement(), ((Number) ((List) values).get(0)).doubleValue()); + dataBlock.setDoubleValue(fieldIndex.getAndIncrement(), ((Number) ((List) values).get(1)).doubleValue()); + } else if (values instanceof Map) { + dataBlock.setDoubleValue(fieldIndex.getAndIncrement(), ((Number) ((Map) values).get("lat")).doubleValue()); + dataBlock.setDoubleValue(fieldIndex.getAndIncrement(), ((Number) ((Map) values).get("lon")).doubleValue()); + } + // Retrieve Z value + dataBlock.setDoubleValue(fieldIndex.getAndIncrement(), + ((Number)data.get(component.getName() + Z_FIELD)).doubleValue()); + + } else if(component instanceof DataRecord){ + final int arraySize = component.getComponentCount(); + for (int i = 0; i < arraySize; i++) { + DataComponent subComponent = component.getComponent(i); + dataBlockFromES(subComponent, data, dataBlock, fieldIndex); + } + } else if(component instanceof DataArray) { + final int compSize = component.getComponentCount(); + List dataList = (List) data.get(component.getName()); + if(((DataArray) component).getElementType() instanceof ScalarComponent) { + // Simple array + ScalarComponent scalarComponent = (ScalarComponent)((DataArray) component).getElementType(); + switch (scalarComponent.getDataType()) { + case FLOAT: + case DOUBLE: + for(int ind = 0; ind < compSize; ind++) { + Object value = dataList.get(ind); + if(value instanceof Number) { + dataBlock.setDoubleValue(fieldIndex.getAndIncrement(), ((Number) value).doubleValue()); + } else { + dataBlock.setDoubleValue(fieldIndex.getAndIncrement(), Double.NaN); + } + } + break; + case SHORT: + case USHORT: + case UINT: + case INT: + for(int ind = 0; ind < compSize; ind++) { + dataBlock.setIntValue(fieldIndex.getAndIncrement(), ((Number)dataList.get(ind)).intValue()); + } + break; + case ASCII_STRING: + case UTF_STRING: + for(int ind = 0; ind < compSize; ind++) { + dataBlock.setStringValue(fieldIndex.getAndIncrement(), (String)dataList.get(ind)); + } + break; + case BOOLEAN: + for(int ind = 0; ind < compSize; ind++) { + dataBlock.setBooleanValue(fieldIndex.getAndIncrement(), (boolean)dataList.get(ind)); + } + break; + case ULONG: + case LONG: + for(int ind = 0; ind < compSize; ind++) { + dataBlock.setLongValue(fieldIndex.getAndIncrement(), ((Number)dataList.get(ind)).longValue()); + } + break; + case UBYTE: + case BYTE: + for(int ind = 0; ind < compSize; ind++) { + dataBlock.setByteValue(fieldIndex.getAndIncrement(), (Byte)dataList.get(ind)); + } + break; + default: + getLogger().error("Unsupported type " + scalarComponent.getDataType().name()); + } + } else { + // Complex nested value + for (int i = 0; i < compSize; i++) { + DataComponent subComponent = component.getComponent(i); + dataBlockFromES(subComponent, (Map) dataList.get(i), dataBlock, fieldIndex); + } + } + } + } + + DataBlock dataBlockFromES(DataComponent component, Map data) { + AtomicInteger fieldIndex = new AtomicInteger(0); + DataBlock dataBlock = component.createDataBlock(); + dataBlockFromES(component, data, dataBlock, fieldIndex); + return dataBlock; + } + + @Override + public DataBlock getDataBlock(DataKey key) { + DataBlock result = null; + Map recordStoreInfoMap = getRecordStores(); + EsRecordStoreInfo info = recordStoreInfoMap.get(key.recordType); + if(info != null) { + // build the key as recordTYpe_timestamp_producerID + String esKey = getRsKey(key); + + // build the request + GetRequest getRequest = new GetRequest(info.indexName, info.name, esKey); + try { + // build and execute the response + GetResponse response = client.get(getRequest); + + // deserialize the blob field from the response if any + if (response.isExists()) { + result = dataBlockFromES(info.recordDescription, response.getSourceAsMap()); + } + } catch (IOException ex) { + log.error(ex.getLocalizedMessage(), ex); + } + } + return result; + } + + @Override + public Iterator getDataBlockIterator(IDataFilter filter) { + + Map recordStoreInfoMap = getRecordStores(); + EsRecordStoreInfo info = recordStoreInfoMap.get(filter.getRecordType()); + if (info != null) { + SearchRequest searchRequest = new SearchRequest(info.indexName); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryByFilter(filter)); + searchSourceBuilder.size(config.scrollFetchSize); + searchSourceBuilder.sort(new FieldSortBuilder(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME).order(SortOrder.ASC)); + searchRequest.source(searchSourceBuilder); + searchRequest.scroll(TimeValue.timeValueMillis(config.scrollMaxDuration)); + + final Iterator searchHitsIterator = new ESIterator(client, searchRequest, TimeValue.timeValueMillis(config.scrollMaxDuration)); //max of scrollFetchSize hits will be returned for each scroll + + // build a DataBlock iterator based on the searchHits iterator + + return new Iterator() { + + @Override + public boolean hasNext() { + return searchHitsIterator.hasNext(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public DataBlock next() { + SearchHit nextSearchHit = searchHitsIterator.next(); + + return dataBlockFromES(info.recordDescription, nextSearchHit.getSourceAsMap()); + } + }; + } else { + return Collections.emptyIterator(); + } + } + + Iterator recordIteratorFromESQueryFilter(IDataFilter filter, BoolQueryBuilder esFilter) { + + Map recordStoreInfoMap = getRecordStores(); + EsRecordStoreInfo info = recordStoreInfoMap.get(filter.getRecordType()); + if (info != null) { + SearchRequest searchRequest = new SearchRequest(info.indexName); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(esFilter); + searchSourceBuilder.size(config.scrollFetchSize); + searchSourceBuilder.sort(new FieldSortBuilder(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME).order(SortOrder.ASC)); + searchRequest.source(searchSourceBuilder); + searchRequest.scroll(TimeValue.timeValueMillis(config.scrollMaxDuration)); + + final Iterator searchHitsIterator = new ESIterator(client, searchRequest, TimeValue.timeValueMillis(config.scrollMaxDuration)); //max of scrollFetchSize hits will be returned for each scroll + + // build a IDataRecord iterator based on the searchHits iterator + + return new Iterator() { + + @Override + public boolean hasNext() { + return searchHitsIterator.hasNext(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public IDataRecord next() { + SearchHit nextSearchHit = searchHitsIterator.next(); + + Map queryResult = nextSearchHit.getSourceAsMap(); + // build key + final DataKey key = getDataKey(nextSearchHit.getId(), queryResult); + + final DataBlock datablock = dataBlockFromES(info.recordDescription, queryResult); + + return new IDataRecord() { + + @Override + public DataKey getKey() { + return key; + } + + @Override + public DataBlock getData() { + return datablock; + } + + }; + } + }; + } else { + return Collections.emptyIterator(); + } + + } + + @Override + public Iterator getRecordIterator(IDataFilter filter) { + return recordIteratorFromESQueryFilter(filter, queryByFilter(filter)); + } + + @Override + public int getNumMatchingRecords(IDataFilter filter, long maxCount) { + int result = 0; + Map recordStoreInfoMap = getRecordStores(); + EsRecordStoreInfo info = recordStoreInfoMap.get(filter.getRecordType()); + if (info != null) { + SearchRequest searchRequest = new SearchRequest(info.indexName); + searchRequest.source(new SearchSourceBuilder().size(0) + .query(queryByFilter(filter))); + try { + SearchResponse response = client.search(searchRequest); + try { + return Math.toIntExact(Math.min(response.getHits().getTotalHits(), maxCount)); + } catch (ArithmeticException ex) { + getLogger().error("Too many records"); + return Integer.MAX_VALUE; + } + } catch (IOException | ElasticsearchStatusException ex) { + log.error("getRecordStores failed", ex); + } + } + return result; + } + + + + private void parseDataMapping(XContentBuilder builder, DataComponent dataComponent, DataComponent timeFieldToIgnore) throws IOException { + if (dataComponent instanceof SimpleComponent && !dataComponent.equals(timeFieldToIgnore)) { + switch (((SimpleComponent) dataComponent).getDataType()) { + case FLOAT: + builder.startObject(dataComponent.getName()); + // TODO When Quantity will contains a precision information + // builder.field("type", "scaled_float"); + // builder.field("index", false); + // builder.field("scaling_factor", 100); + builder.field("type", "float"); + builder.endObject(); + break; + case DOUBLE: + builder.startObject(dataComponent.getName()); + builder.field("type", "double"); + builder.endObject(); + break; + case SHORT: + builder.startObject(dataComponent.getName()); + builder.field("type", "short"); + builder.endObject(); + break; + case USHORT: + case UINT: + case INT: + builder.startObject(dataComponent.getName()); + builder.field("type", "integer"); + builder.endObject(); + break; + case ASCII_STRING: + builder.startObject(dataComponent.getName()); + builder.field("type", "keyword"); + builder.endObject(); + break; + case UTF_STRING: + builder.startObject(dataComponent.getName()); + builder.field("type", "text"); + builder.endObject(); + break; + case BOOLEAN: + builder.startObject(dataComponent.getName()); + builder.field("type", "boolean"); + builder.endObject(); + break; + case ULONG: + case LONG: + builder.startObject(dataComponent.getName()); + builder.field("type", "long"); + builder.endObject(); + break; + case UBYTE: + case BYTE: + builder.startObject(dataComponent.getName()); + builder.field("type", "byte"); + builder.endObject(); + break; + default: + getLogger().error("Unsupported type " + ((SimpleComponent) dataComponent).getDataType()); + } + } else if(dataComponent instanceof DataRecord) { + for(int i = 0; i < dataComponent.getComponentCount(); i++) { + DataComponent component = dataComponent.getComponent(i); + parseDataMapping(builder, component, timeFieldToIgnore); + } + } else if(dataComponent instanceof DataArray){ + if(((DataArray) dataComponent).getElementType() instanceof SimpleComponent) { + parseDataMapping(builder, ((DataArray) dataComponent).getElementType(), timeFieldToIgnore); + } else { + builder.startObject(dataComponent.getName()); + { + builder.field("type", "nested"); + builder.field("dynamic", false); + builder.startObject("properties"); + { + parseDataMapping(builder, ((DataArray) dataComponent).getElementType(), timeFieldToIgnore); + } + builder.endObject(); + } + builder.endObject(); + } + } else if(dataComponent instanceof Vector) { + // Point type + builder.startObject(dataComponent.getName()); + { + builder.field("type", "geo_point"); + } + builder.endObject(); + builder.startObject(dataComponent.getName()+Z_FIELD); + { + builder.field("type", "double"); + } + builder.endObject(); + } + } + + + /** + * Override this method to add special fields in es data mapping + * @param builder + * @throws IOException + */ + void createDataMappingFields(XContentBuilder builder) throws IOException { + builder.startObject(ESDataStoreTemplate.PRODUCER_ID_FIELD_NAME); + { + builder.field("type", "keyword"); + } + builder.endObject(); + builder.startObject(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME); + { + // Issue with date https://discuss.elastic.co/t/weird-issue-with-date-sort/137646 + builder.field("type", "date"); + builder.field("format", "epoch_millis"); + } + builder.endObject(); + builder.startObject(STORAGE_ID_FIELD_NAME); + { + builder.field("type", "keyword"); + } + builder.endObject(); + } + + DataComponent findTimeComponent(DataComponent parent) { + ScalarComponent timeStamp = (ScalarComponent)SWEHelper.findComponentByDefinition(parent, SWEConstants.DEF_SAMPLING_TIME); + if (timeStamp == null) + timeStamp = (ScalarComponent)SWEHelper.findComponentByDefinition(parent, SWEConstants.DEF_PHENOMENON_TIME); + if (timeStamp == null) + return null; + return timeStamp; + } + + /** + * @param rsInfo record store metadata + * @throws IOException + */ + void createDataMapping(EsRecordStoreInfo rsInfo) throws IOException { + + // create the index + CreateIndexRequest indexRequest = new CreateIndexRequest(rsInfo.indexName); + XContentBuilder builder = XContentFactory.jsonBuilder(); + + builder.startObject(); + { + builder.startObject(rsInfo.name); + { + builder.field("dynamic", false); + builder.startObject("properties"); + { + createDataMappingFields(builder); + DataComponent dataComponent = rsInfo.getRecordDescription(); + DataComponent timeComponent = findTimeComponent(dataComponent); + parseDataMapping(builder, dataComponent, timeComponent); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + + indexRequest.mapping(rsInfo.name, builder); + + client.indices().create(indexRequest); + + addedIndex.add(rsInfo.indexName); + } + + public List getAddedIndex() { + return Collections.unmodifiableList(addedIndex); + } + + void dataComponentSimpleToJson(SimpleComponent component, DataBlock data, int i, XContentBuilder builder) throws IOException { + switch (data.getDataType(i)) { + case FLOAT: + builder.field(component.getName(), data.getFloatValue(i)); + break; + case DOUBLE: + builder.field(component.getName(), data.getDoubleValue(i)); + break; + case SHORT: + case USHORT: + case UINT: + case INT: + builder.field(component.getName(), data.getIntValue(i)); + break; + case ASCII_STRING: + case UTF_STRING: + builder.field(component.getName(), data.getStringValue(i)); + break; + case BOOLEAN: + builder.field(component.getName(), data.getBooleanValue(i)); + break; + case ULONG: + case LONG: + builder.field(component.getName(), data.getLongValue(i)); + break; + case UBYTE: + case BYTE: + builder.field(component.getName(), data.getByteValue(i)); + break; + default: + getLogger().error("Unsupported type " + data.getDataType(i).name()); + } + } + + private static void setXContentValue(XContentBuilder builder, double value) throws IOException { + if(Double.isNaN(value) || !Double.isFinite(value)) { + builder.nullValue(); + } else { + builder.value(value); + } + } + + private static void setXContentValue(XContentBuilder builder, float value) throws IOException { + if(Float.isNaN(value) || !Float.isFinite(value)) { + builder.nullValue(); + } else { + builder.value(value); + } + } + + void dataComponentToJson(DataComponent dataComponent, DataBlock data, XContentBuilder builder, AtomicInteger fieldCounter) throws IOException { + if(dataComponent instanceof SimpleComponent) { + dataComponentSimpleToJson((SimpleComponent) dataComponent, data, fieldCounter.getAndIncrement(), builder); + } else if(dataComponent instanceof Vector) { + builder.startObject(dataComponent.getName()); + { + builder.field("lat"); + setXContentValue(builder, data.getDoubleValue(fieldCounter.getAndIncrement())); + builder.field("lon"); + setXContentValue(builder, data.getDoubleValue(fieldCounter.getAndIncrement())); + } + builder.endObject(); + builder.field(dataComponent.getName()+Z_FIELD, data.getDoubleValue(fieldCounter.getAndIncrement())); + } else if(dataComponent instanceof DataArray) { + // DataArray of scalar values, it is Array of ElasticSearch + if(((DataArray) dataComponent).getElementType() instanceof ScalarComponent) { + final int compSize = dataComponent.getComponentCount(); + ScalarComponent scalarComponent = (ScalarComponent) ((DataArray) dataComponent).getElementType(); + builder.startArray(dataComponent.getName()); + switch (scalarComponent.getDataType()) { + case FLOAT: + for(int ind = 0; ind < compSize; ind++) { + setXContentValue(builder, data.getFloatValue(fieldCounter.getAndIncrement())); + } + break; + case DOUBLE: + for(int ind = 0; ind < compSize; ind++) { + setXContentValue(builder, data.getDoubleValue(fieldCounter.getAndIncrement())); + } + break; + case SHORT: + case USHORT: + case UINT: + case INT: + for(int ind = 0; ind < compSize; ind++) { + builder.value(data.getIntValue(fieldCounter.getAndIncrement())); + } + break; + case ASCII_STRING: + case UTF_STRING: + for(int ind = 0; ind < compSize; ind++) { + builder.value(data.getStringValue(fieldCounter.getAndIncrement())); + } + break; + case BOOLEAN: + for(int ind = 0; ind < compSize; ind++) { + builder.value(data.getBooleanValue(fieldCounter.getAndIncrement())); + } + break; + case ULONG: + case LONG: + for(int ind = 0; ind < compSize; ind++) { + builder.value(data.getLongValue(fieldCounter.getAndIncrement())); + } + break; + case UBYTE: + case BYTE: + for(int ind = 0; ind < compSize; ind++) { + builder.value(data.getByteValue(fieldCounter.getAndIncrement())); + } + break; + default: + getLogger().error("Unsupported type " + scalarComponent.getDataType().name()); + } + builder.endArray(); + } else { + // Array of complex type, this is nested document of ElasticSearch + int compSize = dataComponent.getComponentCount(); + builder.startArray(dataComponent.getName()); + for (int i = 0; i < compSize; i++) { + builder.startObject(); + dataComponentToJson(dataComponent.getComponent(i), data, builder, fieldCounter); + builder.endObject(); + } + builder.endArray(); + } + } else if(dataComponent instanceof DataRecord) { + int compSize = dataComponent.getComponentCount(); + for (int i = 0; i < compSize; i++) { + dataComponentToJson(dataComponent.getComponent(i), data, builder, fieldCounter); + } + } + } + + void dataComponentToJson(DataComponent dataComponent, DataBlock data, XContentBuilder builder) throws IOException + { + AtomicInteger fieldCounter = new AtomicInteger(0); + dataComponentToJson(dataComponent, data, builder, fieldCounter); + } + + void storeRecordIndexRequestFields(XContentBuilder builder, DataKey key) throws IOException { + builder.field(ESDataStoreTemplate.PRODUCER_ID_FIELD_NAME, key.producerID); + builder.field(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME, ESDataStoreTemplate.toEpochMillisecond(key.timeStamp)); + builder.field(STORAGE_ID_FIELD_NAME, config.id); + } + + public IndexRequest storeRecordIndexRequest(DataKey key, DataBlock data) throws IOException { + IndexRequest request = null; + + Map recordStoreInfoMap = getRecordStores(); + EsRecordStoreInfo info = recordStoreInfoMap.get(key.recordType); + if(info != null) { + //ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + //XContentBuilder builder = XContentFactory.jsonBuilder(byteArrayOutputStream); + XContentBuilder builder = XContentFactory.jsonBuilder(); + + builder.startObject(); + { + storeRecordIndexRequestFields(builder, key); + DataComponent dataComponent = info.getRecordDescription(); + dataComponentToJson(dataComponent, data, builder); + } + builder.endObject(); + + request = new IndexRequest(info.getIndexName(), info.name, getRsKey(key)); + + //builder.flush(); + + request.source(builder); + } + return request; + } + + @Override + public void storeRecord(DataKey key, DataBlock data) { + if(key.producerID == null || key.producerID.isEmpty()) { + getLogger().info("Missing producerID data " + key.recordType); + return; + } + try { + IndexRequest request = storeRecordIndexRequest(key, data); + if(request != null) { + bulkProcessor.add(request); + } else { + log.error("Missing record store " + key.recordType); + } + } catch (IOException ex) { + log.error("Cannot create json data storeRecord", ex); + } + + storeChanged = System.currentTimeMillis(); + } + + @Override + public void updateRecord(DataKey key, DataBlock data) { + // Key handle duplicates + storeRecord(key, data); + } + + @Override + public void removeRecord(DataKey key) { + Map recordStoreInfoMap = getRecordStores(); + EsRecordStoreInfo info = recordStoreInfoMap.get(key.recordType); + if(info != null) { + // build the key as recordTYpe_timestamp_producerID + String esKey = getRsKey(key); + + // prepare delete request + DeleteRequest deleteRequest = new DeleteRequest(info.indexName, info.name, esKey); + bulkProcessor.add(deleteRequest); + } + } + + private static String encodeEndPoint(String... params) throws IOException { + StringBuilder s = new StringBuilder(); + for(String param : params) { + if(s.length() > 0) { + s.append("/"); + } + s.append(URLEncoder.encode(param, "UTF-8")); + } + return s.toString(); + } + + /** + * Convert OSH filter to ElasticSearch filter + * @param filter OSH filter + * @return ElasticSearch query object + */ + BoolQueryBuilder queryByFilter(IDataFilter filter) { + double[] timeRange = getTimeRange(filter); + + BoolQueryBuilder query = QueryBuilders.boolQuery(); + + if(config.filterByStorageId) { + query.must(QueryBuilders.termQuery(STORAGE_ID_FIELD_NAME, config.id)); + } + + query.must(new RangeQueryBuilder(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME) + .from(ESDataStoreTemplate.toEpochMillisecond(timeRange[0])) + .to(ESDataStoreTemplate.toEpochMillisecond(timeRange[1])).format("epoch_millis")); + + // check if any producerIDs + if(filter.getProducerIDs() != null && !filter.getProducerIDs().isEmpty()) { + query.must(QueryBuilders.termsQuery(ESDataStoreTemplate.PRODUCER_ID_FIELD_NAME, filter.getProducerIDs())); + } + + return query; + } + + @Override + public int removeRecords(IDataFilter filter) { + try { + Map recordStoreInfoMap = getRecordStores(); + EsRecordStoreInfo info = recordStoreInfoMap.get(filter.getRecordType()); + if(info != null) { + // Delete by query, currently not supported by High Level Api + + BoolQueryBuilder query = queryByFilter(filter); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + XContentBuilder builder = XContentFactory.jsonBuilder(bos); + builder.startObject(); + builder.rawField("query", new ByteArrayInputStream(query.toString().getBytes(StandardCharsets.UTF_8)), XContentType.JSON); + builder.endObject(); + builder.flush(); + String json = bos.toString("UTF-8"); + HttpEntity entity = new NStringEntity(json, ContentType.APPLICATION_JSON); + Response response = client.getLowLevelClient().performRequest("POST", encodeEndPoint(info.indexName, "_delete_by_query"),Collections.EMPTY_MAP, entity); + String source = EntityUtils.toString(response.getEntity()); + Map content = XContentHelper.convertToMap(XContentFactory.xContent(XContentType.JSON), source, true); + + storeChanged = System.currentTimeMillis(); + + return ((Number)content.get("total")).intValue(); + } + } catch (IOException ex) { + log.error("Failed to removeRecords", ex); + } + return 0; + } + + /** + * Get a serialized object from an object. + * The object is serialized using Kryo. + * @param object The raw object + * @return the serialized object + */ + protected static byte[] getBlob(T object){ + return KryoSerializer.serialize(object); + } + + /** + * Get an object from a base64 encoding String. + * The object is deserialized using Kryo. + * @param blob The base64 encoding String + * @return The deserialized object + */ + protected static T getObject(Object blob) { + // Base 64 decoding + byte [] base64decodedData = Base64.decodeBase64(blob.toString().getBytes()); + // Kryo deserialize + return KryoSerializer.deserialize(base64decodedData); + } + + + /** + * Transform a DataKey into an ES key as: . + * @param key the ES key. + * @return the ES key. + */ + protected String getRsKey(DataKey key) { + return key.recordType+RS_KEY_SEPARATOR+Double.doubleToLongBits(key.timeStamp)+RS_KEY_SEPARATOR+key.producerID; + } + + /** + * Transform the recordStorage data key into a DataKey by splitting . + * @param rsKey the corresponding dataKey + * @return the dataKey. NULL if the length != 3 after splitting + */ + protected DataKey getDataKey(String rsKey, Map content) { + DataKey dataKey = null; + + // split the rsKey using separator + String [] split = rsKey.split(RS_KEY_SEPARATOR); + + // must find + if(split.length == 3) { + dataKey = new DataKey(split[0], split[2], Double.longBitsToDouble(Long.parseLong(split[1]))); + } + return dataKey; + } + + protected double[] getTimeRange(IDataFilter filter) { + double[] timeRange = filter.getTimeStampRange(); + if (timeRange != null) + return timeRange; + else + return ALL_TIMES; + } + + /** + * Refreshes the index. + */ + protected void refreshIndex() { + bulkProcessor.flush(); + + if(config.autoRefresh) { + RefreshRequest refreshRequest = new RefreshRequest(); + try { + client.indices().refresh(refreshRequest); + } catch (IOException ex) { + getLogger().error("Error while refreshIndex", ex); + } + } + } + + + @Override + public boolean isReadSupported() { + return true; + } + + @Override + public boolean isWriteSupported() { + return true; + } + + private static final class BulkListener implements BulkProcessor.Listener { + Logger logger = LoggerFactory.getLogger(BulkListener.class); + RestHighLevelClient client; + private int maxRetry; + private int retryDelay; + + public BulkListener(RestHighLevelClient client, int max_retry, int retryDelay) { + this.client = client; + this.maxRetry = max_retry; + this.retryDelay = retryDelay; + } + + private Map bulkRetries = new HashMap<>(); + + + @Override + public void beforeBulk(long executionId, BulkRequest request) { + + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + + } + + + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + if((failure instanceof IOException || failure instanceof ElasticsearchStatusException) && request != null) { + // Retry to send the bulk later + int retries = bulkRetries.getOrDefault(executionId, 0); + if(retries < maxRetry) { + log.info(String.format("Retry send bulk request id:%d",executionId)); + bulkRetries.put(executionId, retries + 1); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + client.bulkAsync(request, new ActionListener() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + log.info(String.format("Successfully sent bulk request id:%d after a failure",executionId)); + } + + @Override + public void onFailure(Exception e) { + afterBulk(executionId, request, e); + } + }); + + } + }, retryDelay); + return; + } else { + logger.error(String.format("Exception while pushing data id:%d to ElasticSearch after %d retries, data lost",executionId, retries), failure); + } + } + if(request != null) { + logger.error(String.format("Unprocessed exception while pushing data id:%d to ElasticSearch, data lost",executionId), failure); + } + } + } +} \ No newline at end of file diff --git a/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESDataStoreTemplate.java b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESDataStoreTemplate.java new file mode 100644 index 000000000..f00e3f7d5 --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESDataStoreTemplate.java @@ -0,0 +1,68 @@ +package org.sensorhub.impl.persistence.es; + +import net.opengis.swe.v20.DataBlock; +import net.opengis.swe.v20.DataComponent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.sensorhub.api.persistence.DataKey; + +import java.awt.*; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Methods to build Json elements from OSH components + * @author Nicolas Fortin, UMRAE Ifsttar + */ +public class ESDataStoreTemplate { + + private static final long MIN_DATE_EPOCH = -62138538000000L; + private static final long MAX_DATE_EPOCH = 253336460400000L; + + public static final String PRODUCER_ID_FIELD_NAME = "producerID"; + public static final String TIMESTAMP_FIELD_NAME = "timestamp"; + + String name; + List jsonParts = new ArrayList<>(); + final int recordStructureStart; + final int capacityHint; + + + public ESDataStoreTemplate(DataComponent recordStructure) { + this.name = recordStructure.getName(); + jsonParts.add("{\""+PRODUCER_ID_FIELD_NAME+"\":\""); + jsonParts.add("\"" + TIMESTAMP_FIELD_NAME + "\":"); + recordStructureStart = jsonParts.size(); + jsonParts.add("\""); + for(int i = 0; i < recordStructure.getComponentCount(); i++) { + DataComponent comp = recordStructure.getComponent(i); + } + + int totalLength = 0; + for(String part : jsonParts) { + totalLength += part.length(); + } + + capacityHint = totalLength * 2; + } + + public static long toEpochMillisecond(double timeSecond) { + return Math.min(MAX_DATE_EPOCH, Math.max(MIN_DATE_EPOCH, Double.valueOf(timeSecond * 1000).longValue())); + } + + public static double fromEpochMillisecond(Number timestamp) { + return timestamp.doubleValue() * 1e-3; + } + + public String build(DataKey key, DataBlock data) throws IOException { + StringBuilder s = new StringBuilder(capacityHint); + s.append(jsonParts.get(0)); + s.append(key.producerID); + for(int i = recordStructureStart; i < data.getAtomCount(); i++) { + s.append(jsonParts.get(i)); + s.append(data.getUnderlyingObject()); + } + return s.toString(); + } +} diff --git a/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESIterator.java b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESIterator.java new file mode 100644 index 000000000..165899ba8 --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESIterator.java @@ -0,0 +1,186 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2016 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es; + +import java.io.IOException; +import java.util.Iterator; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.support.AbstractClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + *

+ * Iterator wrapper for ES scroll iterator. + *

+ * + * @author Mathieu Dhainaut + * @author Nicolas Fortin, UMRAE Ifsttar + * @since 2017 + */ +public class ESIterator implements Iterator{ + + private static final Logger log = LoggerFactory.getLogger(ESIterator.class); + /** + * Default scroll fetch size. + */ + private static final TimeValue DEFAULT_SCROLL = TimeValue.timeValueMillis(6000); + + /** + * The scroll search request. + */ + private SearchRequest scrollSearchResponse; + + /** + * The scroll search response. + */ + private SearchResponse scrollResp; + + /** + * The first total hits number got from the first scroll request. + */ + private long totalHits = -1; + + /** + * The current number of fetch hits got from the current iterator. + */ + private int currentNbFetch = 0; + + /** + * The current search iterator. + */ + private Iterator searchHitIterator; + + /** + * The shared transport client. + */ + private RestHighLevelClient client; + + /** + * The current fetch size. + */ + private int fetchSize; + + private TimeValue scroll; + + String scrollId; + + boolean hasNext = false; + + + + /** + * The total number of fetched hits since the first request. + */ + private long nbHits; + + public ESIterator(RestHighLevelClient client, SearchRequest scrollSearchResponse, TimeValue scroll) { + this.scroll = scroll; + this.client = client; + this.scrollSearchResponse = scrollSearchResponse; + this.nbHits = 0; + } + + public ESIterator(RestHighLevelClient client, SearchRequest scrollSearchResponse) { + this(client, scrollSearchResponse, DEFAULT_SCROLL); + } + + /** + * Inits the scroll response and gets the current iterator. + */ + private void init() { + try { + scrollResp = client.search(scrollSearchResponse); + scrollId = scrollResp.getScrollId(); + + // get totalHits + totalHits = scrollResp.getHits().getTotalHits(); + + // init current iterator + searchHitIterator = scrollResp.getHits().iterator(); + + } catch (IOException |ElasticsearchStatusException ex) { + log.error(ex.getLocalizedMessage(), ex); + } + + hasNext = scrollResp != null && scrollResp.getHits().getHits().length > 0; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc} + */ + public boolean hasNext() { + // first call + if(totalHits == -1) { + init(); + } + return nbHits < totalHits; + } + + /** + * Makes a new scroll response, re-init the scroll response and + * the search iterator. + */ + private void makeNewScrollRequest() { + // build and execute the next scroll request + try { + SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); + scrollRequest.scroll(scroll); + scrollResp = client.searchScroll(scrollRequest); + + hasNext = scrollResp.getHits().getHits().length > 0; + } catch (IOException ex) { + log.error(ex.getLocalizedMessage(), ex); + } + + scrollId = scrollResp.getScrollId(); + + // re-init the search iterator + searchHitIterator = scrollResp.getHits().iterator(); + + // reset the current number of fetched hits + currentNbFetch = 0; + } + + /** + * {@inheritDoc} + */ + public SearchHit next() { + // get the next hit + SearchHit hit = searchHitIterator.next(); + nbHits++; + currentNbFetch++; + + // if we have to make a new request + // we compare the number of current fetched hit to the allowed fetch size + if(!searchHitIterator.hasNext()) { + makeNewScrollRequest(); + } + return hit; + } + +} diff --git a/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESMultiSourceStorageDescriptor.java b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESMultiSourceStorageDescriptor.java new file mode 100644 index 000000000..25c905d9a --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESMultiSourceStorageDescriptor.java @@ -0,0 +1,53 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2015 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es; + +import org.sensorhub.api.module.IModule; +import org.sensorhub.api.module.IModuleProvider; +import org.sensorhub.api.module.ModuleConfig; +import org.sensorhub.impl.module.JarModuleProvider; + +/** + *

+ * Descriptor of ES multi source storage module. + * This is needed for automatic discovery by the ModuleRegistry. + *

+ * + * @author Mathieu Dhainaut + * @since 2017 + */ +public class ESMultiSourceStorageDescriptor extends JarModuleProvider implements IModuleProvider{ + + @Override + public String getModuleName() { + return "ElasticSearch Multi-Source Storage"; + } + + @Override + public String getModuleDescription() { + return "Generic implementation of multisource storage using ElasticSearch Database"; + } + + @Override + public Class> getModuleClass() { + return ESMultiSourceStorageImpl.class; + } + + @Override + public Class getModuleConfigClass() { + return ESBasicStorageConfig.class; + } + +} diff --git a/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESMultiSourceStorageImpl.java b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESMultiSourceStorageImpl.java new file mode 100644 index 000000000..df40027a5 --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESMultiSourceStorageImpl.java @@ -0,0 +1,114 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2016 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es; + +import java.io.IOException; +import java.util.*; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.support.AbstractClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValueType; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.sensorhub.api.persistence.IMultiSourceStorage; +import org.sensorhub.api.persistence.IObsStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + *

+ * ES implementation of {@link IObsStorage} for storing observations. + *

+ * + * @author Mathieu Dhainaut + * @author Nicolas Fortin, UMRAE Ifsttar + * @since 2017 + */ +public class ESMultiSourceStorageImpl extends ESObsStorageImpl implements IMultiSourceStorage { + + private static final Logger log = LoggerFactory.getLogger(ESMultiSourceStorageImpl.class); + + + public ESMultiSourceStorageImpl() { + // default constructor + } + + public ESMultiSourceStorageImpl(RestHighLevelClient client) { + super(client); + } + + @Override + public Collection getProducerIDs() { + ArrayList resultList = new ArrayList<>(); + final String aggregateName = "producers"; + // Compute unique values of producers id in the foi meta data index + SearchRequest searchRequest = new SearchRequest(indexNameMetaData); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.size(0); // Do not get hits + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + + if(config.filterByStorageId) { + boolQueryBuilder.must(new TermQueryBuilder(STORAGE_ID_FIELD_NAME, config.id)); + } + + boolQueryBuilder.must(new TermQueryBuilder(METADATA_TYPE_FIELD_NAME, FOI_IDX_NAME)); + + sourceBuilder.query(boolQueryBuilder); + + sourceBuilder.aggregation(new TermsAggregationBuilder(aggregateName, ValueType.STRING).field(ESDataStoreTemplate.PRODUCER_ID_FIELD_NAME)); + searchRequest.source(sourceBuilder); + try { + SearchResponse response = client.search(searchRequest); + Aggregation responseMap = response.getAggregations().getAsMap().get(aggregateName); + Object result = responseMap.getMetaData().get("bucket"); + + if(result instanceof Collection) { + for(Object res : (Collection)result) { + if(res instanceof Map) { + resultList.add((String)((Map) res).get("key")); + } + } + } + } catch (IOException ex) { + log.error(ex.getLocalizedMessage(), ex); + return Collections.emptyList(); + } + return resultList; + } + + @Override + public IObsStorage getDataStore(String producerID) { + // return this because ES does not encapsulate any storage + return this; + } + + @Override + public IObsStorage addDataStore(String producerID) { + // return this because ES does not encapsulate any storage + return this; + } + + +} diff --git a/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESObsStorageDescriptor.java b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESObsStorageDescriptor.java new file mode 100644 index 000000000..7663df5aa --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESObsStorageDescriptor.java @@ -0,0 +1,54 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2015 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es; + +import org.sensorhub.api.module.IModule; +import org.sensorhub.api.module.IModuleProvider; +import org.sensorhub.api.module.ModuleConfig; +import org.sensorhub.impl.module.JarModuleProvider; + +/** + *

+ * Descriptor of ES observation storage module. + * This is needed for automatic discovery by the ModuleRegistry. + *

+ * + * @author Mathieu Dhainaut + * @author Nicolas Fortin, UMRAE Ifsttar + * @since 2017 + */ +public class ESObsStorageDescriptor extends JarModuleProvider implements IModuleProvider{ + + @Override + public String getModuleName() { + return "ElasticSearch Observation Storage"; + } + + @Override + public String getModuleDescription() { + return "Generic implementation of observation storage using ElasticSearch Database"; + } + + @Override + public Class> getModuleClass() { + return ESBasicStorageImpl.class; + } + + @Override + public Class getModuleConfigClass() { + return ESBasicStorageConfig.class; + } + +} diff --git a/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESObsStorageImpl.java b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESObsStorageImpl.java new file mode 100644 index 000000000..094769c75 --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/ESObsStorageImpl.java @@ -0,0 +1,705 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2016 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es; + +import com.vividsolutions.jts.geom.*; +import net.opengis.gml.v32.AbstractFeature; +import net.opengis.gml.v32.AbstractGeometry; +import net.opengis.gml.v32.impl.EnvelopeJTS; +import net.opengis.gml.v32.impl.PointJTS; +import net.opengis.gml.v32.impl.PolygonJTS; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.geo.builders.*; +import org.elasticsearch.common.geo.parsers.ShapeParser; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.metrics.geobounds.ParsedGeoBounds; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.locationtech.spatial4j.context.jts.JtsSpatialContext; +import org.locationtech.spatial4j.shape.Shape; +import org.locationtech.spatial4j.shape.jts.JtsPoint; +import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.api.persistence.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.vast.util.Bbox; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + *

+ * ES implementation of {@link IObsStorage} for storing observations. + *

+ * + * @author Mathieu Dhainaut + * @author Nicolas Fortin, UMRAE Ifsttar + * @since 2017 + */ +public class ESObsStorageImpl extends ESBasicStorageImpl implements IObsStorageModule { + + private static final String POLYGON_QUERY_ERROR_MSG = "Cannot build polygon geo query"; + private static final String RESULT_TIME_FIELD_NAME = "resultTime"; + protected static final String FOI_IDX_NAME = "foi"; + protected static final String GEOBOUNDS_IDX_NAME = "geobounds"; + protected static final String FOI_UNIQUE_ID_FIELD = "foiID"; + protected static final String SHAPE_FIELD_NAME = "geometry"; + protected Bbox foiExtent = new Bbox(); + protected static final String METADATA_TYPE_FOI = "foi"; + private GeometryFactory geometryFactory = new GeometryFactory(); + + /** + * Class logger + */ + private static final Logger log = LoggerFactory.getLogger(ESObsStorageImpl.class); + + public ESObsStorageImpl() { + // default constructor + } + + public ESObsStorageImpl(RestHighLevelClient client) { + super(client); + } + + @Override + public void start() throws SensorHubException { + super.start(); + + // preload bbox + if (client != null) { + try { + GetResponse response = client.get(new GetRequest(indexNameMetaData, INDEX_METADATA_TYPE, GEOBOUNDS_IDX_NAME)); + if(response != null && response.getSource() != null && response.getSource().containsKey(BLOB_FIELD_NAME)) { + foiExtent = getObject(response.getSource().get(BLOB_FIELD_NAME)); + } + } catch (IOException ex) { + log.error(ex.getLocalizedMessage(), ex); + } + } + } + + /** + * Transform the recordStorage data key into a DataKey by splitting . + * @param rsKey the corresponding dataKey + * @return the dataKey. NULL if the length != 3 after splitting + */ + protected DataKey getDataKey(String rsKey, Map content) { + + DataKey dataKey = super.getDataKey(rsKey, content); + + return new ObsKey(dataKey.recordType, dataKey.producerID, (String) content.get(FOI_UNIQUE_ID_FIELD), dataKey.timeStamp); + } + + /** + * Overload query builder in order to manage IObsFilter + * @param filter Filter results + * @return + */ + @Override + BoolQueryBuilder queryByFilter(IDataFilter filter) { + BoolQueryBuilder queryBuilder = super.queryByFilter(filter); + if(filter instanceof ObsFilter) { + IObsFilter obsFilter = (IObsFilter) filter; + + // filter on Foi + if(obsFilter.getFoiIDs() != null && !obsFilter.getFoiIDs().isEmpty()) { + queryBuilder.must(QueryBuilders.termsQuery(FOI_UNIQUE_ID_FIELD, obsFilter.getFoiIDs())); + } + + // filter on roi? + if (obsFilter.getRoi() != null) { + try { + // build geo query + queryBuilder.must(getPolygonGeoQuery(obsFilter.getRoi())); + } catch (IOException e) { + log.error(POLYGON_QUERY_ERROR_MSG, e); + } + } + } + return queryBuilder; + } + + BoolQueryBuilder queryByFilter(IFoiFilter filter) { + BoolQueryBuilder query = QueryBuilders.boolQuery(); + + if(config.filterByStorageId) { + query.must(QueryBuilders.termQuery(STORAGE_ID_FIELD_NAME, config.id)); + } + + // check if any producerIDs + if(filter.getProducerIDs() != null && !filter.getProducerIDs().isEmpty()) { + query.must(QueryBuilders.termsQuery(ESDataStoreTemplate.PRODUCER_ID_FIELD_NAME, filter.getProducerIDs())); + } + + + // filter on Foi + if(filter.getFeatureIDs() != null && !filter.getFeatureIDs().isEmpty()) { + query.must(QueryBuilders.termsQuery("_id", filter.getFeatureIDs())); + } + + // filter on roi? + if (filter.getRoi() != null) { + try { + // build geo query + query.must(getPolygonGeoQuery(filter.getRoi())); + } catch (IOException e) { + log.error(POLYGON_QUERY_ERROR_MSG, e); + } + } + return query; + } + + @Override + public synchronized int getNumFois(IFoiFilter filter) { + + int result = 0; + + BoolQueryBuilder queryBuilder = queryByFilter(filter); + + SearchRequest searchRequest = new SearchRequest(indexNameMetaData); + searchRequest.source(new SearchSourceBuilder().size(0) + .query(queryBuilder)); + try { + SearchResponse response = client.search(searchRequest); + try { + result = Math.toIntExact(response.getHits().getTotalHits()); + } catch (ArithmeticException ex) { + getLogger().error("Too many records"); + result = Integer.MAX_VALUE; + } + } catch (IOException | ElasticsearchStatusException ex) { + log.error("getRecordStores failed", ex); + } + return result; + } + + @Override + public Bbox getFoisSpatialExtent() { + SearchRequest searchRequest = new SearchRequest(indexNameMetaData); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.boolQuery()); + searchSourceBuilder.size(0); + searchSourceBuilder.aggregation(AggregationBuilders.geoBounds("agg").field(SHAPE_FIELD_NAME)); + searchRequest.source(searchSourceBuilder); + try { + SearchResponse response = client.search(searchRequest); + Object obj = response.getAggregations().asMap().get("agg"); + if(obj instanceof ParsedGeoBounds) { + ParsedGeoBounds geoBounds = (ParsedGeoBounds) obj; + if(geoBounds.bottomRight() != null && geoBounds.topLeft() != null) { + foiExtent = new Bbox(Math.nextDown((float)geoBounds.topLeft().getLon()), + Math.nextDown((float)geoBounds.bottomRight().getLat()), 0, + Math.nextUp((float)geoBounds.bottomRight().getLon()), + Math.nextUp((float)geoBounds.topLeft().getLat()), 0); + } + } + } catch (IOException ex) { + log.error(ex.getLocalizedMessage(), ex); + } + return foiExtent.copy(); + } + + @Override + void createDataMappingFields(XContentBuilder builder) throws IOException { + builder.startObject(SHAPE_FIELD_NAME); + { + builder.field("type", "geo_point"); + } + builder.endObject(); + builder.startObject(FOI_UNIQUE_ID_FIELD); + { + builder.field("type", "keyword"); + } + builder.endObject(); + super.createDataMappingFields(builder); + } + + @Override + void createMetaMappingProperties(XContentBuilder builder) throws IOException { + builder.startObject(ESDataStoreTemplate.PRODUCER_ID_FIELD_NAME); + { + builder.field("type", "keyword"); + } + builder.endObject(); + builder.startObject(SHAPE_FIELD_NAME); + { + builder.field("type", "geo_point"); + } + builder.endObject(); + super.createMetaMappingProperties(builder); + } + + /** + * Parse shape returned by query result + * @param source queryResult.getSourceAsString() + * @return Shape instance + * @throws IOException Issue with input data + */ + static Shape parseResultSourceGeometry(String source) throws IOException { + XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, new BytesArray(source), XContentType.JSON); + parser.nextToken(); + // Continue while we not found the geometry field + while (parser.currentToken() != XContentParser.Token.FIELD_NAME || + !parser.currentName().equals(SHAPE_FIELD_NAME)) { + parser.nextToken(); + } + parser.nextToken(); // Go into field content + ShapeBuilder shapeBuilder = ShapeParser.parse(parser); + return shapeBuilder.buildS4J(); + } + + @Override + public synchronized Iterator getFoiIDs(IFoiFilter filter) { + + commit(); + + // build query + // aggregate queries + BoolQueryBuilder filterQueryBuilder = queryByFilter(filter); + + SearchRequest searchRequest = new SearchRequest(indexNameMetaData); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(filterQueryBuilder); + if(filter.getRoi() != null) { + // ElasticSearch return false positive (only approximate bound box test) + // We have to check before returning values + searchSourceBuilder.fetchSource(SHAPE_FIELD_NAME, null); + } else { + searchSourceBuilder.fetchSource(false); + } + searchSourceBuilder.size(config.scrollFetchSize); + searchSourceBuilder.sort(new FieldSortBuilder(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME).order(SortOrder.ASC)); + searchRequest.source(searchSourceBuilder); + searchRequest.scroll(TimeValue.timeValueMillis(config.scrollMaxDuration)); + + final ESIterator searchHitsIterator = new ESIterator(client, searchRequest, TimeValue.timeValueMillis(config.scrollMaxDuration)); //max of scrollFetchSize hits will be returned for each scroll + + // build a IDataRecord iterator based on the searchHits iterator + + final Shape geomTest = filter.getRoi() == null ? null : getPolygonBuilder(filter.getRoi()).buildS4J(); + + return new Iterator() { + String nextFeature = null; + org.locationtech.jts.geom.GeometryFactory factory = new org.locationtech.jts.geom.GeometryFactory(); + + @Override + public boolean hasNext() { + if(nextFeature == null && searchHitsIterator.hasNext()) { + fetchNext(); + } + return nextFeature != null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + void fetchNext() { + nextFeature = null; + if(searchHitsIterator.hasNext()) { + SearchHit nextSearchHit = searchHitsIterator.next(); + if (geomTest != null) { + try { + do { + // Extract shape from the geometry field using ES shape parser + //Shape geom = ESObsStorageImpl.parseResultSourceGeometry(nextSearchHit.getSourceAsString()); + Map pt = (Map)nextSearchHit.getSourceAsMap().get(SHAPE_FIELD_NAME); + Shape geom = new JtsPoint(factory.createPoint(new org.locationtech.jts.geom.Coordinate(((Number)pt.get("lon")).doubleValue(),((Number)pt.get("lat")).doubleValue())), JtsSpatialContext.GEO); + if (geom.relate(geomTest).intersects()) { + break; + } + if (!searchHitsIterator.hasNext()) { + return; + } + nextSearchHit = searchHitsIterator.next(); + } while (true); + } catch (Exception ex) { + log.error(ex.getLocalizedMessage(), ex); + } + } + nextFeature = nextSearchHit.getId(); + } + } + + @Override + public String next() { + String ret = nextFeature; + fetchNext(); + return ret; + } + }; +// +// // build query +// // aggregate queries +// BoolQueryBuilder filterQueryBuilder = QueryBuilders.boolQuery(); +// // filter on feature ids? +// if(filter.getFeatureIDs() != null && !filter.getFeatureIDs().isEmpty()) { +// filterQueryBuilder.must(QueryBuilders.termsQuery(FOI_UNIQUE_ID_FIELD, filter.getFeatureIDs())); +// } +// +// // filter on producer ids? +// if(filter.getProducerIDs() != null && !filter.getProducerIDs().isEmpty()) { +// filterQueryBuilder.must(QueryBuilders.termsQuery(PRODUCER_ID_FIELD_NAME, filter.getProducerIDs())); +// } +// +// // filter on ROI? +// if (filter.getRoi() != null) { +// try { +// // build geo query +// filterQueryBuilder.must(getPolygonGeoQuery(filter.getRoi())); +// } catch (IOException e) { +// log.error(POLYGON_QUERY_ERROR_MSG, e); +// } +// } +// +// final SearchRequestBuilder scrollReq = client.prepareSearch(indexNamePrepend) +// .setTypes("_doc") +// .setQuery(filterQueryBuilder) +// // get only the id +// .setFetchSource(new String[] { FOI_UNIQUE_ID_FIELD }, new String[] {}) +// .setScroll(new TimeValue(config.scrollMaxDuration)); +// // wrap the request into custom ES Scroll iterator +// final Iterator searchHitsIterator = new ESIterator(client, scrollReq, config.scrollFetchSize); +// +// return new Iterator() { +// +// @Override +// public boolean hasNext() { +// return searchHitsIterator.hasNext(); +// } +// +// @Override +// public void remove() { +// +// } +// +// @Override +// public String next() { +// SearchHit nextSearchHit = searchHitsIterator.next(); +// // get Feature id +// return nextSearchHit.getSourceAsMap().get(FOI_UNIQUE_ID_FIELD).toString(); +// } +// }; + } + + @Override + public synchronized Iterator getFois(IFoiFilter filter) { + + commit(); + + // build query + // aggregate queries + BoolQueryBuilder filterQueryBuilder = QueryBuilders.boolQuery(); + + + if(config.filterByStorageId) { + filterQueryBuilder.must(QueryBuilders.termQuery(STORAGE_ID_FIELD_NAME, config.id)); + } + + // filter on producer ids? + if(filter.getProducerIDs() != null && !filter.getProducerIDs().isEmpty()) { + filterQueryBuilder.must(QueryBuilders.termsQuery(ESDataStoreTemplate.PRODUCER_ID_FIELD_NAME, filter.getProducerIDs())); + } + + if(filter.getFeatureIDs() != null && !filter.getFeatureIDs().isEmpty()) { + filterQueryBuilder.must(QueryBuilders.termsQuery("_id" ,filter.getFeatureIDs().toArray(new String[filter.getFeatureIDs().size()]))); + } + + // filter on roi? + if (filter.getRoi() != null) { + try { + // build geo query + filterQueryBuilder.must(getPolygonGeoQuery(filter.getRoi())); + } catch (IOException e) { + log.error(POLYGON_QUERY_ERROR_MSG, e); + } + } + + final Shape geomTest = filter.getRoi() == null ? null : getPolygonBuilder(filter.getRoi()).buildS4J(); + + SearchRequest searchRequest = new SearchRequest(indexNameMetaData); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(filterQueryBuilder); + searchSourceBuilder.size(config.scrollFetchSize); + searchSourceBuilder.sort(new FieldSortBuilder(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME).order(SortOrder.ASC)); + searchRequest.source(searchSourceBuilder); + searchRequest.scroll(TimeValue.timeValueMillis(config.scrollMaxDuration)); + + final ESIterator searchHitsIterator = new ESIterator(client, searchRequest, TimeValue.timeValueMillis(config.scrollMaxDuration)); //max of scrollFetchSize hits will be returned for each scroll + + // build a IDataRecord iterator based on the searchHits iterator + + return new AbstractFeatureIterator(searchHitsIterator, geomTest); + } + + void storeRecordIndexRequestFields(XContentBuilder builder, DataKey key) throws IOException { + if((key instanceof ObsKey)) { + ObsKey obsKey = (ObsKey) key; + + + // obs part + if(!Double.isNaN(obsKey.resultTime)) { + builder.timeField(RESULT_TIME_FIELD_NAME, ESDataStoreTemplate.toEpochMillisecond(obsKey.resultTime)); + } + + if(obsKey.foiID != null) { + builder.field(FOI_UNIQUE_ID_FIELD, obsKey.foiID); + } + + if(obsKey.samplingGeometry != null) { + Point centroid = obsKey.samplingGeometry.getCentroid(); + builder.startObject(SHAPE_FIELD_NAME); + { + builder.field("lat", centroid.getY()); + builder.field("lon", centroid.getX()); + } + builder.endObject(); + } + } + super.storeRecordIndexRequestFields(builder, key); + } + + + @Override + public synchronized void storeFoi(String producerID, AbstractFeature foi) { + log.info("ESObsStorageImpl:storeFoi"); + + // add new record storage + byte[] bytes = this.getBlob(foi); + + try { + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + { + // Convert to elastic search epoch millisecond + builder.field(STORAGE_ID_FIELD_NAME, config.id); + builder.field(METADATA_TYPE_FIELD_NAME, METADATA_TYPE_FOI); + Point centroid = getCentroid(foi.getLocation()); + builder.startObject(SHAPE_FIELD_NAME); + { + builder.field("lat", centroid.getY()); + builder.field("lon", centroid.getX()); + } + builder.endObject(); + builder.field(ESDataStoreTemplate.PRODUCER_ID_FIELD_NAME, producerID); + builder.field(ESDataStoreTemplate.TIMESTAMP_FIELD_NAME, System.currentTimeMillis()); + builder.field(BLOB_FIELD_NAME, bytes); + } + builder.endObject(); + IndexRequest request = new IndexRequest(indexNameMetaData, INDEX_METADATA_TYPE, foi.getUniqueIdentifier()); + + request.source(builder); + + client.index(request); + + storeChanged = System.currentTimeMillis(); + + } catch (IOException | SensorHubException ex) { + getLogger().error(String.format("storeFoi exception %s:%s in elastic search driver",producerID, foi.getName()), ex); + } + } + + /** + * Gets the envelope builder from a bbox object. + * @param bbox the bbox + * @returnthe Envelope builder + */ + protected synchronized EnvelopeBuilder getEnvelopeBuilder(Bbox bbox) { + org.locationtech.jts.geom.Coordinate topLeft = new org.locationtech.jts.geom.Coordinate(bbox.getMinX(), bbox.getMaxY()); + org.locationtech.jts.geom.Coordinate btmRight = new org.locationtech.jts.geom.Coordinate(bbox.getMaxX(), bbox.getMinY()); + return new EnvelopeBuilder(topLeft, btmRight); + } + + /** + * Gets the envelope builder from envelope geometry. + * @param env the envelope geometry + * @returnthe Envelope builder + */ + protected synchronized EnvelopeBuilder getEnvelopeBuilder(Envelope env) { + org.locationtech.jts.geom.Coordinate topLeft = new org.locationtech.jts.geom.Coordinate(env.getMinX(), env.getMaxY()); + org.locationtech.jts.geom.Coordinate btmRight = new org.locationtech.jts.geom.Coordinate(env.getMaxX(), env.getMinY()); + return new EnvelopeBuilder(topLeft, btmRight); + } + + /** + * Build a polygon builder from a polygon geometry. + * @param polygon the Polygon geometry + * @return the builder + */ + protected synchronized PolygonBuilder getPolygonBuilder(Polygon polygon) { + // get coordinates list from polygon + CoordinatesBuilder coordinates = new CoordinatesBuilder(); + for(Coordinate coordinate : polygon.getExteriorRing().getCoordinates()) { + double x = coordinate.x; + double y = coordinate.y; + // Handle out of bounds points + if(x < -180 || x > 180) { + x = -180 + x % 180; + log.warn("Point %f,%f out of bounds",x,y); + } + if(y < -90 || y > 90) { + y = -90 + y % 90; + log.warn("Point %f,%f out of bounds",x,y); + } + coordinates.coordinate(x, y); + } + // build shape builder from coordinates + return new PolygonBuilder(coordinates); + //.relation(ShapeRelation.WITHIN); strategy, Default is INTERSECT + } + + /** + * Build a point builder from a point geometry. + * @param point the Point geometry + * @return the builder + */ + protected synchronized PointBuilder getPointBuilder(Point point) { + // build shape builder from coordinates + double x = point.getX(); + double y = point.getY(); + // Handle out of bounds points + if(x < -180 || x > 180) { + x = -180 + x % 180; + log.warn("Point %f,%f out of bounds",x,y); + } + if(y < -90 || y > 90) { + y = -90 + y % 90; + log.warn("Point %f,%f out of bounds",x,y); + } + return new PointBuilder(x, y); + } + + Point getCentroid(AbstractGeometry geometry) throws SensorHubException { + if(geometry instanceof PolygonJTS) { + return ((PolygonJTS)geometry).getCentroid(); + } else if(geometry instanceof PointJTS) { + return (PointJTS)geometry; + } else if(geometry instanceof EnvelopeJTS) { + return geometryFactory.createPoint(((Envelope)geometry).centre()); + } else { + throw new SensorHubException("Unsupported Geometry exception: "+geometry.getClass()); + } + } + + /** + * Build the corresponding builder given a generic geometry. + * @param geometry The abstract geometry + * @return the corresponding builder. The current supported builder are: PolygonJTS, Point, EnvelopeJTS + * @throws SensorHubException if the geometry is not supported + */ + protected synchronized ShapeBuilder getShapeBuilder(AbstractGeometry geometry) throws SensorHubException { + if(geometry instanceof PolygonJTS) { + return getPolygonBuilder((PolygonJTS)geometry); + } else if(geometry instanceof PointJTS) { + return getPointBuilder((PointJTS)geometry); + } else if(geometry instanceof EnvelopeJTS) { + return getEnvelopeBuilder((Envelope)geometry); + } else { + throw new SensorHubException("Unsupported Geometry exception: "+geometry.getClass()); + } + } + + /** + * Build the geo shape query from a Polygon. The query will use a geo intersection query + * @param polygon The geometry to build the query + * @return The corresponding builder + * @throws IOException + */ + protected synchronized QueryBuilder getPolygonGeoQuery(Polygon polygon) throws IOException { + Envelope envelope = polygon.getEnvelopeInternal(); + return QueryBuilders.geoBoundingBoxQuery(SHAPE_FIELD_NAME).setCorners(envelope.getMaxY(), envelope.getMinX(), envelope.getMinY(), envelope.getMaxX()); + } + + static final class AbstractFeatureIterator implements Iterator { + ESIterator searchHitsIterator; + Shape geomTest; + AbstractFeature nextFeature = null; + org.locationtech.jts.geom.GeometryFactory factory = new org.locationtech.jts.geom.GeometryFactory(); + + public AbstractFeatureIterator(ESIterator searchHitsIterator, Shape geomTest) { + this.searchHitsIterator = searchHitsIterator; + this.geomTest = geomTest; + } + + @Override + public boolean hasNext() { + if(nextFeature == null && searchHitsIterator.hasNext()) { + fetchNext(); + } + return nextFeature != null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + void fetchNext() { + nextFeature = null; + if(searchHitsIterator.hasNext()) { + SearchHit nextSearchHit = searchHitsIterator.next(); + if (geomTest != null) { + try { + do { + // Extract shape from the geometry field using ES shape parser + //Shape geom = ESObsStorageImpl.parseResultSourceGeometry(nextSearchHit.getSourceAsString()); + + Map pt = (Map)nextSearchHit.getSourceAsMap().get(SHAPE_FIELD_NAME); + Shape geom = new JtsPoint(factory.createPoint(new org.locationtech.jts.geom.Coordinate(((Number)pt.get("lon")).doubleValue(),((Number)pt.get("lat")).doubleValue())), JtsSpatialContext.GEO); + + if (geom.relate(geomTest).intersects()) { + break; + } + if (!searchHitsIterator.hasNext()) { + return; + } + nextSearchHit = searchHitsIterator.next(); + } while (true); + } catch (Exception ex) { + log.error(ex.getLocalizedMessage(), ex); + } + } + nextFeature = ESObsStorageImpl.getObject(nextSearchHit.getSourceAsMap().get(BLOB_FIELD_NAME)); + } + } + + @Override + public AbstractFeature next() { + AbstractFeature ret = nextFeature; + fetchNext(); + return ret; + } + + } +} diff --git a/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/EsRecordStoreInfo.java b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/EsRecordStoreInfo.java new file mode 100644 index 000000000..286b58c6c --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/EsRecordStoreInfo.java @@ -0,0 +1,17 @@ +package org.sensorhub.impl.persistence.es; + +import net.opengis.swe.v20.DataComponent; +import net.opengis.swe.v20.DataEncoding; + +public class EsRecordStoreInfo extends DataStreamInfo { + String indexName; + + public EsRecordStoreInfo(String name,String indexName, DataComponent recordDescription, DataEncoding recommendedEncoding) { + super(name, recordDescription, recommendedEncoding); + this.indexName = indexName; + } + + public String getIndexName() { + return indexName; + } +} diff --git a/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/KryoSerializer.java b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/KryoSerializer.java new file mode 100644 index 000000000..5c9c3b96c --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/java/org/sensorhub/impl/persistence/es/KryoSerializer.java @@ -0,0 +1,113 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2016 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +import org.objenesis.strategy.StdInstantiatorStrategy; +import org.vast.data.AbstractDataBlock; +import org.vast.data.DataBlockByte; +import org.vast.data.DataBlockDouble; +import org.vast.data.DataBlockFloat; +import org.vast.data.DataBlockInt; +import org.vast.data.DataBlockLong; +import org.vast.data.DataBlockParallel; +import org.vast.data.DataBlockShort; +import org.vast.data.DataBlockString; +import org.vast.data.DataBlockTuple; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.FieldSerializer; + +import net.opengis.OgcPropertyList; + +/** + *

+ * Kryo Serializer/Deserializer. + *

+ * + * @author Mathieu Dhainaut + * @since 2017 + */ +public class KryoSerializer { + + private static final ThreadLocal kryoLocal = new ThreadLocal() { + @Override + protected Kryo initialValue() { + Kryo kryo = new Kryo(); + kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); + kryo.addDefaultSerializer(OgcPropertyList.class, FieldSerializer.class); + + kryo.register(AbstractDataBlock[].class); + kryo.register(DataBlockTuple.class); + kryo.register(DataBlockParallel.class); + kryo.register(DataBlockByte.class); + kryo.register(DataBlockShort.class); + kryo.register(DataBlockInt.class); + kryo.register(DataBlockLong.class); + kryo.register(DataBlockFloat.class); + kryo.register(DataBlockDouble.class); + kryo.register(DataBlockString.class); + kryo.register(DataStreamInfo.class); + return kryo; + }; + }; + + private KryoSerializer() { + } + + public static Kryo getInstance() { + return kryoLocal.get(); + } + + public static byte[] serialize(Object object) { + // create buffer + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Output output = new Output(bos); + + // write into buffer + kryoLocal.get().writeClassAndObject(output, object); + output.flush(); + + // get serialized data + byte[] result = bos.toByteArray(); + + // close buffer + output.close(); + + // + // return serialized data + return result; + } + + public static T deserialize(byte[] serializedData) { + // create buffer + ByteArrayInputStream bis = new ByteArrayInputStream(serializedData); + Input ki = new Input(bis); + + // read from buffer + T result = (T) kryoLocal.get().readClassAndObject(ki); + + // close buffer + ki.close(); + + // return deserialized data + return result; + } +} diff --git a/persistence/sensorhub-storage-es-rest/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider b/persistence/sensorhub-storage-es-rest/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider new file mode 100644 index 000000000..f23ab3430 --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/main/resources/META-INF/services/org.sensorhub.api.module.IModuleProvider @@ -0,0 +1,2 @@ +org.sensorhub.impl.persistence.es.ESObsStorageDescriptor +org.sensorhub.impl.persistence.es.ESMultiSourceStorageDescriptor \ No newline at end of file diff --git a/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/integration/TestEsBasicStorage.java b/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/integration/TestEsBasicStorage.java new file mode 100644 index 000000000..fda2130eb --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/integration/TestEsBasicStorage.java @@ -0,0 +1,82 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2015 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es.integration; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.impl.persistence.es.ESBasicStorageImpl; +import org.sensorhub.impl.persistence.es.ESBasicStorageConfig; +import org.sensorhub.test.persistence.AbstractTestBasicStorage; + + +public class TestEsBasicStorage extends AbstractTestBasicStorage { + + protected static final String CLUSTER_NAME = "elasticsearch"; + + @Before + public void init() throws Exception { + ESBasicStorageConfig config = new ESBasicStorageConfig(); + config.autoStart = true; + config.clusterName = CLUSTER_NAME; + List nodes = new ArrayList(); + nodes.add("localhost:9200"); + nodes.add("localhost:9201"); + + config.nodeUrls = nodes; + config.scrollFetchSize = 200; + config.bulkConcurrentRequests = 0; + config.id = "junit_" + UUID.randomUUID().toString(); + storage = new ESBasicStorageImpl(); + storage.init(config); + storage.start(); + } + + @After + public void closeStorage() throws SensorHubException { + storage.stop(); + } + + @Override + protected void forceReadBackFromStorage() throws Exception { + // Let the time to ES to write the data + // if some tests are not passed, try to increase this value first!! + storage.commit(); + } + + @AfterClass + public static void cleanup() throws UnknownHostException { +// // add transport address(es) +// Settings settings = Settings.builder() +// .put("cluster.name", CLUSTER_NAME).build(); +// +// TransportClient client = new PreBuiltTransportClient(settings); +// client.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300)); +// +// String idxName = "junit_*"; +// +// DeleteIndexResponse delete = client.admin().indices().delete(new DeleteIndexRequest(idxName)).actionGet(); +// if (!delete.isAcknowledged()) { +// System.err.println("Index wasn't deleted"); +// } +// +// client.close(); + } +} diff --git a/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/integration/TestEsMultiSourceStorage.java b/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/integration/TestEsMultiSourceStorage.java new file mode 100644 index 000000000..6d58e340e --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/integration/TestEsMultiSourceStorage.java @@ -0,0 +1,89 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2015 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es.integration; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.api.persistence.IObsStorageModule; +import org.sensorhub.impl.persistence.es.ESBasicStorageConfig; +import org.sensorhub.impl.persistence.es.ESMultiSourceStorageImpl; +import org.sensorhub.test.persistence.AbstractTestMultiObsStorage; + + +public class TestEsMultiSourceStorage extends AbstractTestMultiObsStorage> +{ + protected static final String CLUSTER_NAME = "elasticsearch"; + + @Before + public void init() throws Exception + { + ESBasicStorageConfig config = new ESBasicStorageConfig(); + config.autoStart = true; + config.clusterName = CLUSTER_NAME; + List nodes = new ArrayList(); + nodes.add("localhost:9200"); + nodes.add("localhost:9201"); + + config.nodeUrls = nodes; + config.scrollFetchSize = 200; + config.bulkConcurrentRequests = 0; + config.id = "junit_"+UUID.randomUUID().toString(); + storage = new ESMultiSourceStorageImpl(); + storage.init(config); + storage.start(); + } + + @After + public void closeStorage() throws SensorHubException { + storage.stop(); + } + + @Override + protected void forceReadBackFromStorage() throws Exception + { + // Let the time to ES to write the data + // if some tests are not passed, try to increase this value first!! + //Thread.sleep(2500); + storage.commit(); + } + + + @AfterClass + public static void cleanup() throws UnknownHostException + { +// // add transport address(es) +// Settings settings = Settings.builder() +// .put("cluster.name", CLUSTER_NAME).build(); +// +// TransportClient client = new PreBuiltTransportClient(settings); +// client.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300)); +// +// String idxName = "junit_*"; +// +// DeleteIndexResponse delete = client.admin().indices().delete(new DeleteIndexRequest(idxName)).actionGet(); +// if (!delete.isAcknowledged()) { +// System.err.println("Index wasn't deleted"); +// } +// +// client.close(); + } + +} diff --git a/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/integration/TestEsObsStorage.java b/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/integration/TestEsObsStorage.java new file mode 100644 index 000000000..61b17b350 --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/integration/TestEsObsStorage.java @@ -0,0 +1,91 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2015 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es.integration; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.impl.persistence.es.ESBasicStorageConfig; +import org.sensorhub.impl.persistence.es.ESObsStorageImpl; +import org.sensorhub.test.persistence.AbstractTestObsStorage; + + +public class TestEsObsStorage extends AbstractTestObsStorage +{ + + protected static final String CLUSTER_NAME = "elasticsearch"; + + @Before + public void init() throws Exception + { + ESBasicStorageConfig config = new ESBasicStorageConfig(); + config.autoStart = true; + config.clusterName = CLUSTER_NAME; + List nodes = new ArrayList(); + nodes.add("localhost:9300"); + + config.nodeUrls = nodes; + config.scrollFetchSize = 200; + config.bulkConcurrentRequests = 0; + config.scrollMaxDuration = 999999000; + config.id = "junit_"+UUID.randomUUID().toString(); + storage = new ESObsStorageImpl(); + storage.init(config); + storage.start(); + } + + + @After + public void closeStorage() throws SensorHubException { + storage.stop(); + } + + @Override + protected void forceReadBackFromStorage() throws Exception + { + // Let the time to ES to write the data + // if some tests are not passed, try to increase this value first!! + //Thread.sleep(2500); + storage.commit(); + + } + + @AfterClass + public static void cleanup() throws UnknownHostException + { +// // add transport address(es) +// Settings settings = Settings.builder() +// .put("cluster.name", CLUSTER_NAME).build(); +// +// TransportClient client = new PreBuiltTransportClient(settings); +// client.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300)); +// +// String idxName = "junit_*"; +// +// DeleteIndexResponse delete = client.admin().indices().delete(new DeleteIndexRequest(idxName)).actionGet(); +// if (!delete.isAcknowledged()) { +// System.err.println("Index wasn't deleted"); +// } +// +// client.close(); + } + +} diff --git a/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/mock/TestEsBasicStorage.java b/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/mock/TestEsBasicStorage.java new file mode 100644 index 000000000..ba79f6ad2 --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/mock/TestEsBasicStorage.java @@ -0,0 +1,326 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2012-2015 Sensia Software LLC. All Rights Reserved. + + ******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es.mock; + +import net.opengis.swe.v20.Count; +import net.opengis.swe.v20.DataArray; +import net.opengis.swe.v20.DataBlock; +import net.opengis.swe.v20.DataComponent; +import net.opengis.swe.v20.DataEncoding; +import net.opengis.swe.v20.DataRecord; +import net.opengis.swe.v20.DataType; +import net.opengis.swe.v20.Vector; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.locationtech.jts.geom.Coordinate; +import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.api.persistence.DataFilter; +import org.sensorhub.api.persistence.DataKey; +import org.sensorhub.api.persistence.IDataRecord; +import org.sensorhub.impl.persistence.es.ESBasicStorageConfig; +import org.sensorhub.impl.persistence.es.ESBasicStorageImpl; +import org.sensorhub.test.TestUtils; +import org.sensorhub.test.persistence.AbstractTestBasicStorage; +import org.vast.data.TextEncodingImpl; +import org.vast.swe.SWEConstants; +import org.vast.swe.SWEHelper; +import org.vast.swe.helper.GeoPosHelper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.assertEquals; + + +public class TestEsBasicStorage extends AbstractTestBasicStorage { + + protected static final String CLUSTER_NAME = "elasticsearch"; + + private static final boolean clean_index = true; + + @Before + public void init() throws Exception { + + + ESBasicStorageConfig config = new ESBasicStorageConfig(); + config.autoStart = true; + config.clusterName = CLUSTER_NAME; + List nodes = new ArrayList(); + nodes.add("localhost:9200"); + nodes.add("localhost:9201"); + + config.nodeUrls = nodes; + config.bulkConcurrentRequests = 0; + config.id = "junit_testesbasicstorage_" + System.currentTimeMillis(); + config.indexNamePrepend = "data_" + config.id + "_"; + config.indexNameMetaData = "meta_" + config.id + "_"; + + storage = new ESBasicStorageImpl(); + storage.init(config); + storage.start(); + } + + @After + public void after() throws SensorHubException { + // Delete added index + storage.commit(); + if (clean_index) { + DeleteIndexRequest request = new DeleteIndexRequest(storage.getAddedIndex().toArray(new String[storage.getAddedIndex().size()])); + try { + storage.getClient().indices().delete(request); + } catch (IOException ex) { + throw new SensorHubException(ex.getLocalizedMessage(), ex); + } + } + storage.stop(); + } + + @Override + protected void forceReadBackFromStorage() throws Exception { + // Let the time to ES to write the data + // if some tests are not passed, try to increase this value first!! + storage.commit(); + } + public static void assertDataBlockEquals(DataBlock data1, DataBlock data2) throws Exception { + Assert.assertEquals("Data blocks are not the same size", (long)data1.getAtomCount(), (long)data2.getAtomCount()); + + for(int i = 0; i < data1.getAtomCount(); ++i) { + Assert.assertEquals(data1.getDataType(i), data2.getDataType(i)); + + if(data1.getDataType(i) == DataType.DOUBLE || data1.getDataType(i) == DataType.FLOAT) { + Assert.assertEquals(data1.getDoubleValue(i), data2.getDoubleValue(i), 1e-6); + } else { + Assert.assertEquals("Data blocks values are not equal at index=" + i, data1.getStringValue(i), data2.getStringValue(i)); + } + } + + } + @Test + public void testLocationOutput() throws Exception { + GeoPosHelper fac = new GeoPosHelper(); + Vector locVector = fac.newLocationVectorLLA(SWEConstants.DEF_SENSOR_LOC); + locVector.setName("location"); + locVector.setLocalFrame('#' + "locationjunit"); + DataComponent outputStruct = fac.wrapWithTimeStampUTC(locVector); + outputStruct.setName("sensorLocation"); + outputStruct.setId("SENSOR_LOCATION"); + DataEncoding outputEncoding = new TextEncodingImpl(); + + storage.addRecordStore(outputStruct.getName(), outputStruct, outputEncoding); + + double timeStamp = new Date().getTime() / 1000.; + // build new datablock + DataBlock dataBlock = outputStruct.createDataBlock(); + Coordinate location = new Coordinate(-1.55336, 47.21725, 15); + dataBlock.setDoubleValue(0, timeStamp); + dataBlock.setDoubleValue(1, location.y); //y + dataBlock.setDoubleValue(2, location.x); //x + dataBlock.setDoubleValue(3, location.z); //z + + storage.storeRecord(new DataKey(outputStruct.getName(), "e44cb499-3b6c-4305-b479-ebacc965579f", timeStamp), dataBlock); + + forceReadBackFromStorage(); + + // Read back + Iterator it = storage.getRecordIterator(new DataFilter(outputStruct.getName()) { + @Override + public double[] getTimeStampRange() { + return new double[]{timeStamp - 5, Double.MAX_VALUE}; + } + }); + int i = 0; + while (it.hasNext()) { + assertDataBlockEquals(dataBlock, it.next().getData()); + i++; + } + assertEquals(1, i); + } + + private static final float[] freqs = new float[]{20, 25, 31.5f, 40, 50, 63, 80, 100, 125, 160, 200, 250, 315, 400, 500, 630, 800, 1000, 1250, 1600, 2000, 2500, 3150, 4000, 5000, 6300, 8000, 10000, 12500}; + + @Test + public void testNestedDataBlock() throws Exception { + SWEHelper fac = new SWEHelper(); + DataComponent acousticData = fac.newDataRecord(); + acousticData.setName("acoustic_fast"); + acousticData.setDefinition("http://sensorml.com/ont/swe/property/Acoustic"); + acousticData.setDescription("Acoustic indicators measurements"); + + // add time, temperature, pressure, wind speed and wind direction fields + acousticData.addComponent("time", fac.newTimeStampIsoUTC()); + acousticData.addComponent("leq", fac.newQuantity(SWEHelper.getPropertyUri("dBsplFast"), "Leq", null, "dB", DataType.FLOAT)); + acousticData.addComponent("laeq", fac.newQuantity(SWEHelper.getPropertyUri("dBsplFast"), "LAeq", null, "dB(A)", DataType.FLOAT)); + + DataRecord nestedRec = fac.newDataRecord(2); + nestedRec.addComponent("freq", fac.newQuantity(SWEHelper.getPropertyUri("frequency"), "freq", null, "Hz", DataType.FLOAT)); + nestedRec.addComponent("spl", fac.newQuantity(SWEHelper.getPropertyUri("spl"), "spl", null, "dB", DataType.FLOAT)); + + DataArray recordDesc = fac.newDataArray(freqs.length); + recordDesc.setName("spectrum"); + recordDesc.setDefinition("urn:spectrum:third-octave"); + recordDesc.setElementType("elt", nestedRec); + acousticData.addComponent("spectrum", recordDesc); + + // also generate encoding definition + DataEncoding acousticEncoding = fac.newTextEncoding(",", "\n"); + + + storage.addRecordStore(acousticData.getName(), acousticData, acousticEncoding); + + forceReadBackFromStorage(); + + DataBlock dataBlock = acousticData.createDataBlock(); + int index = 0; + dataBlock.setDoubleValue(index++, 1531297249.125); + dataBlock.setFloatValue(index++, 45.4f); + dataBlock.setFloatValue(index++, 44.6f); + for(float freq : freqs) { + dataBlock.setFloatValue(index++, freq); + dataBlock.setFloatValue(index++, (float)(22.1 + Math.log10(freq))); + } + DataKey dataKey = new DataKey(acousticData.getName(), + "e44cb499-3b6c-4305-b479-ebacc965579f", dataBlock.getDoubleValue(0)); + + storage.storeRecord(dataKey, dataBlock); + + forceReadBackFromStorage(); + + DataBlock dataBlock1 = storage.getDataBlock(dataKey); + + TestUtils.assertEquals(dataBlock, dataBlock1); + } + + + @Test + public void testSimpleArrayDataBlock() throws Exception { + SWEHelper fac = new SWEHelper(); + DataComponent acousticData = fac.newDataRecord(); + acousticData.setName("acoustic_fast"); + acousticData.setDefinition("http://sensorml.com/ont/swe/property/Acoustic"); + acousticData.setDescription("Acoustic indicators measurements"); + + Count elementCount = fac.newCount(); + elementCount.setValue(8); // 8x125ms + + // add time, temperature, pressure, wind speed and wind direction fields + acousticData.addComponent("time", fac.newTimeStampIsoUTC()); + acousticData.addComponent("leq", fac.newArray(elementCount, "leq", fac.newQuantity(SWEHelper.getPropertyUri("dBsplFast"), "Leq", null, "dB", DataType.FLOAT))); + acousticData.addComponent("laeq", fac.newArray(elementCount, "laeq", fac.newQuantity(SWEHelper.getPropertyUri("dBsplFast"), "LAeq", null, "dB(A)", DataType.FLOAT))); + for(double freq : freqs) { + String name = "leq_" + Double.valueOf(freq).intValue(); + acousticData.addComponent(name, fac.newArray(elementCount, name, fac.newQuantity(SWEHelper.getPropertyUri("dBsplFast"), name, null, "dB", DataType.FLOAT))); + } + + // also generate encoding definition + DataEncoding acousticEncoding = fac.newTextEncoding(",", "\n"); + + + storage.addRecordStore(acousticData.getName(), acousticData, acousticEncoding); + + forceReadBackFromStorage(); + + DataBlock dataBlock = acousticData.createDataBlock(); + int index = 0; + dataBlock.setDoubleValue(index++, 1531297249.125); + for(int idStep = 0; idStep < 8; idStep++) { + dataBlock.setDoubleValue(index++, idStep + 0.1); + } + for(int idStep = 0; idStep < 8; idStep++) { + dataBlock.setDoubleValue(index++, idStep + 0.2); + } + for (float freq : freqs) { + for(int idStep = 0; idStep < 8; idStep++) { + dataBlock.setDoubleValue(index++, idStep + freq / 100000.); + } + } + DataKey dataKey = new DataKey(acousticData.getName(), + "e44cb499-3b6c-4305-b479-ebacc965579f", dataBlock.getDoubleValue(0)); + + storage.storeRecord(dataKey, dataBlock); + + forceReadBackFromStorage(); + + DataBlock dataBlock1 = storage.getDataBlock(dataKey); + + TestUtils.assertEquals(dataBlock, dataBlock1); + + } + + + @Test + public void testNaNSimpleArrayDataBlock() throws Exception { + SWEHelper fac = new SWEHelper(); + DataComponent acousticData = fac.newDataRecord(); + acousticData.setName("acoustic_fast"); + acousticData.setDefinition("http://sensorml.com/ont/swe/property/Acoustic"); + acousticData.setDescription("Acoustic indicators measurements"); + + Count elementCount = fac.newCount(); + elementCount.setValue(8); // 8x125ms + + // add time, temperature, pressure, wind speed and wind direction fields + acousticData.addComponent("time", fac.newTimeStampIsoUTC()); + acousticData.addComponent("leq", fac.newArray(elementCount, "leq", fac.newQuantity(SWEHelper.getPropertyUri("dBsplFast"), "Leq", null, "dB", DataType.FLOAT))); + acousticData.addComponent("laeq", fac.newArray(elementCount, "laeq", fac.newQuantity(SWEHelper.getPropertyUri("dBsplFast"), "LAeq", null, "dB(A)", DataType.FLOAT))); + for(double freq : freqs) { + String name = "leq_" + Double.valueOf(freq).intValue(); + acousticData.addComponent(name, fac.newArray(elementCount, name, fac.newQuantity(SWEHelper.getPropertyUri("dBsplFast"), name, null, "dB", DataType.FLOAT))); + } + + // also generate encoding definition + DataEncoding acousticEncoding = fac.newTextEncoding(",", "\n"); + + + storage.addRecordStore(acousticData.getName(), acousticData, acousticEncoding); + + forceReadBackFromStorage(); + + DataBlock dataBlock = acousticData.createDataBlock(); + int index = 0; + dataBlock.setDoubleValue(index++, 1531297249.125); + for(int idStep = 0; idStep < 8; idStep++) { + dataBlock.setDoubleValue(index++, idStep + 0.1); + } + for(int idStep = 0; idStep < 8; idStep++) { + dataBlock.setDoubleValue(index++, idStep + 0.2); + } + for (float freq : freqs) { + for(int idStep = 0; idStep < 8; idStep++) { + if(idStep != 2) { + dataBlock.setDoubleValue(index++, idStep + freq / 100000.); + } else { + dataBlock.setDoubleValue(index++, Double.NaN); + } + } + } + DataKey dataKey = new DataKey(acousticData.getName(), + "e44cb499-3b6c-4305-b479-ebacc965579f", dataBlock.getDoubleValue(0)); + + storage.storeRecord(dataKey, dataBlock); + + forceReadBackFromStorage(); + + DataBlock dataBlock1 = storage.getDataBlock(dataKey); + + TestUtils.assertEquals(dataBlock, dataBlock1); + + } +} diff --git a/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/mock/TestEsMultiSourceStorage.java b/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/mock/TestEsMultiSourceStorage.java new file mode 100644 index 000000000..e12393646 --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/mock/TestEsMultiSourceStorage.java @@ -0,0 +1,79 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2015 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es.mock; + +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.junit.After; +import org.junit.Before; +import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.impl.persistence.es.ESBasicStorageConfig; +import org.sensorhub.impl.persistence.es.ESMultiSourceStorageImpl; +import org.sensorhub.test.persistence.AbstractTestMultiObsStorage; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class TestEsMultiSourceStorage extends AbstractTestMultiObsStorage { + + protected static final String CLUSTER_NAME = "elasticsearch"; + + private static final boolean clean_index = true; + + @Before + public void init() throws Exception { + + + ESBasicStorageConfig config = new ESBasicStorageConfig(); + config.autoStart = true; + config.clusterName = CLUSTER_NAME; + List nodes = new ArrayList(); + nodes.add("localhost:9200"); + nodes.add("localhost:9201"); + + config.nodeUrls = nodes; + config.bulkConcurrentRequests = 0; + config.id = "junit_testesmultisourcestorage_" + System.currentTimeMillis(); + config.indexNamePrepend = "data_" + config.id + "_"; + config.indexNameMetaData = "meta_" + config.id + "_"; + + storage = new ESMultiSourceStorageImpl(); + storage.init(config); + storage.start(); + } + + @After + public void after() throws SensorHubException { + // Delete added index + storage.commit(); + if(clean_index) { + DeleteIndexRequest request = new DeleteIndexRequest(storage.getAddedIndex().toArray(new String[storage.getAddedIndex().size()])); + try { + storage.getClient().indices().delete(request); + } catch (IOException ex) { + throw new SensorHubException(ex.getLocalizedMessage(), ex); + } + } + storage.stop(); + } + + @Override + protected void forceReadBackFromStorage() throws Exception { + // Let the time to ES to write the data + // if some tests are not passed, try to increase this value first!! + storage.commit(); + } +} + diff --git a/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/mock/TestEsObsStorage.java b/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/mock/TestEsObsStorage.java new file mode 100644 index 000000000..3e8799b5a --- /dev/null +++ b/persistence/sensorhub-storage-es-rest/src/test/java/org/sensorhub/impl/persistence/es/mock/TestEsObsStorage.java @@ -0,0 +1,78 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2012-2015 Sensia Software LLC. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.persistence.es.mock; + +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.junit.After; +import org.junit.Before; +import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.impl.persistence.es.ESBasicStorageConfig; +import org.sensorhub.impl.persistence.es.ESObsStorageImpl; +import org.sensorhub.test.persistence.AbstractTestObsStorage; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class TestEsObsStorage extends AbstractTestObsStorage { + + protected static final String CLUSTER_NAME = "elasticsearch"; + + private static final boolean clean_index = true; + + @Before + public void init() throws Exception { + + + ESBasicStorageConfig config = new ESBasicStorageConfig(); + config.autoStart = true; + config.clusterName = CLUSTER_NAME; + List nodes = new ArrayList(); + nodes.add("localhost:9200"); + nodes.add("localhost:9201"); + + config.nodeUrls = nodes; + config.bulkConcurrentRequests = 0; + config.id = "junit_testesobsstorage_" + System.currentTimeMillis(); + config.indexNamePrepend = "data_" + config.id + "_"; + config.indexNameMetaData = "meta_" + config.id + "_"; + + storage = new ESObsStorageImpl(); + storage.init(config); + storage.start(); + } + + @After + public void after() throws SensorHubException { + // Delete added index + storage.commit(); + if(clean_index) { + DeleteIndexRequest request = new DeleteIndexRequest(storage.getAddedIndex().toArray(new String[storage.getAddedIndex().size()])); + try { + storage.getClient().indices().delete(request); + } catch (IOException ex) { + throw new SensorHubException(ex.getLocalizedMessage(), ex); + } + } + storage.stop(); + } + + @Override + protected void forceReadBackFromStorage() throws Exception { + // Let the time to ES to write the data + // if some tests are not passed, try to increase this value first!! + storage.commit(); + } +} diff --git a/persistence/sensorhub-storage-es-rest/src/test/resources/empty b/persistence/sensorhub-storage-es-rest/src/test/resources/empty new file mode 100644 index 000000000..e69de29bb