Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions libs/backend-apisix/e2e/resources/service-upstream.e2e-spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Differ } from '@api7/adc-differ';
import * as ADCSDK from '@api7/adc-sdk';

import { BackendAPISIX } from '../../src';
Expand All @@ -22,6 +23,143 @@ describe('Service-Upstreams E2E', () => {
});
});

describe('Service inline upstream', () => {
const serviceName = 'test-inline-upstream';
const service = {
name: serviceName,
upstream: {
type: 'roundrobin',
nodes: [
{
host: 'httpbin.org',
port: 443,
weight: 100,
},
],
},
} satisfies ADCSDK.Service;

it('Create service with inline upstream', async () =>
syncEvents(backend, Differ.diff({ services: [service] }, {})));

it('Dump (inline upstream should exist)', async () => {
const result = await dumpConfiguration(backend);
const testService = result.services?.find((s) => s.name === serviceName);
expect(testService).toBeDefined();
expect(testService?.upstream).toMatchObject({
type: 'roundrobin',
nodes: [
{
host: 'httpbin.org',
port: 443,
weight: 100,
},
],
});
// Verify that inline upstream has no id and name
expect(testService?.upstream?.id).toBeUndefined();
expect(testService?.upstream?.name).toBeUndefined();
});

const updatedService = {
name: serviceName,
upstream: {
type: 'roundrobin',
nodes: [
{
host: 'httpbin.org',
port: 443,
weight: 50,
},
{
host: 'example.com',
port: 80,
weight: 50,
},
],
},
} satisfies ADCSDK.Service;
it('Update service inline upstream', async () =>
syncEvents(
backend,
Differ.diff(
{ services: [updatedService] },
await dumpConfiguration(backend),
),
));

it('Dump (inline upstream should be updated)', async () => {
const result = await dumpConfiguration(backend);
const testService = result.services?.find((s) => s.name === serviceName);
expect(testService).toBeDefined();
expect(testService?.upstream?.nodes).toHaveLength(2);
expect(testService?.upstream).toMatchObject(updatedService.upstream);
// Verify that inline upstream still has no id and name
expect(testService?.upstream?.id).toBeUndefined();
expect(testService?.upstream?.name).toBeUndefined();
});

const serviceWithoutUpstream = {
name: serviceName,
hosts: ['test.example.com'],
} satisfies ADCSDK.Service;
it('Update service to remove inline upstream', async () =>
syncEvents(
backend,
Differ.diff(
{ services: [serviceWithoutUpstream] },
await dumpConfiguration(backend),
),
));

it('Dump (inline upstream should be removed)', async () => {
const result = await dumpConfiguration(backend);
const testService = result.services?.find((s) => s.name === serviceName);
expect(testService).toBeDefined();
expect(testService?.upstream).toBeUndefined();
expect(testService?.hosts).toEqual(['test.example.com']);
});

const serviceForDeletion = {
name: serviceName,
hosts: ['test.example.com'],
upstream: {
type: 'roundrobin',
nodes: [
{
host: 'httpbin.org',
port: 443,
weight: 100,
},
],
},
} satisfies ADCSDK.Service;
it('Re-add inline upstream for deletion test', async () =>
syncEvents(
backend,
Differ.diff(
{ services: [serviceForDeletion] },
await dumpConfiguration(backend),
),
));

it('Dump (inline upstream should exist again)', async () => {
const result = await dumpConfiguration(backend);
const testService = result.services?.find((s) => s.name === serviceName);
expect(testService).toBeDefined();
expect(testService?.upstream).toBeDefined();
});

it('Delete service with inline upstream', async () =>
syncEvents(backend, Differ.diff({}, await dumpConfiguration(backend))));

it('Dump again (service should not exist)', async () => {
const result = await dumpConfiguration(backend);
const testService = result.services?.find((s) => s.name === serviceName);
expect(testService).toBeUndefined();
});
});

describe('Service multiple upstreams', () => {
const serviceName = 'test';
const service = {
Expand Down
3 changes: 2 additions & 1 deletion libs/backend-apisix/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
}
},
"devDependencies": {
"@api7/adc-sdk": "workspace:*"
"@api7/adc-sdk": "workspace:*",
"@api7/adc-differ": "workspace:*"
},
"nx": {
"name": "backend-apisix",
Expand Down
3 changes: 3 additions & 0 deletions libs/backend-apisix/src/fetcher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as ADCSDK from '@api7/adc-sdk';
import { type AxiosInstance } from 'axios';
import { produce } from 'immer';
import { unset } from 'lodash';
import {
Subject,
combineLatest,
Expand Down Expand Up @@ -271,6 +272,8 @@ export class Fetcher extends ADCSDK.backend.BackendEventSource {
produce(service, (serviceDraft) => {
if (service.upstream_id)
serviceDraft.upstream = upstreamIdMap[service.upstream_id];
unset(serviceDraft, 'upstream.id');
unset(serviceDraft, 'upstream.name');
if (upstreamServiceIdMap[service.id])
serviceDraft.upstreams = upstreamServiceIdMap[service.id];
}),
Expand Down
124 changes: 124 additions & 0 deletions libs/backend-apisix/src/operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import {
Subject,
catchError,
concatMap,
delay,
from,
map,
mergeMap,
of,
reduce,
retry,
tap,
throwError,
} from 'rxjs';
Expand Down Expand Up @@ -43,6 +45,11 @@ export class Operator extends ADCSDK.backend.BackendEventSource {
: `${resourceTypeToAPIName(resourceType)}/${resourceId}`
}`;

// Handle service operations separately
if (resourceType === ADCSDK.ResourceType.SERVICE) {
return this.operateService(event, path);
}

return from(
this.client.request({
method: 'DELETE',
Expand All @@ -55,6 +62,123 @@ export class Operator extends ADCSDK.backend.BackendEventSource {
);
}

private operateService(event: ADCSDK.Event, servicePath: string) {
const { type } = event;

// Handle service deletion
if (type === ADCSDK.EventType.DELETE) {
// Delete service with upstream: delete service first, then upstream
if (event.oldValue && (event.oldValue as ADCSDK.Service).upstream) {
return this.deleteServiceWithUpstream(event, servicePath);
}
return from(this.client.request({ url: servicePath, method: 'DELETE' }));
}

// Handle service create/update
const data = this.fromADC(event, this.opts.version) as typing.Service;
const oldUpstream = event.oldValue
? (event.oldValue as ADCSDK.Service).upstream
: undefined;
const newUpstream = (event.newValue as ADCSDK.Service).upstream;

// oldValue has upstream, newValue doesn't -> delete upstream
if (oldUpstream && !newUpstream) {
return this.deleteUpstreamThenUpdateService(event, data, servicePath);
}

// newValue has upstream -> create/update upstream
if (newUpstream) {
return this.upsertServiceWithUpstream(event, data, servicePath);
}

// Service without upstream
return from(this.client.request({ url: servicePath, method: 'PUT', data }));
}

private getUpstreamUrl(upstreamId: string) {
return `/apisix/admin/upstreams/${upstreamId}`;
}

private upsertServiceWithUpstream(
event: ADCSDK.Event,
data: typing.Service,
servicePath: string,
) {
// Create/Update upstream first, then service
return from(
this.client.request({
url: this.getUpstreamUrl(event.resourceId),
method: 'PUT',
data: {
...data.upstream,
id: event.resourceId,
name: event.resourceName,
},
}),
).pipe(
concatMap(() =>
this.client.request({
url: servicePath,
method: 'PUT',
data: { ...data, upstream: undefined, upstream_id: event.resourceId },
}),
),
);
}

private deleteUpstreamWithRetry(upstreamId: string) {
return from(
this.client.request({
url: this.getUpstreamUrl(upstreamId),
method: 'DELETE',
}),
).pipe(
retry({
count: 3,
delay: (error: Error | AxiosError, retryCount: number) => {
if (
axios.isAxiosError(error) &&
error.response?.data?.error_msg?.includes('is still using it')
) {
return of(null).pipe(delay(100 * Math.pow(2, retryCount - 1)));
}
return throwError(() => error);
},
}),
);
}

private deleteServiceWithUpstream(event: ADCSDK.Event, servicePath: string) {
return this.executeServiceOpThenDeleteUpstream(
{ url: servicePath, method: 'DELETE' },
event.resourceId,
);
}

private deleteUpstreamThenUpdateService(
event: ADCSDK.Event,
data: typing.Service,
servicePath: string,
) {
return this.executeServiceOpThenDeleteUpstream(
{ url: servicePath, method: 'PUT', data },
event.resourceId,
);
}

private executeServiceOpThenDeleteUpstream(
serviceRequest: {
url: string;
method: 'DELETE' | 'PUT';
data?: typing.Service;
},
upstreamId: string,
) {
return from(this.client.request(serviceRequest)).pipe(
concatMap(() => this.deleteUpstreamWithRetry(upstreamId)),
);
}

public sync(
events: Array<ADCSDK.Event>,
opts: ADCSDK.BackendSyncOptions = { exitOnFailure: true },
Expand Down
8 changes: 6 additions & 2 deletions libs/backend-apisix/src/transformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ export class ToADC {

hosts: service.hosts,

upstream: this.transformUpstream(service.upstream),
upstream: service.upstream
? this.transformUpstream(service.upstream)
: undefined,
upstreams: service.upstreams,
plugins: service.plugins,
} as ADCSDK.Service);
Expand Down Expand Up @@ -282,7 +284,9 @@ export class FromADC {
name: service.name,
desc: service.description,
labels: FromADC.transformLabels(service.labels),
upstream: this.transformUpstream(service.upstream),
upstream: service.upstream
? this.transformUpstream(service.upstream)
: undefined,
plugins: service.plugins,
hosts: service.hosts,
});
Expand Down
Loading
Loading