본 글은 Kevin Lee님의 Parallel Programming을 학습 내용을 토대로 실험한 내용입니다.

 

자바8에서 부터 Parallel Stream을 이용해서 병렬처리를 쉽게할 수 있습니다.

출처: https://www.javatpoint.com/fork-join-in-java

병렬 스트림은 내부적으로 ForkJoinPool을 사용합니다.

 

작업을 분할할 때 기본적으로 pc의 cpu의 코어 수(정확히는 스레드)만큼 작업을 분할합니다. 

ForkJoinPool.getCommonPoolParallelism(); // 확인 가능 (0부터 1입니다)

 

 

우선 성능 테스트를 하기 전 간단한 병렬처리에 대한 개념 예제를 보여드리겠습니다.

 


병렬처리 개념

 

실험 환경: 제 PC의 cpu사양은 8코어 16스레드를 지원합니다.

 

상황)

1부터 16까지 총 16개의 숫자를 출력합니다.

출력하기 전 스레드를 1초씩 재우고 요소를 출력하고 총 소요시간을 출력하는 코드입니다.

 

싱글스레드

하나의 스레드가 1초씩 잠들고 출력하기 때문에. 결과값은 16,000ms가 나와야합니다.

@Slf4j
public class CorePerformance {

    public static void main(String[] args) {
        System.out.println("stream (16 elements) ");
        final StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        IntStream.rangeClosed(1, 16)
            .peek(i -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            })
            .forEach(i -> log.info("i = {}", i));
        stopWatch.stop();
        log.info(">>> stream takes [{}]ms", stopWatch.getTotalTimeMillis());
    }

}

결과

예상한대로 16,000ms가 나온 것을 확인할 수 있습니다.

 

병렬처리

제 pc가 16스레드를 지원하니, 한 번에 16개의 스레드가 동시에 작업이 가능하므로 1초간 잠들고 연산을 수행합니다.
(병렬 처리는 실제로 task를 나누어서 물리적으로 동시에 실행합니다. 어느 작업이 먼저 완성될지 모르므로 순서는 보장할 수 없습니다)

 

 

이론상이라면 결과 값은 1,000ms가 나와야합니다.

@Slf4j
public class CorePerformance {

    public static void main(String[] args) {
        log.info("==== parallel stream (16 elements) ====");
        final StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        IntStream.rangeClosed(1, 16)
            .parallel()
            .peek(i -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            })
            .forEach(i -> log.info("i = {}", i));
        stopWatch.stop();
        log.info(">>> parallel stream takes [{}]ms", stopWatch.getTotalTimeMillis());
    }
}

결과



예상대로 1,000ms가 나온것을 확인할 수 있습니다.

 

그렇다면 이번에는 숫자를 1~17까지, 총 17개의 요소로 변경해보습니다.

동시에 최대 16개의 스레드가 작업을 할 수 있으므로,  총 2,000ms가 나오길 기대합니다.

정상적으로 2,000ms가 나온걸 확인할 수 있습니다.

 

 

이번엔 코어(스레드)의 개수를 변경해서 실행해보곘습니다.


최대 4개의 스레드가 동시에 작업을 할 수 있도록 변경합니다. 이번엔  총 16개의 요소를 출력해보겠습니다.

 

(16 / 4) * 1000ms 이므로 일반적인 경우에는 4,000ms가 나오길 기대합니다. (5,000ms가 나올 수도 있습니다)

(처리 속도및 성능, 작업 방식, 상황에 따라 달라질 수 있습니다. 결과가 보장되지 않습니다)

-Djava.util.concurrent.ForkJoinPool.common.parallelism={value} // 0(=1)부터 시작합니다.

4.000ms가 나오는걸 확인할 수 있습니다.

 

간단한 실험이 끝났으므로 성능테스트를  진행해보겠습니다.

 


Iterator vs Stream vs Pararell Stream

JMH라는 벤치마크 툴을 이용해서 실험을 진행했습니다.

 

병렬처리가 비효율적인 코드.

@State(value = Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(value = 1, jvmArgs = {"-Xms1G", "-Xmx4G"})
@Warmup(iterations = 3)
@Measurement(iterations = 3)
public class ImpracticalComparePerformance {

    private static final int MAX_VALUE = 10_000_000;

    @Benchmark
    public long iterativeSum() {
        long result = 0;
        for (int i = 0; i <= MAX_VALUE; i++) {
            result += i;
        }
        return result;
    }

    @Benchmark
    public long sequentialSum() {
        return Stream.iterate(1L, i -> i + 1)
            .limit(MAX_VALUE)
            .reduce(Long::sum)
            .get();
    }

    @Benchmark
    public long parallelSum() {
        return Stream.iterate(1L, i -> i + 1)
            .limit(MAX_VALUE)
            .parallel()
            .reduce(Long::sum)
            .get();
    }

    @Benchmark
    public long parallelRangedSum() {
        return LongStream.rangeClosed(1, MAX_VALUE)
            .parallel()
            .reduce(Long::sum)
            .getAsLong();
    }
}

결과

순서는 보장되지 않습니다.

로그 전문

 

병렬처리를 했더니 일반적인 반복문으로 처리하는 것보다 10배 가까이 느린 결과가 나왔습니다.

왜 그럴까요?

 

우선 위 코드에서 1부터 n까지 이전 값에 1씩 더하는 코드입니다.

병렬처리를 하면  기본적으로 앞에서 설명했던 것처럼, 자원을 분배해서 계산하고 합치는 작업을 반복하는데,

위 계산 방식은 이전 값에 1씩 더하는 것이므로 이전값이 있어야 다음 값을 계산할 수 있습니다.

 

물론 계산이 빨리 끝난 스레드는 작업훔치기를 통해서 빠른 병렬처리를 하지만, 어쨌든 이런것 자체도 비용일 뿐더러

포크 / 조인 하는 비용보다, 계산하는 비용이 더 저렴하기 때문에 병렬처리의 이점이 없어졌습니다.

 

반면 parallelRangedSum같은 경우 이전 값에서 1씩 더하는 연산이 아니라, 정해진 range에서 각자 계산을 하고 값을 더하는 것이기 때문에 퍼포먼스가 좋게 나왔습니다.

 

하지만 말그대로 elements를 분할하고 병합하는 과정을 반복하기 때문에 이 역시도 때문에 스레드 수, 작업 연산에 따라서 오히려  오히려 성능이 더 저하될 수 있습니다. (분할하고 합하는 연산보다 더 컴퓨터 자원을 소모할 때)

 

이러한 이유로, 위 예제는 병렬처리를 하기엔 비효율적인 코드입니다.  (stream또한 너무 연산이 간단해서 비효율적)

 

 

이번엔 위 코드에서 연산 마다 +5ms가 소요되는 어떤 작업을 추가로 처리한다고 가정하고,
다음 코드를 매 연산마다 실행할 수 있도록 추가해줍니다.

 

(MAX_VALUE는 1,000으로 변경했습니다)

    private static final int MAX_VALUE = 1_000; // 범위가 너무 크므로 1000으로 변경

	// 연산마다 5ms를 지연시시킴
    private void slowDown() {
        try {
            TimeUnit.MICROSECONDS.sleep(5L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Benchmark
    public static long iterativeSum() {
        long result = 0;
        for (int i = 0; i <= MAX_VALUE; i++) {
            result += i;
            slowDown();
        }
        return result;
    }

    @Benchmark()
    public long sequentialSum() {
        return Stream.iterate(1L, i -> i + 1)
            .limit(MAX_VALUE)
            .peek(i -> slowDown())
            .reduce(Long::sum)
            .get();
    }

    @Benchmark
    public long parallelSum() {
        return Stream.iterate(1L, i -> i + 1)
            .limit(MAX_VALUE)
            .parallel()
            .peek(i -> slowDown())
            .reduce(Long::sum)
            .get();
    }

    @Benchmark
    public long parallelRangedSum() {
        return LongStream.rangeClosed(1, MAX_VALUE)
            .parallel()
            .peek(i -> slowDown())
            .reduce(Long::sum)
            .getAsLong();
    }

 

결과

로그 전문

 

이번 실험에서는 병렬처리에서 훨씬 더 좋은 퍼포먼스 결과가 나왔습니다.

 

정리하자면 병렬처리라고 무조건 퍼포먼스가 좋고, 나쁘다라고 말할 수 없습니다.

cpu 코어의 개수, 작업 연산의 속도, 심지어 스트림의 소스의 종류. 연산 알고리즘 등 다양한 요소를 고려해봐야합니다. 

 

 

실전 예제

product 리스트와, 어떤 조건을 받아서 해당 조건에  만족하는 product의 가격 합을 리턴하는 로직입니다.

product 리스트는 총 500만개입니다.

@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(value = 1, jvmArgs = {"-Xms1G", "-Xmx4G"})
@Warmup(iterations = 3)
@Measurement(iterations = 3)
public class PracticalComparePerformance {

    private static final List<Product> products;

    static {
        final int length = 5_000_000;
        final Product[] list = new Product[length];

        for (int i = 0; i < length; i++) {
            list[i] = new Product((long) i, "Product" + i, BigDecimal.valueOf(ThreadLocalRandom.current().nextInt(20, 40)));
        }

        products = List.of(list);
    }

    private BigDecimal imperativeSum(final List<Product> products, final Predicate<Product> predicate) {
        BigDecimal sum = BigDecimal.ZERO;
        for (final Product product : products) {
            if (predicate.test(product)) {
                sum = sum.add(product.getPrice());
            }
        }
        return sum;
    }

    private BigDecimal streamSum(final List<Product> products, final Predicate<Product> predicate) {
        return products.stream()
            .filter(predicate)
            .map(Product::getPrice)
            .reduce(BigDecimal.ZERO, BigDecimal::add);
    }

    private BigDecimal parallelSum(final List<Product> products, final Predicate<Product> predicate) {
        return products.stream()
            .parallel()
            .filter(predicate)
            .map(Product::getPrice)
            .reduce(BigDecimal.ZERO, BigDecimal::add);
    }

    @Benchmark
    public BigDecimal imperativeTest() {
    	// 가격이 30원 이상인 product
        return imperativeSum(products, product -> product.getPrice().compareTo(BigDecimal.valueOf(30)) >= 0);      
    }

    @Benchmark
    public BigDecimal streamTest() {
        return streamSum(products, product -> product.getPrice().compareTo(BigDecimal.valueOf(30)) >= 0);
    }

    @Benchmark
    public BigDecimal parallelStreamTest() {
        return parallelSum(products, product -> product.getPrice().compareTo(BigDecimal.valueOf(30)) >= 0);
    }
}

결과

로그 전문

 

병렬처리가 단순 반복문보다 대략 3배정도 빠르게 나왔습니다. (물론 이것도 pc환경마다 다릅니다)

제 pc에서는 10,000개 까지는 parallel stream이 근소하게 빨랐습니다.

 

 

+추가 

당연한 얘기지만, 병렬처리를 할 때는 race condition을 조심해야합니다.

 

주의해야될 코드

final int[] sum = {0}; // 배열을 선언한 이유는 ramda expression은 final혹은 effective final 변수에만 접근 가능합니다.
IntStream.range(0, 101)
	.forEach(i -> sum[0] += i);

System.out.println("stream sum = " + sum[0]);

final int[] sum2 = {0};
IntStream.range(0, 101)
	.parallel()
    .forEach(i -> sum2[0] += i); // 위험!!!

System.out.println("parallel stream race condition = " + sum2[0]);

System.out.println("parallel stream no race condition = " + 
	IntStream.range(0, 101)
    	.parallel()
        .sum()); 		// sum을 통해 해결 가능하다.

 

race condition은 원하는 결과 값이 나오지 않을 수 있다.

 

 

모든 코드는 깃허브에서 확인할 수  있습니다.

 

GitHub - brick0123/parallel-programming: parallel programming benchmark

parallel programming benchmark. Contribute to brick0123/parallel-programming development by creating an account on GitHub.

github.com

 

References

케빈 - 자바8 못다한 이야기

https://www.javatpoint.com/fork-join-in-java