欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

在Netty中使用Apache common fileupload

程序员文章站 2024-01-01 23:48:04
...

 

在Http上传中,Apache common fileupload 的文件上传组件要求传入Inputstream对象。

而Netty中数据是按块(HttpChunk)来传送数据,没有直接的流。

因此要想在Netty中使用Apache Common Fileupload,则必须将httpchunk适配成InputStream。

 

实现Apache FileUpload

 

/**
 * 用Netty来实现上传
 */
public class NettyFileUpload extends FileUpload {

	private NettyRequestContext context;

	public static final boolean isMultipartContent(HttpRequest request) {
		if (HttpMethod.POST != request.getMethod()) {
			return false;
		}
		if (request.getHeaders("Content-Type") == null && request.getHeaders("Content-Type").size() == 0) {
			return false;
		}
		String contentType = request.getHeaders("Content-Type").get(0);
		if (contentType == null) {
			return false;
		}
		if (contentType.toLowerCase().startsWith("multipart/")) {
			return true;
		}
		return false;
	}

	public NettyFileUpload(NettyRequestContext context) {
		this.context = context;
	}

	public NettyFileUpload(FileItemFactory fileItemFactory) {
		super(fileItemFactory);
	}

	public FileItemIterator getItemIterator() throws FileUploadException, IOException {
		return super.getItemIterator(context);
	}
 

 

public class NettyRequestContext implements RequestContext {
	private String encoding;
	private String contentType;
	private int contentLength = -1;
	/**
	 * 上传的内容流
	 */
	private InputStream inputStream;
	public NettyRequestContext(String encoding, String contentType,
 int contentLength, InputStream inputStream) {
		this.encoding = encoding;
		this.contentType = contentType;
		this.contentLength = contentLength;
		this.inputStream = inputStream;
	}
	@Override
	public String getCharacterEncoding() {
		return encoding;
	}
	@Override
	public String getContentType() {
		return contentType;
	}
	@Override
	public int getContentLength() {
		return contentLength;
	}
	@Override
	public InputStream getInputStream() throws IOException {
		// 不能直接用request的流,因为有HttpChunk
		return inputStream;
	}
	@Override
	public String toString() {
		return "ContentLength=" + this.getContentLength() + ", ContentType="
				+ this.getContentType();
	}

	public void closeInputStream() throws IOException {
		getInputStream().close();
	}
}
 

适配成InputStream:

 

public class NettyChunkInputStream extends InputStream {

	private BlockingQueue<HttpChunk> chunkQueue = new ArrayBlockingQueue<HttpChunk>(128);

	private HttpChunk currentChunk = null;

	private volatile boolean closed;

	public boolean putChunk(HttpChunk chunk) throws IOException {
		if (!closed) {
			try {
				chunkQueue.put(chunk);
			} catch (InterruptedException e) {
				throw new IOException(e);
			}
			return true;
		}
		throw new IOException(" this inputstream has been closed!");

	}

	@Override
	public int read() throws IOException {
		byte resultByte = -1;
		try {
			if (getChunk().getContent().readable()) {
				resultByte = getChunk().getContent().readByte();
			} else if (!getChunk().isLast()) {
				nextChunk();
				if (getChunk().getContent().readable()) {
					resultByte = getChunk().getContent().readByte();
				} else {
					return -1;
				}
			} else {
				return -1;
			}
		} catch (InterruptedException e) {
			throw new IOException(e);
		}
		// InputStream.read()返回0-255之间的int
		return resultByte >= 0 ? resultByte : 256 + resultByte;
	}

	private HttpChunk getChunk() throws InterruptedException {
		if (currentChunk == null) {
			currentChunk = chunkQueue.take();
		}

		return currentChunk;
	}

	private void nextChunk() throws InterruptedException {
		currentChunk = chunkQueue.take();
	}

	@Override
	public int available() throws IOException {
		throw new UnsupportedOperationException("unsupport available()");
	}

	@Override
	public void close() throws IOException {
		chunkQueue = null;
		closed = true;
	}

	public boolean isClosed() {
		return closed;
	}

}
 

 

Netty FileUpload应用:

 

public class NettyUploadHandler extends SimpleChannelUpstreamHandler {
	private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(32);
	private boolean hasReadChunk;
	private NettyChunkInputStream chunkStream = new NettyChunkInputStream();
	private NettyRequestContext context;

	private volatile Map<String, String> resultMap = null;

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		if (!hasReadChunk) {
			handleHttpRequest(ctx, e);
		} else {
			handleHttpChunk(e);
		}
	}

	private void handleHttpRequest(ChannelHandlerContext ctx, MessageEvent e) throws IOException {
		HttpRequest request = (HttpRequest) e.getMessage();
		if (isUploadFile(request)) {
			handleUploadRequest(request);
		} else {
			ctx.sendUpstream(e);
		}
	}

	private void handleUploadRequest(HttpRequest request) throws IOException {
		context = new NettyRequestContext("UTF-8", request.getHeader("Content-Type"), -1, chunkStream);
		if (request.isChunked()) {
			hasReadChunk = true;
		} else {
			HttpChunk chunk = new DefaultHttpChunk(request.getContent());
			chunkStream.putChunk(chunk);
		}
		startUpload();
	}

	private void handleHttpChunk(MessageEvent e) throws IOException {

		if (isUploadFinished()) {
			writeResult(e.getChannel());
			return;
		}
		HttpChunk chunk = (HttpChunk) e.getMessage();
		chunkStream.putChunk(chunk);

		if (chunk.isLast()) {
			for (;;) {
				if (isUploadFinished()) {
					writeResult(e.getChannel());
					return;
				}
			}
		}
	}

	private boolean isUploadFinished() {
		return resultMap != null || chunkStream.isClosed();
	}

	private boolean isUploadFile(HttpRequest request) {
		return request.getUri().equals("/upload/uploadfile") && NettyFileUpload.isMultipartContent(request);
	}

	private void startUpload() {
		EXECUTOR.execute(new UploadTask());
	}

	private void writeResult(Channel channel) {
		String json = JsonUtil.beanToJson(resultMap);
		byte[] data = json.getBytes();
		ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
		HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
		response.setContent(buffer);
		response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
		response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buffer.readableBytes()));
		channel.write(response);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
		e.getCause().printStackTrace();
	}

	class UploadTask implements Runnable {

		public UploadTask() {
			super();
		}

		@Override
		public void run() {
			long start = System.currentTimeMillis();

			try {

				NettyFileUpload upload = new NettyFileUpload(context);
				FileItemIterator iter = upload.getItemIterator();

				while (iter.hasNext()) {
					FileItemStream item = iter.next();
					//这里处理逻辑

				}
				resultMap = handler.getResult();
				context.closeInputStream();
				long end = System.currentTimeMillis();
				System.out.println("spend time : " + (end - start));

			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}
}
  

该NettyChunkInputStream必须一个线程来putChunk(...),另一个线程使用getInputStream()来消耗数据。

 

PS:可以在NettyChunkInputStream中重写InputStream.read(bs,offset,len),避免每次调用read()都进行边界判断,使之效率更高。

 

 

相关标签: Apache json

上一篇:

下一篇: