뭐요

java.util.concurrent 패키지, 예제로 알아보기 본문

Java

java.util.concurrent 패키지, 예제로 알아보기

욕심만 많은 사람 2023. 7. 16. 01:58

Concurrency API Summary

  • 기존에 병렬 처리를 하기 위해서는 Thread 클래스 혹은 Runnable 인터페이스를 직접 사용해야 했습니다. 하지만 자바 5부터 새롭게 추가된 Concurrency API를 통해 기존의 한계점을 극복합니다.
  • 작업을 실행하기 위해 Executor 인터페이스를 제공합니다.
  • 쓰레드 풀, 큐 등 다양한 Executor 구현체를 제공합니다.


예시로 설명하는 기존 방식의 한계점 (Thread, Runnable)

웹 서버는 여러 사용자의 요청을 동시에 처리해야 하므로 다음과 같은 형태의 코드를 지닐 것입니다.

try (ServerSocket listenSocket = new ServerSocket(port)) {
    Socket connection;
    while ((connection = listenSocket.accept()) != null) {
        Thread thread = new Thread(new RequestHandler(connection));
        thread.start();
    }
}

문제점

  1. 무한히 생성되는 쓰레드

쓰레드 개수의 제한이 없기 때문에 사용자 요청이 많아지면 OOM이 발생할 수 있습니다. 따라서 최대로 생성될 수 있는 쓰레드 개수를 제한할 수 있어야 합니다.

  1. 쓰레드 생성&종료

모든 요청에 대해 쓰레드를 새로 생성&종료하여 오버헤드가 발생합니다. 따라서 쓰레드를 재사용 한다면 부하를 줄일 수 있습니다.

  1. 값의 반환 불가능

Executor

출처 : https://mangkyu.tistory.com/259
  • 개발자가 하나하나 Thread를 관리하기 힘듭니다.
  • 따라서 쓰레드를 만들고 관리하는 작업을 Executor에게 위임합니다.
  1. 생성 : application에서 사용될 Thread를 만들거나, 쓰레드 풀을 통해 관리합니다.
  1. 관리 : Thread 생명 주기 관리
  1. 작업 처리 및 실행 : Thread로 작업을 수행하기 위한 API 제공

Example
  • execute(Runnable), submit(Runnable) ⇒ 쓰레드 작업 수행
  • shutdown() ⇒ 할 일을 마치고 종료
  • shutdownNow() ⇒ 즉시 종료


그 외의 주요 인터페이스

Executors

  • 직접 쓰레드를 다루는 것은 번거로운데, 이를 도와주는 팩토리 역할을 합니다.
  • Executor, ExecutorService, SchedueledExecutorService를 구현한 쓰레드 풀을 손쉽게 생성합니다.
  • 쓰레드 풀을 return 합니다.
Example
  1. newFixedThreadPool()
    1. 고정된 쓰레드의 개수를 갖는 쓰레드 풀을 생성합니다.
    1. ExecutorService 인터페이스를 구현한 ThreadPoolExecutor 객체가 생성됩니다.
  1. newCachedThredPool()
    1. 필요할 때 필요한 만큼의 쓰레드를 풀 생성합니다. (고정X)
    1. 이미 생성된 쓰레드가 있다면 이를 재활용 할 수 있습니다.
    1. 60초 동안 사용되지 않으면 쓰레드를 종료시킵니다.
  1. newworkstealingpool()
    1. 인자를 통해 병렬 처리 레벨을 지정합니다.
    1. 인자를 지정하지 않으면 현재 시스템의 core 프로세스 개수 기반으로 pool 사이즈가 할당됩니다.

ExecutorService

  • Executor 인터페이스를 상속 받은 인터페이스
  • Runnable, Callable 실행 가능
  • 작업 등록 뿐 아니라 실행의 책임도 갖음
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorTest {
    public static void main(String[] args) {
        // given
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        // when
        executorService.submit(getRunnable("test1"));
        executorService.submit(getRunnable("test2"));
        executorService.submit(getRunnable("test3"));
        executorService.submit(getRunnable("test4"));
        executorService.submit(getRunnable("test5"));
        executorService.shutdown();
    }

    private static Runnable getRunnable(String message){
        return () -> System.out.println(Thread.currentThread().getName() + " :: " + message);
    }
}
pool-1-thread-1 :: test1
pool-1-thread-4 :: test4
pool-1-thread-2 :: test2
pool-1-thread-3 :: test3
pool-1-thread-2 :: test5

BUILD SUCCESSFUL in 164ms

ScheduledExecutorService

  • ExecutorService를 상속받은 인터페이스
  • 특정 시간 이후 또는 주기적으로 작업을 수행하게 하는 스케줄 기능
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ExecutorTest {
    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(getRunnable("Test"), 1, 2, TimeUnit.SECONDS);
    }

    private static Runnable getRunnable(String message){
        return () -> System.out.println(Thread.currentThread().getName() + " :: " + message);
    }
}
pool-1-thread-1 :: Test
pool-1-thread-1 :: Test
pool-1-thread-1 :: Test
pool-1-thread-1 :: Test
pool-1-thread-1 :: Test
  • scheduleAtFixedRate() 함수 원형
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit);
    • initialDelay 후에 period 간격으로 command를 run()하겠다!
    • 명시적으로 종료시켜 주어야 함

Callable

public interface Callable<V> {
    V call() throws Exception;
}
  • Runnable과 같이 쓰레드 생성에 사용되는 인터페이스
  • return type이 객체이기 때문에 처리 결과를 받을 수 있음

Runnable과 Callable의 차이점
  • Runnable ⇒ 반환 타입 없어서 처리 결과 받을 수 없음,
    public interface Runnable {
        public abstract void run();
    }
  • Callable ⇒ 반환 타입이 객체여서 처리 결과 받을 수 있음, 메소드 수행 중 Exception 던지기 가능

Future

  • 비동기 작업의 현재 상태를 조회하거나 결과를 가져오기 위한 객체
  • Runnable, Callable의 상태를 조회하거나 결과를 확인함
  • 시간이 걸리는 작업의 결과를 기다리는 동안 다른 작업을 할 수 있음
    1. 시간이 걸리는 작업은 Callable에 작성
    1. submit 후 Future 객체 저장
    1. 다른 작업 수행
    1. get() 호출을 통해 블로킹(Blocking) 콜을 수행 ⇒ 결과 확인을 위해 기다린다.
    import java.util.concurrent.*;
    
    public class ExecutorTest {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
    
            // 시간이 걸리는 작업 수행
            Future<String> future = executorService.submit(getCallable());
    
            // 다른 작업 수행
            System.out.println(future.isDone());
            System.out.println("다른 작업 수행");
    
            // 블로킹 콜
            future.get();
            System.out.println(future.isDone());
            System.out.println("Finish");
            executorService.shutdown();
        }
    
        private static Callable getCallable(){
            return () -> {
                Thread.sleep(2000L);
                return "Sleep finish";
            };
        }
    }
  • invokeAll()
    • 동시에 실행한 작업 중 제일 오래 걸리는 작업이 끝나야 결과 반환
    • 모든 결과가 수행된 뒤 처리 되어야 할 때 사용
    import java.util.*;
    import java.util.concurrent.*;
    
    public class ExecutorTest {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
    
            Callable<String> callable1 = () -> {
                Thread.sleep(1000L);
                return "Hello1";
            };
            Callable<String> callable2 = () -> {
                Thread.sleep(2000L);
                return "Hello2";
            };
            Callable<String> callable3 = () -> {
                Thread.sleep(3000L);
                return "Hello3";
            };
    
            List<Future<String>> futures = executorService.invokeAll(Arrays.asList(callable1, callable2, callable3));
    
            for(Future<String> future : futures){
                System.out.println(future.get());
            }
    
            executorService.shutdown();
        }
    
        private static Callable getCallable(long time){
            return () -> {
                Thread.sleep(time);
                return "Sleep finish";
            };
        }
    }


Reference.

https://www.geeksforgeeks.org/java-util-concurrent-package/

http://javacan.tistory.com/124

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#package.description

https://velog.io/@neity16/Java-8-4-자바-Concurrent-Executors-Callable-Future


'Java' 카테고리의 다른 글

Value Object, 값 객체 개념  (0) 2023.07.18
[Java] String class의 isEmpty(), isBlank()  (0) 2023.07.16
정적 팩토리 메소드  (0) 2023.07.16
입출력 스트림 간단하게 알아보기  (0) 2023.07.16
JAVA stream API 예제로 이해하기  (2) 2023.07.11