본문 바로가기

개발 개발/자바

[Thread] 자바 쓰레드의 resume, suspend, stop 구현

출처 : http://javafreak.tistory.com/232

자바 언어가 1.x 에서 2.x 대를 넘나들 시절에 thread 를 다룰때 뻔질나게 자주 쓰였던 thread 메소드가 resume, suspend , stop 인데 아쉽게도 deprecated (앞으로 쓰지 말라는 뜻) 되어서 별 수 없이 쓰레드의 상태를 관리하는 방식으로 구현을 해야 한다.

구현은 아래와 같은 간단한 코드에서 시작한다.
public class ThreadHandle implements Runnable {
    @Override    public void run() {
        // TODO Auto-generated method stub
    }
}
Runnable을 구현한 것을 볼 수 있는데, 꼭 저렇게 할 필요는 없으나 Runnable을 implement 해주지 않으면 별도의 구현체를 클래스로 정의해야하기 때문에 쓰레드를 이용해서 단위 작업을 수행하는 ThreadHandle과 같은 클래스가 자체적으로 Runnable을 구현하면 편리하다.

이제 저 상태에서 start, resume, suspend, stop 메소드를 정의해준다.
public class ThreadHandle implements Runnable {
    public void start(){
        ;
    }
    public void resume() {
        ;
    }
    public void suspend() {
        ;
    }
    public void stop() {
        ;
    }
    public void join() {
        ;
    }
    @Override
    public void run() {
        // TODO Auto-generated method stub
    }
}
ThreadHandle을 사용하는 쪽에서는 아래와같이 초기화하는데 쓰레드를 사용하는 것과 별 차이가 없어보이게 된다.
    public void test_thread_handle() throws Exception {
        ThreadHandle threadHandle = new ThreadHandle();
        threadHandle.start();
    }
그리고 start() 메소드가 제대로 동작하도록 하려면 아래와 같이 ThreadHandle이 thread를 생성하고 실행하게 만들어준다.
    private Thread thisThread ;
    private String threadName ;
    public ThreadHandle(){}
    public ThreadHandle(String threadName){
        this.threadName = threadName;
    }
    public void start(){
        thisThread = new Thread(this);
        if ( threadName != null) thisThread.setName(threadName);
        thisThread.start();
    }
ThreadHandle.start() 안에서 쓰레드 인스턴스를 생성하고 start()를 호출함으로써 새로운 쓰레드가 실행된다. threadHandle.start()를 호출이 <<main>> 쓰레드에서 실행되었다면 아래와 같이 새로운 쓰레드가 분기하게 된다.

두 개의 쓰레드가 함께 실행된다.

"t-0"라고 이름을 붙인 새로운 쓰레드는 run() 메소드를 시작으로 작업을 수행하는데 쓰레드에 일시정지, 재시작, 종료 기능을 추가하기 위해서 run() 메소드 안에서 반복문을 집어넣는다.
public class ThreadHandle implements Runnable {
    ....
    @Override
    public void run() {
        // TODO Auto-generated method stub
        while ( true ){
            processTask();
        }
    }
}
반복문에 별다른 종료 조건을 주지 않았기 때문에 위와같은 상태에서는 정신없이 작업이 계속될 것이다. while 안에 적절한 종료 조건을 넣어서 적절한 시점에 쓰레드가 종료하게 해야 한다.

종료 조건은 두가지로 나눌 수 있다.

1. run() 메소드의 반복문 안에서 더이상 작업을 계속할 필요가 없어서 끝내는 경우
2. 외부에서 ThreadHandler.stop() 메소드를 호출했을 때.

1번은 쓰레드가 스스로 종료 조건을 판단하는 것이고 2번은 외부에서 강제로 종료시키는 경우에 해당한다.

1번의 전형적인 예로 소켓 연결을 통해서 데이터를 주고받은 뒤 연결을 끊는 경우를 들 수 있다.
    public void run() {
        while ( true ) {
            int nRead = in.read(buf, 0, buf.length);
            if ( nRead < 0 ) break;
            ....
        }
    }
더이상 읽을 데이터가 없다면 while 문을 끝내고 run() 메소드가 종료하면서 쓰레드도 생을 마감하게 된다.

2번처럼 외부에서 강제로 종료시키는 경우(정확하게 말하면 종료하라고 신호만 보낸다. 자바 스레드에서는 한 스레드가 다른 스레드를 강제로 종료시킬 방법이 없다.)가 바로 여기서 구현할 내용인데 다음과 같이 thread의 상태를 나타내는 static 변수를 정의하고 이를 참조하는 stateCode 프로퍼티를 도입해서 상태를 관리한다.
public class ThreadHandle implements Runnable {
    final private static int STATE_INIT = 0x1;
    final private static int STATE_STARTED = 0x1 << 1;
    final private static int STATE_SUSPENDED = 0x1 << 2;
    final private static int STATE_STOPPED = 0x1 << 3;

    private int stateCode = STATE_INIT;
    public void start(){
        if ( stateCode != STATE_INIT) // INIT 상태가 아니면 이미 실행되었다.
            throw new IllegalStateException("already started");
        
        thisThread = new Thread(this);
        if ( threadName != null) thisThread.setName(threadName);
        thisThread.start();
        stateCode = STATE_STARTED;
    }
    ....
    public void stop() {
        if ( stateCode == STATE_STOPPED) // 이미 멈췄다면 또 호출할 필요가 없음.
            throw new IllegalStateException("already stopped");
        thisThread.interrupt();
        stateCode = STATE_STOPPED ;
    }
    ....
}
STATE_INIT은 쓰레드 인스턴스가 생성되었지만 실행되지는 않은 상태를 의미하고, STATE_STARTED 는 Thread의 start() 메소드가 호출되어서 실행된 상태를 의미한다. 마찬가지로 STATE_STOPPED 는 쓰레드가 끝났거나 끝나는 중임을(while 루프를 빠져나와서 run() 메소드가 끝나가는 단계) 나타낸다.

각 상태는 다음 상태의 조건이 되는데, start() 메소드에서 현재 상태를 확인해서 STATE_INIT이 아니라면 이미 시작되었다는 뜻이므로 예외를 던지게 한다. 마찬가지로 stop() 메소드에서도 상태가 STATE_STOPPED 라면 이미 종료가 되었거나 종료 중이므로 예외를 던진다.

위 구현은 멀티쓰레드 환경에서는 오작동을 할 가능성이 있는데, 하나의 ThreadHandle 인스턴스를 여러개의 스레드들이 공유하는 상황이라면 스레드들이 동시에 start(), stop() 메소드를 호출할 때 stateCode 프로퍼티의 일관성이 깨질 수가 있다.

스레드 A와 스레드 B가 동시에 start()를 호출할 때 A가 현재 stateCode가 STATE_INIT임을 보고 if 조건절을 무사히 통과해서 thisThread.start();를 호출할 것이다. 그리고 스레드 A가 stateCode를 STATE_STARTED로 갱신하기 전에 스레드 B가 if 조건절에 진입하면 stateCode가 STATE_INIT 이기 때문에 무사히 통과해서 또다시 스레드 인스턴스를 생성하고 실행시키게 된다.

스레드 A가 stateCode를 갱신한 후에 스레드 B가 if 조건절에 진입하더라도 stateCode가 volatile이 아니기 때문에 스레드 B는 스레드 A가 갱신한 최신의 stateCode 값을 보지 못할 수도 있다.

따라서 ThreadHandle을 멀티쓰레드에서 안전하게 하려면 아래와 같이 critical section을 잘 막아줘야 한다.
    public void start(){
        synchronized ( this ){
            if ( stateCode != STATE_INIT)
                throw new IllegalStateException("already started");
            
            thisThread = new Thread(this);
            if ( threadName != null) thisThread.setName(threadName);
            thisThread.start();
            stateCode = STATE_STARTED;
        }
    }
    ....
    public void stop() {
        synchronized ( this ){
            if ( stateCode == STATE_STOPPED)
                throw new IllegalStateException("already stopped");
            this.stateCode = STATE_STOPPED;
            thisThread.interrupt();
        }
    }
suspend(), resume() 도 위와같이 구현하면 오직 하나의 스레드만이 threadHandle의 상태를 변경할 수 있고 상태를 변경하는동안 다른 스레드들이 진입해서 일관성이 깨지는 것을 막을 수 있다.

stop() 메소드가 호출된 후 상태가 변경되었으므로(stateCode가 변경되었으므로) run() 메소드의 while 문 안에서 이 변경을 확인해서 반복문을 빠져나오는 코드를 삽입한다.
    @Override
    public void run() {
        while ( true ){            
            if ( stateCode == STATE_STOPPED)
                break//종료하라는 신호이므로 루프를 끝낸다.
            processComplexJob();
        }
    }
위에서는 processComplexJob(); 메소드를 실행하기 전에 상태를 확인했는데, 수행할 작업을 끝내는데 꽤 많은 시간이 걸린다면 그 작업을 시작하기 전에 확인해주면 응답성을 높일 수가 있다. 

하지만 if 조건절을 판단한 직후에(종료조건이 아님을 확인한 직후에) 다른 스레드가 stop() 을 호출해서 스레드의 상태를 STATE_STOPPED로 바꾸주었다면 간발의 차이로 오랜 시간이 걸리는 작업은 실행될 수 밖에 없다.

이럴 경우 processComplexJob() 메소드 안에서 별도로 stateCode를 다시 한 번 확인하지 않는 한, 일단 시작된 작업이 끝나고 나서 while문을 한 번 반복한 후 다시 if 조건절에 도착해서 STAE_STOPPED 값을 보고 루프를 빠져나올 것이다.

suspend()를 구현한 코드는 아래와 같은데 stateCode 값을 변경하기 전에 현재의 상태를 미리 확인하는 과정을 거친다.
    public void suspend() {
        synchronized ( this ){
            if ( stateCode == STATE_SUSPENDED) return;
            if ( stateCode == STATE_INIT )
                throw new IllegalStateException("not started yet");
            if ( stateCode == STATE_STOPPED)
                throw new IllegalStateException("already stopped");
            stateCode = STATE_SUSPENDED;
        }    
    }
현재의 상태가 STATE_STARTED 가 아니라면 일시 정지가 별 의미가 없으므로 위처럼 모든 상태를 체크해서 적절히 예외를 던지거나 더이상 진행하지 말아야 하는데, 모든 상태를 체크하다보니 실제 상태를 변경하는 코드보다 조건을 검증하는 코드(if 구문들)가 더 많아진다. 

여기서는 STARTED, SUSPENDED, STOPPED 의 조건만을 판단하기 때문에 코딩량을 감당할 수 있으나 구현 조건에 따라서 정지중, 정지, 종료중, 종료, 재시작중, 재시작처럼 쓰레드가 거치는 상태가 많다면state pattern을 도입하는 것을 고려해볼만 하다. 

일시 정지 기능을 구현했으니 stateCode의 변경을 보고 스레드를 잠시 정지하는 기능을 run 메소드의 while 반복문 안에 삽입해준다.
    public void run() {
        while ( true ){
            // 상태 코드가 일시 정지라면 while문에서 계속 대기하도록 한다.
            while ( stateCode == STATE_SUSPENDED){
                try {
                    System.out.println("[handle] suspending...");
                    Thread.sleep(24 * 60 * 60 * 1000);
                } catch (InterruptedException e) {
                    if ( stateCode != STATE_SUSPENDED){
                        System.out.println("[handle] resuming...");
                        break;
                    }
                }
            }
            if ( stateCode == STATE_STOPPED){
                System.out.println("[handle] stopping...");
                break;
            }
            processComplexJob();
        }
여기서는 아주 오랜시간동안 쓰레드를 재우는 식으로 구현했는데 저렇게 오랫동안 스레드를 정지시켰을때에는 반드시 누군가가 thisThread.interrupt(); 를 호출해주어야 한다. 따라서 suspend()에서 잠재운 스레드를 다시 실행시키는 resume() 메소드에서는 반드시 thisThread.interrupt(); 를 호출해 준다.
    public void resume() {
        synchronized ( this ){
            if ( stateCode == STATE_STARTED || stateCode == STATE_INIT) return;
            if ( stateCode == STATE_STOPPED)
                throw new IllegalStateException("already stopped");
            stateCode = STATE_STARTED;
            thisThread.interrupt(); // 꼭 해줘야 한다.
        }
    }
stop() 메소드 구현에서도 interrupte() 를 호출하는데, stop() 메소드는 이미 멈춘 상태가 아니라면 언제든 호출될 수 있기 때문에 resume()과 마찬가지로 상태를 변경한 후에 스레드를 깨워주어야 한다. 

마지막으로 stateCode 를 volatile 로 바꿔주어서 run() 메소드를 실행중인 스레드가 공유 변수의 복사본만 바라보느라 외부 스레드가 갱신한 최신의 stateCode 의 값을 놓치는 일이 없게 해준다.
public class ThreadHandle implements Runnable {
    ....
    private volatile int stateCode = STATE_INIT;
이렇게 대강 start, resume, suspend, stop 기능을 구현했는데, 이런 메소드가 호출될때마다 ThreadHandle 내에서 관리하는 스레드는 아래와 같이 상태 전이가 반복된다.
스레드의 상태 전이를 관리할 때 위와같이 state diagram을 그려놓으면 구현할 때 도움이 많이 된다.  현재 상태에 따라서 호출할 수 있는 메소드의 종류가 나뉘는 것을 한눈에 알 수 있기 때문에 상태 관리 및 전이를 코드로 옮기기에 용이하다.

어떤 코드들을 보면 스레드에 걸리는 인터럽트 신호를 상태 전이의 조건으로 사용하는 경우도 있다. 

위에서는 stop()이 호출된 후 run() 메소드의 while 문 안에서 STATE_STOPPED 값을 확인하고 break; 하도록 했는데 아래처럼 while문의 조건절에서 인터럽트 신호 여부를 종료 조건으로 판단할 수도 있다.
    while ( ! thisThread.isInterrupted() ){
        .....
    }
이럴 경우 스레드 내에서 함부로 인터럽트 신호를 먹어버리지 않도록 유의해야한다. ( 관련글 : [Java] 자바 Thread의 interrupt 이해하기 ) 실행 상태가 아닌 BLOCK 상태에서 인터럽트를 걸면 스레드가 깨어나면서 인터럽트 신호가 해제되므로 catch 절에서 다시 한 번 인터럽트를 걸어서 인터럽스 신호를 복구시켜야 한다.
    try {
        Thread.sleep( sleepTime );
    catch (InteruptedException e){
        thisThread.interrupt(); // 다시 한 번 신호를 걸어준다.
    }
인터럽트 신호를 상태 전이의 힌트로 사용하는 방식도 나쁘진 않지만 스레드가 java nio 를 이용해서 스트립 입출력을 다룬다면 스레드에 인터럽트를 걸 때 신중해야 한다.

NIO 이전의 입출력 메소드들은 읽기, 쓰기 도중에 BLOCK 상태에 있으면 인터럽트에도 반응하지 않았지만 NIO 에서는 읽고 쓰는 도중에 BLOCK 상태에 있을때 인터럽트를 걸면 ClosedByInterruptException 예외가 던져지면서 스트림이 닫혀버린다. ( BLOCK 상태가 아니더라도 현재 스레드에 인터럽트가 걸려있는걸 확인하면 NIO 관련 스트림 클래스들은 스트림을 닫아버린다.)

위의 구현에는 해당되지 않으나 단지 스레드를 깨울 목적으로 인터럽트를 걸었는데 NIO 의 read() 에서 여전히 살아있는 인터럽트 신호를 보고 스트림을 닫아버리는 일이 발생한다. 이럴 경우 NIO의 read, write 실행 직전에 반드시 인터럽트 신호를 해제해줘야하고 read, write 실행 중에 인터럽트가 걸리지 않게 해줘야 하는데 이렇게되면 "입출력중" 이라는 별도의 상태를 정의해야 할 수도 있다. ( 복잡도 증가 )


public class ThreadHandle implements Runnable {

    private Thread thisThread ;

    private String threadName ;


    

    final private static int STATE_INIT = 0x1;

    final private static int STATE_STARTED = 0x1 << 1;

    final private static int STATE_SUSPENDED = 0x1 << 2;

    final private static int STATE_STOPPED = 0x1 << 3;


    private volatile int stateCode = STATE_INIT;

        

    public ThreadHandle(){ }

    

    public ThreadHandle(String threadName){

        this.threadName = threadName;

    }

    public void start(){

        synchronized ( this ){

            if ( stateCode != STATE_INIT)

                throw new IllegalStateException("already started");

            

            thisThread = new Thread(this);

            if ( threadName != null) thisThread.setName(threadName);

            thisThread.start();

            stateCode = STATE_STARTED;

        }

    }


    public void stop() {

        synchronized ( this ){

            if ( stateCode == STATE_STOPPED)

                throw new IllegalStateException("already stopped");

            this.stateCode = STATE_STOPPED;

            thisThread.interrupt();

        }

    }

    public void resume() {

        synchronized ( this ){

            if ( stateCode == STATE_STARTED || stateCode == STATE_INIT) return;

            if ( stateCode == STATE_STOPPED)

                throw new IllegalStateException("already stopped");

            stateCode = STATE_STARTED;

            thisThread.interrupt(); // 꼭 해줘야 한다.

        }

    }

    public void suspend() {

        synchronized ( this ){

            if ( stateCode == STATE_SUSPENDED) return;

            if ( stateCode == STATE_INIT )

                throw new IllegalStateException("not started yet");

            if ( stateCode == STATE_STOPPED)

                throw new IllegalStateException("already stopped");

            stateCode = STATE_SUSPENDED;

        }    

    }


    public void run() {

        while ( true ){

            // 상태 코드가 일시 정지라면 while문에서 계속 대기하도록 한다.

            while ( stateCode == STATE_SUSPENDED){

                try {

                    System.out.println("[handle] suspending...");

                    Thread.sleep(24 * 60 * 60 * 1000);

                } catch (InterruptedException e) {

                    if ( stateCode != STATE_SUSPENDED){

                        System.out.println("[handle] resuming...");

                        break;

                    }

                }

            }

            if ( stateCode == STATE_STOPPED){

                System.out.println("[handle] stopping...");

                break;

            }

            processComplexJob();

        }

}



public void test_thread_handle() throws Exception {

    ThreadHandle threadHandle = new ThreadHandle();

    threadHandle.start();

}


public : 공공의, 공중의, 공적인, 공립의, 공공연한