在Netty中使用Apache common fileupload
程序员文章站
2024-01-01 23:48:04
...
在Http上传中,Apache common fileupload 的文件上传组件要求传入Inputstream对象。
而Netty中数据是按块(HttpChunk)来传送数据,没有直接的流。
因此要想在Netty中使用Apache Common Fileupload,则必须将httpchunk适配成InputStream。
实现Apache FileUpload
/** * 用Netty来实现上传 */ public class NettyFileUpload extends FileUpload { private NettyRequestContext context; public static final boolean isMultipartContent(HttpRequest request) { if (HttpMethod.POST != request.getMethod()) { return false; } if (request.getHeaders("Content-Type") == null && request.getHeaders("Content-Type").size() == 0) { return false; } String contentType = request.getHeaders("Content-Type").get(0); if (contentType == null) { return false; } if (contentType.toLowerCase().startsWith("multipart/")) { return true; } return false; } public NettyFileUpload(NettyRequestContext context) { this.context = context; } public NettyFileUpload(FileItemFactory fileItemFactory) { super(fileItemFactory); } public FileItemIterator getItemIterator() throws FileUploadException, IOException { return super.getItemIterator(context); }
public class NettyRequestContext implements RequestContext { private String encoding; private String contentType; private int contentLength = -1; /** * 上传的内容流 */ private InputStream inputStream; public NettyRequestContext(String encoding, String contentType, int contentLength, InputStream inputStream) { this.encoding = encoding; this.contentType = contentType; this.contentLength = contentLength; this.inputStream = inputStream; } @Override public String getCharacterEncoding() { return encoding; } @Override public String getContentType() { return contentType; } @Override public int getContentLength() { return contentLength; } @Override public InputStream getInputStream() throws IOException { // 不能直接用request的流,因为有HttpChunk return inputStream; } @Override public String toString() { return "ContentLength=" + this.getContentLength() + ", ContentType=" + this.getContentType(); } public void closeInputStream() throws IOException { getInputStream().close(); } }
适配成InputStream:
public class NettyChunkInputStream extends InputStream { private BlockingQueue<HttpChunk> chunkQueue = new ArrayBlockingQueue<HttpChunk>(128); private HttpChunk currentChunk = null; private volatile boolean closed; public boolean putChunk(HttpChunk chunk) throws IOException { if (!closed) { try { chunkQueue.put(chunk); } catch (InterruptedException e) { throw new IOException(e); } return true; } throw new IOException(" this inputstream has been closed!"); } @Override public int read() throws IOException { byte resultByte = -1; try { if (getChunk().getContent().readable()) { resultByte = getChunk().getContent().readByte(); } else if (!getChunk().isLast()) { nextChunk(); if (getChunk().getContent().readable()) { resultByte = getChunk().getContent().readByte(); } else { return -1; } } else { return -1; } } catch (InterruptedException e) { throw new IOException(e); } // InputStream.read()返回0-255之间的int return resultByte >= 0 ? resultByte : 256 + resultByte; } private HttpChunk getChunk() throws InterruptedException { if (currentChunk == null) { currentChunk = chunkQueue.take(); } return currentChunk; } private void nextChunk() throws InterruptedException { currentChunk = chunkQueue.take(); } @Override public int available() throws IOException { throw new UnsupportedOperationException("unsupport available()"); } @Override public void close() throws IOException { chunkQueue = null; closed = true; } public boolean isClosed() { return closed; } }
Netty FileUpload应用:
public class NettyUploadHandler extends SimpleChannelUpstreamHandler { private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(32); private boolean hasReadChunk; private NettyChunkInputStream chunkStream = new NettyChunkInputStream(); private NettyRequestContext context; private volatile Map<String, String> resultMap = null; @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { if (!hasReadChunk) { handleHttpRequest(ctx, e); } else { handleHttpChunk(e); } } private void handleHttpRequest(ChannelHandlerContext ctx, MessageEvent e) throws IOException { HttpRequest request = (HttpRequest) e.getMessage(); if (isUploadFile(request)) { handleUploadRequest(request); } else { ctx.sendUpstream(e); } } private void handleUploadRequest(HttpRequest request) throws IOException { context = new NettyRequestContext("UTF-8", request.getHeader("Content-Type"), -1, chunkStream); if (request.isChunked()) { hasReadChunk = true; } else { HttpChunk chunk = new DefaultHttpChunk(request.getContent()); chunkStream.putChunk(chunk); } startUpload(); } private void handleHttpChunk(MessageEvent e) throws IOException { if (isUploadFinished()) { writeResult(e.getChannel()); return; } HttpChunk chunk = (HttpChunk) e.getMessage(); chunkStream.putChunk(chunk); if (chunk.isLast()) { for (;;) { if (isUploadFinished()) { writeResult(e.getChannel()); return; } } } } private boolean isUploadFinished() { return resultMap != null || chunkStream.isClosed(); } private boolean isUploadFile(HttpRequest request) { return request.getUri().equals("/upload/uploadfile") && NettyFileUpload.isMultipartContent(request); } private void startUpload() { EXECUTOR.execute(new UploadTask()); } private void writeResult(Channel channel) { String json = JsonUtil.beanToJson(resultMap); byte[] data = json.getBytes(); ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data); HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); response.setContent(buffer); response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8"); response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buffer.readableBytes())); channel.write(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { e.getCause().printStackTrace(); } class UploadTask implements Runnable { public UploadTask() { super(); } @Override public void run() { long start = System.currentTimeMillis(); try { NettyFileUpload upload = new NettyFileUpload(context); FileItemIterator iter = upload.getItemIterator(); while (iter.hasNext()) { FileItemStream item = iter.next(); //这里处理逻辑 } resultMap = handler.getResult(); context.closeInputStream(); long end = System.currentTimeMillis(); System.out.println("spend time : " + (end - start)); } catch (Exception e) { e.printStackTrace(); } } } }
该NettyChunkInputStream必须一个线程来putChunk(...),另一个线程使用getInputStream()来消耗数据。
PS:可以在NettyChunkInputStream中重写InputStream.read(bs,offset,len),避免每次调用read()都进行边界判断,使之效率更高。
推荐阅读
-
在Netty中使用Apache common fileupload
-
在Netty中使用Apache common fileupload
-
在ASP.NET 2.0中操作数据之五十二:使用FileUpload上传文件
-
在ASP.NET 2.0中操作数据之五十二:使用FileUpload上传文件
-
使用apache common math 中的聚类方法DBSCAN与Kmeans
-
在SpringBoot中,如何使用Netty实现远程调用方法总结
-
请教在apache2.4中 DefaultType功能怎样使用其它设置进行替代
-
在linux中如何查看apache使用的是哪个httpd.conf
-
在linux中如何查看apache使用的是哪个httpd.conf
-
请问在apache2.4中 DefaultType功能怎样使用其它设置进行替代