From 9c11178293da498015e1fba7832ea4512ee464cf Mon Sep 17 00:00:00 2001 From: LiteSun Date: Thu, 9 Oct 2025 17:33:35 +0800 Subject: [PATCH 01/15] fix: handle service creation with upstreams --- libs/backend-apisix/src/operator.ts | 53 +++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/libs/backend-apisix/src/operator.ts b/libs/backend-apisix/src/operator.ts index 25789ef5..1725b053 100644 --- a/libs/backend-apisix/src/operator.ts +++ b/libs/backend-apisix/src/operator.ts @@ -37,21 +37,52 @@ export class Operator extends ADCSDK.backend.BackendEventSource { private operate(event: ADCSDK.Event) { const { type, resourceType, resourceId, parentId } = event; const isUpdate = type !== ADCSDK.EventType.DELETE; - const path = `/apisix/admin/${ - resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL - ? `consumers/${parentId}/credentials/${resourceId}` - : `${resourceTypeToAPIName(resourceType)}/${resourceId}` - }`; + const path = `/apisix/admin/${resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL + ? `consumers/${parentId}/credentials/${resourceId}` + : `${resourceTypeToAPIName(resourceType)}/${resourceId}` + }`; + + if (!isUpdate) { + return from(this.client.request({ url: path, method: 'DELETE' })); + } + + const data = this.fromADC(event, this.opts.version); + + // Handle service with upstream: create upstream first, then service + if (resourceType === ADCSDK.ResourceType.SERVICE && (data as typing.Service).upstream) { + return this.createServiceWithUpstream(event, data as typing.Service, path); + } + + return from(this.client.request({ url: path, method: 'PUT', data })); + } + + private createServiceWithUpstream(event: ADCSDK.Event, data: typing.Service, servicePath: string) { + const upstreamData: typing.Upstream = { + ...data.upstream, + id: event.resourceId, + name: event.resourceName, + }; + + const serviceData = { + ...data, + upstream: undefined, + upstream_id: event.resourceId, + }; return from( this.client.request({ - method: 'DELETE', - url: path, - ...(isUpdate && { + url: `/apisix/admin/upstreams/${event.resourceId}`, + method: 'PUT', + data: upstreamData, + }), + ).pipe( + concatMap(() => + this.client.request({ + url: servicePath, method: 'PUT', - data: this.fromADC(event, this.opts.version), + data: serviceData, }), - }), + ), ); } @@ -112,7 +143,7 @@ export class Operator extends ADCSDK.backend.BackendEventSource { () => new Error( error.response?.data?.error_msg ?? - JSON.stringify(error.response?.data), + JSON.stringify(error.response?.data), ), ); return throwError(() => error); From 167b545fd9c12d3f49cd3611a8bd7218aff3ce81 Mon Sep 17 00:00:00 2001 From: LiteSun Date: Thu, 9 Oct 2025 17:56:39 +0800 Subject: [PATCH 02/15] fix: enhance service and upstream handling during create/update/delete operations --- libs/backend-apisix/src/operator.ts | 63 +++++++++++++++++++++++--- libs/backend-apisix/src/transformer.ts | 4 +- 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/libs/backend-apisix/src/operator.ts b/libs/backend-apisix/src/operator.ts index 1725b053..15191aa6 100644 --- a/libs/backend-apisix/src/operator.ts +++ b/libs/backend-apisix/src/operator.ts @@ -36,27 +36,42 @@ export class Operator extends ADCSDK.backend.BackendEventSource { private operate(event: ADCSDK.Event) { const { type, resourceType, resourceId, parentId } = event; - const isUpdate = type !== ADCSDK.EventType.DELETE; const path = `/apisix/admin/${resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL ? `consumers/${parentId}/credentials/${resourceId}` : `${resourceTypeToAPIName(resourceType)}/${resourceId}` }`; - if (!isUpdate) { + // Handle deletion + if (type === ADCSDK.EventType.DELETE) { + // Delete service with upstream: delete service first, then upstream + if (resourceType === ADCSDK.ResourceType.SERVICE && event.oldValue && (event.oldValue as ADCSDK.Service).upstream) { + return this.deleteServiceWithUpstream(event, path); + } return from(this.client.request({ url: path, method: 'DELETE' })); } const data = this.fromADC(event, this.opts.version); - // Handle service with upstream: create upstream first, then service - if (resourceType === ADCSDK.ResourceType.SERVICE && (data as typing.Service).upstream) { - return this.createServiceWithUpstream(event, data as typing.Service, path); + // Handle service with upstream changes + if (resourceType === ADCSDK.ResourceType.SERVICE) { + const oldUpstream = event.oldValue ? (event.oldValue as ADCSDK.Service).upstream : undefined; + const newUpstream = (event.newValue as ADCSDK.Service).upstream; + + // oldValue has upstream, newValue doesn't -> delete upstream + if (oldUpstream && !newUpstream) { + return this.deleteUpstreamThenUpdateService(event, data as typing.Service, path); + } + + // newValue has upstream -> create/update upstream + if (newUpstream) { + return this.upsertServiceWithUpstream(event, data as typing.Service, path); + } } return from(this.client.request({ url: path, method: 'PUT', data })); } - private createServiceWithUpstream(event: ADCSDK.Event, data: typing.Service, servicePath: string) { + private upsertServiceWithUpstream(event: ADCSDK.Event, data: typing.Service, servicePath: string) { const upstreamData: typing.Upstream = { ...data.upstream, id: event.resourceId, @@ -69,6 +84,7 @@ export class Operator extends ADCSDK.backend.BackendEventSource { upstream_id: event.resourceId, }; + // Create/Update upstream first, then service return from( this.client.request({ url: `/apisix/admin/upstreams/${event.resourceId}`, @@ -86,6 +102,41 @@ export class Operator extends ADCSDK.backend.BackendEventSource { ); } + private deleteServiceWithUpstream(event: ADCSDK.Event, servicePath: string) { + // Delete service first, then upstream + return from( + this.client.request({ + url: servicePath, + method: 'DELETE', + }), + ).pipe( + concatMap(() => + this.client.request({ + url: `/apisix/admin/upstreams/${event.resourceId}`, + method: 'DELETE', + }), + ), + ); + } + + private deleteUpstreamThenUpdateService(event: ADCSDK.Event, data: typing.Service, servicePath: string) { + // Update service first (remove upstream reference), then delete upstream + return from( + this.client.request({ + url: servicePath, + method: 'PUT', + data, + }), + ).pipe( + concatMap(() => + this.client.request({ + url: `/apisix/admin/upstreams/${event.resourceId}`, + method: 'DELETE', + }), + ), + ); + } + public sync( events: Array, opts: ADCSDK.BackendSyncOptions = { exitOnFailure: true }, diff --git a/libs/backend-apisix/src/transformer.ts b/libs/backend-apisix/src/transformer.ts index a577b51c..e2d073c7 100644 --- a/libs/backend-apisix/src/transformer.ts +++ b/libs/backend-apisix/src/transformer.ts @@ -47,7 +47,7 @@ export class ToADC { hosts: service.hosts, - upstream: this.transformUpstream(service.upstream), + upstream: service.upstream ? this.transformUpstream(service.upstream) : undefined, upstreams: service.upstreams, plugins: service.plugins, } as ADCSDK.Service); @@ -282,7 +282,7 @@ export class FromADC { name: service.name, desc: service.description, labels: FromADC.transformLabels(service.labels), - upstream: this.transformUpstream(service.upstream), + upstream: service.upstream ? this.transformUpstream(service.upstream) : undefined, plugins: service.plugins, hosts: service.hosts, }); From 6897819250f60c57cdabe8561acfd194b8cb30c4 Mon Sep 17 00:00:00 2001 From: LiteSun Date: Thu, 9 Oct 2025 22:40:45 +0800 Subject: [PATCH 03/15] f --- libs/backend-apisix/src/fetcher.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libs/backend-apisix/src/fetcher.ts b/libs/backend-apisix/src/fetcher.ts index 78b78309..8635dcba 100644 --- a/libs/backend-apisix/src/fetcher.ts +++ b/libs/backend-apisix/src/fetcher.ts @@ -20,6 +20,7 @@ import { SemVer, gte as semVerGTE } from 'semver'; import { ToADC } from './transformer'; import * as typing from './typing'; import { resourceTypeToAPIName } from './utils'; +import { unset } from 'lodash'; export interface FetcherOptions { client: AxiosInstance; @@ -271,6 +272,8 @@ export class Fetcher extends ADCSDK.backend.BackendEventSource { produce(service, (serviceDraft) => { if (service.upstream_id) serviceDraft.upstream = upstreamIdMap[service.upstream_id]; + unset(serviceDraft, 'upstream.id'); + unset(serviceDraft, 'upstream.name'); if (upstreamServiceIdMap[service.id]) serviceDraft.upstreams = upstreamServiceIdMap[service.id]; }), From c1adfd8726fa08f4a167fbe73c6a808efc7f75bc Mon Sep 17 00:00:00 2001 From: LiteSun Date: Fri, 10 Oct 2025 10:43:50 +0800 Subject: [PATCH 04/15] test: add end-to-end tests for inline upstream service creation, update, and deletion --- .../resources/service-upstream.e2e-spec.ts | 144 ++++++++++++++++++ libs/backend-apisix/e2e/support/utils.ts | 6 + 2 files changed, 150 insertions(+) diff --git a/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts b/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts index 15e0f135..1a5c9f26 100644 --- a/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts +++ b/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts @@ -22,6 +22,150 @@ describe('Service-Upstreams E2E', () => { }); }); + describe('Service inline upstream', () => { + const serviceName = 'test-inline-upstream'; + const service = { + name: serviceName, + upstream: { + type: 'roundrobin', + nodes: [ + { + host: 'httpbin.org', + port: 443, + weight: 100, + }, + ], + }, + } satisfies ADCSDK.Service; + + it('Create service with inline upstream', async () => + syncEvents(backend, [ + createEvent(ADCSDK.ResourceType.SERVICE, serviceName, service), + ])); + + it('Dump (inline upstream should exist)', async () => { + const result = await dumpConfiguration(backend); + const testService = result.services?.find((s) => s.name === serviceName); + expect(testService).toBeDefined(); + expect(testService?.upstream).toMatchObject({ + type: 'roundrobin', + nodes: [ + { + host: 'httpbin.org', + port: 443, + weight: 100, + }, + ], + }); + // Verify that inline upstream has no id and name + expect(testService?.upstream?.id).toBeUndefined(); + expect(testService?.upstream?.name).toBeUndefined(); + }); + + const updatedService = { + name: serviceName, + upstream: { + type: 'roundrobin', + nodes: [ + { + host: 'httpbin.org', + port: 443, + weight: 50, + }, + { + host: 'example.com', + port: 80, + weight: 50, + }, + ], + }, + } satisfies ADCSDK.Service; + it('Update service inline upstream', async () => + syncEvents(backend, [ + updateEvent(ADCSDK.ResourceType.SERVICE, serviceName, updatedService), + ])); + + it('Dump (inline upstream should be updated)', async () => { + const result = await dumpConfiguration(backend); + const testService = result.services?.find((s) => s.name === serviceName); + expect(testService).toBeDefined(); + expect(testService?.upstream?.nodes).toHaveLength(2); + expect(testService?.upstream).toMatchObject(updatedService.upstream); + // Verify that inline upstream still has no id and name + expect(testService?.upstream?.id).toBeUndefined(); + expect(testService?.upstream?.name).toBeUndefined(); + }); + + const serviceWithoutUpstream = { + name: serviceName, + hosts: ['test.example.com'], + } satisfies ADCSDK.Service; + it('Update service to remove inline upstream', async () => + syncEvents(backend, [ + updateEvent( + ADCSDK.ResourceType.SERVICE, + serviceName, + serviceWithoutUpstream, + undefined, + updatedService, // oldValue with upstream + ), + ])); + + it('Dump (inline upstream should be removed)', async () => { + const result = await dumpConfiguration(backend); + const testService = result.services?.find((s) => s.name === serviceName); + expect(testService).toBeDefined(); + expect(testService?.upstream).toBeUndefined(); + expect(testService?.hosts).toEqual(['test.example.com']); + }); + + const serviceForDeletion = { + name: serviceName, + hosts: ['test.example.com'], + upstream: { + type: 'roundrobin', + nodes: [ + { + host: 'httpbin.org', + port: 443, + weight: 100, + }, + ], + }, + } satisfies ADCSDK.Service; + it('Re-add inline upstream for deletion test', async () => + syncEvents(backend, [ + updateEvent( + ADCSDK.ResourceType.SERVICE, + serviceName, + serviceForDeletion, + ), + ])); + + it('Dump (inline upstream should exist again)', async () => { + const result = await dumpConfiguration(backend); + const testService = result.services?.find((s) => s.name === serviceName); + expect(testService).toBeDefined(); + expect(testService?.upstream).toBeDefined(); + }); + + it('Delete service with inline upstream', async () => + syncEvents(backend, [ + deleteEvent( + ADCSDK.ResourceType.SERVICE, + serviceName, + undefined, + serviceForDeletion, // oldValue with upstream + ), + ])); + + it('Dump again (service should not exist)', async () => { + const result = await dumpConfiguration(backend); + const testService = result.services?.find((s) => s.name === serviceName); + expect(testService).toBeUndefined(); + }); + }); + describe('Service multiple upstreams', () => { const serviceName = 'test'; const service = { diff --git a/libs/backend-apisix/e2e/support/utils.ts b/libs/backend-apisix/e2e/support/utils.ts index 4117ebff..da5ed171 100644 --- a/libs/backend-apisix/e2e/support/utils.ts +++ b/libs/backend-apisix/e2e/support/utils.ts @@ -55,9 +55,13 @@ export const updateEvent = ( resourceName: string, resource: object, parentName?: string, + oldValue?: object, ): ADCSDK.Event => { const event = createEvent(resourceType, resourceName, resource, parentName); event.type = ADCSDK.EventType.UPDATE; + if (oldValue) { + event.oldValue = oldValue; + } return event; }; @@ -65,6 +69,7 @@ export const deleteEvent = ( resourceType: ADCSDK.ResourceType, resourceName: string, parentName?: string, + oldValue?: object, ): ADCSDK.Event => ({ type: ADCSDK.EventType.DELETE, resourceType, @@ -82,6 +87,7 @@ export const deleteEvent = ( ? parentName : ADCSDK.utils.generateId(parentName) : undefined, + oldValue, }); export const overrideEventResourceId = ( From f62da20f19a33203f2f48dc73bafebe82908a75a Mon Sep 17 00:00:00 2001 From: LiteSun Date: Fri, 10 Oct 2025 11:41:34 +0800 Subject: [PATCH 05/15] f --- libs/backend-apisix/e2e/assets/docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/backend-apisix/e2e/assets/docker-compose.yaml b/libs/backend-apisix/e2e/assets/docker-compose.yaml index d6c80351..e5cdc7e2 100644 --- a/libs/backend-apisix/e2e/assets/docker-compose.yaml +++ b/libs/backend-apisix/e2e/assets/docker-compose.yaml @@ -27,7 +27,7 @@ services: apisix: etcd: - image: bitnami/etcd:3.5 + image: bitnamilegacy/etcd:3.5 restart: always volumes: - etcd_data:/bitnami/etcd From 574e1a8a1a2a3fe48056942c5797eb27fc9d0ef7 Mon Sep 17 00:00:00 2001 From: LiteSun Date: Fri, 10 Oct 2025 14:36:26 +0800 Subject: [PATCH 06/15] triger ci From c9ec2fefb9c3814d784eef1239704e89ff29232b Mon Sep 17 00:00:00 2001 From: LiteSun Date: Fri, 10 Oct 2025 15:04:45 +0800 Subject: [PATCH 07/15] f --- libs/backend-apisix/src/operator.ts | 47 ++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/libs/backend-apisix/src/operator.ts b/libs/backend-apisix/src/operator.ts index 15191aa6..237377dc 100644 --- a/libs/backend-apisix/src/operator.ts +++ b/libs/backend-apisix/src/operator.ts @@ -6,11 +6,13 @@ import { Subject, catchError, concatMap, + delay, from, map, mergeMap, of, reduce, + retry, tap, throwError, } from 'rxjs'; @@ -102,25 +104,47 @@ export class Operator extends ADCSDK.backend.BackendEventSource { ); } + private deleteUpstreamWithRetry(upstreamId: string) { + // Delete upstream with retry on race condition + return from( + this.client.request({ + url: `/apisix/admin/upstreams/${upstreamId}`, + method: 'DELETE', + }), + ).pipe( + retry({ + count: 3, + delay: (error: Error | AxiosError, retryCount: number) => { + // Only retry if upstream deletion fails due to "still using" race condition + if ( + axios.isAxiosError(error) && + error.response?.data?.error_msg?.includes('is still using it') + ) { + // Exponential backoff: 100ms, 200ms, 400ms + const delayMs = 100 * Math.pow(2, retryCount - 1); + return of(null).pipe(delay(delayMs)); + } + // Don't retry other errors + return throwError(() => error); + }, + }), + ); + } + private deleteServiceWithUpstream(event: ADCSDK.Event, servicePath: string) { - // Delete service first, then upstream + // Delete service first, then upstream with retry return from( this.client.request({ url: servicePath, method: 'DELETE', }), ).pipe( - concatMap(() => - this.client.request({ - url: `/apisix/admin/upstreams/${event.resourceId}`, - method: 'DELETE', - }), - ), + concatMap(() => this.deleteUpstreamWithRetry(event.resourceId)), ); } private deleteUpstreamThenUpdateService(event: ADCSDK.Event, data: typing.Service, servicePath: string) { - // Update service first (remove upstream reference), then delete upstream + // Update service first (remove upstream reference), then delete upstream with retry return from( this.client.request({ url: servicePath, @@ -128,12 +152,7 @@ export class Operator extends ADCSDK.backend.BackendEventSource { data, }), ).pipe( - concatMap(() => - this.client.request({ - url: `/apisix/admin/upstreams/${event.resourceId}`, - method: 'DELETE', - }), - ), + concatMap(() => this.deleteUpstreamWithRetry(event.resourceId)), ); } From b7d3d36e404b5566210486c7751e91e96118c010 Mon Sep 17 00:00:00 2001 From: LiteSun Date: Fri, 10 Oct 2025 22:42:53 +0800 Subject: [PATCH 08/15] f --- .../resources/service-upstream.e2e-spec.ts | 47 ++++++++----------- libs/backend-apisix/e2e/support/utils.ts | 22 ++++----- libs/backend-apisix/package.json | 5 +- 3 files changed, 30 insertions(+), 44 deletions(-) diff --git a/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts b/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts index 1a5c9f26..26d87dc7 100644 --- a/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts +++ b/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts @@ -1,3 +1,4 @@ +import { Differ } from '@api7/adc-differ'; import * as ADCSDK from '@api7/adc-sdk'; import { BackendAPISIX } from '../../src'; @@ -39,9 +40,7 @@ describe('Service-Upstreams E2E', () => { } satisfies ADCSDK.Service; it('Create service with inline upstream', async () => - syncEvents(backend, [ - createEvent(ADCSDK.ResourceType.SERVICE, serviceName, service), - ])); + syncEvents(backend, Differ.diff({ services: [service] }, {}))); it('Dump (inline upstream should exist)', async () => { const result = await dumpConfiguration(backend); @@ -81,9 +80,10 @@ describe('Service-Upstreams E2E', () => { }, } satisfies ADCSDK.Service; it('Update service inline upstream', async () => - syncEvents(backend, [ - updateEvent(ADCSDK.ResourceType.SERVICE, serviceName, updatedService), - ])); + syncEvents( + backend, + Differ.diff({ services: [updatedService] }, await dumpConfiguration(backend)), + )); it('Dump (inline upstream should be updated)', async () => { const result = await dumpConfiguration(backend); @@ -101,15 +101,13 @@ describe('Service-Upstreams E2E', () => { hosts: ['test.example.com'], } satisfies ADCSDK.Service; it('Update service to remove inline upstream', async () => - syncEvents(backend, [ - updateEvent( - ADCSDK.ResourceType.SERVICE, - serviceName, - serviceWithoutUpstream, - undefined, - updatedService, // oldValue with upstream + syncEvents( + backend, + Differ.diff( + { services: [serviceWithoutUpstream] }, + await dumpConfiguration(backend), ), - ])); + )); it('Dump (inline upstream should be removed)', async () => { const result = await dumpConfiguration(backend); @@ -134,13 +132,13 @@ describe('Service-Upstreams E2E', () => { }, } satisfies ADCSDK.Service; it('Re-add inline upstream for deletion test', async () => - syncEvents(backend, [ - updateEvent( - ADCSDK.ResourceType.SERVICE, - serviceName, - serviceForDeletion, + syncEvents( + backend, + Differ.diff( + { services: [serviceForDeletion] }, + await dumpConfiguration(backend), ), - ])); + )); it('Dump (inline upstream should exist again)', async () => { const result = await dumpConfiguration(backend); @@ -150,14 +148,7 @@ describe('Service-Upstreams E2E', () => { }); it('Delete service with inline upstream', async () => - syncEvents(backend, [ - deleteEvent( - ADCSDK.ResourceType.SERVICE, - serviceName, - undefined, - serviceForDeletion, // oldValue with upstream - ), - ])); + syncEvents(backend, Differ.diff({}, await dumpConfiguration(backend)))); it('Dump again (service should not exist)', async () => { const result = await dumpConfiguration(backend); diff --git a/libs/backend-apisix/e2e/support/utils.ts b/libs/backend-apisix/e2e/support/utils.ts index da5ed171..add2b0be 100644 --- a/libs/backend-apisix/e2e/support/utils.ts +++ b/libs/backend-apisix/e2e/support/utils.ts @@ -34,14 +34,14 @@ export const createEvent = ( resourceName, resourceId: resourceType === ADCSDK.ResourceType.CONSUMER || - resourceType === ADCSDK.ResourceType.GLOBAL_RULE || - resourceType === ADCSDK.ResourceType.PLUGIN_METADATA + resourceType === ADCSDK.ResourceType.GLOBAL_RULE || + resourceType === ADCSDK.ResourceType.PLUGIN_METADATA ? resourceName : resourceType === ADCSDK.ResourceType.SSL ? ADCSDK.utils.generateId((resource as ADCSDK.SSL).snis.join(',')) : ADCSDK.utils.generateId( - parentName ? `${parentName}.${resourceName}` : resourceName, - ), + parentName ? `${parentName}.${resourceName}` : resourceName, + ), newValue: resource, parentId: parentName ? resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL @@ -55,13 +55,9 @@ export const updateEvent = ( resourceName: string, resource: object, parentName?: string, - oldValue?: object, ): ADCSDK.Event => { const event = createEvent(resourceType, resourceName, resource, parentName); event.type = ADCSDK.EventType.UPDATE; - if (oldValue) { - event.oldValue = oldValue; - } return event; }; @@ -69,25 +65,23 @@ export const deleteEvent = ( resourceType: ADCSDK.ResourceType, resourceName: string, parentName?: string, - oldValue?: object, ): ADCSDK.Event => ({ type: ADCSDK.EventType.DELETE, resourceType, resourceName, resourceId: resourceType === ADCSDK.ResourceType.CONSUMER || - resourceType === ADCSDK.ResourceType.GLOBAL_RULE || - resourceType === ADCSDK.ResourceType.PLUGIN_METADATA + resourceType === ADCSDK.ResourceType.GLOBAL_RULE || + resourceType === ADCSDK.ResourceType.PLUGIN_METADATA ? resourceName : ADCSDK.utils.generateId( - parentName ? `${parentName}.${resourceName}` : resourceName, - ), + parentName ? `${parentName}.${resourceName}` : resourceName, + ), parentId: parentName ? resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL ? parentName : ADCSDK.utils.generateId(parentName) : undefined, - oldValue, }); export const overrideEventResourceId = ( diff --git a/libs/backend-apisix/package.json b/libs/backend-apisix/package.json index b57d2037..1a75150f 100644 --- a/libs/backend-apisix/package.json +++ b/libs/backend-apisix/package.json @@ -9,7 +9,8 @@ } }, "devDependencies": { - "@api7/adc-sdk": "workspace:*" + "@api7/adc-sdk": "workspace:*", + "@api7/adc-differ": "workspace:*" }, "nx": { "name": "backend-apisix", @@ -29,4 +30,4 @@ } } } -} +} \ No newline at end of file From 7d31adaca032e74b22aa0f5fa3adf158906aa479 Mon Sep 17 00:00:00 2001 From: LiteSun Date: Fri, 10 Oct 2025 22:50:32 +0800 Subject: [PATCH 09/15] u --- pnpm-lock.yaml | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a65f9849..9afd19b9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -86,7 +86,7 @@ importers: version: 21.4.1(@babel/traverse@7.28.0)(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17))(@types/node@22.16.5)(@zkochan/js-yaml@0.0.7)(babel-plugin-macros@3.1.0)(eslint@9.33.0(jiti@2.4.2))(nx@21.4.1(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17)))(ts-node@10.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@types/node@22.16.5)(typescript@5.8.3))(typescript@5.8.3) '@nx/vite': specifier: 21.4.1 - version: 21.4.1(@babel/traverse@7.28.0)(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17))(nx@21.4.1(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17)))(typescript@5.8.3)(vite@6.3.5(@types/node@22.16.5)(jiti@2.4.2)(less@4.1.3)(sass-embedded@1.86.0)(sass@1.86.0)(terser@5.39.1)(yaml@2.8.0))(vitest@3.2.4) + version: 21.4.1(@babel/traverse@7.28.0)(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17))(nx@21.4.1(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17)))(typescript@5.8.3)(vite@6.3.5(@types/node@22.16.5)(jiti@2.4.2)(less@4.1.3)(sass-embedded@1.86.0)(sass@1.86.0)(terser@5.39.1)(yaml@2.8.0))(vitest@3.2.4(@types/node@22.16.5)(@vitest/ui@3.2.4)(jiti@2.4.2)(less@4.1.3)(sass-embedded@1.86.0)(sass@1.86.0)(terser@5.39.1)(yaml@2.8.0)) '@nx/web': specifier: 21.4.1 version: 21.4.1(@babel/traverse@7.28.0)(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17))(nx@21.4.1(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17))) @@ -146,7 +146,7 @@ importers: version: 8.41.0(eslint@9.33.0(jiti@2.4.2))(typescript@5.8.3) '@vitest/coverage-v8': specifier: ^3.0.5 - version: 3.2.4(vitest@3.2.4) + version: 3.2.4(vitest@3.2.4(@types/node@22.16.5)(@vitest/ui@3.2.4)(jiti@2.4.2)(less@4.1.3)(sass-embedded@1.86.0)(sass@1.86.0)(terser@5.39.1)(yaml@2.8.0)) '@vitest/ui': specifier: ^3.0.0 version: 3.2.4(vitest@3.2.4) @@ -247,6 +247,9 @@ importers: libs/backend-apisix: devDependencies: + '@api7/adc-differ': + specifier: workspace:* + version: link:../differ '@api7/adc-sdk': specifier: workspace:* version: link:../sdk @@ -286,12 +289,6 @@ importers: libs/sdk: {} - libs/tstetsetse: - dependencies: - tslib: - specifier: ^2.3.0 - version: 2.6.3 - packages: '@ampproject/remapping@2.3.0': @@ -8378,7 +8375,7 @@ snapshots: '@nx/nx-win32-x64-msvc@21.4.1': optional: true - '@nx/vite@21.4.1(@babel/traverse@7.28.0)(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17))(nx@21.4.1(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17)))(typescript@5.8.3)(vite@6.3.5(@types/node@22.16.5)(jiti@2.4.2)(less@4.1.3)(sass-embedded@1.86.0)(sass@1.86.0)(terser@5.39.1)(yaml@2.8.0))(vitest@3.2.4)': + '@nx/vite@21.4.1(@babel/traverse@7.28.0)(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17))(nx@21.4.1(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17)))(typescript@5.8.3)(vite@6.3.5(@types/node@22.16.5)(jiti@2.4.2)(less@4.1.3)(sass-embedded@1.86.0)(sass@1.86.0)(terser@5.39.1)(yaml@2.8.0))(vitest@3.2.4(@types/node@22.16.5)(@vitest/ui@3.2.4)(jiti@2.4.2)(less@4.1.3)(sass-embedded@1.86.0)(sass@1.86.0)(terser@5.39.1)(yaml@2.8.0))': dependencies: '@nx/devkit': 21.4.1(nx@21.4.1(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17))) '@nx/js': 21.4.1(@babel/traverse@7.28.0)(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17))(nx@21.4.1(@swc-node/register@1.9.2(@swc/core@1.13.5(@swc/helpers@0.5.17))(@swc/types@0.1.24)(typescript@5.8.3))(@swc/core@1.13.5(@swc/helpers@0.5.17))) @@ -9217,7 +9214,7 @@ snapshots: '@unrs/resolver-binding-win32-x64-msvc@1.11.1': optional: true - '@vitest/coverage-v8@3.2.4(vitest@3.2.4)': + '@vitest/coverage-v8@3.2.4(vitest@3.2.4(@types/node@22.16.5)(@vitest/ui@3.2.4)(jiti@2.4.2)(less@4.1.3)(sass-embedded@1.86.0)(sass@1.86.0)(terser@5.39.1)(yaml@2.8.0))': dependencies: '@ampproject/remapping': 2.3.0 '@bcoe/v8-coverage': 1.0.2 From c5dde5da83a088832c2df1f838eb0864808e3ecd Mon Sep 17 00:00:00 2001 From: LiteSun Date: Sun, 12 Oct 2025 18:34:45 +0800 Subject: [PATCH 10/15] format code --- .../resources/service-upstream.e2e-spec.ts | 5 +- libs/backend-apisix/e2e/support/utils.ts | 16 +++--- libs/backend-apisix/package.json | 2 +- libs/backend-apisix/src/fetcher.ts | 2 +- libs/backend-apisix/src/operator.ts | 53 +++++++++++++------ libs/backend-apisix/src/transformer.ts | 8 ++- 6 files changed, 56 insertions(+), 30 deletions(-) diff --git a/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts b/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts index 26d87dc7..305b174a 100644 --- a/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts +++ b/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts @@ -82,7 +82,10 @@ describe('Service-Upstreams E2E', () => { it('Update service inline upstream', async () => syncEvents( backend, - Differ.diff({ services: [updatedService] }, await dumpConfiguration(backend)), + Differ.diff( + { services: [updatedService] }, + await dumpConfiguration(backend), + ), )); it('Dump (inline upstream should be updated)', async () => { diff --git a/libs/backend-apisix/e2e/support/utils.ts b/libs/backend-apisix/e2e/support/utils.ts index add2b0be..4117ebff 100644 --- a/libs/backend-apisix/e2e/support/utils.ts +++ b/libs/backend-apisix/e2e/support/utils.ts @@ -34,14 +34,14 @@ export const createEvent = ( resourceName, resourceId: resourceType === ADCSDK.ResourceType.CONSUMER || - resourceType === ADCSDK.ResourceType.GLOBAL_RULE || - resourceType === ADCSDK.ResourceType.PLUGIN_METADATA + resourceType === ADCSDK.ResourceType.GLOBAL_RULE || + resourceType === ADCSDK.ResourceType.PLUGIN_METADATA ? resourceName : resourceType === ADCSDK.ResourceType.SSL ? ADCSDK.utils.generateId((resource as ADCSDK.SSL).snis.join(',')) : ADCSDK.utils.generateId( - parentName ? `${parentName}.${resourceName}` : resourceName, - ), + parentName ? `${parentName}.${resourceName}` : resourceName, + ), newValue: resource, parentId: parentName ? resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL @@ -71,12 +71,12 @@ export const deleteEvent = ( resourceName, resourceId: resourceType === ADCSDK.ResourceType.CONSUMER || - resourceType === ADCSDK.ResourceType.GLOBAL_RULE || - resourceType === ADCSDK.ResourceType.PLUGIN_METADATA + resourceType === ADCSDK.ResourceType.GLOBAL_RULE || + resourceType === ADCSDK.ResourceType.PLUGIN_METADATA ? resourceName : ADCSDK.utils.generateId( - parentName ? `${parentName}.${resourceName}` : resourceName, - ), + parentName ? `${parentName}.${resourceName}` : resourceName, + ), parentId: parentName ? resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL ? parentName diff --git a/libs/backend-apisix/package.json b/libs/backend-apisix/package.json index 1a75150f..3e8960b1 100644 --- a/libs/backend-apisix/package.json +++ b/libs/backend-apisix/package.json @@ -30,4 +30,4 @@ } } } -} \ No newline at end of file +} diff --git a/libs/backend-apisix/src/fetcher.ts b/libs/backend-apisix/src/fetcher.ts index 8635dcba..1c88f05e 100644 --- a/libs/backend-apisix/src/fetcher.ts +++ b/libs/backend-apisix/src/fetcher.ts @@ -1,6 +1,7 @@ import * as ADCSDK from '@api7/adc-sdk'; import { type AxiosInstance } from 'axios'; import { produce } from 'immer'; +import { unset } from 'lodash'; import { Subject, combineLatest, @@ -20,7 +21,6 @@ import { SemVer, gte as semVerGTE } from 'semver'; import { ToADC } from './transformer'; import * as typing from './typing'; import { resourceTypeToAPIName } from './utils'; -import { unset } from 'lodash'; export interface FetcherOptions { client: AxiosInstance; diff --git a/libs/backend-apisix/src/operator.ts b/libs/backend-apisix/src/operator.ts index 237377dc..8c3effd7 100644 --- a/libs/backend-apisix/src/operator.ts +++ b/libs/backend-apisix/src/operator.ts @@ -38,15 +38,20 @@ export class Operator extends ADCSDK.backend.BackendEventSource { private operate(event: ADCSDK.Event) { const { type, resourceType, resourceId, parentId } = event; - const path = `/apisix/admin/${resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL - ? `consumers/${parentId}/credentials/${resourceId}` - : `${resourceTypeToAPIName(resourceType)}/${resourceId}` - }`; + const path = `/apisix/admin/${ + resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL + ? `consumers/${parentId}/credentials/${resourceId}` + : `${resourceTypeToAPIName(resourceType)}/${resourceId}` + }`; // Handle deletion if (type === ADCSDK.EventType.DELETE) { // Delete service with upstream: delete service first, then upstream - if (resourceType === ADCSDK.ResourceType.SERVICE && event.oldValue && (event.oldValue as ADCSDK.Service).upstream) { + if ( + resourceType === ADCSDK.ResourceType.SERVICE && + event.oldValue && + (event.oldValue as ADCSDK.Service).upstream + ) { return this.deleteServiceWithUpstream(event, path); } return from(this.client.request({ url: path, method: 'DELETE' })); @@ -56,24 +61,38 @@ export class Operator extends ADCSDK.backend.BackendEventSource { // Handle service with upstream changes if (resourceType === ADCSDK.ResourceType.SERVICE) { - const oldUpstream = event.oldValue ? (event.oldValue as ADCSDK.Service).upstream : undefined; + const oldUpstream = event.oldValue + ? (event.oldValue as ADCSDK.Service).upstream + : undefined; const newUpstream = (event.newValue as ADCSDK.Service).upstream; // oldValue has upstream, newValue doesn't -> delete upstream if (oldUpstream && !newUpstream) { - return this.deleteUpstreamThenUpdateService(event, data as typing.Service, path); + return this.deleteUpstreamThenUpdateService( + event, + data as typing.Service, + path, + ); } // newValue has upstream -> create/update upstream if (newUpstream) { - return this.upsertServiceWithUpstream(event, data as typing.Service, path); + return this.upsertServiceWithUpstream( + event, + data as typing.Service, + path, + ); } } return from(this.client.request({ url: path, method: 'PUT', data })); } - private upsertServiceWithUpstream(event: ADCSDK.Event, data: typing.Service, servicePath: string) { + private upsertServiceWithUpstream( + event: ADCSDK.Event, + data: typing.Service, + servicePath: string, + ) { const upstreamData: typing.Upstream = { ...data.upstream, id: event.resourceId, @@ -138,12 +157,14 @@ export class Operator extends ADCSDK.backend.BackendEventSource { url: servicePath, method: 'DELETE', }), - ).pipe( - concatMap(() => this.deleteUpstreamWithRetry(event.resourceId)), - ); + ).pipe(concatMap(() => this.deleteUpstreamWithRetry(event.resourceId))); } - private deleteUpstreamThenUpdateService(event: ADCSDK.Event, data: typing.Service, servicePath: string) { + private deleteUpstreamThenUpdateService( + event: ADCSDK.Event, + data: typing.Service, + servicePath: string, + ) { // Update service first (remove upstream reference), then delete upstream with retry return from( this.client.request({ @@ -151,9 +172,7 @@ export class Operator extends ADCSDK.backend.BackendEventSource { method: 'PUT', data, }), - ).pipe( - concatMap(() => this.deleteUpstreamWithRetry(event.resourceId)), - ); + ).pipe(concatMap(() => this.deleteUpstreamWithRetry(event.resourceId))); } public sync( @@ -213,7 +232,7 @@ export class Operator extends ADCSDK.backend.BackendEventSource { () => new Error( error.response?.data?.error_msg ?? - JSON.stringify(error.response?.data), + JSON.stringify(error.response?.data), ), ); return throwError(() => error); diff --git a/libs/backend-apisix/src/transformer.ts b/libs/backend-apisix/src/transformer.ts index e2d073c7..e22a30fc 100644 --- a/libs/backend-apisix/src/transformer.ts +++ b/libs/backend-apisix/src/transformer.ts @@ -47,7 +47,9 @@ export class ToADC { hosts: service.hosts, - upstream: service.upstream ? this.transformUpstream(service.upstream) : undefined, + upstream: service.upstream + ? this.transformUpstream(service.upstream) + : undefined, upstreams: service.upstreams, plugins: service.plugins, } as ADCSDK.Service); @@ -282,7 +284,9 @@ export class FromADC { name: service.name, desc: service.description, labels: FromADC.transformLabels(service.labels), - upstream: service.upstream ? this.transformUpstream(service.upstream) : undefined, + upstream: service.upstream + ? this.transformUpstream(service.upstream) + : undefined, plugins: service.plugins, hosts: service.hosts, }); From 0451d09455d9a3df1af5c69cad84cb50c3c70464 Mon Sep 17 00:00:00 2001 From: LiteSun Date: Sun, 12 Oct 2025 18:49:11 +0800 Subject: [PATCH 11/15] u --- libs/backend-apisix/src/operator.ts | 137 ++++++++++++++-------------- 1 file changed, 69 insertions(+), 68 deletions(-) diff --git a/libs/backend-apisix/src/operator.ts b/libs/backend-apisix/src/operator.ts index 8c3effd7..07174b99 100644 --- a/libs/backend-apisix/src/operator.ts +++ b/libs/backend-apisix/src/operator.ts @@ -44,48 +44,56 @@ export class Operator extends ADCSDK.backend.BackendEventSource { : `${resourceTypeToAPIName(resourceType)}/${resourceId}` }`; - // Handle deletion + // Handle service operations separately + if (resourceType === ADCSDK.ResourceType.SERVICE) { + return this.operateService(event, path); + } + + // Handle deletion for non-service resources if (type === ADCSDK.EventType.DELETE) { - // Delete service with upstream: delete service first, then upstream - if ( - resourceType === ADCSDK.ResourceType.SERVICE && - event.oldValue && - (event.oldValue as ADCSDK.Service).upstream - ) { - return this.deleteServiceWithUpstream(event, path); - } return from(this.client.request({ url: path, method: 'DELETE' })); } + // Handle create/update for non-service resources const data = this.fromADC(event, this.opts.version); + return from(this.client.request({ url: path, method: 'PUT', data })); + } - // Handle service with upstream changes - if (resourceType === ADCSDK.ResourceType.SERVICE) { - const oldUpstream = event.oldValue - ? (event.oldValue as ADCSDK.Service).upstream - : undefined; - const newUpstream = (event.newValue as ADCSDK.Service).upstream; + private operateService(event: ADCSDK.Event, servicePath: string) { + const { type } = event; - // oldValue has upstream, newValue doesn't -> delete upstream - if (oldUpstream && !newUpstream) { - return this.deleteUpstreamThenUpdateService( - event, - data as typing.Service, - path, - ); + // Handle service deletion + if (type === ADCSDK.EventType.DELETE) { + // Delete service with upstream: delete service first, then upstream + if (event.oldValue && (event.oldValue as ADCSDK.Service).upstream) { + return this.deleteServiceWithUpstream(event, servicePath); } + return from(this.client.request({ url: servicePath, method: 'DELETE' })); + } - // newValue has upstream -> create/update upstream - if (newUpstream) { - return this.upsertServiceWithUpstream( - event, - data as typing.Service, - path, - ); - } + // Handle service create/update + const data = this.fromADC(event, this.opts.version) as typing.Service; + const oldUpstream = event.oldValue + ? (event.oldValue as ADCSDK.Service).upstream + : undefined; + const newUpstream = (event.newValue as ADCSDK.Service).upstream; + + // oldValue has upstream, newValue doesn't -> delete upstream + if (oldUpstream && !newUpstream) { + return this.deleteUpstreamThenUpdateService(event, data, servicePath); } - return from(this.client.request({ url: path, method: 'PUT', data })); + // newValue has upstream -> create/update upstream + if (newUpstream) { + return this.upsertServiceWithUpstream(event, data, servicePath); + } + + // Service without upstream + return from(this.client.request({ url: servicePath, method: 'PUT', data })); + } + + private getUpstreamUrl(upstreamId: string) { + return `/apisix/admin/upstreams/${upstreamId}`; } private upsertServiceWithUpstream( @@ -93,57 +101,44 @@ export class Operator extends ADCSDK.backend.BackendEventSource { data: typing.Service, servicePath: string, ) { - const upstreamData: typing.Upstream = { - ...data.upstream, - id: event.resourceId, - name: event.resourceName, - }; - - const serviceData = { - ...data, - upstream: undefined, - upstream_id: event.resourceId, - }; - // Create/Update upstream first, then service return from( this.client.request({ - url: `/apisix/admin/upstreams/${event.resourceId}`, + url: this.getUpstreamUrl(event.resourceId), method: 'PUT', - data: upstreamData, + data: { + ...data.upstream, + id: event.resourceId, + name: event.resourceName, + }, }), ).pipe( concatMap(() => this.client.request({ url: servicePath, method: 'PUT', - data: serviceData, + data: { ...data, upstream: undefined, upstream_id: event.resourceId }, }), ), ); } private deleteUpstreamWithRetry(upstreamId: string) { - // Delete upstream with retry on race condition return from( this.client.request({ - url: `/apisix/admin/upstreams/${upstreamId}`, + url: this.getUpstreamUrl(upstreamId), method: 'DELETE', }), ).pipe( retry({ count: 3, delay: (error: Error | AxiosError, retryCount: number) => { - // Only retry if upstream deletion fails due to "still using" race condition if ( axios.isAxiosError(error) && error.response?.data?.error_msg?.includes('is still using it') ) { - // Exponential backoff: 100ms, 200ms, 400ms - const delayMs = 100 * Math.pow(2, retryCount - 1); - return of(null).pipe(delay(delayMs)); + return of(null).pipe(delay(100 * Math.pow(2, retryCount - 1))); } - // Don't retry other errors return throwError(() => error); }, }), @@ -151,13 +146,10 @@ export class Operator extends ADCSDK.backend.BackendEventSource { } private deleteServiceWithUpstream(event: ADCSDK.Event, servicePath: string) { - // Delete service first, then upstream with retry - return from( - this.client.request({ - url: servicePath, - method: 'DELETE', - }), - ).pipe(concatMap(() => this.deleteUpstreamWithRetry(event.resourceId))); + return this.executeServiceOpThenDeleteUpstream( + { url: servicePath, method: 'DELETE' }, + event.resourceId, + ); } private deleteUpstreamThenUpdateService( @@ -165,14 +157,23 @@ export class Operator extends ADCSDK.backend.BackendEventSource { data: typing.Service, servicePath: string, ) { - // Update service first (remove upstream reference), then delete upstream with retry - return from( - this.client.request({ - url: servicePath, - method: 'PUT', - data, - }), - ).pipe(concatMap(() => this.deleteUpstreamWithRetry(event.resourceId))); + return this.executeServiceOpThenDeleteUpstream( + { url: servicePath, method: 'PUT', data }, + event.resourceId, + ); + } + + private executeServiceOpThenDeleteUpstream( + serviceRequest: { + url: string; + method: 'DELETE' | 'PUT'; + data?: typing.Service; + }, + upstreamId: string, + ) { + return from(this.client.request(serviceRequest)).pipe( + concatMap(() => this.deleteUpstreamWithRetry(upstreamId)), + ); } public sync( From cb028dba5b522c28c930f52d1cd665997477f1d6 Mon Sep 17 00:00:00 2001 From: LiteSun Date: Sun, 12 Oct 2025 18:56:16 +0800 Subject: [PATCH 12/15] f --- libs/backend-apisix/src/operator.ts | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/libs/backend-apisix/src/operator.ts b/libs/backend-apisix/src/operator.ts index 07174b99..ebae40ce 100644 --- a/libs/backend-apisix/src/operator.ts +++ b/libs/backend-apisix/src/operator.ts @@ -38,6 +38,7 @@ export class Operator extends ADCSDK.backend.BackendEventSource { private operate(event: ADCSDK.Event) { const { type, resourceType, resourceId, parentId } = event; + const isUpdate = type !== ADCSDK.EventType.DELETE; const path = `/apisix/admin/${ resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL ? `consumers/${parentId}/credentials/${resourceId}` @@ -49,14 +50,16 @@ export class Operator extends ADCSDK.backend.BackendEventSource { return this.operateService(event, path); } - // Handle deletion for non-service resources - if (type === ADCSDK.EventType.DELETE) { - return from(this.client.request({ url: path, method: 'DELETE' })); - } - - // Handle create/update for non-service resources - const data = this.fromADC(event, this.opts.version); - return from(this.client.request({ url: path, method: 'PUT', data })); + return from( + this.client.request({ + method: 'DELETE', + url: path, + ...(isUpdate && { + method: 'PUT', + data: this.fromADC(event, this.opts.version), + }), + }), + ); } private operateService(event: ADCSDK.Event, servicePath: string) { From fdd9c229452c564d9cf99c78f5dfa6d279c9449f Mon Sep 17 00:00:00 2001 From: LiteSun Date: Tue, 28 Oct 2025 11:15:16 +0800 Subject: [PATCH 13/15] u --- .../resources/service-upstream.e2e-spec.ts | 51 ------ libs/backend-apisix/src/operator.ts | 171 +++++------------- 2 files changed, 41 insertions(+), 181 deletions(-) diff --git a/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts b/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts index 305b174a..361550a2 100644 --- a/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts +++ b/libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts @@ -99,57 +99,6 @@ describe('Service-Upstreams E2E', () => { expect(testService?.upstream?.name).toBeUndefined(); }); - const serviceWithoutUpstream = { - name: serviceName, - hosts: ['test.example.com'], - } satisfies ADCSDK.Service; - it('Update service to remove inline upstream', async () => - syncEvents( - backend, - Differ.diff( - { services: [serviceWithoutUpstream] }, - await dumpConfiguration(backend), - ), - )); - - it('Dump (inline upstream should be removed)', async () => { - const result = await dumpConfiguration(backend); - const testService = result.services?.find((s) => s.name === serviceName); - expect(testService).toBeDefined(); - expect(testService?.upstream).toBeUndefined(); - expect(testService?.hosts).toEqual(['test.example.com']); - }); - - const serviceForDeletion = { - name: serviceName, - hosts: ['test.example.com'], - upstream: { - type: 'roundrobin', - nodes: [ - { - host: 'httpbin.org', - port: 443, - weight: 100, - }, - ], - }, - } satisfies ADCSDK.Service; - it('Re-add inline upstream for deletion test', async () => - syncEvents( - backend, - Differ.diff( - { services: [serviceForDeletion] }, - await dumpConfiguration(backend), - ), - )); - - it('Dump (inline upstream should exist again)', async () => { - const result = await dumpConfiguration(backend); - const testService = result.services?.find((s) => s.name === serviceName); - expect(testService).toBeDefined(); - expect(testService?.upstream).toBeDefined(); - }); - it('Delete service with inline upstream', async () => syncEvents(backend, Differ.diff({}, await dumpConfiguration(backend)))); diff --git a/libs/backend-apisix/src/operator.ts b/libs/backend-apisix/src/operator.ts index ebae40ce..43578f4f 100644 --- a/libs/backend-apisix/src/operator.ts +++ b/libs/backend-apisix/src/operator.ts @@ -6,6 +6,7 @@ import { Subject, catchError, concatMap, + defer, delay, from, map, @@ -39,143 +40,53 @@ export class Operator extends ADCSDK.backend.BackendEventSource { private operate(event: ADCSDK.Event) { const { type, resourceType, resourceId, parentId } = event; const isUpdate = type !== ADCSDK.EventType.DELETE; - const path = `/apisix/admin/${ - resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL + const PATH_PREFIX = '/apisix/admin'; + const paths = [ + `${PATH_PREFIX}/${resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL ? `consumers/${parentId}/credentials/${resourceId}` : `${resourceTypeToAPIName(resourceType)}/${resourceId}` - }`; + }`, + ]; - // Handle service operations separately - if (resourceType === ADCSDK.ResourceType.SERVICE) { - return this.operateService(event, path); + if (event.resourceType === ADCSDK.ResourceType.SERVICE) { + const path = `${PATH_PREFIX}/upstreams/${event.resourceId}`; + if (event.type === ADCSDK.EventType.DELETE) + paths.push(path); // services will be deleted before upstreams + else paths.unshift(path); // services will be created/updated after upstreams } - return from( - this.client.request({ - method: 'DELETE', - url: path, - ...(isUpdate && { - method: 'PUT', - data: this.fromADC(event, this.opts.version), - }), - }), - ); - } - - private operateService(event: ADCSDK.Event, servicePath: string) { - const { type } = event; - - // Handle service deletion - if (type === ADCSDK.EventType.DELETE) { - // Delete service with upstream: delete service first, then upstream - if (event.oldValue && (event.oldValue as ADCSDK.Service).upstream) { - return this.deleteServiceWithUpstream(event, servicePath); - } - return from(this.client.request({ url: servicePath, method: 'DELETE' })); - } - - // Handle service create/update - const data = this.fromADC(event, this.opts.version) as typing.Service; - const oldUpstream = event.oldValue - ? (event.oldValue as ADCSDK.Service).upstream - : undefined; - const newUpstream = (event.newValue as ADCSDK.Service).upstream; - - // oldValue has upstream, newValue doesn't -> delete upstream - if (oldUpstream && !newUpstream) { - return this.deleteUpstreamThenUpdateService(event, data, servicePath); - } - - // newValue has upstream -> create/update upstream - if (newUpstream) { - return this.upsertServiceWithUpstream(event, data, servicePath); - } - - // Service without upstream - return from(this.client.request({ url: servicePath, method: 'PUT', data })); - } - - private getUpstreamUrl(upstreamId: string) { - return `/apisix/admin/upstreams/${upstreamId}`; - } + const operateWithRetry = (op: () => Promise) => + defer(op).pipe(retry({ count: 3, delay: 100 })); + return from(paths).pipe( + map( + (path) => () => { + let data = undefined; + if (isUpdate) { + data = this.fromADC(event, this.opts.version); + if (event.resourceType === ADCSDK.ResourceType.SERVICE && path.includes('/upstreams')) { + data = { + id: event.resourceId, + name: event.resourceName, + ...(data as typing.Service).upstream, + }; + } - private upsertServiceWithUpstream( - event: ADCSDK.Event, - data: typing.Service, - servicePath: string, - ) { - // Create/Update upstream first, then service - return from( - this.client.request({ - url: this.getUpstreamUrl(event.resourceId), - method: 'PUT', - data: { - ...data.upstream, - id: event.resourceId, - name: event.resourceName, - }, - }), - ).pipe( - concatMap(() => - this.client.request({ - url: servicePath, - method: 'PUT', - data: { ...data, upstream: undefined, upstream_id: event.resourceId }, - }), - ), - ); - } - - private deleteUpstreamWithRetry(upstreamId: string) { - return from( - this.client.request({ - url: this.getUpstreamUrl(upstreamId), - method: 'DELETE', - }), - ).pipe( - retry({ - count: 3, - delay: (error: Error | AxiosError, retryCount: number) => { - if ( - axios.isAxiosError(error) && - error.response?.data?.error_msg?.includes('is still using it') - ) { - return of(null).pipe(delay(100 * Math.pow(2, retryCount - 1))); + if (event.resourceType === ADCSDK.ResourceType.SERVICE && path.includes('/services')) { + data = { ...data, upstream: undefined, upstream_id: event.resourceId }; + } } - return throwError(() => error); - }, - }), - ); - } - private deleteServiceWithUpstream(event: ADCSDK.Event, servicePath: string) { - return this.executeServiceOpThenDeleteUpstream( - { url: servicePath, method: 'DELETE' }, - event.resourceId, - ); - } - - private deleteUpstreamThenUpdateService( - event: ADCSDK.Event, - data: typing.Service, - servicePath: string, - ) { - return this.executeServiceOpThenDeleteUpstream( - { url: servicePath, method: 'PUT', data }, - event.resourceId, - ); - } - - private executeServiceOpThenDeleteUpstream( - serviceRequest: { - url: string; - method: 'DELETE' | 'PUT'; - data?: typing.Service; - }, - upstreamId: string, - ) { - return from(this.client.request(serviceRequest)).pipe( - concatMap(() => this.deleteUpstreamWithRetry(upstreamId)), + return this.client.request({ + method: 'DELETE', + url: path, + ...(isUpdate && { + method: 'PUT', + data, + }), + }); + } + ), + concatMap(operateWithRetry), ); } @@ -236,7 +147,7 @@ export class Operator extends ADCSDK.backend.BackendEventSource { () => new Error( error.response?.data?.error_msg ?? - JSON.stringify(error.response?.data), + JSON.stringify(error.response?.data), ), ); return throwError(() => error); From b7a1b22e3c686df8e8a1b746b93028385de55dbc Mon Sep 17 00:00:00 2001 From: bzp2010 Date: Tue, 28 Oct 2025 22:38:37 +0800 Subject: [PATCH 14/15] chore: clean code --- libs/backend-apisix/src/operator.ts | 64 ++++++++++++-------------- libs/backend-apisix/src/transformer.ts | 31 ++++++++----- 2 files changed, 49 insertions(+), 46 deletions(-) diff --git a/libs/backend-apisix/src/operator.ts b/libs/backend-apisix/src/operator.ts index 43578f4f..638f77bc 100644 --- a/libs/backend-apisix/src/operator.ts +++ b/libs/backend-apisix/src/operator.ts @@ -7,7 +7,6 @@ import { catchError, concatMap, defer, - delay, from, map, mergeMap, @@ -42,9 +41,10 @@ export class Operator extends ADCSDK.backend.BackendEventSource { const isUpdate = type !== ADCSDK.EventType.DELETE; const PATH_PREFIX = '/apisix/admin'; const paths = [ - `${PATH_PREFIX}/${resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL - ? `consumers/${parentId}/credentials/${resourceId}` - : `${resourceTypeToAPIName(resourceType)}/${resourceId}` + `${PATH_PREFIX}/${ + resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL + ? `consumers/${parentId}/credentials/${resourceId}` + : `${resourceTypeToAPIName(resourceType)}/${resourceId}` }`, ]; @@ -58,34 +58,21 @@ export class Operator extends ADCSDK.backend.BackendEventSource { const operateWithRetry = (op: () => Promise) => defer(op).pipe(retry({ count: 3, delay: 100 })); return from(paths).pipe( - map( - (path) => () => { - let data = undefined; - if (isUpdate) { - data = this.fromADC(event, this.opts.version); - if (event.resourceType === ADCSDK.ResourceType.SERVICE && path.includes('/upstreams')) { - data = { - id: event.resourceId, - name: event.resourceName, - ...(data as typing.Service).upstream, - }; - } - - if (event.resourceType === ADCSDK.ResourceType.SERVICE && path.includes('/services')) { - data = { ...data, upstream: undefined, upstream_id: event.resourceId }; - } - } - - return this.client.request({ - method: 'DELETE', - url: path, - ...(isUpdate && { - method: 'PUT', - data, - }), - }); - } - ), + map((path) => () => { + const data = this.fromADC(event, this.opts.version); + return this.client.request({ + method: 'DELETE', + url: path, + ...(isUpdate && { + method: 'PUT', + data: + event.resourceType === ADCSDK.ResourceType.SERVICE && + path.includes('/upstreams') + ? (data as typing.Service).upstream + : data, + }), + }); + }), concatMap(operateWithRetry), ); } @@ -147,7 +134,7 @@ export class Operator extends ADCSDK.backend.BackendEventSource { () => new Error( error.response?.data?.error_msg ?? - JSON.stringify(error.response?.data), + JSON.stringify(error.response?.data), ), ); return throwError(() => error); @@ -232,9 +219,16 @@ export class Operator extends ADCSDK.backend.BackendEventSource { if (event.parentId) route.service_id = event.parentId; return route; } - case ADCSDK.ResourceType.SERVICE: + case ADCSDK.ResourceType.SERVICE: { (event.newValue as ADCSDK.Service).id = event.resourceId; - return fromADC.transformService(event.newValue as ADCSDK.Service); + const [service, upstream] = fromADC.transformService( + event.newValue as ADCSDK.Service, + ); + return { + ...service, + ...(upstream && { upstream: upstream }), + }; + } case ADCSDK.ResourceType.SSL: (event.newValue as ADCSDK.SSL).id = event.resourceId; return fromADC.transformSSL(event.newValue as ADCSDK.SSL); diff --git a/libs/backend-apisix/src/transformer.ts b/libs/backend-apisix/src/transformer.ts index e22a30fc..f82f1879 100644 --- a/libs/backend-apisix/src/transformer.ts +++ b/libs/backend-apisix/src/transformer.ts @@ -278,18 +278,27 @@ export class FromADC { }); } - public transformService(service: ADCSDK.Service): typing.Service { - return ADCSDK.utils.recursiveOmitUndefined({ - id: service.id, - name: service.name, - desc: service.description, - labels: FromADC.transformLabels(service.labels), - upstream: service.upstream - ? this.transformUpstream(service.upstream) + public transformService( + service: ADCSDK.Service, + ): [typing.Service, typing.Upstream | undefined] { + return [ + ADCSDK.utils.recursiveOmitUndefined({ + id: service.id, + name: service.name, + desc: service.description, + labels: FromADC.transformLabels(service.labels), + upstream_id: service.id, + plugins: service.plugins, + hosts: service.hosts, + }), + service.upstream + ? ({ + ...this.transformUpstream(service.upstream), + id: service.id, + name: service.name, + } as typing.Upstream) : undefined, - plugins: service.plugins, - hosts: service.hosts, - }); + ]; } public transformConsumer(consumer: ADCSDK.Consumer): typing.Consumer { From e5c86e4bb0bc051ca45c2c5fadc09348e0915ad5 Mon Sep 17 00:00:00 2001 From: bzp2010 Date: Tue, 28 Oct 2025 23:43:21 +0800 Subject: [PATCH 15/15] fix --- libs/backend-apisix/src/operator.ts | 31 +++++++++++++++-------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/libs/backend-apisix/src/operator.ts b/libs/backend-apisix/src/operator.ts index 638f77bc..7eaa374a 100644 --- a/libs/backend-apisix/src/operator.ts +++ b/libs/backend-apisix/src/operator.ts @@ -58,21 +58,22 @@ export class Operator extends ADCSDK.backend.BackendEventSource { const operateWithRetry = (op: () => Promise) => defer(op).pipe(retry({ count: 3, delay: 100 })); return from(paths).pipe( - map((path) => () => { - const data = this.fromADC(event, this.opts.version); - return this.client.request({ - method: 'DELETE', - url: path, - ...(isUpdate && { - method: 'PUT', - data: - event.resourceType === ADCSDK.ResourceType.SERVICE && - path.includes('/upstreams') - ? (data as typing.Service).upstream - : data, + map( + (path) => () => + this.client.request({ + method: 'DELETE', + url: path, + ...(isUpdate && { + method: 'PUT', + data: + event.resourceType === ADCSDK.ResourceType.SERVICE && + path.includes('/upstreams') + ? (this.fromADC(event, this.opts.version) as typing.Service) + .upstream + : this.fromADC(event, this.opts.version), + }), }), - }); - }), + ), concatMap(operateWithRetry), ); } @@ -214,7 +215,7 @@ export class Operator extends ADCSDK.backend.BackendEventSource { (event.newValue as ADCSDK.Route).id = event.resourceId; const route = fromADC.transformRoute( event.newValue as ADCSDK.Route, - event.parentId, + event.parentId!, ); if (event.parentId) route.service_id = event.parentId; return route;