纯JAVA实现Online Judge--5.并行运行
前言
如果一道题目有5份(输入+输出为一份)测试数据,对于一份用户的代码,如果串行的运行(假设每一份运行平均要500毫秒),那么5份的话就需要2.5秒了。但是,如果我们采用并行的方式的话,则只需要500毫秒(实际上会多一点)就可以了。但是,因为每个线程运行的用户代码,都需要从标准输入流(System.in)中读取数据,而且读取的时机无法掌控,也读需要将运行的输出结果输出到标准输出流(System.out)中,输出的时机也是无法掌控的。因此这里就发生了冲突问题。最明显的情况就是,a,b线程同时向System.out输出数据时,如果我们不做适当处理的话,我们是无法分清哪些输出数据是属于A线程的,哪些是属于B线程的,甚至连输出数据都难以拆分成两份(一个线程输出一份结果数据)。
多线程并行运行
这里主要涉及到两个类:ProblemCallable和ProblemItemCallable。ProblemCallable是一道题目对于一份用户代码的任务类。而ProblemItemCallable是一份数据的任务类。也就是说,一个ProblemCallable会对应多个ProblemItemCallable。他们两者都是实现了Callable接口的类,复写了里面的call方法。
ProblemCallable的call方法代码如下:
public List<ProblemResultItem> call() throws Exception {
List<String> paths = problem.getInputDataFilePathList();
final List<ProblemResultItem> resultItems = new ArrayList<ProblemResultItem>();
countDownLatch = new CountDownLatch(paths.size());
// 为了内存使用比较准确,先大概的执行一次回收吧
run.gc();
for (int i = 0; i < paths.size(); i++) {
final String path = paths.get(i);
itemExecThreadPool.execute(new Runnable() {
@Override
public void run() {
resultItems.add(process(path));
}
});
}
// 阻塞线程,等待所有结果都计算完了,再返回
countDownLatch.await();
return resultItems;
}
可以看出,在这个方法里面,先获取了该道题目所有标准输入文件的路径,然后在一个循环里面,根据标准输入文件的个数,动态创建了对应数量的任务类,去执行process方法。最后再利用CountDownLatch阻塞当前线程,等待所有结果到返回之后,这里再返回最终的结果。
ProblemCallable的process方法代码如下:
private ProblemResultItem process(String inputFilePath) {
ProblemResultItem item = null;
ProblemItemCallable itemCallable = null;
long beginMemory = 0;
long beginTime = 0;
long endTime = 0;
long endMemory = 0;
Future<ProblemResultItem> submit = null;
try {
itemCallable = new ProblemItemCallable(mainMethod, inputFilePath,
resultBuffer, threadSystemIn);
submit = itemGetThreadPool.submit(itemCallable);
beginMemory = run.totalMemory() - run.freeMemory();
beginTime = System.nanoTime();
item = submit
.get(problem.getTimeLimit() + 2, TimeUnit.MILLISECONDS);
if (item == null) {
killThread((FutureTask<ProblemResultItem>) submit);
throw new TimeoutException();
}
endTime = System.nanoTime();
endMemory = run.totalMemory() - run.freeMemory();
} catch (Exception e) {
// 出现了意外,先关闭资源再说(如已经打开的流等)
itemCallable.colseResource();
killThread((FutureTask<ProblemResultItem>) submit);
item = new ProblemResultItem();
item.setNormal(false);
if (e instanceof CancellationException
|| e instanceof TimeoutException) {
// 超时了,会进来这里
item.setMessage("超时");
} else {
item.setMessage(e.getMessage());
}
endTime = System.nanoTime();
endMemory = run.totalMemory() - run.freeMemory();
}
// 时间为毫微秒,要先转变为微秒再变为毫秒
item.setUseTime((endTime - beginTime) / 1000 / 1000);
item.setUseMemory(endMemory - beginMemory);
item.setInputFilePath(inputFilePath);
if (item.getUseMemory() > problem.getMemoryLimit()) {
item.setNormal(false);
item.setMessage("超出内存限制");
}
// 无论怎么样,这里必须最后都要进行减一,不然将会一直阻塞线程,最终无法返回结果
countDownLatch.countDown();
return item;
}
从代码中不难看出,该方法主要做的是一些准备和善后工作。最终真正通过反射执行用户代码中的main方法,交给了ProblemItemCallable去完成。
ProblemItemCallable的代码如下:
package cn.superman.sandbox.callable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import cn.superman.sandbox.core.systemInStream.ThreadInputStream;
import cn.superman.sandbox.core.systemOutStream.CacheOutputStream;
import cn.superman.sandbox.dto.ProblemResultItem;
public class ProblemItemCallable implements Callable<ProblemResultItem> {
private Method mainMethod;
private CacheOutputStream resultBuffer;
private FileInputStream fileInputStream;
private ThreadInputStream threadSystemIn;
public ProblemItemCallable(Method mainMethod, String inputFilePath,
CacheOutputStream resultBuffer, ThreadInputStream threadSystemIn) {
this.mainMethod = mainMethod;
this.resultBuffer = resultBuffer;
this.threadSystemIn = threadSystemIn;
// 重定向输入流,注意路径不能包含中文名
File file = new File(inputFilePath);
if (!file.exists()) {
throw new RuntimeException("测试数据有问题");
}
try {
fileInputStream = new FileInputStream(file);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
public ProblemResultItem call() throws Exception {
ProblemResultItem item = new ProblemResultItem();
try {
threadSystemIn.setThreadIn(fileInputStream);
mainMethod.invoke(null, new Object[] { new String[0] });
item.setResult(new String(resultBuffer.removeBytes(Thread
.currentThread().getId())));
item.setNormal(true);
} catch (InvocationTargetException e) {
Throwable throwable = e.getTargetException();
if (throwable instanceof OutOfMemoryError) {
item.setMessage("内存溢出");
} else {
item.setMessage(throwable.getMessage());
}
item.setNormal(false);
} catch (RuntimeException runtimeException) {
item.setMessage(runtimeException.getMessage());
item.setNormal(false);
} finally {
threadSystemIn.removeAndCloseThreadIn();
}
return item;
}
public void colseResource() {
threadSystemIn.removeAndCloseThreadIn();
}
}
代码不多,主要是根据传进来的路径,打开文件的输入流,就设置到threadSystemIn(后面会说)中,再利用反射调用main方法,最后再通过resultBuffer(后面会说),取出这一次的输出结果,然后进行返回。
大概就是通过上述的方式,完成了多线程并行的运行。这里重点解决的是一个输入和输出的问题。因为,就正如上面提到那样,因为多线程运行的原因,多个线程会同时去争抢的从System.in中读取数据,也会争抢的将输出信息输出到System.out中。由于我们很难去修改用户的代码,甚至以及编译出来后的class信息,因此我们无法直接通过上锁等方式,去协调这个问题。我通过一种无锁的简单方法解决了这个问题。
输入输出分流
解决上述问题,主要是两个类:ThreadInputStream和CacheOutputStream。他们分别在沙箱初始化的时候,进行了设置:
// 重定向输出流
System.setOut(new PrintStream(resultBuffer));
// 重定向输入流
System.setIn(systemThreadIn);
整体的解决思路,就是去根据每个线程的ID,为每一个线程匹配对应的输入流和输出流。要实现这件事,其实只要使用JAVA本身提供的ThreadLocal类即可简单的进行实现了。示意图如下:
ThreadInputStream类代码如下:
package cn.superman.sandbox.core.systemInStream;
import java.io.IOException;
import java.io.InputStream;
public class ThreadInputStream extends InputStream {
private volatile ThreadLocal<InputStream> localIn = new ThreadLocal<InputStream>();
@Override
public int read() throws IOException {
return localIn.get().read();
}
public void setThreadIn(InputStream in) {
localIn.set(in);
}
public void removeAndCloseThreadIn() {
InputStream stream = localIn.get();
localIn.remove();
try {
if (stream != null) {
stream.close();
stream = null;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
CacheOutputStream类代码如下:
package cn.superman.sandbox.core.systemOutStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
public class CacheOutputStream extends OutputStream {
private volatile ThreadLocal<ByteArrayOutputStream> localBytesCache = new ThreadLocal<ByteArrayOutputStream>() {
@Override
protected ByteArrayOutputStream initialValue() {
return new ByteArrayOutputStream();
}
};
@Override
public void write(int b) throws IOException {
ByteArrayOutputStream byteBufferStream = localBytesCache.get();
byteBufferStream.write(b);
}
public byte[] removeBytes(long threadId) {
ByteArrayOutputStream byteBufferStream = localBytesCache.get();
if (byteBufferStream == null) {
return new byte[0];
}
byte[] result = byteBufferStream.toByteArray();
// 因为这个可能以后还可以重用(因为线程时有反复重用的,所以这里只需要将里面的内容清空就可以了)
byteBufferStream.reset();
return result;
}
}
预告
OJ的沙箱端部分大概就到这里了,后面的博文会大概讲一下WEB端的内容。