Giới thiệu

Trong ứng dụng của chúng ta, đôi khi có những tác vụ tốn nhiều thời gian và không cần xử lý nó “ngay lập tức”, thì chúng ta sẽ đưa nó vào hàng đợi và xử lý tuần tự. Queue của Laravel được xây dựng để giải quyết các vấn đề như vậy, Queue giúp chúng ta phân phối các task, cân bằng và không gây trì trệ cho các task kế tiếp.

Queue của Laravel cung cấp cho chúng ta đa dạng các across api khác nhau như Beastalk, Amazon sqs, Redis,… Queue cho phép hoãn lại các task tiêu thụ nhiều thời gian. Ví dụ như gửi mail, giúp tăng tốc độ phản hồi cho ứng dụng web, tăng trải nghiệm người dùng.

Ở đây có thể có một số bạn sẽ giống như mình lúc mới tìm hiểu về Queues, người ta toàn chỉ ví dụ queue trong mail và chúng ta cứ tưởng chỉ có mail mới dùng queues. Thật sự không phải vậy, queue có thể làm được mọi thứ nha, thứ gì cũng được miễn sao chúng ta làm nó phù hợp.

Config

Trong file config/queue.php chứa các lựa chọn connection với queues. Các driver được định nghĩa mặc định như Amazon SQS, Beanstalk hay Redis.

'connections' => [

    'sync' => [
        'driver' => 'sync',
    ],

    'database' => [
        'driver' => 'database',
        'table' => 'jobs',
        'queue' => 'default',
        'retry_after' => 90,
        'after_commit' => false,
    ],

    'beanstalkd' => [
        'driver' => 'beanstalkd',
        'host' => 'localhost',
        'queue' => 'default',
        'retry_after' => 90,
        'block_for' => 0,
        'after_commit' => false,
    ],

    'sqs' => [
        'driver' => 'sqs',
        'key' => env('AWS_ACCESS_KEY_ID'),
        'secret' => env('AWS_SECRET_ACCESS_KEY'),
        'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
        'queue' => env('SQS_QUEUE', 'default'),
        'suffix' => env('SQS_SUFFIX'),
        'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
        'after_commit' => false,
    ],

    'redis' => [
        'driver' => 'redis',
        'connection' => 'default',
        'queue' => env('REDIS_QUEUE', 'default'),
        'retry_after' => 90,
        'block_for' => null,
        'after_commit' => false,
    ],

],

Sử dụng Queues với Database
Để sử dụng Queues với Database bạn cần tạo bảng jobs để lưu trữ thông tin. Laravel đã cung cấp sẵn lệnh command để tạo tự động bảng trong DB. Chạy command như sau:

php artisan queue:table

php artisan migrate

Sử dụng Queues với Redis
Cấu hình để sử dụng với Redis DB, bạn cần sửa trong QUEUE_CONNECTION=redis trong file .env. Nếu Redis của bạn sử dụng 1 Redis Cluster, tên của Queues phải có key hash tag, nó bắt buộc để đảm bảo rằng tất cả Redis key cho các cụm queue được đặt cùng vị trí của hash.

Và bạn cần cài thêm các package:

  • Amazon SQS: aws/aws-sdk-php
  • Beanstakld: pda/pheanstalk
  • Redis: predis/predis

Sử dụng driver nào cũng được nó không ảnh hưởng nhiều đến hiệu suất, quan trọng là bạn thấy cái nào dễ thì sử dụng thôi, ở đây mình dùng Database để mọi người làm có thể thấy nó hoạt động sao thôi.

Tạo Job

Generating Job Classes

Mặc định, các Job của Queue sau khi được tạo sẽ nằm trong thư mục app/Jobs, nó sẽ được tạo mặc định khi bạn chạy command make:job. Chạy lệnh bên dưới:

php artisan make:job NewJob

Các Job sẽ implement Illuminate\Contracts\Queue\ShouldQueue interface. Bạn theo dõi các bài viết về Event hay Email sẽ thấy nhiều về interface này, và đây là điểm mấu chốt của queue.

Cấu trúc

Mặc định Job sau khi được tạo sẽ có cấu trúc như sau:

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class NewJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        // Process...
    }
}

Mặc định job được tạo ra và nạp vào queue default. Nếu bạn muốn đưa job vào 1 hàng đợi khác. Giả dụ như bạn cần mua đồ dùng chung chung ở tiệm tạp hóa bạn xếp vào hàng 1, nhưng bạn chỉ muốn mua gạo trong đó thôi, thì bạn xếp qua hàng 2 để mua gạo. Việc tách các queue giúp giảm thời gian chờ cho cái job đó nếu trong queue có nhiều job.
Bạn khai báo như sau:


class NewJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __constructor () 
    {
        $this->queue = 'NewJob';
    }
    ...
}

Để có thể sử dụng Eloquent model trong job, SerializesModels được định nghĩa trong job, nó giúp bạn có thể sử dụng Eloquent model một cách động bộ hoặc không, khi job được xử lý.

Bạn có thể inject những gì cần thiết để phục vụ cho Job của mình.
Hàm handle() được gọi khi job được xử lý trong queue.

Dispatch job

Mặc định, job có thể gọi tới hàm static dispatch() ở bất kì đâu để đẩy job vào queue, các tham số được truyền vào hàm dispatch() được định nghĩa ở __construct().

NewJob::dispatch($new);

Nếu bạn muốn nạp job có điều kiện, bạn thử cách sau:

NewJob::dispatchIf($accountActive, $new);

NewJob::dispatchUnless($accountSuspended, $new);

Nếu muốn trì hoãn việc thực hiện job trong queue, bạn sử dụng delay():

NewJob::dispatch()->delay(now()->addMinutes(10));

Ngoài ra bạn có thể tùy chọn việc xử lý job trên Queue nào, và sử dụng DB nào bằng hàm onConnection() và onQueue();

NewJob::dispatch()->onQueue('processing')->onConnection('sqs');

Hoặc bạn có thể khai báo trong hàm __construct():

public function __construct()
{
    $this->onQueue('processing');
}

Ngoài ra có thể đưa job vào Queue bằng cách sau:

class UserController extends Controller
{
    /**
     * Send a reminder e-mail to a given user.
     *
     * @param  Request  $request
     * @param  int  $id
     * @return Response
     */
    public function sendReminderEmail(Request $request, $id)
    {
        $user = User::findOrFail($id);

        $this->dispatch(new NewJob($user));
    }
}

hoặc dùng hàm helper dispatch:

Route::get('/job', function () {
    dispatch(new App\Jobs\NewJob("data"));

    return 'Done!';
});

Tạo các Worker lắng nghe Job

Để lắng nghe các job trong Queue, bạn sử dụng command:

php artisan queue:work

Các options đính kèm:

  • --tries=3: Định nghĩa số lần xử lý job trước khi job bị fail.
  • --timeout=30: Định nghĩa thời gian tối đa job có thể chạy trong queue tính bằng giây.
  • --once: Định nghĩa worker chỉ lắng nghe và xử lý 1 job duy nhất.
  • Bạn có thể truyền cụ thể driver mà worker này sẽ lắng nghe như : redis, sqs, …
php artisan queue:work redis --tries=3
  • --queue=name: Chỉ rõ tên queue mà worker sẽ nghe.
  • --sleep=5: Bạn có thể thiết lập thời gian sleep trước khi thực hiện job mới:

Chú ý là queue chỉ “sleep” nếu không có job nào trong queue. Nếu còn nhiều job, thì queue sẽ thực hiện liên tục mà không sleep.

Hoặc bạn có thể khai báo trực tiếp các option này vào trong job:

class NewJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;


    /**
     * The number of times the job may be attempted.
     *
     * @var int
     */
    public $tries = 25;

    /**
     * The number of seconds the job can run before timing out.
     *
     * @var int
     */
    public $timeout = 120;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //
    }
}

Worker là một process tồn tại lâu dài, nên trong quá trình deploy, nó sẽ không tự nhận biết và thích nghi với sự thay đổi của source code, vì vậy bạn cần restart nó trong quá trình deploy, bằng lệnh:

php artisan queue:restart

Độ ưu tiên của queue

Bạn có thể truyền vào danh sách các queue ngăn cách bởi dấu , để thiết lập độ ưu tiên:

php artisan queue:work --queue=high,low

Các job ở high queue sẽ luôn được thực hiện trước các job low.

Xử lý các failed job

Failed job Events

Nếu bạn muốn đăng kí event lắng nghe khi một queued job thất bại, bạn có thể sử dụng hàm Queue::failing. Event này là một cách tốt để thông báo cho team của bạn qua. Chúng ta đăng ký nó trong hàm boot trong AppServiceProvider:

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     *
     * @return void
     */
    public function register()
    {
        //
    }

    /**
     * Bootstrap any application services.
     *
     * @return void
     */
    public function boot()
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }
}

Các hàm xử lý failed job

Để điều khiển tốt hơn, bạn có thể thêm hàm failed trực tiếp trong queue job class, cho phép bạn thực hiện một hành động cụ thể khi thất bại:

class NewJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //
    }

    /**
     * Handle a job failure.
     *
     * @return void
     */
    public function failed()
    {
        // Called when the job is failing...
    }
}

Thử lại Failed Jobs

Để xem tất cả các failed jobs đã insert vào trong bảng failed_jobs, bạn có thể sử dụng câu lệnh queue:failed:

php artisan queue:failed

Nó sẽ liệt kê các Job ID, connection, queue và thời gian thất bại. Job ID có thể dùng để thử chạy lại. Ví dụ để thử chạy lại một failed job có ID là 5, gõ lệnh như sau:

php artisan queue:retry 5

Hay thực thi tất cả failed job:

php artisan queue:retry all

Nếu bạn muốn xoá một failed job, sử dụng queue:forget:

php artisan queue:forget 5

Để xoá tất cả các failed jobs, bạn có thể sử dụng queue:flush:

php artisan queue:flush

So sánh queue:work và queue:listen

Trong quá trình các bạn tìm hiểu thông tin về queue, có lúc bạn thấy người ta dùng queue:work lúc thì dùng queue:listen, vậy nó khác nhau như thế nào?

  • queue:work: sẽ xử lý công việc đầu tiên trong hàng đợi. Nên được ưu tiên khi bạn muốn hàng đợi của bạn chạy dưới dạng daemon. Đây sẽ là một quá trình lâu dài sẽ có lợi khi hiệu suất là một vấn đề. Điều này sẽ sử dụng một phiên bản lưu trữ của ứng dụng và không khởi động lại ứng dụng mỗi khi một công việc được xử lý.
  • queue:listen: sẽ xử lý tất cả các công việc khi chúng đi qua. Nên được sử dụng khi bạn không quan tâm đến hiệu suất hoặc bạn không muốn phải khởi động lại hàng đợi sau khi thay đổi mã.

Kết luận
queue:work nên được sử dụng khá nhiều trong product vì nó hiệu quả hơn nhiều và sử dụng ít RAM hơn. Tuy nhiên; bạn cần khởi động lại nó sau mỗi thay đổi code của jobs.
queue:listen hữu ích cho quá trình phát triển và ở môi trường cục bộ vì bạn không phải khởi động lại nó sau khi thay đổi mã (vì chúng được khởi động lại sau mỗi jobs ).

Kết luận

Qua bày này mình giới thiệu tới các bạn các chứng năng cơ bản và phổ biến sử dụng nhiều trong các ứng dụng. Có thể chưa đáp ứng nhu cầu của các bạn, bạn có thể tham khảo thêm trên trang chủ của Laravel.

Đồng thời các bạn có thể tìm hiểu thêm về supervisor để giúp quản lí queue tốt hơn.

Nguồn tham khảo:

Rất mong được sự ủng hộ của mọi người để mình có động lực ra những bài viết tiếp theo.
{\__/}
( ~.~ )
/ > ♥️ I LOVE YOU 3000

JUST DO IT!