Skip to content

Commit 9784c01

Browse files
author
Nagaraj G
committed
Fix partial cache update post snapshot restore
Signed-off-by: Nagaraj G <[email protected]>
1 parent d29095f commit 9784c01

File tree

4 files changed

+201
-21
lines changed

4 files changed

+201
-21
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1818
* [Resource Sharing] Reverts @Inject pattern usage for ResourceSharingExtension to client accessor pattern. ([#5576](https://github.com/opensearch-project/security/pull/5576))
1919
* Inject user custom attributes when injecting user and role information to the thread context ([#5560](https://github.com/opensearch-project/security/pull/5560))
2020
* Allow any plugin system request when `plugins.security.system_indices.enabled` is set to `false` ([#5579](https://github.com/opensearch-project/security/pull/5579))
21+
* Fix compilation issue after change to Subject interface in core and bump to 3.2.0 ([#5423](https://github.com/opensearch-project/security/pull/5423))
22+
* Provide SecureHttpTransportParameters to complement SecureTransportParameters counterpart ([#5432](https://github.com/opensearch-project/security/pull/5432))
23+
* Use isClusterPerm instead of requestedResolved.isLocalAll() to determine if action is a cluster action ([#5445](https://github.com/opensearch-project/security/pull/5445))
24+
* Fix config update with deprecated config types failing in mixed clusters ([#5456](https://github.com/opensearch-project/security/pull/5456))
25+
* Fix usage of jwt_clock_skew_tolerance_seconds in HTTPJwtAuthenticator ([#5506](https://github.com/opensearch-project/security/pull/5506))
26+
* Fix partial cache update post snapshot restore[#5478](https://github.com/opensearch-project/security/pull/5478)
2127

2228
### Refactoring
2329

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*
8+
* Modifications Copyright OpenSearch Contributors. See
9+
* GitHub history for details.
10+
*/
11+
12+
package org.opensearch.security;
13+
14+
import java.util.List;
15+
import java.util.Map;
16+
import java.util.concurrent.ExecutionException;
17+
18+
import org.apache.logging.log4j.LogManager;
19+
import org.apache.logging.log4j.Logger;
20+
import org.junit.After;
21+
import org.junit.Before;
22+
import org.junit.ClassRule;
23+
import org.junit.Test;
24+
25+
import org.opensearch.OpenSearchStatusException;
26+
import org.opensearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
27+
import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
28+
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
29+
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
30+
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
31+
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
32+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
33+
import org.opensearch.action.get.GetRequest;
34+
import org.opensearch.action.get.GetResponse;
35+
import org.opensearch.action.index.IndexRequest;
36+
import org.opensearch.action.support.WriteRequest;
37+
import org.opensearch.client.RequestOptions;
38+
import org.opensearch.client.RestHighLevelClient;
39+
import org.opensearch.common.xcontent.XContentType;
40+
import org.opensearch.core.rest.RestStatus;
41+
import org.opensearch.security.api.AbstractApiIntegrationTest;
42+
import org.opensearch.security.support.ConfigConstants;
43+
import org.opensearch.test.framework.TestSecurityConfig;
44+
import org.opensearch.test.framework.cluster.ClusterManager;
45+
import org.opensearch.test.framework.cluster.LocalCluster;
46+
import org.opensearch.test.framework.cluster.TestRestClient;
47+
import org.opensearch.transport.client.Client;
48+
49+
import static org.hamcrest.MatcherAssert.assertThat;
50+
import static org.opensearch.security.CrossClusterSearchTests.TYPE_ATTRIBUTE;
51+
import static org.opensearch.security.SearchOperationTest.TEST_SNAPSHOT_REPOSITORY_NAME;
52+
import static org.opensearch.security.support.ConfigConstants.SECURITY_BACKGROUND_INIT_IF_SECURITYINDEX_NOT_EXIST;
53+
import static org.opensearch.security.support.ConfigConstants.SECURITY_RESTAPI_ROLES_ENABLED;
54+
import static org.opensearch.test.framework.TestSecurityConfig.AuthcDomain.AUTHC_HTTPBASIC_INTERNAL;
55+
import static org.opensearch.test.framework.TestSecurityConfig.Role.ALL_ACCESS;
56+
import static org.opensearch.test.framework.TestSecurityConfig.User.USER_ADMIN;
57+
import static org.opensearch.test.framework.matcher.GetResponseMatchers.containDocument;
58+
import static org.junit.Assert.assertEquals;
59+
import static org.junit.Assert.assertTrue;
60+
61+
public class SecurityIndexSnapshotRestoreTests extends AbstractApiIntegrationTest {
62+
private static final Logger log = LogManager.getLogger(SecurityIndexSnapshotRestoreTests.class);
63+
64+
private static final String TEST_INDEX_NAME = "my_index_001";
65+
private static final String DOC_ID = "doc_id";
66+
67+
private static final TestSecurityConfig.User ADMIN_USER = new TestSecurityConfig.User("admin").roles(ALL_ACCESS)
68+
.attr(TYPE_ATTRIBUTE, "administrative");
69+
70+
private static final TestSecurityConfig.User LIMITED_READ_USER_1 = new TestSecurityConfig.User("limited_read_user").roles(
71+
new TestSecurityConfig.Role("limited-reader").indexPermissions("indices:data/read*").on(TEST_INDEX_NAME)
72+
);
73+
74+
private static final TestSecurityConfig.User LIMITED_READ_USER_2 = new TestSecurityConfig.User("user2");
75+
76+
private static final TestSecurityConfig.Role LIMITED_READ_USER_2_ROLE = new TestSecurityConfig.Role("limited-reader_2")
77+
.indexPermissions("indices:data/read*")
78+
.on(TEST_INDEX_NAME);
79+
80+
private String securityIndex;
81+
82+
@ClassRule
83+
public static LocalCluster cluster = new LocalCluster.Builder().clusterManager(ClusterManager.THREE_CLUSTER_MANAGERS)
84+
.authc(AUTHC_HTTPBASIC_INTERNAL)
85+
.users(ADMIN_USER, LIMITED_READ_USER_1)
86+
.anonymousAuth(false)
87+
.nodeSettings(
88+
Map.of(
89+
SECURITY_RESTAPI_ROLES_ENABLED,
90+
List.of("user_" + USER_ADMIN.getName() + "__" + ALL_ACCESS.getName()),
91+
SECURITY_BACKGROUND_INIT_IF_SECURITYINDEX_NOT_EXIST,
92+
false
93+
)
94+
)
95+
.build();
96+
97+
@Before
98+
public void setUp() throws Exception {
99+
securityIndex = ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX;
100+
101+
try (Client client = cluster.getInternalNodeClient()) {
102+
client.admin()
103+
.cluster()
104+
.putRepository(
105+
new PutRepositoryRequest(TEST_SNAPSHOT_REPOSITORY_NAME).type("fs")
106+
.settings(Map.of("location", cluster.getSnapshotDirPath()))
107+
)
108+
.actionGet();
109+
110+
CreateIndexResponse createIndexResponse = client.admin().indices().create(new CreateIndexRequest(TEST_INDEX_NAME)).actionGet();
111+
assertTrue(createIndexResponse.isAcknowledged());
112+
113+
client.index(
114+
new IndexRequest(TEST_INDEX_NAME).id(DOC_ID)
115+
.source("{\"message\": \"test document 1\"}", XContentType.JSON)
116+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
117+
).actionGet();
118+
}
119+
}
120+
121+
@After
122+
public void cleanData() throws ExecutionException, InterruptedException {
123+
try (Client client = cluster.getInternalNodeClient()) {
124+
client.admin().indices().delete(new DeleteIndexRequest(TEST_INDEX_NAME)).actionGet();
125+
126+
client.admin().cluster().deleteRepository(new DeleteRepositoryRequest(TEST_SNAPSHOT_REPOSITORY_NAME)).actionGet();
127+
}
128+
}
129+
130+
@Test
131+
public void testSecurityCacheReloadAfterRestore() throws Exception {
132+
// 1. Read data in custom index with LIMITED_READ_USER_1
133+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_READ_USER_1)) {
134+
GetResponse response = restHighLevelClient.get(new GetRequest(TEST_INDEX_NAME, DOC_ID), RequestOptions.DEFAULT);
135+
assertThat(response, containDocument(TEST_INDEX_NAME, DOC_ID));
136+
}
137+
138+
// 2. Create snapshot of security index
139+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(ADMIN_USER)) {
140+
SnapshotSteps steps = new SnapshotSteps(restHighLevelClient);
141+
steps.createSnapshot(TEST_SNAPSHOT_REPOSITORY_NAME, "test-snap", securityIndex);
142+
steps.waitForSnapshotCreation(TEST_SNAPSHOT_REPOSITORY_NAME, "test-snap");
143+
}
144+
145+
// 3. Add new role and user to security index (This is not in snapshot created above)
146+
try (TestRestClient client = cluster.getRestClient(ADMIN_USER)) {
147+
client.createRole(LIMITED_READ_USER_2_ROLE.getName(), LIMITED_READ_USER_2_ROLE).assertStatusCode(201);
148+
client.createUser(LIMITED_READ_USER_2.getName(), LIMITED_READ_USER_2).assertStatusCode(201);
149+
client.assignRoleToUser(LIMITED_READ_USER_2.getName(), "limited-reader_2").assertStatusCode(200);
150+
}
151+
152+
// 4. Read data in custom index with LIMITED_READ_USER_2
153+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_READ_USER_2)) {
154+
GetResponse response = restHighLevelClient.get(new GetRequest(TEST_INDEX_NAME, DOC_ID), RequestOptions.DEFAULT);
155+
assertThat(response, containDocument(TEST_INDEX_NAME, DOC_ID));
156+
}
157+
158+
// 5. Delete security index
159+
try (Client client = cluster.getInternalNodeClient()) {
160+
DeleteIndexRequest deleteRequest = new DeleteIndexRequest(securityIndex);
161+
client.admin().indices().delete(deleteRequest).actionGet();
162+
}
163+
164+
// 6. Restore security index
165+
try (Client client = cluster.getInternalNodeClient()) {
166+
RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(TEST_SNAPSHOT_REPOSITORY_NAME, "test-snap")
167+
.waitForCompletion(true)
168+
.indices(securityIndex); // restore security index
169+
170+
RestoreSnapshotResponse restoreResponse = client.admin().cluster().restoreSnapshot(restoreRequest).actionGet();
171+
172+
// Verify restore was successful
173+
assertEquals(RestStatus.OK, restoreResponse.status());
174+
}
175+
176+
// 7. Read data in custom index with LIMITED_READ_USER_1 because it was in snapshot
177+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_READ_USER_1)) {
178+
GetResponse response = restHighLevelClient.get(new GetRequest(TEST_INDEX_NAME, DOC_ID), RequestOptions.DEFAULT);
179+
assertThat(response, containDocument(TEST_INDEX_NAME, DOC_ID));
180+
}
181+
182+
// 8. Should get 401 error to read custom index with LIMITED_READ_USER_2 because it was not in snapshot
183+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_READ_USER_2)) {
184+
restHighLevelClient.get(new GetRequest(TEST_INDEX_NAME, DOC_ID), RequestOptions.DEFAULT);
185+
} catch (OpenSearchStatusException exception) {
186+
assertEquals(RestStatus.UNAUTHORIZED, exception.status()); // Verify it's a 401
187+
}
188+
}
189+
}

src/main/java/org/opensearch/security/configuration/ConfigurationRepository.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -681,15 +681,12 @@ public void afterIndexShardStarted(IndexShard indexShard) {
681681

682682
// Check if this is a security index shard
683683
if (securityIndex.equals(index.getName())) {
684-
// Only trigger on primary shard to avoid multiple reloads
685-
if (indexShard.routingEntry() != null && indexShard.routingEntry().primary()) {
686-
threadPool.generic().execute(() -> {
687-
if (isSecurityIndexRestoredFromSnapshot(clusterService, index, securityIndex)) {
688-
LOGGER.info("Security index primary shard {} started - config reloading for snapshot restore", shardId);
689-
reloadConfiguration(CType.values());
690-
}
691-
});
692-
}
684+
threadPool.generic().execute(() -> {
685+
if (isSecurityIndexRestoredFromSnapshot(clusterService, index, securityIndex)) {
686+
LOGGER.info("Security index shard {} started - config reloading for snapshot restore", shardId);
687+
reloadConfiguration(CType.values());
688+
}
689+
});
693690
}
694691
}
695692
}

src/test/java/org/opensearch/security/configuration/ConfigurationRepositoryTest.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.opensearch.cluster.metadata.Metadata;
4444
import org.opensearch.cluster.node.DiscoveryNode;
4545
import org.opensearch.cluster.node.DiscoveryNodes;
46-
import org.opensearch.cluster.routing.ShardRouting;
4746
import org.opensearch.cluster.service.ClusterService;
4847
import org.opensearch.common.Priority;
4948
import org.opensearch.common.settings.Settings;
@@ -597,7 +596,6 @@ public void getConfigurationsFromIndex_SecurityIndexNotInitiallyReady() throws I
597596
public void afterIndexShardStarted_whenSecurityIndexUpdated() throws InterruptedException, TimeoutException {
598597
Settings settings = Settings.builder().build();
599598
IndexShard indexShard = mock(IndexShard.class);
600-
ShardRouting shardRouting = mock(ShardRouting.class);
601599
ShardId shardId = mock(ShardId.class);
602600
Index index = mock(Index.class);
603601
ClusterState mockClusterState = mock(ClusterState.class);
@@ -611,20 +609,11 @@ public void afterIndexShardStarted_whenSecurityIndexUpdated() throws Interrupted
611609
when(indexShard.shardId()).thenReturn(shardId);
612610
when(shardId.getIndex()).thenReturn(index);
613611
when(index.getName()).thenReturn(ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX);
614-
when(indexShard.routingEntry()).thenReturn(shardRouting);
615612
when(clusterService.state()).thenReturn(mockClusterState);
616613
when(mockClusterState.custom(RestoreInProgress.TYPE)).thenReturn(mockRestore);
617614
when(threadPool.generic()).thenReturn(executorService);
618615

619-
// when replica shard updated
620-
when(shardRouting.primary()).thenReturn(false);
621-
configurationRepository.afterIndexShardStarted(indexShard);
622-
verify(executorService, never()).execute(any());
623-
verify(configurationRepository, never()).reloadConfiguration(any());
624-
625-
// when primary shard updated
626616
doReturn(true).when(configurationRepository).reloadConfiguration(any());
627-
when(shardRouting.primary()).thenReturn(true);
628617
when(mockRestore.iterator()).thenReturn(Collections.singletonList(mockEntry).iterator());
629618
when(mockEntry.indices()).thenReturn(Collections.singletonList(ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX));
630619
ArgumentCaptor<Runnable> successRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -637,7 +626,6 @@ public void afterIndexShardStarted_whenSecurityIndexUpdated() throws Interrupted
637626
Mockito.reset(configurationRepository, executorService);
638627
ArgumentCaptor<Runnable> errorRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
639628
when(clusterService.state()).thenThrow(new RuntimeException("ClusterState exception"));
640-
when(shardRouting.primary()).thenReturn(true);
641629
configurationRepository.afterIndexShardStarted(indexShard);
642630
verify(executorService).execute(errorRunnableCaptor.capture());
643631
errorRunnableCaptor.getValue().run();

0 commit comments

Comments
 (0)