A Deep Dive into Implementing Kafka in NestJS for Event-Driven

Tamrakar Shreyaa
9 min readOct 19, 2024

--

A brief overview of Apache Kafka:

Apache Kafka is an open-source distributed event streaming platform used to handle real-time data feeds. It is designed to handle a high volume, high throughput, and low latency data streams, and can be used to process, store, and analyze data in real-time. Kafka can be used to build real-time streaming data pipelines and applications and is often used in combination with Apache Storm, Apache Hadoop, and Apache Spark.

What event-driven architecture is:

Event-Driven Architecture (EDA) involves communication between services through the exchange of events. Events represent significant occurrences or state changes within the system and are typically published to event streams or topics. Services react to events asynchronously, enabling loosely coupled and responsive architectures.

Communication between services in a decoupled manner is crucial. This is where Kafka, a distributed event streaming platform, comes into play.

Let’s get started!

Step 1: Create a NestJS backend

npx @nestjs/cli new kafka-nestjs

Step 2: Setup Docker yml file

Firstly, you need to have Docker installed and running.

create a file named docker-compose.yml and copy-paste the following:

version: '3.8'
services:
postgres:
image: postgres:latest
container_name: postgres_demo
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
ports:
- "${POSTGRES_PORT}:5432"
volumes:
- postgres_data:/var/lib/postgresql/data

zookeeper:
image: wurstmeister/zookeeper:latest
container_name: zookeeper
ports:
- "2181:2181"
networks:
- my-network

kafka:
image: wurstmeister/kafka:2.13-2.8.1
container_name: kafka
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "audit-log:1:1,api-log:1:1"
ports:
- "9092:9092"
depends_on:
- zookeeper
networks:
- my-network

nestjs-app:
build:
context: .
dockerfile: Dockerfile
ports:
- "3000:3000"
depends_on:
- postgres

volumes:
postgres_data:

networks:
my-network:
  • This docker-compose.yml defines four services (PostgreSQL, Zookeeper, Kafka, and a NestJS application) and sets up how they communicate and interact with each other.
  • Docker Compose will orchestrate the starting and linking of these services in a single command (docker-compose up), managing networking, environment variables, and container dependencies automatically.

create a file named Dockerfile and copy-paste the following:

# Use an official Node runtime as a parent image
FROM node:18

# Set the working directory
WORKDIR /usr/src/app

# Install dependencies and utilities
RUN apt-get update && \
apt-get install -y iputils-ping netcat-openbsd

# Copy package.json and package-lock.json to the working directory
COPY package*.json ./

# Install dependencies
RUN npm install

# Copy the rest of the application code to the working directory
COPY . .

# Build the NestJS application
RUN npm run build

# Expose the port the app runs on
EXPOSE 3000

# Define the command to run the app
CMD ["npm", "run", "start:prod"]

Step 3: Setup Prisma (ORM)

npm install -D prisma
npm install @prisma/client
npx prisma init

This will create two new files: schema.prisma and .env.

Update .env file :

# Environment variables declared in this file are automatically made available to Prisma.
# See the documentation for more detail: https://pris.ly/d/prisma-schema#accessing-environment-variables-from-the-schema

# Prisma supports the native connection string format for PostgreSQL, MySQL, SQLite, SQL Server, MongoDB and CockroachDB.
# See the documentation for all the connection string options: https://pris.ly/d/connection-strings

# PostgreSQL environment variables
POSTGRES_USER=postgres
POSTGRES_PASSWORD=######
POSTGRES_DB=kafka_nestjs
POSTGRES_PORT=5432


# NestJS environment variables
DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@localhost:${POSTGRES_PORT}/${POSTGRES_DB}

# mail
MAIL_HOST=smtp.gmail.com
MAIL_USER=#######
MAIL_PASSWORD=#######
MAIL_FROM=noreply@example.com

Update schema.prisma file :

generator client {
provider = "prisma-client-js"
}

datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}

model users {
id Int @id @default(autoincrement())
username String @db.VarChar(100)
email String @unique @db.VarChar(255)
password_hash String @db.VarChar(255)
is_active Boolean @default(true)
created_at DateTime? @default(now()) @db.Timestamp(6)
updated_at DateTime? @default(now()) @db.Timestamp(6)

}

Step 4: Generate and apply a migration

 npx prisma migrate dev "new"
npx prisma migrate deploy
npx prisma generate

Step 5: Connect to the database

npx @nestjs/cli g module prisma
npx @nestjs/cli g service prisma

Update prisma.module.ts file :

import { Module } from '@nestjs/common';
import { PrismaService } from './prisma.service';

@Module({
providers: [PrismaService],
exports: [PrismaService],
})
export class PrismaModule {}

Update prisma.service.ts file :

import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';

@Injectable()
export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy {

async onModuleInit() {
await this.$connect();
}

async onModuleDestroy() {
await this.$disconnect();
}
}

For this to work, you also need to install @nestjs/config:

npm install @nestjs/config
// src/app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { PrismaModule } from './prisma/prisma.module';
import { UsersModule } from './users/users.module';
import { MailModule } from './mail/mail.module';
import { ConfigModule } from '@nestjs/config';
import configuration from './configuration';
import { KafkaModule } from './kafka/kafka.module';
import { KafkaConsumerService } from './kafka/kafka-consumer.service';

@Module({
imports: [

ConfigModule.forRoot({
load: [configuration],
isGlobal: true,
}),
PrismaModule, UsersModule, MailModule, KafkaModule],
controllers: [AppController],
providers: [AppService,KafkaConsumerService],
exports: [KafkaConsumerService],
})
export class AppModule {}

Create src/configuration.ts

//src/configuration.ts
export default () => ({
database: {
url: process.env.DATABASE_URL,
}
});

Step 6: Install swagger and Kafka Dependencies

npm install --save @nestjs/swagger

npm install kafkajs @nestjs/microservices

update in main.ts file

// src/main.ts

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { PrismaClient } from '@prisma/client';
import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger';
import { ValidationPipe } from '@nestjs/common';

async function bootstrap() {
const app = await NestFactory.create(AppModule);


app.useGlobalPipes(new ValidationPipe({ whitelist: true }));

const config = new DocumentBuilder()
.setTitle('project')
.setDescription('The project API description')
.setVersion('0.1')
.addBearerAuth()
.build();
const document = SwaggerModule.createDocument(app, config);
SwaggerModule.setup('api', app, document);

await app.listen(3000);
}
bootstrap();

Step 7: Create a Controller

npx @nestjs/cli g resource users
npm install class-validator
npm install class-transformer
npm install bcrypt

add following code in create-user.dto.ts

import { ApiProperty } from "@nestjs/swagger"
import { IsBoolean, IsEmail, IsOptional, IsString, MinLength } from "class-validator";

export class CreateUserDto {

@ApiProperty({ example: 'john_doe', description: 'The username of the user' })
@IsString()
@MinLength(3)
username: string;

@ApiProperty({ example: 'john.doe@example.com', description: 'The email of the user' })
@IsEmail()
email: string;

@ApiProperty({ example: 'password123', description: 'The password of the user' })
@IsString()
@MinLength(6)
password_hash: string;

@IsBoolean()
@IsOptional()
is_active?: boolean;

}

add following code in users.controller.ts

import {
Controller,
Get,
Post,
Body,
Patch,
Param,
Delete,
ParseIntPipe
} from '@nestjs/common';
import { UsersService } from './users.service';
import { CreateUserDto } from './dto/create-user.dto';
import { UpdateUserDto } from './dto/update-user.dto';
import {
ApiTags
} from '@nestjs/swagger';

@Controller('users')
@ApiTags('users')

export class UsersController {
constructor(private readonly usersService: UsersService) { }

@Post()
async create(@Body() createUserDto: CreateUserDto) {
const users = await this.usersService.create(createUserDto);
return users;
}

@Get()
async findAll() {
const users = await this.usersService.findAll();
return users;
}

@Get(':id')
async findOne(@Param('id', ParseIntPipe) id: number) {
const users = await this.usersService.findOne(id);
return users;
}

@Patch(':id')
async update(
@Param('id', ParseIntPipe) id: number,
@Body() updateUserDto: UpdateUserDto,
) {
const users = await this.usersService.update(id, updateUserDto);
return users;
}

@Delete(':id')
async remove(@Param('id', ParseIntPipe) id: number) {
const deletedUser = await this.usersService.remove(id);
return { success: true, data: deletedUser };
}
}

add following code in users.module.ts

import { Module } from '@nestjs/common';
import { UsersService } from './users.service';
import { UsersController } from './users.controller';
import { PrismaModule } from 'src/prisma/prisma.module';
import { KafkaProducerService } from 'src/kafka/kafka-producer.service';
import { KafkaModule } from 'src/kafka/kafka.module';

@Module({
controllers: [UsersController],
providers: [UsersService],
imports: [PrismaModule,KafkaModule],
exports: [UsersService]


})
export class UsersModule {}

add following code in users.service.ts

import { Injectable } from '@nestjs/common';
import { PrismaService } from 'src/prisma/prisma.service';
import { CreateUserDto } from './dto/create-user.dto';
import { UpdateUserDto } from './dto/update-user.dto';
import * as bcrypt from 'bcrypt';
import { KafkaProducerService } from 'src/kafka/kafka-producer.service';

export const roundsOfHashing = 10;

@Injectable()
export class UsersService {
constructor(private prisma: PrismaService,
private readonly kafkaProducerService: KafkaProducerService) { }

async create(createUserDto: CreateUserDto) {
const hashedPassword = await bcrypt.hash(
createUserDto.password_hash,
roundsOfHashing,
);
// Emit Kafka event after registering the user
await this.kafkaProducerService.emitUserRegisteredEvent(createUserDto);
createUserDto.password_hash = hashedPassword;
return this.prisma.users.create({
data: createUserDto,
});
}


findAll() {
return this.prisma.users.findMany();
}

findOne(id: number) {
return this.prisma.users.findUnique({ where: { id } });
}

async update(id: number, updateUserDto: UpdateUserDto) {
if (updateUserDto.password_hash) {
updateUserDto.password_hash = await bcrypt.hash(
updateUserDto.password_hash,
roundsOfHashing,
);
}

return this.prisma.users.update({
where: { id },
data: updateUserDto,
});
}

async remove(id: number) {
return this.prisma.users.delete({
where: { id },
});
}
}

Step 8: Email setup

Add the @nestjs-modules/mailer and the peer dependency nodemailer

npm install --save @nestjs-modules/mailer nodemailer
npm install --save-dev @types/nodemailer

create a mail module and service via the Nest CLI and followed by creating a templates folder.

npx @nestjs/cli g module mail
npx @nestjs/cli g service mail

mkdir src/mail/templates

now update mail.module.ts file

import { MailerModule } from '@nestjs-modules/mailer';
import { HandlebarsAdapter } from '@nestjs-modules/mailer/dist/adapters/handlebars.adapter';
import { Module } from '@nestjs/common';
import { MailService } from './mail.service';
import { join } from 'path';
import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
imports: [
MailerModule.forRootAsync({
useFactory: async (config: ConfigService) => ({

transport: {
host: config.get('MAIL_HOST'),
secure: false,
auth: {
user: config.get('MAIL_USER'),
pass: config.get('MAIL_PASSWORD'),
},
},
defaults: {
from: `"No Reply" <${config.get('MAIL_FROM')}>`,
},
template: {
dir: join(__dirname, 'templates'),
adapter: new HandlebarsAdapter(),
options: {
strict: true,
},
},
}),
inject: [ConfigService],
}),
],
providers: [MailService],
exports: [MailService],
})
export class MailModule {}

Create your first email template registration.hbs in the src/mail/templates folder. Add the following simple template for a user confirmation.

//src/mail/templates/registration.hbs

<html>
<body>
<h1>Welcome, {{name}}!</h1>
<p>Thank you for registering with us.</p>

</body>
</html>

By default, Nest only distributes TypeScript compiled files (.js and .d.ts) during the build step. To distribute your .hbs files, open your nest-cli.json and add your templates directory


{
"$schema": "https://json.schemastore.org/nest-cli",
"collection": "@nestjs/schematics",
"sourceRoot": "src",
"compilerOptions": {
"deleteOutDir": true,
"assets": ["mail/templates/**/*"]
}
}

Add MailerService to your own MailService and implement your mailing logic here. Let's send a user confirmation email using the template registration.hbs. You need to provide {{ name }} under the context key


import { MailerService } from '@nestjs-modules/mailer';
import { Injectable } from '@nestjs/common';
import { users } from '@prisma/client';

@Injectable()
export class MailService {
constructor(private mailerService: MailerService) {}
async sendUserRegistration(user: users) {

await this.mailerService.sendMail({
to: user.email,
from: '"Support Team" <support@example.com>', // override default from
subject: 'Welcome to Nice App! Confirm your Email',
template: './registration', // `.hbs` extension is appended automatically
context: { // ✏️ filling curly brackets with content
name: user.username,

},
});
}
}

Step 8: Setup Kafka Producer (Sending Events) and Kafka Consumer (Listening to Events)


npx @nestjs/cli g module kafka
// src/kafka/kafka.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { PrismaModule } from 'src/prisma/prisma.module';
import { KafkaProducerService } from './kafka-producer.service';
import { KafkaConsumerService } from './kafka-consumer.service';
import { MailModule } from 'src/mail/mail.module';

@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'nestjs-kafka',
brokers: ['localhost:9092'],
retry: {
retries: 10, // Number of retries
initialRetryTime: 300, // Initial retry time in ms
maxRetryTime: 10000, // Maximum total retry time in ms
factor: 0.2, // Exponential factor for backoff
},
},
consumer: {
groupId: 'nestjs-group',
},
},
},
]),
PrismaModule,
MailModule
],
providers: [KafkaProducerService, KafkaConsumerService],
exports: [KafkaProducerService],
})
export class KafkaModule { }
//  create file src/kafka/kafka-producer.service.ts
import { Injectable } from '@nestjs/common';
import { Kafka } from 'kafkajs';

@Injectable()
export class KafkaProducerService {
private readonly kafka = new Kafka({
brokers: ['localhost:9092'],
});
private readonly producer = this.kafka.producer();

constructor() {
this.producer.connect().catch(console.error);
}
async emitUserRegisteredEvent(userData: any) {
await this.producer.send({
topic: 'user-registered', // Topic name
messages: [
{ key: 'user-registration', value: JSON.stringify(userData) },
],
});
}

async onModuleDestroy() {
await this.producer.disconnect();
}
}
// create file src/kafka/kafka-consumer.service.ts
import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { Kafka } from 'kafkajs';
import { MailService } from 'src/mail/mail.service';

@Injectable()
export class KafkaConsumerService implements OnModuleInit {
private readonly kafka = new Kafka({
clientId: 'nestjs-kafka',
brokers: ['localhost:9092'],
});

private readonly consumer = this.kafka.consumer({ groupId: 'nestjs-group' });
private readonly logger = new Logger(KafkaConsumerService.name);

constructor(
private readonly mailService: MailService,

) { }

async onModuleInit() {

await this.consumer.connect();
await this.consumer.subscribe({ topic: 'user-registered', fromBeginning: true });

await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const userData = JSON.parse(message.value.toString());
console.log(`Received user registration event: ${JSON.stringify(userData)}`);
await this.mailService.sendUserRegistration(userData);

},
});
}
async onModuleDestroy() {
await this.consumer.disconnect();
}
}
  • Kafka Producer: Create a Kafka producer service that sends messages when users register.
  • Kafka Consumer: Create a Kafka consumer service that listens for user registration messages and handles them accordingly.
  • Integration: Inject Kafka producer in your user service to publish events.

service-to-service communication via Kafka without creating full-blown microservices but keeping the decoupled architecture for scalability.

To run code:

docker build -t my-nestjs-app
docker compose up
npm run start:dev
npx prisma studio
Received user registration event: {"username":"test","email":"###","password_hash":"####"}

--

--

Tamrakar Shreyaa
Tamrakar Shreyaa

Written by Tamrakar Shreyaa

Laravel | PHP | Node| nestjs |API | AJAX | jQuery | Laravel vue | Livewire | LAMP stack | CI CD

No responses yet