Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pipeline/data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@
<artifactId>protobuf-java</artifactId>
<version>${protobuf.java.version}</version>
</dependency>
<dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.datacommons.ingestion.data;

import com.google.common.base.Joiner;
import java.nio.charset.StandardCharsets;

/** Util functions for the pipeline data model. */
public class DataUtils {

// Standard FNV-1a 32-bit constants
private static final int FNV_32_INIT = 0x811c9dc5;
private static final int FNV_32_PRIME = 0x01000193;

/**
* Generates a consistent facet ID using the FNV-1a 32-bit hash algorithm.
*
* <p>This is designed to replicate the legacy Go facet ID generation implementation in Mixer's
* GetFacetID function. See
* https://github.com/datacommonsorg/mixer/blob/0618c1f3ef80703c98fc97f6c6c6e5cd3d7c13d3/internal/util/util.go#L497-L515
*
* @param builder The Observation builder containing the fields to hash.
* @return A consistent facet ID string.
*/
public static String generateFacetId(Observation.Builder builder) {
// Only include fields that are set in hash.
// This is so the hashes stay consistent if more fields are added.
String s =
Joiner.on("-")
.join(
builder.getImportName(),
builder.getMeasurementMethod(),
builder.getObservationPeriod(),
builder.getScalingFactor(),
builder.getUnit());
if (builder.getIsDcAggregate()) {
s += "-IsDcAggregate";
}

int hash = fnv1a32(s);

// Go's fmt.Sprint on a uint32 treats it as unsigned.
// We must do the same in Java to avoid negative string values.
return Integer.toUnsignedString(hash);
}

/**
* Computes the 32-bit FNV-1a hash of a string.
*
* <p>Note: Java does not provide a built-in FNV-1a implementation, so we implement it manually
* here.
*
* @param data The input string to hash.
* @return The FNV-1a 32-bit hash as an integer.
*/
private static int fnv1a32(String data) {
int hash = FNV_32_INIT;
for (byte b : data.getBytes(StandardCharsets.UTF_8)) {
hash ^= (b & 0xff); // Bitwise XOR with the unsigned byte value
hash *= FNV_32_PRIME;
}
return hash;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,32 @@ public Builder provenanceUrl(String provenanceUrl) {
return this;
}

public String getImportName() {
return importName;
}

public String getMeasurementMethod() {
return measurementMethod;
}

public String getObservationPeriod() {
return observationPeriod;
}

public String getScalingFactor() {
return scalingFactor;
}

public String getUnit() {
return unit;
}

public boolean getIsDcAggregate() {
return isDcAggregate;
}

public Observation build() {
int intHash =
Objects.hash(
importName, measurementMethod, observationPeriod, scalingFactor, unit, isDcAggregate);
// Convert to positive long and then to string
this.facetId = String.valueOf((long) intHash & 0x7fffffffL);
this.facetId = DataUtils.generateFacetId(this);
return new Observation(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.datacommons.ingestion.data;

import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.Collection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class DataUtilsTest {

private final String expectedId;
private final String importName;
private final String measurementMethod;
private final String observationPeriod;
private final String scalingFactor;
private final String unit;
private final boolean isDcAggregate;

public DataUtilsTest(
String expectedId,
String importName,
String measurementMethod,
String observationPeriod,
String scalingFactor,
String unit,
boolean isDcAggregate) {
this.expectedId = expectedId;
this.importName = importName;
this.measurementMethod = measurementMethod;
this.observationPeriod = observationPeriod;
this.scalingFactor = scalingFactor;
this.unit = unit;
this.isDcAggregate = isDcAggregate;
}

// This method provides the data for the test below
@Parameters(name = "Test {index}: expected {0}")
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
// Format: expectedId, importName, measurementMethod, observationPeriod, scalingFactor,
// unit, isDcAggregate
{"3981252704", "WorldDevelopmentIndicators", "", "P1Y", "", "", false},
{
"10983471",
"CensusACS5YearSurvey_SubjectTables_S2601A",
"CensusACS5yrSurveySubjectTable",
"",
"",
"",
false
},
{"2825511676", "CDC_Mortality_UnderlyingCause", "", "", "", "", false},
{"1226172227", "CensusACS1YearSurvey", "CensusACS1yrSurvey", "", "", "", false},
{"2176550201", "USCensusPEP_Annual_Population", "CensusPEPSurvey", "P1Y", "", "", false},
{"2645850372", "CensusACS5YearSurvey_AggCountry", "CensusACS5yrSurvey", "", "", "", true},
{
"1541763368",
"USDecennialCensus_RedistrictingRelease",
"USDecennialCensus",
"",
"",
"",
false
},
{
"4181918134",
"OECDRegionalDemography_Population",
"OECDRegionalStatistics",
"P1Y",
"",
"",
false
},
{
"1964317807",
"CensusACS5YearSurvey_SubjectTables_S0101",
"CensusACS5yrSurveySubjectTable",
"",
"",
"",
false
},
{"2517965213", "CensusPEP", "CensusPEPSurvey", "", "", "", false}
});
}

@Test
public void testGenerateFacetId() {
Observation.Builder builder =
Observation.builder()
.importName(importName)
.measurementMethod(measurementMethod)
.observationPeriod(observationPeriod)
.scalingFactor(scalingFactor)
.unit(unit)
.isDcAggregate(isDcAggregate);
String facetId = DataUtils.generateFacetId(builder);

assertEquals(expectedId, facetId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,40 +206,45 @@ public void testParseTimeSeriesRow() {
new NodesEdges()
.addNode(
Node.builder()
.subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137")
.value("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137")
.name("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 870755137")
.subjectId(
"dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815")
.value("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815")
.name("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 4134842815")
.types(List.of("StatVarObsSeries"))
.build())
.addNode(
Node.builder()
.subjectId("jVWNIHt73yOspqKD0fnvTCH8GCW7m38F3gW+JB+aWms=")
.value("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 870755137")
.subjectId("jKdXZgRFpeibUoXpgItXgC+oCoPMFsqP5UFyqNJ+Xss=")
.value("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 4134842815")
.build())
.addEdge(
Edge.builder()
.subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137")
.subjectId(
"dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815")
.predicate("variableMeasured")
.objectId("Mean_PrecipitableWater_Atmosphere")
.provenance("dc/base/NOAA_GFS_WeatherForecast")
.build())
.addEdge(
Edge.builder()
.subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137")
.subjectId(
"dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815")
.predicate("observationAbout")
.objectId("geoId/sch2915390")
.provenance("dc/base/NOAA_GFS_WeatherForecast")
.build())
.addEdge(
Edge.builder()
.subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137")
.subjectId(
"dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815")
.predicate("name")
.objectId("jVWNIHt73yOspqKD0fnvTCH8GCW7m38F3gW+JB+aWms=")
.objectId("jKdXZgRFpeibUoXpgItXgC+oCoPMFsqP5UFyqNJ+Xss=")
.provenance("dc/base/NOAA_GFS_WeatherForecast")
.build())
.addEdge(
Edge.builder()
.subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137")
.subjectId(
"dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815")
.predicate("typeOf")
.objectId("StatVarObsSeries")
.provenance("dc/base/NOAA_GFS_WeatherForecast")
Expand Down