Building Microservices using NestJs and KafkaJs

Introduction

NestJS is a framework for creating NodeJS server-side applications. It employs progressive JavaScript, is designed with TypeScript support.

Because NestJS uses the Express framework, any MVC (Model-View-Controller) pattern implementation techniques that are applicable to Express are also applicable to NestJS.

Getting Started

The first things you'll need are:

  • Node installed

  • Your favourite Code Editor/IDE

Once you have those setup, let us get started with nest. Install the Nest js CLI:

npm install -g @nestjs/cli

Now we can use this to bootstrap our api-gateway and microservices, using the following command:

nest new api-gateway
nest new auth-service
nest new todo-service

The command should start scaffolding your app, select your preferred package manager and let the CLI setup and install dependencies.

API Gateway

An API administration tool known as an API gateway lies in between a client and a group of backend services. An API gateway serves as a reverse proxy to receive all application programming interface (API) requests, collect all necessary services, and provide the desired outcome.

Navigate into the api-gateway folder and install the following packages

npm i @nestjs/microservices kafkajs

Now after the installation is successfully, let create a user resource and todo resource which we will use it as entry point for user api call. Run the command below to generate

nest g resource user

Remember to select** REST API**

You will noticed that alot of files and folder were generated for you, open your create-user.dto.ts file and add the following code

export class CreateUserDto {
  email: string;
  password: string;
}

Now let open our user.module.ts file to setup our kafka broker

import { Module } from '@nestjs/common';
import { UserService } from './user.service';
import { UserController } from './user.controller';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'AUTH_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'auth',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'auth-consumer',
          },
        },
      },
    ]),
  ],
  controllers: [UserController],
  providers: [UserService],
})
export class UserModule {}

todo.module .ts file

import { Module } from '@nestjs/common';
import { TodoService } from './todo.service';
import { TodoController } from './todo.controller';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'TODO_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'todo',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'todo-consumer',
          },
        },
      },
    ]),
  ],
  controllers: [TodoController],
  providers: [TodoService],
})
export class TodoModule {}

Your user.controller.ts file will look this

import {
  Controller,
  Get,
  Post,
  Body,
  Patch,
  Param,
  Delete,
} from '@nestjs/common';
import { UserService } from './user.service';
import { CreateUserDto } from './dto/create-user.dto';
import { UpdateUserDto } from './dto/update-user.dto';

@Controller('user')
export class UserController {
  constructor(private readonly userService: UserService) {}

  @Post('/')
  async create(@Body() createUserDto: CreateUserDto) {
    return await this.userService.create(createUserDto);
  }

  @Get()
  findAll() {
    return this.userService.findAll();
  }

  @Get(':id')
  findOne(@Param('id') id: string) {
    return this.userService.findOne(+id);
  }

  @Patch(':id')
  update(@Param('id') id: string, @Body() updateUserDto: UpdateUserDto) {
    return this.userService.update(+id, updateUserDto);
  }

  @Delete(':id')
  remove(@Param('id') id: string) {
    return this.userService.remove(+id);
  }
}

user.service.ts file

import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
import { CreateUserDto } from './dto/create-user.dto';
import { UpdateUserDto } from './dto/update-user.dto';
import { ClientKafka } from '@nestjs/microservices';

@Injectable()
export class UserService implements OnModuleInit {
  constructor(
    @Inject('AUTH_SERVICE')
    private readonly clientKafka: ClientKafka,
  ) {}

  async create(createUserDto: CreateUserDto) {
    return this.clientKafka.send('createUser', createUserDto);
  }

  findAll() {
    return `This action returns all user`;
  }

  findOne(id: number) {
    return `This action returns a #${id} user`;
  }

  update(id: number, updateUserDto: UpdateUserDto) {
    return `This action updates a #${id} user`;
  }

  remove(id: number) {
    return `This action removes a #${id} user`;
  }

  async onModuleInit() {
    this.clientKafka.subscribeToResponseOf('createUser');
    await this.clientKafka.connect();
  }
}

Now let generate the todo resource to handle todo api call

nest g resource todo

open your create-todo.ts file and add this code

export class CreateTodoDto {
  title: string;
}

Now let open our todo.module.ts file to setup our kafka broker

import { Module } from '@nestjs/common';
import { TodoService } from './todo.service';
import { TodoController } from './todo.controller';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'TODO_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'todo',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'todo-consumer',
          },
        },
      },
    ]),
  ],
  controllers: [TodoController],
  providers: [TodoService],
})
export class TodoModule {}

todo.service.ts file

import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
import { CreateTodoDto } from './dto/create-todo.dto';
import { UpdateTodoDto } from './dto/update-todo.dto';
import { ClientKafka } from '@nestjs/microservices';

@Injectable()
export class TodoService implements OnModuleInit {
  constructor(
    @Inject('TODO_SERVICE')
    private readonly clientKafka: ClientKafka,
  ) {}

  create(createTodoDto: CreateTodoDto) {
    return this.clientKafka.send('createTodo', createTodoDto);
  }

  findAll() {
    return this.clientKafka.send('findAllTodo', '');
  }

  async findOne(id: number) {
    return this.clientKafka.send('findOneTodo', { id: id });
  }

  update(id: number, updateTodoDto: UpdateTodoDto) {
    return `This action updates a #${id} todo`;
  }

  remove(id: number) {
    return `This action removes a #${id} todo`;
  }

  async onModuleInit() {
    this.clientKafka.subscribeToResponseOf('createTodo');
    this.clientKafka.subscribeToResponseOf('findAllTodo');
    this.clientKafka.subscribeToResponseOf('findOneTodo');
    await this.clientKafka.connect();
  }
}

todo.controller.ts

import {
  Controller,
  Get,
  Post,
  Body,
  Patch,
  Param,
  Delete,
} from '@nestjs/common';
import { TodoService } from './todo.service';
import { CreateTodoDto } from './dto/create-todo.dto';
import { UpdateTodoDto } from './dto/update-todo.dto';

@Controller('todo')
export class TodoController {
  constructor(private readonly todoService: TodoService) {}

  @Post('/')
  create(@Body() createTodoDto: CreateTodoDto) {
    return this.todoService.create(createTodoDto);
  }

  @Get('/')
  findAll() {
    return this.todoService.findAll();
  }

  @Get('/:id')
  async findOne(@Param('id') id: number) {
    return await this.todoService.findOne(id);
  }

  @Patch('/:id')
  update(@Param('id') id: string, @Body() updateTodoDto: UpdateTodoDto) {
    return this.todoService.update(+id, updateTodoDto);
  }

  @Delete(':id')
  remove(@Param('id') id: string) {
    return this.todoService.remove(+id);
  }
}

That is all for our api gateway, start the server with this command

npm run start:dev

Todo-service

Navigate into the api-gateway folder and install the following packages

npm i @nestjs/microservices kafkajs

let generate a resource for our todo microservice

nest g resource todo

remember to select Microservice (non HTTP)

Now open main.ts file to connect our microservice to our api-gateway

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
        consumer: {
          groupId: 'todo-consumer',
        },
      },
    },
  );
  await app.listen();
}
bootstrap();

open create-todo.dto.ts file and add this code

app.module .ts file

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { TodoModule } from './todo/todo.module';
import { TypeOrmModule } from '@nestjs/typeorm';

@Module({
  imports: [
    TypeOrmModule.forRoot({
      type: 'mariadb',
      database: 'todo-microservice',
      password: '',
      host: 'localhost',
      port: 3306,
      synchronize: true,
      autoLoadEntities: true,
      username: 'root',
    }),
    TodoModule,
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
export class CreateTodoDto {
  title: string
}

Open todo.entity.ts and add

import { Column, Entity, PrimaryGeneratedColumn } from 'typeorm';

@Entity('todos')
export class Todo {
  @PrimaryGeneratedColumn()
  id: number;

  @Column()
  title: string;
}

todo.controller.ts file

import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { TodoService } from './todo.service';
import { CreateTodoDto } from './dto/create-todo.dto';
import { UpdateTodoDto } from './dto/update-todo.dto';

@Controller()
export class TodoController {
  constructor(private readonly todoService: TodoService) {}

  @MessagePattern('createTodo')
  async create(@Payload() createTodoDto: CreateTodoDto) {
    return await this.todoService.create(createTodoDto);
  }

  @MessagePattern('findAllTodo')
  async findAll() {
    return await this.todoService.findAll();
  }

  @MessagePattern('findOneTodo')
  async findOne(@Payload() id: number) {
    return await this.todoService.findOne(id);
  }

  @MessagePattern('updateTodo')
  update(@Payload() updateTodoDto: UpdateTodoDto) {
    return this.todoService.update(updateTodoDto.id, updateTodoDto);
  }

  @MessagePattern('removeTodo')
  remove(@Payload() id: number) {
    return this.todoService.remove(id);
  }
}

tod.service.ts file

import { Injectable } from '@nestjs/common';
import { CreateTodoDto } from './dto/create-todo.dto';
import { UpdateTodoDto } from './dto/update-todo.dto';
import { Repository } from 'typeorm';
import { Todo } from './entities/todo.entity';
import { InjectRepository } from '@nestjs/typeorm';
import { RpcException } from '@nestjs/microservices';

@Injectable()
export class TodoService {
  constructor(
    @InjectRepository(Todo)
    private todoRepository: Repository<Todo>,
  ) {}

  async create(createTodoDto: CreateTodoDto) {
    const todo = new Todo();
    todo.title = createTodoDto.title;

    const saved = await this.todoRepository.save(todo);
    return {
      message: 'added success',
      data: {
        id: saved.id,
        title: saved.title,
      },
    };
  }

  async findAll() {
    return await this.todoRepository.find();
  }

  async findOne(id: any) {
    const todo = await this.todoRepository.findOne({ where: { id: id.id } });
    if (!todo) {
      return new RpcException('Not found');
    }
    return {
      message: 'Fetched',
      todo,
    };
  }

  update(id: number, updateTodoDto: UpdateTodoDto) {
    return `This action updates a #${id} todo`;
  }

  remove(id: number) {
    return `This action removes a #${id} todo`;
  }
}

todo.module.ts file

import { Module } from '@nestjs/common';
import { TodoService } from './todo.service';
import { TodoController } from './todo.controller';
import { TypeOrmModule } from '@nestjs/typeorm';
import { Todo } from './entities/todo.entity';

@Module({
  imports: [TypeOrmModule.forFeature([Todo])],
  controllers: [TodoController],
  providers: [TodoService],
})
export class TodoModule {}

That is all for our todo-service, start the server with this command

npm run start:dev

Auth-service

Navigate into the auth-service folder and install the following packages

npm i @nestjs/microservices kafkajs

let generate a resource for our user microservice

nest g resource user

remember to select Microservice (non HTTP)

Now open main.ts file to connect our microservice to our api-gateway

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
        consumer: {
          groupId: 'auth-consumer',
        },
      },
    },
  );
  await app.listen();
}
bootstrap();

app.module.ts file to connect to database (i am using mariadb but you can use any db)

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { UserModule } from './user/user.module';
import { TypeOrmModule } from '@nestjs/typeorm';

@Module({
  imports: [
    TypeOrmModule.forRoot({
      type: 'mariadb',
      database: 'auth-microservice',
      password: '',
      host: 'localhost',
      port: 3306,
      synchronize: true,
      autoLoadEntities: true,
      username: 'root',
    }),
    UserModule,
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

create-user.dto.ts file

export class CreateUserDto {
  email: string;
  password: string
}

user.entity.ts file

import { Column, Entity, PrimaryGeneratedColumn } from 'typeorm';

@Entity('users')
export class User {
  @PrimaryGeneratedColumn()
  id: number;

  @Column()
  email: string;

  @Column()
  password: string;
}

user.controller.ts file

import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { UserService } from './user.service';
import { CreateUserDto } from './dto/create-user.dto';
import { UpdateUserDto } from './dto/update-user.dto';

@Controller()
export class UserController {
  constructor(private readonly userService: UserService) {}

  @MessagePattern('createUser')
  async create(@Payload() createUserDto: CreateUserDto) {
    return await this.userService.create(createUserDto);
  }

  @MessagePattern('findAllUser')
  findAll() {
    return this.userService.findAll();
  }

  @MessagePattern('findOneUser')
  findOne(@Payload() id: number) {
    return this.userService.findOne(id);
  }

  @MessagePattern('updateUser')
  update(@Payload() updateUserDto: UpdateUserDto) {
    return this.userService.update(updateUserDto.id, updateUserDto);
  }

  @MessagePattern('removeUser')
  remove(@Payload() id: number) {
    return this.userService.remove(id);
  }
}

user.service.ts file

import { Injectable, InternalServerErrorException } from '@nestjs/common';
import { CreateUserDto } from './dto/create-user.dto';
import { UpdateUserDto } from './dto/update-user.dto';
import { Repository } from 'typeorm';
import { User } from './entities/user.entity';
import { InjectRepository } from '@nestjs/typeorm';

@Injectable()
export class UserService {
  constructor(
    @InjectRepository(User)
    private readonly userRepository: Repository<User>,
  ) {}

  async create(createUserDto: CreateUserDto): Promise<any> {
    try {
      const user = new User();
      user.email = createUserDto.email;
      user.password = createUserDto.password;
      const saved = await this.userRepository.save(user);
      return {
        message: 'added success',
        data: {
          id: saved.id,
          email: saved.email,
        },
      };
    } catch (e) {
      throw new InternalServerErrorException();
    }
  }

  findAll() {
    return `This action returns all user`;
  }

  findOne(id: number) {
    return `This action returns a #${id} user`;
  }

  update(id: number, updateUserDto: UpdateUserDto) {
    return `This action updates a #${id} user`;
  }

  remove(id: number) {
    return `This action removes a #${id} user`;
  }
}

That is all for our auth-service, start the server with this command

npm run start:dev

After building our microservices, we still need a kafka server on our system, clone this project on github

git clone https://github.com/obsidiandynamics/kafdrop

follow the instruction on the README file and run it on docker

Github Links

Thank you for reading