欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

纯JAVA实现Online Judge--5.并行运行

程序员文章站 2022-05-12 09:39:43
...

前言

    如果一道题目有5份(输入+输出为一份)测试数据,对于一份用户的代码,如果串行的运行(假设每一份运行平均要500毫秒),那么5份的话就需要2.5秒了。但是,如果我们采用并行的方式的话,则只需要500毫秒(实际上会多一点)就可以了。但是,因为每个线程运行的用户代码,都需要从标准输入流(System.in)中读取数据,而且读取的时机无法掌控,也读需要将运行的输出结果输出到标准输出流(System.out)中,输出的时机也是无法掌控的。因此这里就发生了冲突问题。最明显的情况就是,a,b线程同时向System.out输出数据时,如果我们不做适当处理的话,我们是无法分清哪些输出数据是属于A线程的,哪些是属于B线程的,甚至连输出数据都难以拆分成两份(一个线程输出一份结果数据)。

多线程并行运行

    这里主要涉及到两个类:ProblemCallable和ProblemItemCallable。ProblemCallable是一道题目对于一份用户代码的任务类。而ProblemItemCallable是一份数据的任务类。也就是说,一个ProblemCallable会对应多个ProblemItemCallable。他们两者都是实现了Callable接口的类,复写了里面的call方法。

ProblemCallable的call方法代码如下:

public List<ProblemResultItem> call() throws Exception {
		List<String> paths = problem.getInputDataFilePathList();
		final List<ProblemResultItem> resultItems = new ArrayList<ProblemResultItem>();
		countDownLatch = new CountDownLatch(paths.size());
		// 为了内存使用比较准确,先大概的执行一次回收吧
		run.gc();

		for (int i = 0; i < paths.size(); i++) {
			final String path = paths.get(i);
			itemExecThreadPool.execute(new Runnable() {
				@Override
				public void run() {
					resultItems.add(process(path));
				}
			});
		}

		// 阻塞线程,等待所有结果都计算完了,再返回
		countDownLatch.await();
		return resultItems;
	}

可以看出,在这个方法里面,先获取了该道题目所有标准输入文件的路径,然后在一个循环里面,根据标准输入文件的个数,动态创建了对应数量的任务类,去执行process方法。最后再利用CountDownLatch阻塞当前线程,等待所有结果到返回之后,这里再返回最终的结果。

ProblemCallable的process方法代码如下:

private ProblemResultItem process(String inputFilePath) {
		ProblemResultItem item = null;
		ProblemItemCallable itemCallable = null;
		long beginMemory = 0;
		long beginTime = 0;
		long endTime = 0;
		long endMemory = 0;
		Future<ProblemResultItem> submit = null;

		try {
			itemCallable = new ProblemItemCallable(mainMethod, inputFilePath,
					resultBuffer, threadSystemIn);

			submit = itemGetThreadPool.submit(itemCallable);
			beginMemory = run.totalMemory() - run.freeMemory();
			beginTime = System.nanoTime();

			item = submit
					.get(problem.getTimeLimit() + 2, TimeUnit.MILLISECONDS);

			if (item == null) {
				killThread((FutureTask<ProblemResultItem>) submit);
				throw new TimeoutException();
			}

			endTime = System.nanoTime();
			endMemory = run.totalMemory() - run.freeMemory();
		} catch (Exception e) {
			// 出现了意外,先关闭资源再说(如已经打开的流等)
			itemCallable.colseResource();
			killThread((FutureTask<ProblemResultItem>) submit);
			item = new ProblemResultItem();
			item.setNormal(false);
			if (e instanceof CancellationException
					|| e instanceof TimeoutException) {
				// 超时了,会进来这里
				item.setMessage("超时");
			} else {
				item.setMessage(e.getMessage());
			}
			endTime = System.nanoTime();
			endMemory = run.totalMemory() - run.freeMemory();
		}
		// 时间为毫微秒,要先转变为微秒再变为毫秒
		item.setUseTime((endTime - beginTime) / 1000 / 1000);
		item.setUseMemory(endMemory - beginMemory);
		item.setInputFilePath(inputFilePath);
		if (item.getUseMemory() > problem.getMemoryLimit()) {
			item.setNormal(false);
			item.setMessage("超出内存限制");
		}
		// 无论怎么样,这里必须最后都要进行减一,不然将会一直阻塞线程,最终无法返回结果
		countDownLatch.countDown();
		return item;
	}
从代码中不难看出,该方法主要做的是一些准备和善后工作。最终真正通过反射执行用户代码中的main方法,交给了ProblemItemCallable去完成。


ProblemItemCallable的代码如下:

package cn.superman.sandbox.callable;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;

import cn.superman.sandbox.core.systemInStream.ThreadInputStream;
import cn.superman.sandbox.core.systemOutStream.CacheOutputStream;
import cn.superman.sandbox.dto.ProblemResultItem;

public class ProblemItemCallable implements Callable<ProblemResultItem> {
	private Method mainMethod;
	private CacheOutputStream resultBuffer;
	private FileInputStream fileInputStream;
	private ThreadInputStream threadSystemIn;

	public ProblemItemCallable(Method mainMethod, String inputFilePath,
			CacheOutputStream resultBuffer, ThreadInputStream threadSystemIn) {
		this.mainMethod = mainMethod;
		this.resultBuffer = resultBuffer;
		this.threadSystemIn = threadSystemIn;
		// 重定向输入流,注意路径不能包含中文名
		File file = new File(inputFilePath);
		if (!file.exists()) {
			throw new RuntimeException("测试数据有问题");
		}
		try {
			fileInputStream = new FileInputStream(file);
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		}
	}

	public ProblemResultItem call() throws Exception {
		ProblemResultItem item = new ProblemResultItem();

		try {
			threadSystemIn.setThreadIn(fileInputStream);
			mainMethod.invoke(null, new Object[] { new String[0] });

			item.setResult(new String(resultBuffer.removeBytes(Thread
					.currentThread().getId())));
			item.setNormal(true);

		} catch (InvocationTargetException e) {
			Throwable throwable = e.getTargetException();
			if (throwable instanceof OutOfMemoryError) {
				item.setMessage("内存溢出");
			} else {
				item.setMessage(throwable.getMessage());
			}
			item.setNormal(false);
		} catch (RuntimeException runtimeException) {
			item.setMessage(runtimeException.getMessage());
			item.setNormal(false);
		} finally {
			threadSystemIn.removeAndCloseThreadIn();
		}

		return item;
	}

	public void colseResource() {
		threadSystemIn.removeAndCloseThreadIn();
	}
}
代码不多,主要是根据传进来的路径,打开文件的输入流,就设置到threadSystemIn(后面会说)中,再利用反射调用main方法,最后再通过resultBuffer(后面会说),取出这一次的输出结果,然后进行返回。

    大概就是通过上述的方式,完成了多线程并行的运行。这里重点解决的是一个输入和输出的问题。因为,就正如上面提到那样,因为多线程运行的原因,多个线程会同时去争抢的从System.in中读取数据,也会争抢的将输出信息输出到System.out中。由于我们很难去修改用户的代码,甚至以及编译出来后的class信息,因此我们无法直接通过上锁等方式,去协调这个问题。我通过一种无锁的简单方法解决了这个问题。

输入输出分流

    解决上述问题,主要是两个类:ThreadInputStream和CacheOutputStream。他们分别在沙箱初始化的时候,进行了设置:

// 重定向输出流
		System.setOut(new PrintStream(resultBuffer));
		// 重定向输入流
		System.setIn(systemThreadIn);
整体的解决思路,就是去根据每个线程的ID,为每一个线程匹配对应的输入流和输出流。要实现这件事,其实只要使用JAVA本身提供的ThreadLocal类即可简单的进行实现了。示意图如下:

纯JAVA实现Online Judge--5.并行运行

ThreadInputStream类代码如下:

package cn.superman.sandbox.core.systemInStream;

import java.io.IOException;
import java.io.InputStream;

public class ThreadInputStream extends InputStream {
	private volatile ThreadLocal<InputStream> localIn = new ThreadLocal<InputStream>();

	@Override
	public int read() throws IOException {
		return localIn.get().read();
	}

	public void setThreadIn(InputStream in) {
		localIn.set(in);
	}

	public void removeAndCloseThreadIn() {
		InputStream stream = localIn.get();
		localIn.remove();
		try {
			if (stream != null) {
				stream.close();
				stream = null;
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}


CacheOutputStream类代码如下:

package cn.superman.sandbox.core.systemOutStream;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;

public class CacheOutputStream extends OutputStream {

	private volatile ThreadLocal<ByteArrayOutputStream> localBytesCache = new ThreadLocal<ByteArrayOutputStream>() {
		@Override
		protected ByteArrayOutputStream initialValue() {
			return new ByteArrayOutputStream();
		}

	};

	@Override
	public void write(int b) throws IOException {
		ByteArrayOutputStream byteBufferStream = localBytesCache.get();
		byteBufferStream.write(b);
	}

	public byte[] removeBytes(long threadId) {
		ByteArrayOutputStream byteBufferStream = localBytesCache.get();

		if (byteBufferStream == null) {
			return new byte[0];
		}
		byte[] result = byteBufferStream.toByteArray();
		// 因为这个可能以后还可以重用(因为线程时有反复重用的,所以这里只需要将里面的内容清空就可以了)
		byteBufferStream.reset();
		return result;
	}
}


预告

    OJ的沙箱端部分大概就到这里了,后面的博文会大概讲一下WEB端的内容。