Java大数据量(多线程)分段分批处理
程序员文章站
2022-07-13 13:23:23
...
分段处理主类
package pers.zuo.component.piecewise;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import pers.zuo.component.piecewise.bean.PiecewiseKey;
import pers.zuo.component.piecewise.bean.PiecewiseResult;
import pers.zuo.component.piecewise.bean.PiecewiseTask;
/**
* @author zuojingang
*
* @param <T>
* the type of part process return
*/
public abstract class PiecewiseHandler<V> {
public void nThreads(
final Map<PiecewiseKey, PiecewiseResult<Map<PiecewiseKey, PiecewiseResult<V>>>> nThreadResult,
final int totalNum) throws Exception {
nThreads(nThreadResult, totalNum, D_THREAD_SIZE, D_PART_SIZE);
}
/**
* @param totalNum
* @param threadSize
* @return nThreads process result.
*/
public void nThreads(
final Map<PiecewiseKey, PiecewiseResult<Map<PiecewiseKey, PiecewiseResult<V>>>> nThreadResult,
final int totalNum, final int threadSize, final int partSize) throws Exception {
if (null == nThreadResult || 0 >= totalNum || 0 >= threadSize) {
return;
}
ExecutorService fixThreadPool = Executors.newFixedThreadPool(D_N_THREAD);
List<PiecewiseTask> fTaskList = new ArrayList<>();
int fromIndex = 0;
try {
while (totalNum > fromIndex) {
final int thisFromIndex = fromIndex;
final int threadProcessNum = Math.min(totalNum - fromIndex, threadSize);
final int thisToIndex = thisFromIndex + threadProcessNum;
if (0 < threadProcessNum) {
PiecewiseTask futureTask = PiecewiseBuilder.buildTask(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
final Map<PiecewiseKey, PiecewiseResult<V>> threadResult = PiecewiseBuilder
.initializeThreadResult();
nThreadResult.put(PiecewiseBuilder.buildKey(thisFromIndex, thisToIndex),
PiecewiseBuilder.buildResult(threadResult));
singleThread(threadResult, thisFromIndex, threadProcessNum, partSize);
return true;
}
}, PiecewiseBuilder.buildKey(thisFromIndex, thisToIndex));
fixThreadPool.submit(futureTask);
fTaskList.add(futureTask);
}
fromIndex += threadProcessNum;
}
boolean finished = true;
for (PiecewiseTask futureTask : fTaskList) {
try {
finished = finished && futureTask.get();
} catch (InterruptedException | ExecutionException e) {
nThreadResult.get(futureTask.getTaskKey()).setException(e);
}
}
} catch (Exception e) {
throw e;
} finally {
// the threadPool must manual-lock after use
fixThreadPool.shutdown();
}
}
public void singleThread(final Map<PiecewiseKey, PiecewiseResult<V>> threadResult, final int totalNum) {
singleThread(threadResult, 0, totalNum);
}
public void singleThread(final Map<PiecewiseKey, PiecewiseResult<V>> threadResult, final int offset,
final int totalNum) {
singleThread(threadResult, offset, totalNum, D_PART_SIZE);
}
/**
* @param offset
* @param toIndex
* @param partSize
* @return process subList values and include first index(offset) and exclude
* latest index(offset + totalNum)
*/
public void singleThread(final Map<PiecewiseKey, PiecewiseResult<V>> threadResult, final int offset,
final int totalNum, final int partSize) {
if (0 >= totalNum || 0 >= partSize) {
return;
}
final int toIndex = offset + totalNum;
int fromIndex = offset;
while (toIndex > fromIndex) {
int thisToIndex = Math.min(fromIndex + partSize, toIndex);
V partResult = null;
Exception pe = null;
try {
partResult = partProcess(fromIndex, thisToIndex);
} catch (Exception e) {
pe = e;
}
threadResult.put(PiecewiseBuilder.buildKey(fromIndex, thisToIndex),
PiecewiseBuilder.buildResult(partResult, pe));
fromIndex = thisToIndex;
}
}
/**
* @param offset
* @param partSize
* @return part process result
*/
protected abstract V partProcess(final int fromIndex, final int toIndex) throws Exception;
public static final int D_N_THREAD = 10;
public static final int D_THREAD_SIZE = 10000;
public static final int D_PART_SIZE = 1000;
}
分段任务定制类
package pers.zuo.component.piecewise.bean;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
/**
* @author zuojingang
*
* @param <K
* extends Number> the type of part process return
*/
public class PiecewiseTask extends FutureTask<Boolean> {
private final PiecewiseKey taskKey;
public PiecewiseTask(Callable<Boolean> callable, PiecewiseKey taskKey) {
super(callable);
this.taskKey = taskKey;
}
public PiecewiseKey getTaskKey() {
return taskKey;
}
}
分段任务Key值类
package pers.zuo.component.piecewise.bean;
public class PiecewiseKey {
private final Integer from;
private final Integer to;
public PiecewiseKey(Integer from, Integer to) {
super();
this.from = from;
this.to = to;
}
public Integer getFrom() {
return from;
}
public Integer getTo() {
return to;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((from == null) ? 0 : from.hashCode());
result = prime * result + ((to == null) ? 0 : to.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PiecewiseKey other = (PiecewiseKey) obj;
if (from == null) {
if (other.from != null)
return false;
} else if (!from.equals(other.from))
return false;
if (to == null) {
if (other.to != null)
return false;
} else if (!to.equals(other.to))
return false;
return true;
}
}
分段任务返回值类
package pers.zuo.component.piecewise.bean;
public class PiecewiseResult<V> {
private final V val;
private Exception exception;
public PiecewiseResult(V val) {
super();
this.val = val;
}
public PiecewiseResult(V val, Exception exception) {
super();
this.val = val;
this.exception = exception;
}
public Exception getException() {
return exception;
}
public void setException(Exception exception) {
this.exception = exception;
}
public V getVal() {
return val;
}
}
获取实例工具类
package pers.zuo.component.piecewise.manager;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import pers.zuo.component.piecewise.bean.PiecewiseKey;
import pers.zuo.component.piecewise.bean.PiecewiseResult;
import pers.zuo.component.piecewise.bean.PiecewiseTask;
public class PiecewiseBuilder {
public static PiecewiseKey buildKey(Integer from, Integer to) {
return new PiecewiseKey(from, to);
}
public static <V> PiecewiseResult<V> buildResult(V val) {
return new PiecewiseResult<V>(val);
}
public static <V> PiecewiseResult<V> buildResult(V val, Exception exception) {
return new PiecewiseResult<V>(val, exception);
}
public static PiecewiseTask buildTask(Callable<Boolean> callable, PiecewiseKey taskKey) {
return new PiecewiseTask(callable, taskKey);
}
/**
* this method aimed for simple when define the nThreadResult
*
* @return
*/
public static <V> Map<PiecewiseKey, PiecewiseResult<Map<PiecewiseKey, PiecewiseResult<V>>>> initializeNThreadResult() {
return new HashMap<>();
}
/**
* this method aimed for simple when define the threadResult
*
* @return
*/
public static <V> Map<PiecewiseKey, PiecewiseResult<V>> initializeThreadResult() {
return new HashMap<>();
}
}
下一篇: 进制转换
推荐阅读