23
Make Life Easy Using Java Parallel Streams
- You have to declare a class that implements Runnable.
- Then, override the run method.
- And, if you want to pass some parameters to the thread you have to include them in the class's constructor.
- And yes, if you have to data you wish to process in chunks, you will also have to partition the data yourself.
Say we have a list of strings that we want to process in chunks, each in a separate thread to make the job faster.
The process we want to apply β
class Service{
public void doWork(String s){
System.out.printf("Do work on %s%n", s);
}
}
The thread class implementing Runnable βοΈ
class Job implements Runnable{
private final List<String> data;
private final Service service;
public Job(List<String> data) {
this.data = data;
this.service = new Service();
}
@Override
public void run() {
for (String s : data)
service.doWork(s);
}
}
And this is the code to make it all happen π
// get the data chunks and run a separate worker thread for each chunk
public void oldWay(){
List<String> data = getData();
for(List<String> batch : getDataChunks(data, 1000))
new Job(batch).run();
}
// get the numbers from 0 to 10,000 as Strings
private List<String> getData(){
return IntStream.
range(0, 10_000).
mapToObj(Integer::toString).
collect(Collectors.toList());
}
private List<List<String>> getDataChunks(List<String> data, int chunkSize){
List<List<String>> result= new ArrayList<>();
final AtomicInteger counter = new AtomicInteger();
for (String s : data) {
if (counter.getAndIncrement() % chunkSize == 0)
result.add(new ArrayList<>());
result.get(result.size() - 1).add(s);
}
return result;
}
- According to Oracle:
When a stream executes in parallel, the Java runtime partitions the
stream into multiple substreams. Aggregate operations iterate over and
process these substreams in parallel and then combine the results
When we call parallelStream
on a collection we get a number of sub streams each working in a separate thread.
Each thread handles a number of elements from the collection.
It depends on your configuration.
The default size is equal to one less than the number of cores of your CPU.
That default size of the common pool can be changed with this property
Djava.util.concurrent.ForkJoinPool.common.parallelism=8
You can also use your own custom thread pool.
public void newWay(){
Service service = new Service();
// note that we used the getData method and Service class from previous example
getData().parallelStream().forEach(service::doWork);
}
Check my tutorial on Java Streamsπ
23