From 3d320704b268b5e5c277f36a42d11d3318e16a4a Mon Sep 17 00:00:00 2001 From: Summer Boot Framework Date: Thu, 21 May 2026 09:50:36 -0400 Subject: [PATCH 1/7] Performance improvement: Agent_PDFBox - Serial graphics processing converted to parallel processing --- CHANGES.md | 1 + .../jexpress/util/pdf/Agent_PDFBox.java | 49 ++++++++++++++++--- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 50c8ecc0..f031e8d1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,7 @@ * ✨ New API: FormatterUtil.formatCurrency(BigDecimal amount, RoundingMode roundingMode) * WebResourceController.requestWebResource with @Daemon to serve web resources with enhanced reliability. * Refactoring: predefined URI constants inside BootURI +* Performance improvement: Agent_PDFBox - Serial graphics processing converted to parallel processing ## Version 2.6.9 (2026-04-24) diff --git a/src/main/java/org/summerboot/jexpress/util/pdf/Agent_PDFBox.java b/src/main/java/org/summerboot/jexpress/util/pdf/Agent_PDFBox.java index 9a9a2d76..bb65857f 100644 --- a/src/main/java/org/summerboot/jexpress/util/pdf/Agent_PDFBox.java +++ b/src/main/java/org/summerboot/jexpress/util/pdf/Agent_PDFBox.java @@ -30,6 +30,7 @@ import org.apache.pdfbox.rendering.ImageType; import org.apache.pdfbox.rendering.PDFRenderer; import org.apache.pdfbox.rendering.RenderDestination; +import org.summerboot.jexpress.util.ApplicationUtil; import javax.imageio.ImageIO; import java.awt.image.BufferedImage; @@ -41,6 +42,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; /** * @author Changski Tie Zheng Zhang 张铁铮, 魏泽北, 杜旺财, 杜富贵 @@ -316,10 +319,24 @@ public static List pdf2Images(File pdfFile, String password, floa public static List pdf2Images(PDDocument document, float dpi, ImageType imageType, RenderDestination destination) throws IOException { PDFRenderer renderer = new PDFRenderer(document); int totalPages = document.getNumberOfPages(); - List images = new ArrayList(); - for (int currentPage = 0; currentPage < totalPages; currentPage++) { + List images = new ArrayList(totalPages); + /*for (int currentPage = 0; currentPage < totalPages; currentPage++) { BufferedImage image = renderer.renderImage(currentPage, dpi / 72f, imageType, destination); images.add(image); + }*/ + List> tasks = new ArrayList<>(totalPages); + for (int currentPage = 0; currentPage < totalPages; currentPage++) { + final int index = currentPage; + Callable task = () -> { + BufferedImage image = renderer.renderImage(index, dpi / 72f, imageType, destination); + return image; + }; + tasks.add(task); + } + try { + ApplicationUtil.runAndWaitForAllResults(tasks, images); + } catch (ExecutionException e) { + e.printStackTrace(); } return images; } @@ -332,13 +349,29 @@ public static List pdf2Images(PDDocument document, float dpi, Ima * @throws IOException */ public static List images2Bytes(List images, String formatName) throws IOException { - List imageDataList = new ArrayList(images.size()); + int totalPages = images.size(); + List imageDataList = new ArrayList(totalPages); + List> tasks = new ArrayList<>(totalPages); + /*for (BufferedImage image : images) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) { + ImageIO.write(image, formatName, baos); + byte[] imageData = baos.toByteArray(); + imageDataList.add(imageData); + } + }*/ for (BufferedImage image : images) { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) { - ImageIO.write(image, formatName, baos); - byte[] imageData = baos.toByteArray(); - imageDataList.add(imageData); - } + Callable task = () -> { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) { + ImageIO.write(image, formatName, baos); + return baos.toByteArray(); + } + }; + tasks.add(task); + } + try { + ApplicationUtil.runAndWaitForAllResults(tasks, imageDataList); + } catch (ExecutionException e) { + e.printStackTrace(); } return imageDataList; } From 3ce0b5d8da3925b4d155d67c9748a8dd8937de05 Mon Sep 17 00:00:00 2001 From: Summer Boot Framework Date: Thu, 21 May 2026 16:31:08 -0400 Subject: [PATCH 2/7] refactoring --- .../summerboot/jexpress/boot/ScanedGuiceModule.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java b/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java index e1290e3d..5c513c18 100644 --- a/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java +++ b/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java @@ -141,10 +141,12 @@ public void configure() { } catch (RuntimeException ex) { } Set namedWebsocket = channelHandlerNames.get(Service.ChannelHandlerType.Websocket); - for (String s : namedWebsocket) { - if (s == null || !s.startsWith(WebSocketAuthHandler_OTT.WS_PATH_PREFIX + "/")) { - String errorMessage = "@Service(binding = ChannelHandler.class, named = \"" + s + "\", type = Service.ChannelHandlerType.Websocket): named field value must start with " + WebSocketAuthHandler_OTT.WS_PATH_PREFIX + "/, but found: " + s; - throw new IllegalArgumentException(errorMessage); + if (namedWebsocket != null) { + for (String s : namedWebsocket) { + if (s == null || !s.startsWith(WebSocketAuthHandler_OTT.WS_PATH_PREFIX + "/")) { + String errorMessage = "@Service(binding = ChannelHandler.class, named = \"" + s + "\", type = Service.ChannelHandlerType.Websocket): named field value must start with " + WebSocketAuthHandler_OTT.WS_PATH_PREFIX + "/, but found: " + s; + throw new IllegalArgumentException(errorMessage); + } } } } From 9bf273b0796b0425969c1fd2a071daa0c31baa13 Mon Sep 17 00:00:00 2001 From: Summer Boot Framework Date: Thu, 21 May 2026 23:06:25 -0400 Subject: [PATCH 3/7] websocket --- .../jexpress/boot/ScanedGuiceModule.java | 4 +- .../nio/server/HttpNioChannelInitializer.java | 2 +- .../websocket/BootWebSocketHandler.java | 153 +++++++++--------- .../websocket/WebSocketAuthHandler_OTT.java | 29 +--- .../nio/server/ws/rs/BootController.java | 1 + .../summerboot/jexpress/util/FileUtil.java | 33 ++++ 6 files changed, 118 insertions(+), 104 deletions(-) diff --git a/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java b/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java index 5c513c18..9cf46227 100644 --- a/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java +++ b/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java @@ -143,8 +143,8 @@ public void configure() { Set namedWebsocket = channelHandlerNames.get(Service.ChannelHandlerType.Websocket); if (namedWebsocket != null) { for (String s : namedWebsocket) { - if (s == null || !s.startsWith(WebSocketAuthHandler_OTT.WS_PATH_PREFIX + "/")) { - String errorMessage = "@Service(binding = ChannelHandler.class, named = \"" + s + "\", type = Service.ChannelHandlerType.Websocket): named field value must start with " + WebSocketAuthHandler_OTT.WS_PATH_PREFIX + "/, but found: " + s; + if (s == null || !s.startsWith(WebSocketAuthHandler_OTT.WS_PATH_PREFIX)) { + String errorMessage = "@Service(binding = ChannelHandler.class, named = \"" + s + "\", type = Service.ChannelHandlerType.Websocket): named field value must start with " + WebSocketAuthHandler_OTT.WS_PATH_PREFIX + ", but found: " + s; throw new IllegalArgumentException(errorMessage); } } diff --git a/src/main/java/org/summerboot/jexpress/nio/server/HttpNioChannelInitializer.java b/src/main/java/org/summerboot/jexpress/nio/server/HttpNioChannelInitializer.java index 9663d5d2..37160663 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/HttpNioChannelInitializer.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/HttpNioChannelInitializer.java @@ -126,7 +126,7 @@ protected void initChannelPipeline(ChannelPipeline channelPipeline, NioConfig ni // 关键点:我们将路径设置为 null。设置为 null 意味着它不会主动去拦截并匹配固定 URL, // 而是只要看到带有符合标准的 WebSocket Upgrade 请求头,它就会自动在原地执行握手升级! // 这样无论我们前面把 URI 改成 /ws/chat 还是 /ws/game,它都能兼容升级。 - String webSocketURI = WebSocketAuthHandler_OTT.WS_PATH_PREFIX; + String webSocketURI = WebSocketAuthHandler_OTT.WS_PATH; channelPipeline.addLast(WebSocketAuthHandler_OTT.CHANNEL_CHANNEL_NAME_NEXT, new WebSocketServerProtocolHandler(webSocketURI, null, allowExtensions, maxFrameSize, allowMaskMismatch, checkStartsWith, dropPongFrames, handshakeTimeoutMillis)); // 3. 注意:这里【不要】像之前一样 addLast(new BusinessHandler) 了。 diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java index 7564e0cc..9a74eae1 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java @@ -27,13 +27,15 @@ import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; -import io.netty.util.AttributeKey; import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.tika.Tika; +import org.apache.tika.mime.MimeTypes; import org.summerboot.jexpress.nio.server.NioConfig; import org.summerboot.jexpress.nio.server.NioHttpUtil; import org.summerboot.jexpress.security.auth.Caller; +import org.summerboot.jexpress.util.FileUtil; /** * usage example: @@ -65,10 +67,13 @@ abstract public class BootWebSocketHandler extends SimpleChannelInboundHandler { + Caller caller = ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get(); + if (caller == null) { + clients.remove(ctx.channel()); + ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate()); + ctx.close(); + log.warn("OTT auth failed - " + ctx.channel().remoteAddress() + ": " + ctx); + ctx.close(); + return; + } + + clients.add(ctx.channel()); + log.trace(() -> "handlerAdded: " + ctx.channel().remoteAddress()); + + String message = onCallerConnected(ctx, caller); + if (message != null) { + sendToAllChannels(message, true); + } + }; + NioConfig.cfg.getBizExecutor().execute(asyncTask); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + log.trace(() -> "channelActive: " + ctx.channel().remoteAddress()); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + clients.remove(ctx.channel()); + log.trace(() -> "handlerRemoved: " + ctx.channel().remoteAddress()); + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { clients.remove(ctx.channel()); @@ -107,53 +148,40 @@ protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throw protected void onBinaryWebSocketFrame(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception { ByteBuf bb = msg.content(); byte[] data = ByteBufUtil.getBytes(bb); - Runnable asyncTask = () -> { - Caller caller = (Caller) ctx.channel().attr(KEY_CALLER).get(); - if (caller == null) { - clients.remove(ctx.channel()); - ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate()); - ctx.close(); - log.warn("OTT auth failed " + ctx.channel().remoteAddress()); - return; - } - String responseText = onMessage(ctx, caller, data); - if (responseText != null) { - sendToChannel(ctx, responseText); - } - }; - NioConfig.cfg.getBizExecutor().execute(asyncTask); + processMessage(ctx, null, data); } protected void onTextWebSocketFrame(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { String txt = msg.text(); + processMessage(ctx, txt, null); + } + + protected void onContinuationWebSocketFrame(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception { + + } + + protected void processMessage(ChannelHandlerContext ctx, String text, byte[] data) { Runnable asyncTask = () -> { - Caller caller = (Caller) ctx.channel().attr(KEY_CALLER).get(); - if (caller == null) { - /*caller = auth(ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get());//use the first message as token to auth - if (caller == null) { - clients.remove(ctx.channel()); - ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate()); - ctx.close(); - log.warn("OTT auth failed " + ctx.channel().remoteAddress() + ": " + txt); - return; + Caller caller = (Caller) ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get(); + if (text != null) { + String responseText = onMessage(ctx, caller, text); + if (responseText != null) { + sendToAllChannels(responseText, true); + } + } else if (data != null) { + String[] mimeType = FileUtil.getMIMEShortExtension(data); + StringBuilder sb = new StringBuilder(); + byte[] processedData = onMessage(ctx, caller, data, mimeType[0], mimeType[1], sb); + if (processedData != null) { + sendToAllChannels(processedData, true); + } + if (!sb.isEmpty()) { + sendToAllChannels(sb.toString(), true); } - ctx.channel().attr(KEY_CALLER).set(caller); - String message = onCallerConnected(ctx, caller); - if (message != null) { - sendToAllChannels(message, true); - }*/ } - String responseText = onMessage(ctx, caller, txt); - if (responseText != null) { - //sendToChannel(ctx, responseText); - sendToAllChannels(responseText, true); - } }; NioConfig.cfg.getBizExecutor().execute(asyncTask); - } - - protected void onContinuationWebSocketFrame(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception { } @@ -171,7 +199,7 @@ protected Caller auth(Caller caller) { */ abstract protected String onMessage(ChannelHandlerContext ctx, Caller caller, String txt); - abstract protected String onMessage(ChannelHandlerContext ctx, Caller caller, byte[] data); + abstract protected byte[] onMessage(ChannelHandlerContext ctx, Caller caller, byte[] data, String mimeType, String fileExtension, StringBuilder builder); public void sendToChannel(ChannelHandlerContext ctx, String message) { ctx.writeAndFlush(new TextWebSocketFrame(message)); @@ -194,7 +222,11 @@ public void sendToAllChannels(byte[] data, boolean auth) { public void sendToAllChannels(WebSocketFrame message, boolean auth) { if (auth) { clients.stream() - .filter(channel -> channel.attr(KEY_CALLER).get() != null) + /*.filter(channel -> { + Caller caller = channel.attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get(); + return caller != null; + })*/ + .filter(channel -> channel.attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get() != null) .forEach(channel -> channel.writeAndFlush(message.retainedDuplicate())); } else { clients.stream() @@ -202,43 +234,4 @@ public void sendToAllChannels(WebSocketFrame message, boolean auth) { } } - @Override - public void handlerAdded(ChannelHandlerContext ctx) { - Runnable asyncTask = () -> { - Caller caller = (Caller) ctx.channel().attr(KEY_CALLER).get(); - if (caller == null) { - caller = auth(ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get());//use the first message as token to auth - if (caller == null) { - clients.remove(ctx.channel()); - ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate()); - ctx.close(); - log.warn("OTT " + ctx.channel().remoteAddress() + ": " + ctx); - ctx.close(); - return; - } - - ctx.channel().attr(KEY_CALLER).set(caller); - clients.add(ctx.channel()); - log.trace(() -> "handlerAdded: " + ctx.channel().remoteAddress()); - - String message = onCallerConnected(ctx, caller); - if (message != null) { - sendToAllChannels(message, true); - } - } - }; - NioConfig.cfg.getBizExecutor().execute(asyncTask); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - log.trace(() -> "channelActive: " + ctx.channel().remoteAddress()); - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) { - clients.remove(ctx.channel()); - log.trace(() -> "handlerRemoved: " + ctx.channel().remoteAddress()); - } - } diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java index a42f5517..9abd2c75 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java @@ -23,9 +23,12 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; import io.netty.util.AttributeKey; import org.summerboot.jexpress.security.auth.Authenticator; import org.summerboot.jexpress.security.auth.Caller; @@ -40,7 +43,8 @@ public class WebSocketAuthHandler_OTT extends ChannelInboundHandlerAdapter { public static final AttributeKey USER_ID_KEY = AttributeKey.valueOf("userId"); public static final String CHANNEL_NAME = "WebSocketAuthHandler_OTT"; public static final String CHANNEL_CHANNEL_NAME_NEXT = "BootWebSocketServerProtocolHandler"; - public static final String WS_PATH_PREFIX = "/ws"; + public static final String WS_PATH = "/ws"; + public static final String WS_PATH_PREFIX = WS_PATH + "/"; protected final Injector injector; @@ -76,6 +80,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception String oneTimeTicket = uriRequested.substring(uriPredefinedOTT.length()); Caller caller = verifyAndDestroyTicket(oneTimeTicket); // 校验并销毁 Ticket if (caller == null) { + sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN)); break; } // save OTT result to channel attr @@ -94,24 +99,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ctx.fireChannelRead(msg); return; } - /*if (uriRequested.startsWith(CHAT_PATH_PREFIX)) { - String oneTimeTicket = uriRequested.substring(CHAT_PATH_PREFIX.length()); - Caller caller = verifyAndDestroyTicket(oneTimeTicket); // 校验并销毁 Ticket - - if (caller != null) { - ctx.channel().attr(USER_ID_KEY).set(caller); - - // 【核心点】动态向管道末尾添加聊天专属业务 Handler - //ctx.pipeline().addLast("businessHandler", new ChatModuleHandler()); - ChannelHandler ch = injector.getInstance(Key.get(ChannelHandler.class, Names.named("/ws/chat"))); - //ch = new ChatModuleHandler(); - ctx.pipeline().addAfter(CHANNEL_CHANNEL_NAME_NEXT, "wsChatModuleHandler", ch); - - // 重写 URI,让下游的 WebSocketServerProtocolHandler 能够精准匹配升级 - request.setUri("/ws/chat"); - ctx.fireChannelRead(msg); - return; - }*/ } // 3. 认证失败或路径不匹配:直接拒绝连接 @@ -123,14 +110,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ctx.fireChannelRead(msg); } - private Caller verifyAndDestroyTicket(String oneTimeTicket) { + protected Caller verifyAndDestroyTicket(String oneTimeTicket) { if (authenticator == null) { return null; } return authenticator.oneTimeTicketVerifyAndDestroy(oneTimeTicket); } - private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { + protected void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { HttpUtil.setContentLength(res, 0); ctx.channel().writeAndFlush(res).addListener(ChannelFutureListener.CLOSE); } diff --git a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java index 42398cd1..43838779 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java @@ -291,6 +291,7 @@ public void logout(@Parameter(hidden = true) final ServiceRequest request, @Para @Path(BootURI.CURRENT_VERSION + BootURI.API_NF_OTT) @Daemon @RequiresHealthCheck("") + @Log(responseBody = false) public String oneTimeTicketAuthenticate(@HeaderParam(NioHttpUtil.HTTP_HEADER_AUTH_TOKEN) String authHeader, @Parameter(hidden = true) final SessionContext context) { String jwt = BootAuthenticator.getBearerToken(authHeader); return auth.oneTimeTicketAuthenticate(jwt, context); diff --git a/src/main/java/org/summerboot/jexpress/util/FileUtil.java b/src/main/java/org/summerboot/jexpress/util/FileUtil.java index ebd92dfa..20655138 100644 --- a/src/main/java/org/summerboot/jexpress/util/FileUtil.java +++ b/src/main/java/org/summerboot/jexpress/util/FileUtil.java @@ -16,6 +16,11 @@ */ package org.summerboot.jexpress.util; +import org.apache.tika.Tika; +import org.apache.tika.mime.MediaType; +import org.apache.tika.mime.MimeType; +import org.apache.tika.mime.MimeTypes; + import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -88,4 +93,32 @@ private static void deletePartFiles(File base64File) throws IOException { } } + public static final Tika TIKA = new Tika(); + public static final MimeTypes TIKA_REGISTRY = MimeTypes.getDefaultMimeTypes(); + + public static String[] getMIMEShortExtension(byte[] fileData) { + try { + // 1. 先通过字节码识别出长 MIME 类型(例如 "image/png" 或 "image/jpeg") + String mimeTypeStr = TIKA.detect(fileData); + + // 2. 从 Tika 注册表中找到该类型对应的 MimeType 对象 + MimeType mimeType = TIKA_REGISTRY.getRegisteredMimeType(mimeTypeStr); + + // 3. 获取短扩展名(注意:Tika 返回的带有 ".",例如 ".png" 或 ".jpg") + MediaType mediaType = mimeType.getType(); + String type = mediaType.getType(); + String ext = mimeType.getExtension(); + + // 如果你想去掉前面的点,可以做个简单处理 + if (ext != null && ext.startsWith(".")) { + ext = ext.substring(1); // 返回 "png" 或 "jpg" + } + String[] ret = {type, ext}; + return ret; + } catch (Exception e) { + String[] ret = {"", ""}; + return ret; + } + } + } From 8bafb1cb9a6567bbd24f6501f1d922c1b650662a Mon Sep 17 00:00:00 2001 From: Summer Boot Framework Date: Thu, 21 May 2026 23:14:53 -0400 Subject: [PATCH 4/7] refactoring --- .../jexpress/nio/server/websocket/BootWebSocketHandler.java | 4 ++-- src/main/java/org/summerboot/jexpress/util/FileUtil.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java index 9a74eae1..a2b3ec33 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java @@ -171,7 +171,7 @@ protected void processMessage(ChannelHandlerContext ctx, String text, byte[] dat } else if (data != null) { String[] mimeType = FileUtil.getMIMEShortExtension(data); StringBuilder sb = new StringBuilder(); - byte[] processedData = onMessage(ctx, caller, data, mimeType[0], mimeType[1], sb); + byte[] processedData = onMessage(ctx, caller, data, mimeType[0], mimeType[1], mimeType[2], sb); if (processedData != null) { sendToAllChannels(processedData, true); } @@ -199,7 +199,7 @@ protected Caller auth(Caller caller) { */ abstract protected String onMessage(ChannelHandlerContext ctx, Caller caller, String txt); - abstract protected byte[] onMessage(ChannelHandlerContext ctx, Caller caller, byte[] data, String mimeType, String fileExtension, StringBuilder builder); + abstract protected byte[] onMessage(ChannelHandlerContext ctx, Caller caller, byte[] data, String mimeType, String fileType, String fileExtension, StringBuilder builder); public void sendToChannel(ChannelHandlerContext ctx, String message) { ctx.writeAndFlush(new TextWebSocketFrame(message)); diff --git a/src/main/java/org/summerboot/jexpress/util/FileUtil.java b/src/main/java/org/summerboot/jexpress/util/FileUtil.java index 20655138..198fad42 100644 --- a/src/main/java/org/summerboot/jexpress/util/FileUtil.java +++ b/src/main/java/org/summerboot/jexpress/util/FileUtil.java @@ -113,10 +113,10 @@ public static String[] getMIMEShortExtension(byte[] fileData) { if (ext != null && ext.startsWith(".")) { ext = ext.substring(1); // 返回 "png" 或 "jpg" } - String[] ret = {type, ext}; + String[] ret = {mimeTypeStr, type, ext}; return ret; } catch (Exception e) { - String[] ret = {"", ""}; + String[] ret = {"", "", ""}; return ret; } } From 2be5562c7c410fa503c8331c9e6840876dca20d0 Mon Sep 17 00:00:00 2001 From: Summer Boot Framework Date: Fri, 22 May 2026 13:24:50 -0400 Subject: [PATCH 5/7] refactoring --- .../jexpress/boot/BootErrorCode.java | 2 +- .../nio/server/BootHttpFileUploadHandler.java | 3 +- .../server/NioServerHttpRequestHandler.java | 4 +- .../jexpress/nio/server/domain/Err.java | 3 + .../websocket/BootWebSocketHandler.java | 10 +- .../websocket/LargeFileStreamHandler.java | 235 ++++++++++++++++++ .../websocket/WebSocketAuthHandler_OTT.java | 2 +- .../nio/server/ws/rs/BootController.java | 4 +- .../server/ws/rs/JaxRsRequestProcessor.java | 3 +- .../jexpress/security/auth/Authenticator.java | 5 +- .../security/auth/BootAuthenticator.java | 25 +- .../summerboot/jexpress/util/FileUtil.java | 23 ++ 12 files changed, 299 insertions(+), 20 deletions(-) create mode 100644 src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java diff --git a/src/main/java/org/summerboot/jexpress/boot/BootErrorCode.java b/src/main/java/org/summerboot/jexpress/boot/BootErrorCode.java index 9360b0b5..350ef201 100644 --- a/src/main/java/org/summerboot/jexpress/boot/BootErrorCode.java +++ b/src/main/java/org/summerboot/jexpress/boot/BootErrorCode.java @@ -96,7 +96,7 @@ private static int applyOffset(int code, boolean wihtOffset) { int AUTH_NO_PERMISSION = getErrorCode(AUTH_BASE + 6); int AUTH_FORBIDDEN_IP = getErrorCode(AUTH_BASE + 7); int AUTH_FORBIDDEN_JWT = getErrorCode(AUTH_BASE + 8); - int AUTH_FORBIDDEN_REQUST = getErrorCode(AUTH_BASE + 9); + int AUTH_FORBIDDEN_REQUEST = getErrorCode(AUTH_BASE + 9); //Integration int ACCESS_BASE = getErrorCode(50, true); diff --git a/src/main/java/org/summerboot/jexpress/nio/server/BootHttpFileUploadHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/BootHttpFileUploadHandler.java index 44d0a1a6..51170090 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/BootHttpFileUploadHandler.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/BootHttpFileUploadHandler.java @@ -279,8 +279,7 @@ protected long precheck(ChannelHandlerContext ctx, HttpRequest req) { caller = authenticate(httpHeaders, context); if (caller == null) { - Err err = new Err(BootErrorCode.AUTH_NO_PERMISSION, null, "Unauthorized Caller", null); - context.error(err).status(HttpResponseStatus.UNAUTHORIZED); + context.error(Err.UNAUTHORIZED_401).status(HttpResponseStatus.UNAUTHORIZED); NioHttpUtil.sendResponse(ctx, true, context, null, null); return 0; } diff --git a/src/main/java/org/summerboot/jexpress/nio/server/NioServerHttpRequestHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/NioServerHttpRequestHandler.java index 1ed5bbd2..bb407afd 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/NioServerHttpRequestHandler.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/NioServerHttpRequestHandler.java @@ -171,13 +171,13 @@ public void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest } else { String error = GeoIpUtil.callerAddressFilter(context.remoteIP(), nioCfg.getCallerAddressFilterWhitelist(), nioCfg.getCallerAddressFilterBlacklist(), nioCfg.getCallerAddressFilterOption()); if (error != null) { - Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_IP, null, "Forbidden caller IP", null, "Forbidden caller IP: " + error); + Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_IP, null, "Blocked caller IP", null, "Blocked caller IP: " + error); context.error(err).status(HttpResponseStatus.FORBIDDEN); } else { String request = httpMethod + httpRequestUri; error = SecurityUtil.whitelistbalcklistilter("request", request, nioCfg.getRequestFilterWhitelist(), nioCfg.getRequestFilterBlacklist()); if (error != null) { - Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_REQUST, null, "Forbidden caller request", null, "Forbidden caller request: " + error); + Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_REQUEST, null, "Blocked URL", null, "Blocked URL: " + error); context.error(err).status(HttpResponseStatus.FORBIDDEN); } else { processorSettings = service(ctx, requestHeaders, httpMethod, httpRequestUri, parameters, httpPostRequestBody, context); diff --git a/src/main/java/org/summerboot/jexpress/nio/server/domain/Err.java b/src/main/java/org/summerboot/jexpress/nio/server/domain/Err.java index bdf4f6f4..3d7451b2 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/domain/Err.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/domain/Err.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.summerboot.jexpress.boot.BootErrorCode; import org.summerboot.jexpress.util.BeanUtil; /** @@ -46,6 +47,8 @@ public class Err extends AdditionalJsonFields { @JsonIgnore protected Object internalInfo; + public static final Err UNAUTHORIZED_401 = new Err(BootErrorCode.AUTH_LOGIN_FAILED, null, "Authentication Required - Unknown caller", null); + public Err() { } diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java index a2b3ec33..a2cdf6b7 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java @@ -145,17 +145,17 @@ protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throw } } + protected void onTextWebSocketFrame(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { + String txt = msg.text(); + processMessage(ctx, txt, null); + } + protected void onBinaryWebSocketFrame(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception { ByteBuf bb = msg.content(); byte[] data = ByteBufUtil.getBytes(bb); processMessage(ctx, null, data); } - protected void onTextWebSocketFrame(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { - String txt = msg.text(); - processMessage(ctx, txt, null); - } - protected void onContinuationWebSocketFrame(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception { } diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java new file mode 100644 index 00000000..394c33e7 --- /dev/null +++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java @@ -0,0 +1,235 @@ +/* + * Copyright 2005-2026 Du Law Office - jExpress, The Summer Boot Framework Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://apache.org + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +package org.summerboot.jexpress.nio.server.websocket; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.util.AttributeKey; +import org.summerboot.jexpress.nio.server.NioConfig; +import org.summerboot.jexpress.security.auth.Caller; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; + +public class LargeFileStreamHandler extends SimpleChannelInboundHandler { + + private static final AttributeKey FILE_CHANNEL_KEY = AttributeKey.valueOf("fileChannel"); + private static final AttributeKey FILE_STREAM_KEY = AttributeKey.valueOf("fileStream"); + private static final AttributeKey FILE_SIZE_KEY = AttributeKey.valueOf("fileSize"); + + // 新增:用于在连接上下文中记住当前正在写入的物理文件对象 + private static final AttributeKey TARGET_FILE_KEY = AttributeKey.valueOf("targetFile"); + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { + Caller caller = ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get(); + String userId = caller.getUid(); + + // 1. 【起始帧】收到文件的第一个分片 + if (frame instanceof BinaryWebSocketFrame) { + BinaryWebSocketFrame startFrame = (BinaryWebSocketFrame) frame; + + // 初始化物理文件并开启追加模式 (Append Mode) + File targetFile = new File("/data/uploads/huge_file_" + userId + ".dat"); + FileOutputStream fos = new FileOutputStream(targetFile, true); + FileChannel fileChannel = fos.getChannel(); + + // 暂存至上下文 + ctx.channel().attr(TARGET_FILE_KEY).set(targetFile); + ctx.channel().attr(FILE_STREAM_KEY).set(fos); + ctx.channel().attr(FILE_CHANNEL_KEY).set(fileChannel); + + // 写入并发送第一个 ACK + long currentSize = writeAndGetNewSize(startFrame.content(), fileChannel); + ctx.channel().attr(FILE_SIZE_KEY).set(currentSize); + + sendAck(ctx, currentSize, startFrame.isFinalFragment()); + + if (startFrame.isFinalFragment()) { + closeAndCleanUp(ctx, userId); + } + } + + // 2. 【连续帧】收到后续的无数个分片 + else if (frame instanceof ContinuationWebSocketFrame) { + ContinuationWebSocketFrame continuationFrame = (ContinuationWebSocketFrame) frame; + FileChannel fileChannel = ctx.channel().attr(FILE_CHANNEL_KEY).get(); + + if (fileChannel == null) { + ctx.close(); // 异常协议流,直接截断 + return; + } + + // 追加写入 + long currentSize = writeAndGetNewSize(continuationFrame.content(), fileChannel); + ctx.channel().attr(FILE_SIZE_KEY).set(currentSize); + + // 响应实时进度 ACK 触发背压 + sendAck(ctx, currentSize, continuationFrame.isFinalFragment()); + + // 检查当前帧是否是最后一块碎片 + if (continuationFrame.isFinalFragment()) { + closeAndCleanUp(ctx, userId); + } + } + } + + private long writeAndGetNewSize(ByteBuf content, FileChannel fileChannel) throws IOException { + long position = fileChannel.size(); + content.readBytes(fileChannel, position, content.readableBytes()); + return fileChannel.size(); + } + + private void sendAck(ChannelHandlerContext ctx, long uploadedSize, boolean isFinished) throws IOException { + Map ackMap = new HashMap<>(); + ackMap.put("status", isFinished ? "COMPLETE" : "PROGRESS"); + ackMap.put("uploadedSize", uploadedSize); + + String jsonAck = objectMapper.writeValueAsString(ackMap); + ctx.channel().writeAndFlush(new TextWebSocketFrame(jsonAck)); + } + + /** + * 关闭资源、执行落盘清理,并在最后触发生命周期收尾钩子 + */ + private void closeAndCleanUp(ChannelHandlerContext ctx, String userId) throws IOException { + FileChannel fileChannel = ctx.channel().attr(FILE_CHANNEL_KEY).getAndSet(null); + FileOutputStream fos = ctx.channel().attr(FILE_STREAM_KEY).getAndSet(null); + File targetFile = ctx.channel().attr(TARGET_FILE_KEY).getAndSet(null); + + if (fileChannel != null) { + // 极其重要:强制将操作系统缓存区的数据刷入物理存储介质 + fileChannel.force(true); + fileChannel.close(); + } + if (fos != null) { + fos.close(); + } + + // ========================================== + // 【核心触发点】此时文件已完整落盘,安全调用业务收尾方法 + // ========================================== + if (targetFile != null && targetFile.exists()) { + onUploadCompleted(ctx, targetFile); + } + + System.out.println("用户 [" + userId + "] 的超大文件传输完成。"); + } + + /** + * 大文件成功无损传输并落盘后的业务回调钩子(全功能升级版) + */ + private void onUploadCompleted(ChannelHandlerContext ctx, File targetFile) { + System.out.println("【系统】文件已落盘,启动 CompletableFuture 后台并行审计流水线..."); + ThreadPoolExecutor tpe = NioConfig.cfg.getBizExecutor(); + + // 1. 任务 A:异步触发文件安全扫描(传递给 taskExecutor 线程池运行) + CompletableFuture securityScanTask = CompletableFuture.supplyAsync(() -> { + System.out.println("[线程-" + Thread.currentThread().getName() + "] 正在对文件执行杀毒与敏感合规扫描..."); + return executeVirusScan(targetFile); // 耗时操作 + }, tpe); + + // 2. 任务 B:异步触发媒体转码(并行在线程池的其他线程中运行) + CompletableFuture videoTranscodeTask = CompletableFuture.supplyAsync(() -> { + System.out.println("[线程-" + Thread.currentThread().getName() + "] 正在对大视频文件生成 HLS (.m3u8) 视频切片..."); + return executeVideoTranscode(targetFile); // 耗时操作 + }, tpe); + + // 3. 【核心联合点】将任务 A 和 任务 B 组合在一起 + // allOf 的意思是:后面所有的任务都完成了,这个大组合才算圆满结束 + CompletableFuture.allOf(securityScanTask, videoTranscodeTask) + .thenAcceptAsync((voidResult) -> { + // 4. 当所有并行任务均在后台执行成功后,自动进入这个回调方法 + try { + // 提取各个任务的执行结果 (.join() 在这里不会阻塞,因为 allOf 保证了它们已经执行完了) + boolean scanOk = securityScanTask.join(); + boolean transcodeOk = videoTranscodeTask.join(); + + if (scanOk && transcodeOk) { + System.out.println("【成功】文件 [" + targetFile.getName() + "] 后台安全扫描与转码已全部顺利通过!"); + // 通过 WebSocket 安全通知浏览器:整个业务流程全绿完工! + ctx.channel().writeAndFlush(new TextWebSocketFrame("{\"status\":\"ALL_TASKS_COMPLETE\"}")); + } else { + // 某个环节业务审核失败 + ctx.channel().writeAndFlush(new TextWebSocketFrame("{\"status\":\"AUDIT_FAILED\",\"reason\":\"Security or Transcode failure\"}")); + } + } catch (Exception e) { + System.err.println("处理后台任务结果时发生异常: " + e.getMessage()); + } + }, ctx.executor()) + // 💡 避坑细节:使用 ctx.executor() 作为 thenAcceptAsync 的执行器, + // 意思是让“发回 WebSocket 响应”的这个动作,重新回到 Netty 当前连接专属的 I/O 线程(EventLoop)中排队执行。 + // 这遵循了 Netty 的单线程无锁化设计原则,确保长连接写入的绝对线程安全! + + // 5. 统一异常拦截网:如果在上面的任何一个后台任务中发生了运行时崩溃(如 OOM 或 IOException), + // 错误会被这里牢牢抓住,绝对不会发生普通 submit() 的“无意间隐式吞没异常”隐患! + .exceptionally(throwable -> { + System.err.println("【严重错误】后台异步流水线遭遇致命崩溃: " + throwable.getMessage()); + ctx.channel().writeAndFlush(new TextWebSocketFrame("{\"status\":\"SERVER_ERROR\"}")); + return null; + }); + } + + // ========================================== + // 模拟耗时的底层耗时方法 + // ========================================== + private boolean executeVirusScan(File file) { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return true; // 扫描通过 + } + + private boolean executeVideoTranscode(File file) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return true; // 转码通过 + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + FileChannel fileChannel = ctx.channel().attr(FILE_CHANNEL_KEY).getAndSet(null); + FileOutputStream fos = ctx.channel().attr(FILE_STREAM_KEY).getAndSet(null); + + if (fileChannel != null) fileChannel.close(); + if (fos != null) fos.close(); + + ctx.close(); + } +} + diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java index 9abd2c75..c6d09ec6 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java @@ -80,7 +80,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception String oneTimeTicket = uriRequested.substring(uriPredefinedOTT.length()); Caller caller = verifyAndDestroyTicket(oneTimeTicket); // 校验并销毁 Ticket if (caller == null) { - sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN)); + sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED)); break; } // save OTT result to channel attr diff --git a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java index 43838779..1516e58d 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java @@ -292,9 +292,9 @@ public void logout(@Parameter(hidden = true) final ServiceRequest request, @Para @Daemon @RequiresHealthCheck("") @Log(responseBody = false) - public String oneTimeTicketAuthenticate(@HeaderParam(NioHttpUtil.HTTP_HEADER_AUTH_TOKEN) String authHeader, @Parameter(hidden = true) final SessionContext context) { + public String oneTimeTicketAuthenticate(@QueryParam("wsURI") String wsURI, @HeaderParam(NioHttpUtil.HTTP_HEADER_AUTH_TOKEN) String authHeader, @Parameter(hidden = true) final SessionContext context) { String jwt = BootAuthenticator.getBearerToken(authHeader); - return auth.oneTimeTicketAuthenticate(jwt, context); + return auth.oneTimeTicketAuthenticate(wsURI, jwt, context); } @Operation( diff --git a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/JaxRsRequestProcessor.java b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/JaxRsRequestProcessor.java index 765c3101..a0635274 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/JaxRsRequestProcessor.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/JaxRsRequestProcessor.java @@ -405,8 +405,7 @@ public boolean authorizationCheck(final ChannelHandlerContext channelHandlerCtx, boolean isAuthorized = false; Caller caller = context.caller(); if (caller == null) { - context.status(HttpResponseStatus.UNAUTHORIZED) - .error(new Err(BootErrorCode.AUTH_NO_PERMISSION, null, "Authentication Required - Unknown caller", null)); + context.error(Err.UNAUTHORIZED_401).status(HttpResponseStatus.UNAUTHORIZED); return false; } diff --git a/src/main/java/org/summerboot/jexpress/security/auth/Authenticator.java b/src/main/java/org/summerboot/jexpress/security/auth/Authenticator.java index fc874159..8a9eda17 100644 --- a/src/main/java/org/summerboot/jexpress/security/auth/Authenticator.java +++ b/src/main/java/org/summerboot/jexpress/security/auth/Authenticator.java @@ -137,11 +137,12 @@ public interface Authenticator { * in production, generate a random string as one-time ticket, store it in redis with key "ws:ticket:" + oneTimeTicket, value = caller (or json string), * and set expire time to 10 seconds. return the one-time ticket string to caller. * - * @param jwt + * @param wsURI WebSocket URI + * @param jwt caller's JWT, can be used to verify caller's identity and generate one-time ticket for specific user * @param context contains caller info, e.g. caller.getUid() can be used to generate one-time ticket for specific user * @return (32 to 64 chars + prefix) random string as one-time ticket, e.g. t_f87yfs7shfash7kk7a877asdf */ - String oneTimeTicketAuthenticate(String jwt, SessionContext context); + String oneTimeTicketAuthenticate(String wsURI, String jwt, SessionContext context); /** * in production, call redis.getdel("ws:ticket:" + oneTimeTicket) diff --git a/src/main/java/org/summerboot/jexpress/security/auth/BootAuthenticator.java b/src/main/java/org/summerboot/jexpress/security/auth/BootAuthenticator.java index 12821fad..15e1ca53 100644 --- a/src/main/java/org/summerboot/jexpress/security/auth/BootAuthenticator.java +++ b/src/main/java/org/summerboot/jexpress/security/auth/BootAuthenticator.java @@ -100,7 +100,7 @@ public String signJWT(String username, String pwd, E metaData, int validForMinut @Override public String signJWT(Caller caller, int validForMinutes, final SessionContext context) { if (caller == null) { - context.status(HttpResponseStatus.UNAUTHORIZED); + context.error(Err.UNAUTHORIZED_401).status(HttpResponseStatus.UNAUTHORIZED); return null; } @@ -363,7 +363,7 @@ public Caller verifyToken(String authToken, AuthTokenCache cache, Integer errorC if (error == null) { caller = fromJwt(claims); } else { - Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_JWT, null, "Forbidden JWT", null, "Forbidden JWT: " + error); + Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_JWT, null, "Blocked JWT", null, "Blocked JWT: " + error); context.error(err).status(HttpResponseStatus.FORBIDDEN); } } @@ -503,7 +503,7 @@ public ServerCall.Listener interceptCall(ServerCall= 1024.0d && unitIndex < units.length - 1) { + size /= 1024.0d; + unitIndex++; + } + + // Manual two-decimal formatting avoids String.format(Locale, ...) overhead. + long scaled = Math.round(size * 100.0d); + long integerPart = scaled / 100; + int fractionalPart = (int) (scaled % 100); + + StringBuilder sb = new StringBuilder(16); + sb.append(integerPart).append('.'); + if (fractionalPart < 10) { + sb.append('0'); + } + sb.append(fractionalPart).append(units[unitIndex]); + return sb.toString(); + } } From 6ac05425306fad47bb35ecf8d8ff9187f2af264c Mon Sep 17 00:00:00 2001 From: Summer Boot Framework Date: Fri, 22 May 2026 21:07:32 -0400 Subject: [PATCH 6/7] New API: LargeFileStreamHandler for streaming large file response with low memory usage, and support for WebSocket for partial content delivery. --- CHANGES.md | 1 + pom.xml | 6 +- .../server/websocket/ChatModuleHandler.java | 39 ----- .../server/websocket/GameModuleHandler.java | 38 ----- .../websocket/LargeFileStreamHandler.java | 152 ++++++------------ 5 files changed, 53 insertions(+), 183 deletions(-) delete mode 100644 src/main/java/org/summerboot/jexpress/nio/server/websocket/ChatModuleHandler.java delete mode 100644 src/main/java/org/summerboot/jexpress/nio/server/websocket/GameModuleHandler.java diff --git a/CHANGES.md b/CHANGES.md index f031e8d1..3ab77386 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -15,6 +15,7 @@ * WebResourceController.requestWebResource with @Daemon to serve web resources with enhanced reliability. * Refactoring: predefined URI constants inside BootURI * Performance improvement: Agent_PDFBox - Serial graphics processing converted to parallel processing +* New API: LargeFileStreamHandler for streaming large file response with low memory usage, and support for WebSocket for partial content delivery. ## Version 2.6.9 (2026-04-24) diff --git a/pom.xml b/pom.xml index 467fe9aa..e89eb5f6 100644 --- a/pom.xml +++ b/pom.xml @@ -186,12 +186,12 @@ 0.13.0 - 4.2.13.Final + 4.2.14.Final 2.0.77.Final 1.81.0 33.6.0-jre - 4.34.1 + 4.35.0 2.2.49 @@ -217,7 +217,7 @@ 7.0.0 - 7.3.4.Final + 7.3.5.Final 7.0.2 diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/ChatModuleHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/ChatModuleHandler.java deleted file mode 100644 index 7e6ca6fb..00000000 --- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/ChatModuleHandler.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2005-2026 Du Law Office - jExpress, The Summer Boot Framework Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://apache.org - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.summerboot.jexpress.nio.server.websocket; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import io.netty.handler.codec.http.websocketx.WebSocketFrame; -import org.summerboot.jexpress.security.auth.Caller; - -public class ChatModuleHandler extends SimpleChannelInboundHandler { - - @Override - protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { - Caller caller = ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get(); - - if (frame instanceof TextWebSocketFrame) { - System.out.println("[聊天模块] 收到用户 " + caller.getDisplayName() + " 的文本: " + ((TextWebSocketFrame) frame).text()); - } else if (frame instanceof BinaryWebSocketFrame) { - System.out.println("[聊天模块] 收到用户 " + caller.getDisplayName() + " 的二进制数据流"); - } - } -} \ No newline at end of file diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/GameModuleHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/GameModuleHandler.java deleted file mode 100644 index efc0f4a8..00000000 --- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/GameModuleHandler.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2005-2026 Du Law Office - jExpress, The Summer Boot Framework Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://apache.org - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.summerboot.jexpress.nio.server.websocket; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import io.netty.handler.codec.http.websocketx.WebSocketFrame; -import org.summerboot.jexpress.security.auth.Caller; - -public class GameModuleHandler extends SimpleChannelInboundHandler { - @Override - protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { - Caller caller = ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get(); - - if (frame instanceof TextWebSocketFrame) { - System.out.println("[游戏模块] 收到用户 " + caller.getDisplayName() + " 的文本: " + ((TextWebSocketFrame) frame).text()); - } else if (frame instanceof BinaryWebSocketFrame) { - System.out.println("[游戏模块] 收到用户 " + caller.getDisplayName() + " 的二进制数据流"); - } - } -} diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java index 394c33e7..d029bd04 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java @@ -27,25 +27,28 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.util.AttributeKey; -import org.summerboot.jexpress.nio.server.NioConfig; import org.summerboot.jexpress.security.auth.Caller; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ThreadPoolExecutor; -public class LargeFileStreamHandler extends SimpleChannelInboundHandler { +/** + * @author Changski Tie Zheng Zhang 张铁铮, 魏泽北, 杜旺财, 杜富贵 + */ +public abstract class LargeFileStreamHandler extends SimpleChannelInboundHandler { private static final AttributeKey FILE_CHANNEL_KEY = AttributeKey.valueOf("fileChannel"); private static final AttributeKey FILE_STREAM_KEY = AttributeKey.valueOf("fileStream"); private static final AttributeKey FILE_SIZE_KEY = AttributeKey.valueOf("fileSize"); - // 新增:用于在连接上下文中记住当前正在写入的物理文件对象 + // Used to remember the physical file object currently being written in the connection context. private static final AttributeKey TARGET_FILE_KEY = AttributeKey.valueOf("targetFile"); private final ObjectMapper objectMapper = new ObjectMapper(); @@ -53,53 +56,66 @@ public class LargeFileStreamHandler extends SimpleChannelInboundHandler securityScanTask = CompletableFuture.supplyAsync(() -> { - System.out.println("[线程-" + Thread.currentThread().getName() + "] 正在对文件执行杀毒与敏感合规扫描..."); - return executeVirusScan(targetFile); // 耗时操作 - }, tpe); - - // 2. 任务 B:异步触发媒体转码(并行在线程池的其他线程中运行) - CompletableFuture videoTranscodeTask = CompletableFuture.supplyAsync(() -> { - System.out.println("[线程-" + Thread.currentThread().getName() + "] 正在对大视频文件生成 HLS (.m3u8) 视频切片..."); - return executeVideoTranscode(targetFile); // 耗时操作 - }, tpe); - - // 3. 【核心联合点】将任务 A 和 任务 B 组合在一起 - // allOf 的意思是:后面所有的任务都完成了,这个大组合才算圆满结束 - CompletableFuture.allOf(securityScanTask, videoTranscodeTask) - .thenAcceptAsync((voidResult) -> { - // 4. 当所有并行任务均在后台执行成功后,自动进入这个回调方法 - try { - // 提取各个任务的执行结果 (.join() 在这里不会阻塞,因为 allOf 保证了它们已经执行完了) - boolean scanOk = securityScanTask.join(); - boolean transcodeOk = videoTranscodeTask.join(); - - if (scanOk && transcodeOk) { - System.out.println("【成功】文件 [" + targetFile.getName() + "] 后台安全扫描与转码已全部顺利通过!"); - // 通过 WebSocket 安全通知浏览器:整个业务流程全绿完工! - ctx.channel().writeAndFlush(new TextWebSocketFrame("{\"status\":\"ALL_TASKS_COMPLETE\"}")); - } else { - // 某个环节业务审核失败 - ctx.channel().writeAndFlush(new TextWebSocketFrame("{\"status\":\"AUDIT_FAILED\",\"reason\":\"Security or Transcode failure\"}")); - } - } catch (Exception e) { - System.err.println("处理后台任务结果时发生异常: " + e.getMessage()); - } - }, ctx.executor()) - // 💡 避坑细节:使用 ctx.executor() 作为 thenAcceptAsync 的执行器, - // 意思是让“发回 WebSocket 响应”的这个动作,重新回到 Netty 当前连接专属的 I/O 线程(EventLoop)中排队执行。 - // 这遵循了 Netty 的单线程无锁化设计原则,确保长连接写入的绝对线程安全! - - // 5. 统一异常拦截网:如果在上面的任何一个后台任务中发生了运行时崩溃(如 OOM 或 IOException), - // 错误会被这里牢牢抓住,绝对不会发生普通 submit() 的“无意间隐式吞没异常”隐患! - .exceptionally(throwable -> { - System.err.println("【严重错误】后台异步流水线遭遇致命崩溃: " + throwable.getMessage()); - ctx.channel().writeAndFlush(new TextWebSocketFrame("{\"status\":\"SERVER_ERROR\"}")); - return null; - }); - } - - // ========================================== - // 模拟耗时的底层耗时方法 - // ========================================== - private boolean executeVirusScan(File file) { - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - return true; // 扫描通过 - } + abstract protected void onUploadCompleted(ChannelHandlerContext ctx, File targetFile, Caller caller); - private boolean executeVideoTranscode(File file) { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - return true; // 转码通过 - } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { From 2cf533c41d8a29811852a5038fb1467390a85cc9 Mon Sep 17 00:00:00 2001 From: Summer Boot Framework Date: Fri, 22 May 2026 21:21:29 -0400 Subject: [PATCH 7/7] refactoring --- .../jexpress/nio/server/websocket/LargeFileStreamHandler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java index d029bd04..a8291544 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java @@ -27,6 +27,7 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.util.AttributeKey; +import org.summerboot.jexpress.nio.server.NioConfig; import org.summerboot.jexpress.security.auth.Caller; import java.io.File; @@ -66,7 +67,7 @@ protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) thr // Initialize on first frame only if (fileChannel == null) { - Path targetPath = Paths.get("data", "uploads", "huge_file_" + userId + "_" + System.currentTimeMillis() + ".dat").toAbsolutePath(); + Path targetPath = Paths.get(NioConfig.instance(NioConfig.class).getTempUoloadDir(), String.valueOf(caller.getId()), System.currentTimeMillis() + ".dat").toAbsolutePath(); Path parent = targetPath.getParent(); if (parent != null) { Files.createDirectories(parent);