| 
 | 1 | +import time  | 
1 | 2 | import unittest  | 
2 | 3 | from crate.client import connect  | 
3 | 4 | from crate.client.exceptions import ProgrammingError  | 
4 | 5 | 
 
  | 
5 |  | -from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath  | 
 | 6 | +from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath, assert_busy  | 
6 | 7 | 
 
  | 
7 | 8 | ROLLING_UPGRADES_V4 = (  | 
8 | 9 |     # 4.0.0 -> 4.0.1 -> 4.0.2 don't support rolling upgrades due to a bug  | 
 | 
30 | 31 | )  | 
31 | 32 | 
 
  | 
32 | 33 | ROLLING_UPGRADES_V5 = (  | 
33 |  | -    UpgradePath('5.0.x', '5.1.x'),  | 
34 |  | -    UpgradePath('5.1.x', '5.2.x'),  | 
35 |  | -    UpgradePath('5.2.x', '5.3.x'),  | 
36 |  | -    UpgradePath('5.3.x', '5.4.x'),  | 
37 |  | -    UpgradePath('5.4.x', '5.5.x'),  | 
38 |  | -    UpgradePath('5.5.x', '5.6.x'),  | 
39 |  | -    UpgradePath('5.6.x', '5.7.x'),  | 
40 |  | -    UpgradePath('5.7.x', '5.8.x'),  | 
41 |  | -    UpgradePath('5.8.x', '5.9.x'),  | 
42 |  | -    UpgradePath('5.9.x', '5.10.x'),  | 
43 |  | -    UpgradePath('5.10.x', '5.10'),  | 
44 |  | -    UpgradePath('5.10', 'latest-nightly'),  | 
 | 34 | +    UpgradePath('5.10', 'branch:master'),  | 
45 | 35 | )  | 
46 | 36 | 
 
  | 
47 | 37 | 
 
  | 
48 | 38 | class RollingUpgradeTest(NodeProvider, unittest.TestCase):  | 
49 | 39 | 
 
  | 
 | 40 | +    def _num_docs(self, cursor, schema, table):  | 
 | 41 | +        cursor.execute("select sum(num_docs) from sys.shards where schema_name = ? and table_name = ?", (schema, table))  | 
 | 42 | +        return cursor.fetchall()[0][0]  | 
 | 43 | + | 
 | 44 | +    def _assert_num_docs(self, cursor, schema, table, expected_count):  | 
 | 45 | +        count = self._num_docs(cursor, schema, table)  | 
 | 46 | +        self.assertEqual(expected_count, count)  | 
 | 47 | + | 
50 | 48 |     def test_rolling_upgrade_4_to_5(self):  | 
51 | 49 |         print("")  # force newline for first print  | 
52 | 50 |         for path in ROLLING_UPGRADES_V4:  | 
@@ -88,6 +86,10 @@ def _test_rolling_upgrade(self, path, nodes):  | 
88 | 86 |         }  | 
89 | 87 |         cluster = self._new_cluster(path.from_version, nodes, settings=settings)  | 
90 | 88 |         cluster.start()  | 
 | 89 | +        replica_cluster = None  | 
 | 90 | +        if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10:  | 
 | 91 | +            replica_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False)  | 
 | 92 | +            replica_cluster.start()  | 
91 | 93 |         with connect(cluster.node().http_url, error_trace=True) as conn:  | 
92 | 94 |             c = conn.cursor()  | 
93 | 95 |             c.execute("create user arthur with (password = 'secret')")  | 
@@ -152,6 +154,24 @@ def _test_rolling_upgrade(self, path, nodes):  | 
152 | 154 |             # Add the shards of the new partition primaries  | 
153 | 155 |             expected_active_shards += shards  | 
154 | 156 | 
 
  | 
 | 157 | +            # Set up tables for logical replications  | 
 | 158 | +            if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10:  | 
 | 159 | +                c.execute("create table doc.x (a int) clustered into 1 shards with (number_of_replicas=0)")  | 
 | 160 | +                expected_active_shards += 1  | 
 | 161 | +                c.execute("create publication p for table doc.x")  | 
 | 162 | +                with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:  | 
 | 163 | +                    rc = replica_conn.cursor()  | 
 | 164 | +                    transport_port = cluster.node().addresses.transport.port  | 
 | 165 | +                    replica_transport_port = replica_cluster.node().addresses.transport.port  | 
 | 166 | +                    assert 4300 <= transport_port <= 4310 and 4300 <= replica_transport_port <= 4310  | 
 | 167 | +                    rc.execute("create table doc.rx (a int) clustered into 1 shards with (number_of_replicas=0)")  | 
 | 168 | +                    rc.execute("create publication rp for table doc.rx")  | 
 | 169 | +                    rc.execute(f"create subscription rs connection 'crate://localhost:{transport_port}?user=crate&sslmode=sniff' publication p")  | 
 | 170 | +                    assert_busy(lambda: self._assert_num_docs(rc, "doc", "x", 0))  | 
 | 171 | +                c.execute(f"create subscription s connection 'crate://localhost:{replica_transport_port}?user=crate&sslmode=sniff' publication rp")  | 
 | 172 | +                assert_busy(lambda: self._assert_num_docs(c, "doc", "rx", 0))  | 
 | 173 | +                expected_active_shards += 1  | 
 | 174 | + | 
155 | 175 |         for idx, node in enumerate(cluster):  | 
156 | 176 |             # Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node.  | 
157 | 177 |             # Otherwise upgraded node simply requests N-1 columns from old version with N columns and it always works.  | 
@@ -282,6 +302,25 @@ def _test_rolling_upgrade(self, path, nodes):  | 
282 | 302 |                         c.execute("select version['created'] from information_schema.table_partitions where table_name = 't3' and values['a'] = ?", [idx])  | 
283 | 303 |                         self.assertEqual(c.fetchall(), [[partition_version]])  | 
284 | 304 | 
 
  | 
 | 305 | +                # Ensure logical replications works  | 
 | 306 | +                if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10:  | 
 | 307 | +                    with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:  | 
 | 308 | +                        rc = replica_conn.cursor()  | 
 | 309 | + | 
 | 310 | +                        # Cannot drop replicated tables  | 
 | 311 | +                        with self.assertRaises(ProgrammingError):  | 
 | 312 | +                            rc.execute("drop table doc.x")  | 
 | 313 | +                            c.execute("drop table doc.rx")  | 
 | 314 | + | 
 | 315 | +                        count = self._num_docs(rc, "doc", "x")  | 
 | 316 | +                        count2 = self._num_docs(c, "doc", "rx")  | 
 | 317 | + | 
 | 318 | +                        c.execute("insert into doc.x values (1)")  | 
 | 319 | +                        rc.execute("insert into doc.rx values (1)")  | 
 | 320 | + | 
 | 321 | +                        assert_busy(lambda: self._assert_num_docs(rc, "doc", "x", count + 1))  | 
 | 322 | +                        assert_busy(lambda: self._assert_num_docs(c, "doc", "rx", count2 + 1))  | 
 | 323 | + | 
285 | 324 |         # Finally validate that all shards (primaries and replicas) of all partitions are started  | 
286 | 325 |         # and writes into the partitioned table while upgrading were successful  | 
287 | 326 |         with connect(cluster.node().http_url, error_trace=True) as conn:  | 
 | 
0 commit comments