diff --git a/src/collaboration/change-history.service.spec.ts b/src/collaboration/change-history.service.spec.ts new file mode 100644 index 00000000..49561dca --- /dev/null +++ b/src/collaboration/change-history.service.spec.ts @@ -0,0 +1,74 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ChangeHistoryService } from './change-history.service'; +import { Operation } from './ot-crdt.service'; + +const makeOp = (revision: number, sessionId = 's1'): Operation => ({ + type: 'insert', + position: 0, + content: 'x', + userId: 'u1', + sessionId, + revision, +}); + +describe('ChangeHistoryService', () => { + let service: ChangeHistoryService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ChangeHistoryService], + }).compile(); + service = module.get(ChangeHistoryService); + }); + + describe('record', () => { + it('stores an entry', () => { + service.record(makeOp(1)); + expect(service.getHistory('s1')).toHaveLength(1); + }); + + it('stores entries for different sessions independently', () => { + service.record(makeOp(1, 's1')); + service.record(makeOp(1, 's2')); + expect(service.getHistory('s1')).toHaveLength(1); + expect(service.getHistory('s2')).toHaveLength(1); + }); + }); + + describe('getHistory', () => { + beforeEach(() => { + service.record(makeOp(1)); + service.record(makeOp(2)); + service.record(makeOp(3)); + }); + + it('returns all entries from revision 0', () => { + expect(service.getHistory('s1')).toHaveLength(3); + }); + + it('filters entries after fromRevision', () => { + expect(service.getHistory('s1', 1)).toHaveLength(2); // rev 2 and 3 + }); + + it('returns empty for unknown session', () => { + expect(service.getHistory('unknown')).toEqual([]); + }); + }); + + describe('getLatest', () => { + it('returns last N entries', () => { + for (let i = 1; i <= 5; i++) service.record(makeOp(i)); + const latest = service.getLatest('s1', 3); + expect(latest).toHaveLength(3); + expect(latest[0].revision).toBe(3); + }); + }); + + describe('clear', () => { + it('removes all history for session', () => { + service.record(makeOp(1)); + service.clear('s1'); + expect(service.getHistory('s1')).toHaveLength(0); + }); + }); +}); diff --git a/src/collaboration/change-history.service.ts b/src/collaboration/change-history.service.ts new file mode 100644 index 00000000..ed0432d9 --- /dev/null +++ b/src/collaboration/change-history.service.ts @@ -0,0 +1,38 @@ +import { Injectable } from '@nestjs/common'; +import { Operation } from './ot-crdt.service'; + +export interface HistoryEntry { + revision: number; + operation: Operation; + appliedAt: Date; +} + +@Injectable() +export class ChangeHistoryService { + // sessionId -> ordered history entries + private readonly history = new Map(); + + record(operation: Operation): void { + if (!this.history.has(operation.sessionId)) { + this.history.set(operation.sessionId, []); + } + this.history.get(operation.sessionId)!.push({ + revision: operation.revision, + operation, + appliedAt: new Date(), + }); + } + + getHistory(sessionId: string, fromRevision = 0): HistoryEntry[] { + return (this.history.get(sessionId) ?? []).filter((e) => e.revision > fromRevision); + } + + getLatest(sessionId: string, limit = 50): HistoryEntry[] { + const entries = this.history.get(sessionId) ?? []; + return entries.slice(-limit); + } + + clear(sessionId: string): void { + this.history.delete(sessionId); + } +} diff --git a/src/collaboration/collaboration.gateway.ts b/src/collaboration/collaboration.gateway.ts new file mode 100644 index 00000000..1ec4effc --- /dev/null +++ b/src/collaboration/collaboration.gateway.ts @@ -0,0 +1,119 @@ +import { Logger } from '@nestjs/common'; +import { + WebSocketGateway, + WebSocketServer, + SubscribeMessage, + MessageBody, + ConnectedSocket, + OnGatewayDisconnect, +} from '@nestjs/websockets'; +import { Server, Socket } from 'socket.io'; +import { COLLABORATION_EVENTS } from './constants/collaboration-events.constants'; +import { JoinSessionDto, CollaborativeOperationDto, SyncRequestDto } from './dto/websocket.dto'; +import { OtCrdtService, Operation } from './ot-crdt.service'; +import { PresenceService } from './presence.service'; +import { ChangeHistoryService } from './change-history.service'; + +@WebSocketGateway({ namespace: '/collaboration', cors: { origin: '*' } }) +export class CollaborationGateway implements OnGatewayDisconnect { + @WebSocketServer() + server: Server; + + private readonly logger = new Logger(CollaborationGateway.name); + // socketId -> { sessionId, userId } + private readonly socketMap = new Map(); + + constructor( + private readonly otCrdt: OtCrdtService, + private readonly presence: PresenceService, + private readonly history: ChangeHistoryService, + ) {} + + handleDisconnect(client: Socket): void { + const info = this.socketMap.get(client.id); + if (info) { + this.presence.leave(info.sessionId, info.userId); + this.socketMap.delete(client.id); + this.server.to(info.sessionId).emit(COLLABORATION_EVENTS.USER_JOINED, { + userId: info.userId, + event: 'left', + presence: this.presence.getPresence(info.sessionId), + }); + } + } + + @SubscribeMessage(COLLABORATION_EVENTS.JOIN_SESSION) + handleJoin(@MessageBody() dto: JoinSessionDto, @ConnectedSocket() client: Socket) { + client.join(dto.sessionId); + this.socketMap.set(client.id, { sessionId: dto.sessionId, userId: dto.userId }); + const presenceInfo = this.presence.join(dto.sessionId, dto.userId); + + this.server.to(dto.sessionId).emit(COLLABORATION_EVENTS.USER_JOINED, { + userId: dto.userId, + event: 'joined', + presence: this.presence.getPresence(dto.sessionId), + }); + + return { + event: COLLABORATION_EVENTS.SESSION_STATE, + data: { + sessionId: dto.sessionId, + revision: this.otCrdt.currentRevision(dto.sessionId), + presence: this.presence.getPresence(dto.sessionId), + presenceInfo, + }, + }; + } + + @SubscribeMessage(COLLABORATION_EVENTS.COLLABORATIVE_OPERATION) + handleOperation( + @MessageBody() dto: CollaborativeOperationDto, + @ConnectedSocket() client: Socket, + ) { + const incomingOp = dto.operation as Operation; + const revision = this.otCrdt.nextRevision(dto.sessionId); + const op: Operation = { ...incomingOp, sessionId: dto.sessionId, userId: dto.userId, revision }; + + // Transform against any concurrent ops at the same revision + const concurrent = this.history + .getHistory(dto.sessionId, revision - 1) + .filter((e) => e.revision === revision && e.operation.userId !== dto.userId); + + let finalOp = op; + for (const entry of concurrent) { + const result = this.otCrdt.transform(finalOp, entry.operation); + finalOp = result.operation; + } + + this.history.record(finalOp); + + // Broadcast to all other clients in the session + client.to(dto.sessionId).emit(COLLABORATION_EVENTS.OPERATION_APPLIED, { + operation: finalOp, + revision, + }); + + return { + event: COLLABORATION_EVENTS.OPERATION_APPLIED, + data: { operation: finalOp, revision }, + }; + } + + @SubscribeMessage(COLLABORATION_EVENTS.REQUEST_SYNC) + handleSync(@MessageBody() dto: SyncRequestDto) { + const revision = this.otCrdt.currentRevision(dto.sessionId); + const history = this.history.getLatest(dto.sessionId); + + return { + event: COLLABORATION_EVENTS.FULL_SYNC, + data: { sessionId: dto.sessionId, revision, history }, + }; + } + + @SubscribeMessage(COLLABORATION_EVENTS.RESOLVE_CONFLICT) + handleConflict(@MessageBody() body: { op1: Operation; op2: Operation; sessionId: string }) { + const resolved = this.otCrdt.resolveConflict(body.op1, body.op2); + this.server.to(body.sessionId).emit(COLLABORATION_EVENTS.CONFLICT_RESOLVED, { resolved }); + return { event: COLLABORATION_EVENTS.CONFLICT_RESOLVED, data: { resolved } }; + } +} diff --git a/src/collaboration/collaboration.module.ts b/src/collaboration/collaboration.module.ts new file mode 100644 index 00000000..5c377a20 --- /dev/null +++ b/src/collaboration/collaboration.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; +import { OtCrdtService } from './ot-crdt.service'; +import { PresenceService } from './presence.service'; +import { ChangeHistoryService } from './change-history.service'; +import { CollaborationGateway } from './collaboration.gateway'; + +@Module({ + providers: [OtCrdtService, PresenceService, ChangeHistoryService, CollaborationGateway], + exports: [OtCrdtService, PresenceService, ChangeHistoryService], +}) +export class CollaborationModule {} diff --git a/src/collaboration/ot-crdt.service.spec.ts b/src/collaboration/ot-crdt.service.spec.ts new file mode 100644 index 00000000..ffa110a7 --- /dev/null +++ b/src/collaboration/ot-crdt.service.spec.ts @@ -0,0 +1,95 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { OtCrdtService, Operation } from './ot-crdt.service'; + +const makeOp = (overrides: Partial = {}): Operation => ({ + type: 'insert', + position: 0, + content: 'a', + userId: 'u1', + sessionId: 's1', + revision: 1, + ...overrides, +}); + +describe('OtCrdtService', () => { + let service: OtCrdtService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [OtCrdtService], + }).compile(); + service = module.get(OtCrdtService); + }); + + describe('nextRevision / currentRevision', () => { + it('starts at 0', () => expect(service.currentRevision('s1')).toBe(0)); + it('increments on each call', () => { + expect(service.nextRevision('s1')).toBe(1); + expect(service.nextRevision('s1')).toBe(2); + expect(service.currentRevision('s1')).toBe(2); + }); + it('tracks sessions independently', () => { + service.nextRevision('s1'); + expect(service.currentRevision('s2')).toBe(0); + }); + }); + + describe('transform', () => { + it('returns untransformed when sessions differ', () => { + const op = makeOp({ sessionId: 's1' }); + const against = makeOp({ sessionId: 's2' }); + const result = service.transform(op, against); + expect(result.transformed).toBe(false); + expect(result.operation).toEqual(op); + }); + + it('insert vs insert: shifts position when against is before', () => { + const op = makeOp({ type: 'insert', position: 5, content: 'x' }); + const against = makeOp({ type: 'insert', position: 3, content: 'ab' }); + const result = service.transform(op, against); + expect(result.operation.position).toBe(7); // 5 + 2 + }); + + it('insert vs insert: no shift when against is after', () => { + const op = makeOp({ type: 'insert', position: 3, content: 'x' }); + const against = makeOp({ type: 'insert', position: 5, content: 'ab' }); + const result = service.transform(op, against); + expect(result.operation.position).toBe(3); + }); + + it('insert vs delete: shifts position back', () => { + const op = makeOp({ type: 'insert', position: 5 }); + const against = makeOp({ type: 'delete', position: 2, length: 2 }); + const result = service.transform(op, against); + expect(result.operation.position).toBe(3); // 5 - 2 + }); + + it('delete vs insert: shifts position forward', () => { + const op = makeOp({ type: 'delete', position: 5, length: 1 }); + const against = makeOp({ type: 'insert', position: 3, content: 'ab' }); + const result = service.transform(op, against); + expect(result.operation.position).toBe(7); + }); + + it('delete vs delete same position: zeroes length', () => { + const op = makeOp({ type: 'delete', position: 3, length: 2 }); + const against = makeOp({ type: 'delete', position: 3, length: 2 }); + const result = service.transform(op, against); + expect(result.operation.length).toBe(0); + }); + }); + + describe('resolveConflict', () => { + it('picks higher revision', () => { + const op1 = makeOp({ revision: 3 }); + const op2 = makeOp({ revision: 5 }); + expect(service.resolveConflict(op1, op2)).toEqual(op2); + }); + + it('tie-breaks by userId lexicographic order', () => { + const op1 = makeOp({ revision: 2, userId: 'alice' }); + const op2 = makeOp({ revision: 2, userId: 'bob' }); + expect(service.resolveConflict(op1, op2)).toEqual(op1); // 'alice' <= 'bob' + }); + }); +}); diff --git a/src/collaboration/ot-crdt.service.ts b/src/collaboration/ot-crdt.service.ts new file mode 100644 index 00000000..45e7532c --- /dev/null +++ b/src/collaboration/ot-crdt.service.ts @@ -0,0 +1,83 @@ +import { Injectable, Logger } from '@nestjs/common'; + +export type OperationType = 'insert' | 'delete' | 'retain'; + +export interface Operation { + type: OperationType; + position: number; + content?: string; + length?: number; + userId: string; + sessionId: string; + revision: number; +} + +export interface TransformResult { + operation: Operation; + transformed: boolean; +} + +@Injectable() +export class OtCrdtService { + private readonly logger = new Logger(OtCrdtService.name); + // revision counter per session + private readonly revisions = new Map(); + + /** + * Operational Transformation: transform op against a concurrent op + * so both can be applied in any order and converge to the same state. + */ + transform(op: Operation, against: Operation): TransformResult { + if (op.sessionId !== against.sessionId) { + return { operation: op, transformed: false }; + } + + const transformed = { ...op }; + + if (op.type === 'insert' && against.type === 'insert') { + if (against.position <= op.position) { + transformed.position += against.content?.length ?? 0; + } + } else if (op.type === 'insert' && against.type === 'delete') { + if (against.position < op.position) { + transformed.position = Math.max(against.position, op.position - (against.length ?? 0)); + } + } else if (op.type === 'delete' && against.type === 'insert') { + if (against.position <= op.position) { + transformed.position += against.content?.length ?? 0; + } + } else if (op.type === 'delete' && against.type === 'delete') { + if (against.position < op.position) { + transformed.position = Math.max(against.position, op.position - (against.length ?? 0)); + } else if (against.position === op.position) { + // same position delete — idempotent, skip + transformed.length = 0; + } + } + + return { operation: transformed, transformed: true }; + } + + /** + * Resolve conflict between two concurrent operations. + * Last-writer-wins by userId lexicographic order for determinism. + */ + resolveConflict(op1: Operation, op2: Operation): Operation { + this.logger.debug(`Resolving conflict: rev=${op1.revision} vs rev=${op2.revision}`); + if (op1.revision !== op2.revision) { + return op1.revision > op2.revision ? op1 : op2; + } + // same revision — deterministic tie-break + return op1.userId <= op2.userId ? op1 : op2; + } + + nextRevision(sessionId: string): number { + const rev = (this.revisions.get(sessionId) ?? 0) + 1; + this.revisions.set(sessionId, rev); + return rev; + } + + currentRevision(sessionId: string): number { + return this.revisions.get(sessionId) ?? 0; + } +} diff --git a/src/collaboration/presence.service.spec.ts b/src/collaboration/presence.service.spec.ts new file mode 100644 index 00000000..aaad8439 --- /dev/null +++ b/src/collaboration/presence.service.spec.ts @@ -0,0 +1,66 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { PresenceService } from './presence.service'; + +describe('PresenceService', () => { + let service: PresenceService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [PresenceService], + }).compile(); + service = module.get(PresenceService); + }); + + describe('join', () => { + it('returns presence info', () => { + const info = service.join('s1', 'u1'); + expect(info.userId).toBe('u1'); + expect(info.sessionId).toBe('s1'); + expect(info.joinedAt).toBeInstanceOf(Date); + }); + + it('marks user as present', () => { + service.join('s1', 'u1'); + expect(service.isPresent('s1', 'u1')).toBe(true); + }); + }); + + describe('leave', () => { + it('removes user from session', () => { + service.join('s1', 'u1'); + service.leave('s1', 'u1'); + expect(service.isPresent('s1', 'u1')).toBe(false); + }); + + it('cleans up empty session', () => { + service.join('s1', 'u1'); + service.leave('s1', 'u1'); + expect(service.getPresence('s1')).toHaveLength(0); + }); + }); + + describe('updateCursor', () => { + it('updates cursor position', () => { + service.join('s1', 'u1'); + service.updateCursor('s1', 'u1', 42); + const presence = service.getPresence('s1'); + expect(presence[0].cursorPosition).toBe(42); + }); + + it('is a no-op for unknown user', () => { + expect(() => service.updateCursor('s1', 'unknown', 5)).not.toThrow(); + }); + }); + + describe('getPresence', () => { + it('returns all users in session', () => { + service.join('s1', 'u1'); + service.join('s1', 'u2'); + expect(service.getPresence('s1')).toHaveLength(2); + }); + + it('returns empty array for unknown session', () => { + expect(service.getPresence('unknown')).toEqual([]); + }); + }); +}); diff --git a/src/collaboration/presence.service.ts b/src/collaboration/presence.service.ts new file mode 100644 index 00000000..306ad010 --- /dev/null +++ b/src/collaboration/presence.service.ts @@ -0,0 +1,48 @@ +import { Injectable } from '@nestjs/common'; + +export interface PresenceInfo { + userId: string; + sessionId: string; + joinedAt: Date; + lastSeenAt: Date; + cursorPosition?: number; +} + +@Injectable() +export class PresenceService { + // sessionId -> userId -> PresenceInfo + private readonly sessions = new Map>(); + + join(sessionId: string, userId: string): PresenceInfo { + if (!this.sessions.has(sessionId)) { + this.sessions.set(sessionId, new Map()); + } + const now = new Date(); + const info: PresenceInfo = { userId, sessionId, joinedAt: now, lastSeenAt: now }; + this.sessions.get(sessionId)!.set(userId, info); + return info; + } + + leave(sessionId: string, userId: string): void { + this.sessions.get(sessionId)?.delete(userId); + if (this.sessions.get(sessionId)?.size === 0) { + this.sessions.delete(sessionId); + } + } + + updateCursor(sessionId: string, userId: string, cursorPosition: number): void { + const info = this.sessions.get(sessionId)?.get(userId); + if (info) { + info.cursorPosition = cursorPosition; + info.lastSeenAt = new Date(); + } + } + + getPresence(sessionId: string): PresenceInfo[] { + return Array.from(this.sessions.get(sessionId)?.values() ?? []); + } + + isPresent(sessionId: string, userId: string): boolean { + return this.sessions.get(sessionId)?.has(userId) ?? false; + } +}