java8 stream 并发

先随便写个代码打印1o个随机数,顺便加上parallel

package com.stream;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class TestStream7 {
    public static void main(String args[]){
        
        List<Integer> results = Stream.generate(Math::random).filter(i->i<1).limit(10).map(i->i*100).map(Double::intValue).collect(Collectors.toList());
        System.out.println(results);
        
        List<Integer> results2 = Stream.generate(Math::random).parallel().filter(i->i<1).limit(10).map(i->i*100).map(Double::intValue).collect(Collectors.toList());
        System.out.println(results2);
        
    }
}

与parallel方法对应的串行化方法叫做sequential

并行流的内部使用了ForkJoinPool, 它默认的线程数实际上是你的处理器内核数,这个值是由Runtime.getRuntime().availableProcessors()得到的

如果你确实想改变这个数值,可以使用

System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”,”10″);

来设置一个全局变量,不过要注意的是,它会影响所有的并行流,这里其实完全没有必要修改它,反正你也没有多余的cpu内核,反而会增加线程调度消耗

这里我们简单做个测试

package com.stream;

import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class TestStream7 {
    public static void main(String args[]){
        
        Long s1 = new Date().getTime();
        List<Integer> results = Stream.generate(Math::random).filter(i->i<1).limit(100).map(i->i*100).map(Double::intValue).collect(Collectors.toList());
        Long e1 = new Date().getTime();
        System.out.println(e1-s1);
        System.out.println(results);
        
        Long s2 = new Date().getTime();
        List<Integer> results2 = Stream.generate(Math::random).parallel().filter(i->i<1).limit(100).map(i->i*100).map(Double::intValue).collect(Collectors.toList());
        Long e2 = new Date().getTime();
        System.out.println(e2-s2);
        System.out.println(results2);
        
    }
}

运行几次

次数串行调用并行调用
1859
2517
3497
4507

可以看到并行方式还是比较快的 ( 我当前环境是4核的i7处理器)

既然是多线程,我们就不能避免讨论一下线程安全的问题,使用parallel需要注意线程安全么? 答案是 当然需要,下面是一个线程不安全的例子

package com.stream;

import java.util.Optional;
import java.util.stream.Stream;

public class TestStream8 {
    
    int total = 0;
    
    public static void main(String args[]){
        
        TestStream8 ts = new TestStream8();
        
        Optional<Integer> sum = Stream.of(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15).reduce(Integer::sum);
        System.out.println(sum);
        
        Stream.of(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15).parallel().forEach(ts::add);
        System.out.println(ts.total);
        
    }
    
    public void add(int i){
        total+=i;
    }
}

这个程序多运行几次就会出现串行化结果是120, 并行是119(其实不一定是多少,看心情)的情况

改成静态方法调用也是一样 (如果你了解哪些变量可以是线程不安全的,这很好理解)