본문 바로가기

PHP

[Laravel]queue를 이용한 비동기 분산 처리

개요

실제 어플리케이션에서 이벤트를 도입하려면 프레임워크에서 제공하는 무언가의 처리를 트리거로 만들어 특정한 처리를 실행하고 싶을때 이용하는 것이 좋습니다.

대규모 애플리케이션에서 복잡해지기 쉬운 비즈니스 로직의 처리를 분산하기 위해서는 애플리케이션에서 제공하는 기능을 세세하게 분할하고, 처리 로직에 발생하는 사용자의 해동을 분석해 이를 트리거로 정의하는 것도 가능합니다.

Queue

메일 송신, 엑셀이나 PDF를 보고서 출력 등 큰 데이터를 다루는 처리 로직이 있을 때,

사용자의 요청을 받아 처리 완료 시점까지 사용자를 기다리게 하는 것은 사용자의 이탈에 직접적인 영항를 주므로 웹 애플리케이션 개발에서는 금기 사항입니다.

시간이 걸리거나 부하가 높은 처리를 해결 하기 위해서는 라라벨에서 제공하는 큐Queue를 사용해  요청/응답 처리흐름과 다른 태스크로 실행 하는 기능(비동기 처리)입니다.

 

큐로 실행할 처리는 즉시 실행 또는 지연 실행 여부를 애플리케이션 요구사항에 맞춰 대응할 수 있지만, 그 처리 결과를 응답으로 이용할 수는 없습니다. (로컬 환경에선 QUEUE_CONNECTION=sync 로 설정하면 응답으로 이용할 수 는 있습니다.)

테스크 처리 플로우

큐는 명령어(Command)패턴을 구현한 것입니다. 명령어 패턴에서는 잡(Job)을 이용해 일반적인 런타임과 다른 처리를 제공합니다.

큐를 이용한 처리는 주로 잡이나 태스크라고 불리며, FIFO로 실행됩니다. 저장된 잡은 실행 전에 삭제할 수 없으며, 삭제 하고 싶을때는 데이터베이스 등에서 직접 삭제 해야합니다.

구현

1. Queue 드라이버 설정

비동기 실행 드라이버는 데이터베이스, Beanstalkd, SQS, Redis로 사용 할 수 있습니다.

1-1. RDBMS

라라벨에서 표준으로 지원하는 RDMS를 이용할 때 입니다.

여기선 RDMS로 사용하지 않고 Redis를 사용 합니다.

$ php artisan queue:table
$ php artisan migrate

 

queue table

1-2. Redis

레디스는 인메모리 타입 KVS(Key-Value Store)로 고속 성능과 다양한 데이터 구조를 다루는 것이 특징입니다.

마스터/슬레이브 구성으로 비동기 복제(리플리케이션)에 대응하며 손쉽게 환경을 구성할 수 있기 때문에 대규모 상용 환경에서도 활용 됩니다.

2. predis 설치

$ composer require predis/predis

3. .env 파일 수정

QUEUE_CONNECTION=redis

4. 라우터 등록 

<?php
Route::get(
	'/queue',
	\App\Http\Controllers\QueueAction.class
);

5. Action(Controller) 구현

<?php

declare(strict_types=1);

namespace App\Http\Controllers;
use Illuminate\Contracts\Bus\Dispatcher;

final class QueueAction extends Controller
{
    private $dispatcher;

    public function __construct(
        Dispatcher $dispatcher
    ) {
        $this->dispatcher = $dispatcher;
    }

    public function __invoke(): void
    {
        // 6. Job QueueGenerator 클래스 구현
        $generator = new QueueGenerator();
     
        $this->dispatcher->dispatch($generator);
    }
}

컨스트럭터 인젝션을 이용해 Dispatcher 클래스를 주입 받아서 사용 합니다.

6.Job  클래스 구현

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class QueueGenerator implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    private $path = '';

    public function __construct()
    {
    }

    public function handle()
    {
    	Log::info('queue test');
    }
}

7. 서비스 프로바이더 등록

<?php

declare(strict_types=1);

namespace App\Providers;

use Illuminate\Support\ServiceProvider;
use Illuminate\Contracts\Foundation\Application;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Bus\Dispatcher as DispatcherSubcriber;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     *
     * @return void
     */
    public function register()
    {
        //
        $this->app->bind(
            Dispatcher::class,
            function () {
                return new DispatcherSubcriber();
            }
        );
    }

    /**
     * Bootstrap any application services.
     *
     * @return void
     */
    public function boot()
    {
        //
    }
}

8. 큐 실행

$ php artisan queue:work

위 명령어로 큐를 실행 한뒤 /queue 에 접속하면 큐에 등록하고 실행이 되며, 콘솔에 큐 메세지가 출력이 되는것을 볼 수 있습니다.

하지만 큐 실행만으로는 운용 시스템에서 사용 하기가 힘듭니다. 큐의 연결이 끊기거나 정지되면 큐를 사용 할 수가 없는데, 이때 Supervisor 를 사용해 프로세스를 감시 하고 중지되었을 때 재실행 할 수 있도록 할 수 있습니다. 

 

추가적으로, 

여러 처리를 큐를 이용해 실행하면, 큐 안에서 우선순위를 가지는 처리들이 많아집니다. 

비동기 처리라 할지라도 각 처리에 많은 시간이 소요되면 다른 잡의 실행이 늦어지게 됩니다.

기본적으로 모든 Job은 default라는 이름의 큐에 저장되기 때문에 큐를 분산 시켜주는게 좋습니다.

    public function __invoke(): void
    {
        $generator = new QueueGenerator();
     
     	// 큐를 지정 할 수 있습니다.
        $this->dispatcher->dispatch($generator)->onQueue('큐 이름');
    }