Skip to content

Commit b76f389

Browse files
author
Nagaraj G
committed
Fix partial cache update post snapshot restore
Signed-off-by: Nagaraj G <[email protected]>
1 parent d29095f commit b76f389

File tree

4 files changed

+190
-21
lines changed

4 files changed

+190
-21
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1818
* [Resource Sharing] Reverts @Inject pattern usage for ResourceSharingExtension to client accessor pattern. ([#5576](https://github.com/opensearch-project/security/pull/5576))
1919
* Inject user custom attributes when injecting user and role information to the thread context ([#5560](https://github.com/opensearch-project/security/pull/5560))
2020
* Allow any plugin system request when `plugins.security.system_indices.enabled` is set to `false` ([#5579](https://github.com/opensearch-project/security/pull/5579))
21+
* Fix compilation issue after change to Subject interface in core and bump to 3.2.0 ([#5423](https://github.com/opensearch-project/security/pull/5423))
22+
* Provide SecureHttpTransportParameters to complement SecureTransportParameters counterpart ([#5432](https://github.com/opensearch-project/security/pull/5432))
23+
* Use isClusterPerm instead of requestedResolved.isLocalAll() to determine if action is a cluster action ([#5445](https://github.com/opensearch-project/security/pull/5445))
24+
* Fix config update with deprecated config types failing in mixed clusters ([#5456](https://github.com/opensearch-project/security/pull/5456))
25+
* Fix usage of jwt_clock_skew_tolerance_seconds in HTTPJwtAuthenticator ([#5506](https://github.com/opensearch-project/security/pull/5506))
26+
* Fix partial cache update post snapshot restore[#5478](https://github.com/opensearch-project/security/pull/5478)
2127

2228
### Refactoring
2329

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package org.opensearch.security;
2+
3+
import java.util.List;
4+
import java.util.Map;
5+
import java.util.concurrent.ExecutionException;
6+
7+
import org.apache.logging.log4j.LogManager;
8+
import org.apache.logging.log4j.Logger;
9+
import org.junit.After;
10+
import org.junit.Before;
11+
import org.junit.ClassRule;
12+
import org.junit.Test;
13+
14+
import org.opensearch.OpenSearchStatusException;
15+
import org.opensearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
16+
import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
17+
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
18+
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
19+
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
20+
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
21+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
22+
import org.opensearch.action.get.GetRequest;
23+
import org.opensearch.action.get.GetResponse;
24+
import org.opensearch.action.index.IndexRequest;
25+
import org.opensearch.action.support.WriteRequest;
26+
import org.opensearch.client.RequestOptions;
27+
import org.opensearch.client.RestHighLevelClient;
28+
import org.opensearch.common.xcontent.XContentType;
29+
import org.opensearch.core.rest.RestStatus;
30+
import org.opensearch.security.api.AbstractApiIntegrationTest;
31+
import org.opensearch.security.support.ConfigConstants;
32+
import org.opensearch.test.framework.TestSecurityConfig;
33+
import org.opensearch.test.framework.cluster.ClusterManager;
34+
import org.opensearch.test.framework.cluster.LocalCluster;
35+
import org.opensearch.test.framework.cluster.TestRestClient;
36+
import org.opensearch.transport.client.Client;
37+
38+
import static org.hamcrest.MatcherAssert.assertThat;
39+
import static org.opensearch.security.CrossClusterSearchTests.TYPE_ATTRIBUTE;
40+
import static org.opensearch.security.SearchOperationTest.TEST_SNAPSHOT_REPOSITORY_NAME;
41+
import static org.opensearch.security.support.ConfigConstants.SECURITY_BACKGROUND_INIT_IF_SECURITYINDEX_NOT_EXIST;
42+
import static org.opensearch.security.support.ConfigConstants.SECURITY_RESTAPI_ROLES_ENABLED;
43+
import static org.opensearch.test.framework.TestSecurityConfig.AuthcDomain.AUTHC_HTTPBASIC_INTERNAL;
44+
import static org.opensearch.test.framework.TestSecurityConfig.Role.ALL_ACCESS;
45+
import static org.opensearch.test.framework.TestSecurityConfig.User.USER_ADMIN;
46+
import static org.opensearch.test.framework.matcher.GetResponseMatchers.containDocument;
47+
import static org.junit.Assert.assertEquals;
48+
import static org.junit.Assert.assertTrue;
49+
50+
public class SecurityIndexSnapshotRestoreTests extends AbstractApiIntegrationTest {
51+
private static final Logger log = LogManager.getLogger(SecurityIndexSnapshotRestoreTests.class);
52+
53+
private static final String TEST_INDEX_NAME = "my_index_001";
54+
private static final String DOC_ID = "doc_id";
55+
56+
private static final TestSecurityConfig.User ADMIN_USER = new TestSecurityConfig.User("admin").roles(ALL_ACCESS)
57+
.attr(TYPE_ATTRIBUTE, "administrative");
58+
59+
private static final TestSecurityConfig.User LIMITED_READ_USER_1 = new TestSecurityConfig.User("limited_read_user").roles(
60+
new TestSecurityConfig.Role("limited-reader").indexPermissions("indices:data/read*").on(TEST_INDEX_NAME)
61+
);
62+
63+
private static final TestSecurityConfig.User LIMITED_READ_USER_2 = new TestSecurityConfig.User("user2");
64+
65+
private static final TestSecurityConfig.Role LIMITED_READ_USER_2_ROLE = new TestSecurityConfig.Role("limited-reader_2")
66+
.indexPermissions("indices:data/read*")
67+
.on(TEST_INDEX_NAME);
68+
69+
private String securityIndex;
70+
71+
@ClassRule
72+
public static LocalCluster cluster = new LocalCluster.Builder().clusterManager(ClusterManager.THREE_CLUSTER_MANAGERS)
73+
.authc(AUTHC_HTTPBASIC_INTERNAL)
74+
.users(ADMIN_USER, LIMITED_READ_USER_1)
75+
.anonymousAuth(false)
76+
.nodeSettings(
77+
Map.of(
78+
SECURITY_RESTAPI_ROLES_ENABLED,
79+
List.of("user_" + USER_ADMIN.getName() + "__" + ALL_ACCESS.getName()),
80+
SECURITY_BACKGROUND_INIT_IF_SECURITYINDEX_NOT_EXIST,
81+
false
82+
)
83+
)
84+
.build();
85+
86+
@Before
87+
public void setUp() throws Exception {
88+
securityIndex = ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX;
89+
90+
try (Client client = cluster.getInternalNodeClient()) {
91+
client.admin()
92+
.cluster()
93+
.putRepository(
94+
new PutRepositoryRequest(TEST_SNAPSHOT_REPOSITORY_NAME).type("fs")
95+
.settings(Map.of("location", cluster.getSnapshotDirPath()))
96+
)
97+
.actionGet();
98+
99+
CreateIndexResponse createIndexResponse = client.admin().indices().create(new CreateIndexRequest(TEST_INDEX_NAME)).actionGet();
100+
assertTrue(createIndexResponse.isAcknowledged());
101+
102+
client.index(
103+
new IndexRequest(TEST_INDEX_NAME).id(DOC_ID)
104+
.source("{\"message\": \"test document 1\"}", XContentType.JSON)
105+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
106+
).actionGet();
107+
}
108+
}
109+
110+
@After
111+
public void cleanData() throws ExecutionException, InterruptedException {
112+
try (Client client = cluster.getInternalNodeClient()) {
113+
client.admin().indices().delete(new DeleteIndexRequest(TEST_INDEX_NAME)).actionGet();
114+
115+
client.admin().cluster().deleteRepository(new DeleteRepositoryRequest(TEST_SNAPSHOT_REPOSITORY_NAME)).actionGet();
116+
}
117+
}
118+
119+
@Test
120+
public void testSecurityCacheReloadAfterRestore() throws Exception {
121+
// 1. Read data in custom index with LIMITED_READ_USER_1
122+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_READ_USER_1)) {
123+
GetResponse response = restHighLevelClient.get(new GetRequest(TEST_INDEX_NAME, DOC_ID), RequestOptions.DEFAULT);
124+
assertThat(response, containDocument(TEST_INDEX_NAME, DOC_ID));
125+
}
126+
127+
// 2. Create snapshot of security index
128+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(ADMIN_USER)) {
129+
SnapshotSteps steps = new SnapshotSteps(restHighLevelClient);
130+
steps.createSnapshot(TEST_SNAPSHOT_REPOSITORY_NAME, "test-snap", securityIndex);
131+
steps.waitForSnapshotCreation(TEST_SNAPSHOT_REPOSITORY_NAME, "test-snap");
132+
}
133+
134+
// 3. Add new role and user to security index (This is not in snapshot created above)
135+
try (TestRestClient client = cluster.getRestClient(ADMIN_USER)) {
136+
client.createRole(LIMITED_READ_USER_2_ROLE.getName(), LIMITED_READ_USER_2_ROLE).assertStatusCode(201);
137+
client.createUser(LIMITED_READ_USER_2.getName(), LIMITED_READ_USER_2).assertStatusCode(201);
138+
client.assignRoleToUser(LIMITED_READ_USER_2.getName(), "limited-reader_2").assertStatusCode(200);
139+
}
140+
141+
// 4. Read data in custom index with LIMITED_READ_USER_2
142+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_READ_USER_2)) {
143+
GetResponse response = restHighLevelClient.get(new GetRequest(TEST_INDEX_NAME, DOC_ID), RequestOptions.DEFAULT);
144+
assertThat(response, containDocument(TEST_INDEX_NAME, DOC_ID));
145+
}
146+
147+
// 5. Delete security index
148+
try (Client client = cluster.getInternalNodeClient()) {
149+
DeleteIndexRequest deleteRequest = new DeleteIndexRequest(securityIndex);
150+
client.admin().indices().delete(deleteRequest).actionGet();
151+
}
152+
153+
// 6. Restore security index
154+
try (Client client = cluster.getInternalNodeClient()) {
155+
RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(TEST_SNAPSHOT_REPOSITORY_NAME, "test-snap")
156+
.waitForCompletion(true)
157+
.indices(securityIndex); // restore security index
158+
159+
RestoreSnapshotResponse restoreResponse = client.admin().cluster().restoreSnapshot(restoreRequest).actionGet();
160+
161+
// Verify restore was successful
162+
assertEquals(RestStatus.OK, restoreResponse.status());
163+
}
164+
165+
// 7. Read data in custom index with LIMITED_READ_USER_1 because it was in snapshot
166+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_READ_USER_1)) {
167+
GetResponse response = restHighLevelClient.get(new GetRequest(TEST_INDEX_NAME, DOC_ID), RequestOptions.DEFAULT);
168+
assertThat(response, containDocument(TEST_INDEX_NAME, DOC_ID));
169+
}
170+
171+
// 8. Should get 401 error to read custom index with LIMITED_READ_USER_2 because it was not in snapshot
172+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_READ_USER_2)) {
173+
restHighLevelClient.get(new GetRequest(TEST_INDEX_NAME, DOC_ID), RequestOptions.DEFAULT);
174+
} catch (OpenSearchStatusException exception) {
175+
assertEquals(RestStatus.UNAUTHORIZED, exception.status()); // Verify it's a 401
176+
}
177+
}
178+
}

src/main/java/org/opensearch/security/configuration/ConfigurationRepository.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -681,15 +681,12 @@ public void afterIndexShardStarted(IndexShard indexShard) {
681681

682682
// Check if this is a security index shard
683683
if (securityIndex.equals(index.getName())) {
684-
// Only trigger on primary shard to avoid multiple reloads
685-
if (indexShard.routingEntry() != null && indexShard.routingEntry().primary()) {
686-
threadPool.generic().execute(() -> {
687-
if (isSecurityIndexRestoredFromSnapshot(clusterService, index, securityIndex)) {
688-
LOGGER.info("Security index primary shard {} started - config reloading for snapshot restore", shardId);
689-
reloadConfiguration(CType.values());
690-
}
691-
});
692-
}
684+
threadPool.generic().execute(() -> {
685+
if (isSecurityIndexRestoredFromSnapshot(clusterService, index, securityIndex)) {
686+
LOGGER.info("Security index shard {} started - config reloading for snapshot restore", shardId);
687+
reloadConfiguration(CType.values());
688+
}
689+
});
693690
}
694691
}
695692
}

src/test/java/org/opensearch/security/configuration/ConfigurationRepositoryTest.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.opensearch.cluster.metadata.Metadata;
4444
import org.opensearch.cluster.node.DiscoveryNode;
4545
import org.opensearch.cluster.node.DiscoveryNodes;
46-
import org.opensearch.cluster.routing.ShardRouting;
4746
import org.opensearch.cluster.service.ClusterService;
4847
import org.opensearch.common.Priority;
4948
import org.opensearch.common.settings.Settings;
@@ -597,7 +596,6 @@ public void getConfigurationsFromIndex_SecurityIndexNotInitiallyReady() throws I
597596
public void afterIndexShardStarted_whenSecurityIndexUpdated() throws InterruptedException, TimeoutException {
598597
Settings settings = Settings.builder().build();
599598
IndexShard indexShard = mock(IndexShard.class);
600-
ShardRouting shardRouting = mock(ShardRouting.class);
601599
ShardId shardId = mock(ShardId.class);
602600
Index index = mock(Index.class);
603601
ClusterState mockClusterState = mock(ClusterState.class);
@@ -611,20 +609,11 @@ public void afterIndexShardStarted_whenSecurityIndexUpdated() throws Interrupted
611609
when(indexShard.shardId()).thenReturn(shardId);
612610
when(shardId.getIndex()).thenReturn(index);
613611
when(index.getName()).thenReturn(ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX);
614-
when(indexShard.routingEntry()).thenReturn(shardRouting);
615612
when(clusterService.state()).thenReturn(mockClusterState);
616613
when(mockClusterState.custom(RestoreInProgress.TYPE)).thenReturn(mockRestore);
617614
when(threadPool.generic()).thenReturn(executorService);
618615

619-
// when replica shard updated
620-
when(shardRouting.primary()).thenReturn(false);
621-
configurationRepository.afterIndexShardStarted(indexShard);
622-
verify(executorService, never()).execute(any());
623-
verify(configurationRepository, never()).reloadConfiguration(any());
624-
625-
// when primary shard updated
626616
doReturn(true).when(configurationRepository).reloadConfiguration(any());
627-
when(shardRouting.primary()).thenReturn(true);
628617
when(mockRestore.iterator()).thenReturn(Collections.singletonList(mockEntry).iterator());
629618
when(mockEntry.indices()).thenReturn(Collections.singletonList(ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX));
630619
ArgumentCaptor<Runnable> successRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -637,7 +626,6 @@ public void afterIndexShardStarted_whenSecurityIndexUpdated() throws Interrupted
637626
Mockito.reset(configurationRepository, executorService);
638627
ArgumentCaptor<Runnable> errorRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
639628
when(clusterService.state()).thenThrow(new RuntimeException("ClusterState exception"));
640-
when(shardRouting.primary()).thenReturn(true);
641629
configurationRepository.afterIndexShardStarted(indexShard);
642630
verify(executorService).execute(errorRunnableCaptor.capture());
643631
errorRunnableCaptor.getValue().run();

0 commit comments

Comments
 (0)