Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 68 additions & 43 deletions src/kafka/kafka.service.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Producer } from 'kafkajs';
import { Kafka, Producer, Admin } from 'kafkajs';

@Injectable()
export class KafkaService implements OnModuleInit, OnModuleDestroy {
private readonly kafka: Kafka;
private producer: Producer;
private admin: Admin;
private readonly logger = new Logger(KafkaService.name);
private isKafkaEnabled: boolean; // Flag to check if Kafka is enabled
private readonly kafkaTopic: string;

constructor(private configService: ConfigService) {
// Retrieve Kafka config from the configuration
this.isKafkaEnabled = this.configService.get<boolean>('kafkaEnabled', false); // Default to true if not specified
this.isKafkaEnabled = this.configService.get<boolean>('kafkaEnabled', true); // Default to true if not specified
const brokers = this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092').split(',');
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID', 'user-service');
this.kafkaTopic = this.configService.get<string>('KAFKA_TOPIC', 'user-topic');

// Initialize Kafka client if enabled
if (this.isKafkaEnabled) {
Expand All @@ -27,12 +30,15 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
});

this.producer = this.kafka.producer();
this.admin = this.kafka.admin();
}
}

async onModuleInit() {
if (this.isKafkaEnabled) {
try {
await this.connectAdmin();
await this.ensureTopicExists();
await this.connectProducer();
this.logger.log('Kafka producer initialized successfully');
} catch (error) {
Expand All @@ -46,6 +52,65 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
async onModuleDestroy() {
if (this.isKafkaEnabled) {
await this.disconnectProducer();
await this.disconnectAdmin();
}
}

private async connectAdmin() {
try {
await this.admin.connect();
this.logger.log('Kafka admin connected');
} catch (error) {
this.logger.error(`Failed to connect Kafka admin: ${error.message}`, error.stack);
throw error;
}
}

private async disconnectAdmin() {
try {
await this.admin.disconnect();
this.logger.log('Kafka admin disconnected');
} catch (error) {
this.logger.error(`Failed to disconnect Kafka admin: ${error.message}`, error.stack);
}
}

private async ensureTopicExists() {
try {
// Check if topic exists
const existingTopics = await this.admin.listTopics();

if (!existingTopics.includes(this.kafkaTopic)) {
this.logger.log(`Topic '${this.kafkaTopic}' does not exist. Creating...`);

// Create the topic
await this.admin.createTopics({
topics: [
{
topic: this.kafkaTopic,
numPartitions: this.configService.get<number>('KAFKA_TOPIC_PARTITIONS', 3),
replicationFactor: this.configService.get<number>('KAFKA_TOPIC_REPLICATION_FACTOR', 1),
configEntries: [
{
name: 'cleanup.policy',
value: 'compact'
},
{
name: 'retention.ms',
value: this.configService.get<string>('KAFKA_TOPIC_RETENTION_MS', '604800000') // 7 days default
}
]
}
],
});

this.logger.log(`Topic '${this.kafkaTopic}' created successfully`);
} else {
this.logger.log(`Topic '${this.kafkaTopic}' already exists`);
}
} catch (error) {
this.logger.error(`Failed to ensure topic exists: ${error.message}`, error.stack);
throw error;
}
}

Expand Down Expand Up @@ -114,7 +179,7 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
return; // Do nothing if Kafka is disabled
}

const topic = this.configService.get<string>('KAFKA_TOPIC', 'user-events');
const topic = this.kafkaTopic; // Use the configured topic
let fullEventType = '';
switch (eventType) {
case 'created':
Expand All @@ -140,44 +205,4 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
await this.publishMessage(topic, payload, userId);
this.logger.log(`User ${eventType} event published for user ${userId}`);
}

/**
* Publish a cohort-related event to Kafka
*
* @param eventType - The type of cohort event (created, updatetrued, deleted)
* @param cohortData - The cohort data to include in the event
* @param cohortId - The ID of the cohort (used as the message key)
*/
async publishCohortEvent(eventType: 'created' | 'updated' | 'deleted', cohortData: any, cohortId: string): Promise<void> {
if (!this.isKafkaEnabled) {
this.logger.warn('Kafka is disabled. Skipping cohort event publish.');
return; // Do nothing if Kafka is disabled
}

const topic = this.configService.get<string>('KAFKA_COHORT_TOPIC', 'cohort-events');
let fullEventType = '';
switch (eventType) {
case 'created':
fullEventType = 'COHORT_CREATED';
break;
case 'updated':
fullEventType = 'COHORT_UPDATED';
break;
case 'deleted':
fullEventType = 'COHORT_DELETED';
break;
default:
fullEventType = 'UNKNOWN_EVENT';
break;
}
const payload = {
eventType: fullEventType,
timestamp: new Date().toISOString(),
cohortId,
data: cohortData
};

await this.publishMessage(topic, payload, cohortId);
this.logger.log(`Cohort ${eventType} event published for cohort ${cohortId}`);
}
}