A Deep Dive into Implementing Kafka in NestJS for Event-Driven
A brief overview of Apache Kafka:
Apache Kafka is an open-source distributed event streaming platform used to handle real-time data feeds. It is designed to handle a high volume, high throughput, and low latency data streams, and can be used to process, store, and analyze data in real-time. Kafka can be used to build real-time streaming data pipelines and applications and is often used in combination with Apache Storm, Apache Hadoop, and Apache Spark.
What event-driven architecture is:
Event-Driven Architecture (EDA) involves communication between services through the exchange of events. Events represent significant occurrences or state changes within the system and are typically published to event streams or topics. Services react to events asynchronously, enabling loosely coupled and responsive architectures.
Communication between services in a decoupled manner is crucial. This is where Kafka, a distributed event streaming platform, comes into play.
Let’s get started!
Step 1: Create a NestJS backend
npx @nestjs/cli new kafka-nestjs
Step 2: Setup Docker yml file
Firstly, you need to have Docker installed and running.
create a file named docker-compose.yml
and copy-paste the following:
version: '3.8'
services:
postgres:
image: postgres:latest
container_name: postgres_demo
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
ports:
- "${POSTGRES_PORT}:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
zookeeper:
image: wurstmeister/zookeeper:latest
container_name: zookeeper
ports:
- "2181:2181"
networks:
- my-network
kafka:
image: wurstmeister/kafka:2.13-2.8.1
container_name: kafka
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "audit-log:1:1,api-log:1:1"
ports:
- "9092:9092"
depends_on:
- zookeeper
networks:
- my-network
nestjs-app:
build:
context: .
dockerfile: Dockerfile
ports:
- "3000:3000"
depends_on:
- postgres
volumes:
postgres_data:
networks:
my-network:
- This
docker-compose.yml
defines four services (PostgreSQL, Zookeeper, Kafka, and a NestJS application) and sets up how they communicate and interact with each other. - Docker Compose will orchestrate the starting and linking of these services in a single command (
docker-compose up
), managing networking, environment variables, and container dependencies automatically.
create a file named Dockerfile
and copy-paste the following:
# Use an official Node runtime as a parent image
FROM node:18
# Set the working directory
WORKDIR /usr/src/app
# Install dependencies and utilities
RUN apt-get update && \
apt-get install -y iputils-ping netcat-openbsd
# Copy package.json and package-lock.json to the working directory
COPY package*.json ./
# Install dependencies
RUN npm install
# Copy the rest of the application code to the working directory
COPY . .
# Build the NestJS application
RUN npm run build
# Expose the port the app runs on
EXPOSE 3000
# Define the command to run the app
CMD ["npm", "run", "start:prod"]
Step 3: Setup Prisma (ORM)
npm install -D prisma
npm install @prisma/client
npx prisma init
This will create two new files: schema.prisma
and .env
.
Update .env
file :
# Environment variables declared in this file are automatically made available to Prisma.
# See the documentation for more detail: https://pris.ly/d/prisma-schema#accessing-environment-variables-from-the-schema
# Prisma supports the native connection string format for PostgreSQL, MySQL, SQLite, SQL Server, MongoDB and CockroachDB.
# See the documentation for all the connection string options: https://pris.ly/d/connection-strings
# PostgreSQL environment variables
POSTGRES_USER=postgres
POSTGRES_PASSWORD=######
POSTGRES_DB=kafka_nestjs
POSTGRES_PORT=5432
# NestJS environment variables
DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@localhost:${POSTGRES_PORT}/${POSTGRES_DB}
# mail
MAIL_HOST=smtp.gmail.com
MAIL_USER=#######
MAIL_PASSWORD=#######
MAIL_FROM=noreply@example.com
Update schema.prisma
file :
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
model users {
id Int @id @default(autoincrement())
username String @db.VarChar(100)
email String @unique @db.VarChar(255)
password_hash String @db.VarChar(255)
is_active Boolean @default(true)
created_at DateTime? @default(now()) @db.Timestamp(6)
updated_at DateTime? @default(now()) @db.Timestamp(6)
}
Step 4: Generate and apply a migration
npx prisma migrate dev "new"
npx prisma migrate deploy
npx prisma generate
Step 5: Connect to the database
npx @nestjs/cli g module prisma
npx @nestjs/cli g service prisma
Update prisma.module.ts
file :
import { Module } from '@nestjs/common';
import { PrismaService } from './prisma.service';
@Module({
providers: [PrismaService],
exports: [PrismaService],
})
export class PrismaModule {}
Update prisma.service.ts
file :
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';
@Injectable()
export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy {
async onModuleInit() {
await this.$connect();
}
async onModuleDestroy() {
await this.$disconnect();
}
}
For this to work, you also need to install @nestjs/config
:
npm install @nestjs/config
// src/app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { PrismaModule } from './prisma/prisma.module';
import { UsersModule } from './users/users.module';
import { MailModule } from './mail/mail.module';
import { ConfigModule } from '@nestjs/config';
import configuration from './configuration';
import { KafkaModule } from './kafka/kafka.module';
import { KafkaConsumerService } from './kafka/kafka-consumer.service';
@Module({
imports: [
ConfigModule.forRoot({
load: [configuration],
isGlobal: true,
}),
PrismaModule, UsersModule, MailModule, KafkaModule],
controllers: [AppController],
providers: [AppService,KafkaConsumerService],
exports: [KafkaConsumerService],
})
export class AppModule {}
Create src/configuration.ts
//src/configuration.ts
export default () => ({
database: {
url: process.env.DATABASE_URL,
}
});
Step 6: Install swagger and Kafka Dependencies
npm install --save @nestjs/swagger
npm install kafkajs @nestjs/microservices
update in main.ts file
// src/main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { PrismaClient } from '@prisma/client';
import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger';
import { ValidationPipe } from '@nestjs/common';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.useGlobalPipes(new ValidationPipe({ whitelist: true }));
const config = new DocumentBuilder()
.setTitle('project')
.setDescription('The project API description')
.setVersion('0.1')
.addBearerAuth()
.build();
const document = SwaggerModule.createDocument(app, config);
SwaggerModule.setup('api', app, document);
await app.listen(3000);
}
bootstrap();
Step 7: Create a Controller
npx @nestjs/cli g resource users
npm install class-validator
npm install class-transformer
npm install bcrypt
add following code in create-user.dto.ts
import { ApiProperty } from "@nestjs/swagger"
import { IsBoolean, IsEmail, IsOptional, IsString, MinLength } from "class-validator";
export class CreateUserDto {
@ApiProperty({ example: 'john_doe', description: 'The username of the user' })
@IsString()
@MinLength(3)
username: string;
@ApiProperty({ example: 'john.doe@example.com', description: 'The email of the user' })
@IsEmail()
email: string;
@ApiProperty({ example: 'password123', description: 'The password of the user' })
@IsString()
@MinLength(6)
password_hash: string;
@IsBoolean()
@IsOptional()
is_active?: boolean;
}
add following code in users.controller.ts
import {
Controller,
Get,
Post,
Body,
Patch,
Param,
Delete,
ParseIntPipe
} from '@nestjs/common';
import { UsersService } from './users.service';
import { CreateUserDto } from './dto/create-user.dto';
import { UpdateUserDto } from './dto/update-user.dto';
import {
ApiTags
} from '@nestjs/swagger';
@Controller('users')
@ApiTags('users')
export class UsersController {
constructor(private readonly usersService: UsersService) { }
@Post()
async create(@Body() createUserDto: CreateUserDto) {
const users = await this.usersService.create(createUserDto);
return users;
}
@Get()
async findAll() {
const users = await this.usersService.findAll();
return users;
}
@Get(':id')
async findOne(@Param('id', ParseIntPipe) id: number) {
const users = await this.usersService.findOne(id);
return users;
}
@Patch(':id')
async update(
@Param('id', ParseIntPipe) id: number,
@Body() updateUserDto: UpdateUserDto,
) {
const users = await this.usersService.update(id, updateUserDto);
return users;
}
@Delete(':id')
async remove(@Param('id', ParseIntPipe) id: number) {
const deletedUser = await this.usersService.remove(id);
return { success: true, data: deletedUser };
}
}
add following code in users.module.ts
import { Module } from '@nestjs/common';
import { UsersService } from './users.service';
import { UsersController } from './users.controller';
import { PrismaModule } from 'src/prisma/prisma.module';
import { KafkaProducerService } from 'src/kafka/kafka-producer.service';
import { KafkaModule } from 'src/kafka/kafka.module';
@Module({
controllers: [UsersController],
providers: [UsersService],
imports: [PrismaModule,KafkaModule],
exports: [UsersService]
})
export class UsersModule {}
add following code in users.service.ts
import { Injectable } from '@nestjs/common';
import { PrismaService } from 'src/prisma/prisma.service';
import { CreateUserDto } from './dto/create-user.dto';
import { UpdateUserDto } from './dto/update-user.dto';
import * as bcrypt from 'bcrypt';
import { KafkaProducerService } from 'src/kafka/kafka-producer.service';
export const roundsOfHashing = 10;
@Injectable()
export class UsersService {
constructor(private prisma: PrismaService,
private readonly kafkaProducerService: KafkaProducerService) { }
async create(createUserDto: CreateUserDto) {
const hashedPassword = await bcrypt.hash(
createUserDto.password_hash,
roundsOfHashing,
);
// Emit Kafka event after registering the user
await this.kafkaProducerService.emitUserRegisteredEvent(createUserDto);
createUserDto.password_hash = hashedPassword;
return this.prisma.users.create({
data: createUserDto,
});
}
findAll() {
return this.prisma.users.findMany();
}
findOne(id: number) {
return this.prisma.users.findUnique({ where: { id } });
}
async update(id: number, updateUserDto: UpdateUserDto) {
if (updateUserDto.password_hash) {
updateUserDto.password_hash = await bcrypt.hash(
updateUserDto.password_hash,
roundsOfHashing,
);
}
return this.prisma.users.update({
where: { id },
data: updateUserDto,
});
}
async remove(id: number) {
return this.prisma.users.delete({
where: { id },
});
}
}
Step 8: Email setup
Add the @nestjs-modules/mailer
and the peer dependency nodemailer
npm install --save @nestjs-modules/mailer nodemailer
npm install --save-dev @types/nodemailer
create a mail
module and service via the Nest CLI and followed by creating a templates
folder.
npx @nestjs/cli g module mail
npx @nestjs/cli g service mail
mkdir src/mail/templates
now update mail.module.ts file
import { MailerModule } from '@nestjs-modules/mailer';
import { HandlebarsAdapter } from '@nestjs-modules/mailer/dist/adapters/handlebars.adapter';
import { Module } from '@nestjs/common';
import { MailService } from './mail.service';
import { join } from 'path';
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
MailerModule.forRootAsync({
useFactory: async (config: ConfigService) => ({
transport: {
host: config.get('MAIL_HOST'),
secure: false,
auth: {
user: config.get('MAIL_USER'),
pass: config.get('MAIL_PASSWORD'),
},
},
defaults: {
from: `"No Reply" <${config.get('MAIL_FROM')}>`,
},
template: {
dir: join(__dirname, 'templates'),
adapter: new HandlebarsAdapter(),
options: {
strict: true,
},
},
}),
inject: [ConfigService],
}),
],
providers: [MailService],
exports: [MailService],
})
export class MailModule {}
Create your first email template registration.hbs
in the src/mail/templates
folder. Add the following simple template for a user confirmation.
//src/mail/templates/registration.hbs
<html>
<body>
<h1>Welcome, {{name}}!</h1>
<p>Thank you for registering with us.</p>
</body>
</html>
By default, Nest only distributes TypeScript compiled files (.js
and .d.ts
) during the build step. To distribute your .hbs
files, open your nest-cli.json
and add your templates
directory
{
"$schema": "https://json.schemastore.org/nest-cli",
"collection": "@nestjs/schematics",
"sourceRoot": "src",
"compilerOptions": {
"deleteOutDir": true,
"assets": ["mail/templates/**/*"]
}
}
Add MailerService
to your own MailService
and implement your mailing logic here. Let's send a user confirmation email using the template registration.hbs
. You need to provide {{ name }}
under the context
key
import { MailerService } from '@nestjs-modules/mailer';
import { Injectable } from '@nestjs/common';
import { users } from '@prisma/client';
@Injectable()
export class MailService {
constructor(private mailerService: MailerService) {}
async sendUserRegistration(user: users) {
await this.mailerService.sendMail({
to: user.email,
from: '"Support Team" <support@example.com>', // override default from
subject: 'Welcome to Nice App! Confirm your Email',
template: './registration', // `.hbs` extension is appended automatically
context: { // ✏️ filling curly brackets with content
name: user.username,
},
});
}
}
Step 8: Setup Kafka Producer (Sending Events) and Kafka Consumer (Listening to Events)
npx @nestjs/cli g module kafka
// src/kafka/kafka.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { PrismaModule } from 'src/prisma/prisma.module';
import { KafkaProducerService } from './kafka-producer.service';
import { KafkaConsumerService } from './kafka-consumer.service';
import { MailModule } from 'src/mail/mail.module';
@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'nestjs-kafka',
brokers: ['localhost:9092'],
retry: {
retries: 10, // Number of retries
initialRetryTime: 300, // Initial retry time in ms
maxRetryTime: 10000, // Maximum total retry time in ms
factor: 0.2, // Exponential factor for backoff
},
},
consumer: {
groupId: 'nestjs-group',
},
},
},
]),
PrismaModule,
MailModule
],
providers: [KafkaProducerService, KafkaConsumerService],
exports: [KafkaProducerService],
})
export class KafkaModule { }
// create file src/kafka/kafka-producer.service.ts
import { Injectable } from '@nestjs/common';
import { Kafka } from 'kafkajs';
@Injectable()
export class KafkaProducerService {
private readonly kafka = new Kafka({
brokers: ['localhost:9092'],
});
private readonly producer = this.kafka.producer();
constructor() {
this.producer.connect().catch(console.error);
}
async emitUserRegisteredEvent(userData: any) {
await this.producer.send({
topic: 'user-registered', // Topic name
messages: [
{ key: 'user-registration', value: JSON.stringify(userData) },
],
});
}
async onModuleDestroy() {
await this.producer.disconnect();
}
}
// create file src/kafka/kafka-consumer.service.ts
import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { Kafka } from 'kafkajs';
import { MailService } from 'src/mail/mail.service';
@Injectable()
export class KafkaConsumerService implements OnModuleInit {
private readonly kafka = new Kafka({
clientId: 'nestjs-kafka',
brokers: ['localhost:9092'],
});
private readonly consumer = this.kafka.consumer({ groupId: 'nestjs-group' });
private readonly logger = new Logger(KafkaConsumerService.name);
constructor(
private readonly mailService: MailService,
) { }
async onModuleInit() {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'user-registered', fromBeginning: true });
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const userData = JSON.parse(message.value.toString());
console.log(`Received user registration event: ${JSON.stringify(userData)}`);
await this.mailService.sendUserRegistration(userData);
},
});
}
async onModuleDestroy() {
await this.consumer.disconnect();
}
}
- Kafka Producer: Create a Kafka producer service that sends messages when users register.
- Kafka Consumer: Create a Kafka consumer service that listens for user registration messages and handles them accordingly.
- Integration: Inject Kafka producer in your user service to publish events.
service-to-service communication via Kafka without creating full-blown microservices but keeping the decoupled architecture for scalability.
To run code:
docker build -t my-nestjs-app
docker compose up
npm run start:dev
npx prisma studio
Received user registration event: {"username":"test","email":"###","password_hash":"####"}