Việc xử lý những tác vụ nặng, tốn nhiều tài nguyên mạng cũng như tài nguyên CPU làm tăng thời gian chờ đối với REST API là điều không tốt. Việc 1 api phải hồi lâu có thể kéo theo những api sau đó trì trệ khiến cho trải nghiệm người dùng không được tốt. Trong bài viết này chúng ta sẽ cùng nhau tìm hiểu Queue in NestJS – hàng đợi để giải quyết vấn đề vừa nêu trên.

Queue rất hữu ích trong kiến trúc của backend. Với nó, ta có thể thực hiện xử lý phân tán và không đồng bộ. Một Publisher ( tương tự như RabbitMQ về ý tưởng ) có thể gửi message vào hàng đợi. Một consumer có thể xử dụng message và xử lý nó. Khi consumer xử lý message, thì những consumer khác sẽ không xử lý được message này.

Bài hôm nay ta sử dụng Queue để tối ưu việc gửi mail (chức năng đã xây dựng từ trước).

Theo dõi source code tại đâybranch dev nha

Link postman để test cho dễ.

Setting up Bull and Redis

Trong NestJS, chúng ta sử dụng thư viện @nestjs/bull. Nó cung cấp các chức năng hàng đợi dựa trên Redis. Và ưu điểm của redis là không bị mất dữ liệu khi khởi động lại server mình cũng đã chia sẽ ở những vài trước (xem tại đây). Cách cài đặt redis thì có thể lên trực tiếp trang chủ của nó xem nha, khá là đơn giản, chạy lệnh sương sương là được à.

Ta cài thư viện bull về:

npm install @nestjs/bull bull
npm install @types/bull --save-dev 

Khi cài đặt xong ta cấu hình cho Bull connect với redis trong AppModule:

...
import { BullModule } from '@nestjs/bull';

@Module({
  imports: [
    ...
    // BullModule.forRoot({
    //   redis: {
    //     host: 'localhost',
    //     port: 6379,
    //   },
    // }),
    BullModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: async (config: ConfigService) => ({
        redis: {
          host: config.get('REDIS_HOST'),
          port: config.get('REDIS_PORT'),
          // username: config.get('REDIS_USERNAME'),
          password: config.get('REDIS_PASSWORD'),
        },
      }),
      inject: [ConfigService],
    }),
  ],
  ...
})
export class AppModule {}

Phần cấu hình khai báo những variable trong file .env đã làm ở những bài trước. Mặc định redis chạy ở port 6379.

Sử dụng BullModule.forRootAsync giúp Bull có thể dử dụng trên tất cả module khác trong hệ thống.

Managing queue in NestJS with Bull

Ta đăng ký hàng đợi vào UserModule:

...
import { BullModule } from '@nestjs/bull';

@Module({
  imports: [
    ...
    BullModule.registerQueue({
      name: 'send-mail',
    }),
  ],
  ...
})
export class UserModule {}

Sửa lại xíu chỗ gửi mail chào mừng lúc đăng ký, thay vì gửi ngay lúc đó, thì ta sẽ nạp nó vào hàng đợi:

...
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';

@Injectable()
export class UserService {
  constructor(
    private readonly userRepository: UserRepository,
    @InjectQueue('send-mail')
    private sendMail: Queue,
  ) {}

  async create(userDto: CreateUserDto) {
    userDto.password = await bcrypt.hash(userDto.password, 10);

    // check exists
    const userInDb = await this.userRepository.findByCondition({
      email: userDto.email,
    });
    if (userInDb) {
      throw new HttpException('User already exists', HttpStatus.BAD_REQUEST);
    }
    await this.sendMail.add(
      'register',
      {
        to: userDto.email,
        name: userDto.name,
      },
      {
        removeOnComplete: true,
      },
    );
    // await this.mailerService.sendMail({
    //   to: userDto.email,
    //   subject: 'Welcome to my website',
    //   template: './welcome',
    //   context: {
    //     name: userDto.name,
    //   },
    // });

    return await this.userRepository.create(userDto);
  }
}

@InjectQueue('send-mail') send-mail là hàng đợi mà mình đã đăng ký bên trong module phía trên nha. Module nào đăng ký module đó, nếu module khác sử sụng send-mail thì đăng ký lại, ko vấn đề gì hết.

Bây giờ mình khai báo consumer để xử lý việc gửi email trong hàng đợi.

import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { MailerService } from '@nest-modules/mailer';

@Processor('send-mail')
export class EmailConsumer {
  constructor(private mailerService: MailerService) {}

  @Process('register')
  async registerEmail(job: Job<unknown>) {
    console.log(job.data);
    const time1 = new Date();
    await this.mailerService.sendMail({
      to: job.data['to'],
      subject: 'Welcome to my website',
      template: './welcome',
      context: {
        name: job.data['name'],
      },
    });
    const time2 = new Date();
    console.log('Send Success: ', time2.getTime() - time1.getTime(), 'ms');
  }
}

Đừng quên khai báo EmailConsumer vào UserModule nha. Nếu không nó sẽ không chạy đâu.

import { EmailConsumer } from './consumers/email.consumer';

@Module({
  ...
  providers: [
    ...
    EmailConsumer,
  ],
})
export class UserModule {}

Tới đây thì các bạn có thể test được rồi. Vì tác vụ gửi mail không cần phải thực hiện ngay lập tức. Nên khi vừa đăng ký xong chúng ta sẽ nhận được access token để đăng nhập thôi.

Kết luận

Trong bài viết này, chúng ta đã tìm hiểu kiến ​​thức cơ bản về quản lý Queue in NestJS và Bull. Để làm điều đó, chúng ta đã triển khai một ví dụ về việc gửi mail. Nhờ thực hiện điều đó thông qua hàng đợi, chúng ta có thể quản lý tài nguyên của mình tốt hơn. Chúng ta cũng có thể tránh thời gian chờ đối với các tác vụ sử dụng nhiều CPU hay thời gian và chạy chúng trong các quy trình riêng biệt.

Rất mong được sự ủng hộ của mọi người để mình có động lực ra những bài viết tiếp theo.
{\__/}
( ~.~ )
/ > ♥️ I LOVE YOU 3000

JUST DO IT!