diff --git a/CHANGES.md b/CHANGES.md index 50c8ecc0..3ab77386 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,8 @@ * ✨ New API: FormatterUtil.formatCurrency(BigDecimal amount, RoundingMode roundingMode) * WebResourceController.requestWebResource with @Daemon to serve web resources with enhanced reliability. * Refactoring: predefined URI constants inside BootURI +* Performance improvement: Agent_PDFBox - Serial graphics processing converted to parallel processing +* New API: LargeFileStreamHandler for streaming large file response with low memory usage, and support for WebSocket for partial content delivery. ## Version 2.6.9 (2026-04-24) diff --git a/pom.xml b/pom.xml index 467fe9aa..e89eb5f6 100644 --- a/pom.xml +++ b/pom.xml @@ -186,12 +186,12 @@ 0.13.0 - 4.2.13.Final + 4.2.14.Final 2.0.77.Final 1.81.0 33.6.0-jre - 4.34.1 + 4.35.0 2.2.49 @@ -217,7 +217,7 @@ 7.0.0 - 7.3.4.Final + 7.3.5.Final 7.0.2 diff --git a/src/main/java/org/summerboot/jexpress/boot/BootErrorCode.java b/src/main/java/org/summerboot/jexpress/boot/BootErrorCode.java index 9360b0b5..350ef201 100644 --- a/src/main/java/org/summerboot/jexpress/boot/BootErrorCode.java +++ b/src/main/java/org/summerboot/jexpress/boot/BootErrorCode.java @@ -96,7 +96,7 @@ private static int applyOffset(int code, boolean wihtOffset) { int AUTH_NO_PERMISSION = getErrorCode(AUTH_BASE + 6); int AUTH_FORBIDDEN_IP = getErrorCode(AUTH_BASE + 7); int AUTH_FORBIDDEN_JWT = getErrorCode(AUTH_BASE + 8); - int AUTH_FORBIDDEN_REQUST = getErrorCode(AUTH_BASE + 9); + int AUTH_FORBIDDEN_REQUEST = getErrorCode(AUTH_BASE + 9); //Integration int ACCESS_BASE = getErrorCode(50, true); diff --git a/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java b/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java index e1290e3d..9cf46227 100644 --- a/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java +++ b/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java @@ -141,10 +141,12 @@ public void configure() { } catch (RuntimeException ex) { } Set namedWebsocket = channelHandlerNames.get(Service.ChannelHandlerType.Websocket); - for (String s : namedWebsocket) { - if (s == null || !s.startsWith(WebSocketAuthHandler_OTT.WS_PATH_PREFIX + "/")) { - String errorMessage = "@Service(binding = ChannelHandler.class, named = \"" + s + "\", type = Service.ChannelHandlerType.Websocket): named field value must start with " + WebSocketAuthHandler_OTT.WS_PATH_PREFIX + "/, but found: " + s; - throw new IllegalArgumentException(errorMessage); + if (namedWebsocket != null) { + for (String s : namedWebsocket) { + if (s == null || !s.startsWith(WebSocketAuthHandler_OTT.WS_PATH_PREFIX)) { + String errorMessage = "@Service(binding = ChannelHandler.class, named = \"" + s + "\", type = Service.ChannelHandlerType.Websocket): named field value must start with " + WebSocketAuthHandler_OTT.WS_PATH_PREFIX + ", but found: " + s; + throw new IllegalArgumentException(errorMessage); + } } } } diff --git a/src/main/java/org/summerboot/jexpress/nio/server/BootHttpFileUploadHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/BootHttpFileUploadHandler.java index 44d0a1a6..51170090 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/BootHttpFileUploadHandler.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/BootHttpFileUploadHandler.java @@ -279,8 +279,7 @@ protected long precheck(ChannelHandlerContext ctx, HttpRequest req) { caller = authenticate(httpHeaders, context); if (caller == null) { - Err err = new Err(BootErrorCode.AUTH_NO_PERMISSION, null, "Unauthorized Caller", null); - context.error(err).status(HttpResponseStatus.UNAUTHORIZED); + context.error(Err.UNAUTHORIZED_401).status(HttpResponseStatus.UNAUTHORIZED); NioHttpUtil.sendResponse(ctx, true, context, null, null); return 0; } diff --git a/src/main/java/org/summerboot/jexpress/nio/server/HttpNioChannelInitializer.java b/src/main/java/org/summerboot/jexpress/nio/server/HttpNioChannelInitializer.java index 9663d5d2..37160663 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/HttpNioChannelInitializer.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/HttpNioChannelInitializer.java @@ -126,7 +126,7 @@ protected void initChannelPipeline(ChannelPipeline channelPipeline, NioConfig ni // 关键点:我们将路径设置为 null。设置为 null 意味着它不会主动去拦截并匹配固定 URL, // 而是只要看到带有符合标准的 WebSocket Upgrade 请求头,它就会自动在原地执行握手升级! // 这样无论我们前面把 URI 改成 /ws/chat 还是 /ws/game,它都能兼容升级。 - String webSocketURI = WebSocketAuthHandler_OTT.WS_PATH_PREFIX; + String webSocketURI = WebSocketAuthHandler_OTT.WS_PATH; channelPipeline.addLast(WebSocketAuthHandler_OTT.CHANNEL_CHANNEL_NAME_NEXT, new WebSocketServerProtocolHandler(webSocketURI, null, allowExtensions, maxFrameSize, allowMaskMismatch, checkStartsWith, dropPongFrames, handshakeTimeoutMillis)); // 3. 注意:这里【不要】像之前一样 addLast(new BusinessHandler) 了。 diff --git a/src/main/java/org/summerboot/jexpress/nio/server/NioServerHttpRequestHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/NioServerHttpRequestHandler.java index 1ed5bbd2..bb407afd 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/NioServerHttpRequestHandler.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/NioServerHttpRequestHandler.java @@ -171,13 +171,13 @@ public void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest } else { String error = GeoIpUtil.callerAddressFilter(context.remoteIP(), nioCfg.getCallerAddressFilterWhitelist(), nioCfg.getCallerAddressFilterBlacklist(), nioCfg.getCallerAddressFilterOption()); if (error != null) { - Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_IP, null, "Forbidden caller IP", null, "Forbidden caller IP: " + error); + Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_IP, null, "Blocked caller IP", null, "Blocked caller IP: " + error); context.error(err).status(HttpResponseStatus.FORBIDDEN); } else { String request = httpMethod + httpRequestUri; error = SecurityUtil.whitelistbalcklistilter("request", request, nioCfg.getRequestFilterWhitelist(), nioCfg.getRequestFilterBlacklist()); if (error != null) { - Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_REQUST, null, "Forbidden caller request", null, "Forbidden caller request: " + error); + Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_REQUEST, null, "Blocked URL", null, "Blocked URL: " + error); context.error(err).status(HttpResponseStatus.FORBIDDEN); } else { processorSettings = service(ctx, requestHeaders, httpMethod, httpRequestUri, parameters, httpPostRequestBody, context); diff --git a/src/main/java/org/summerboot/jexpress/nio/server/domain/Err.java b/src/main/java/org/summerboot/jexpress/nio/server/domain/Err.java index bdf4f6f4..3d7451b2 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/domain/Err.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/domain/Err.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.summerboot.jexpress.boot.BootErrorCode; import org.summerboot.jexpress.util.BeanUtil; /** @@ -46,6 +47,8 @@ public class Err extends AdditionalJsonFields { @JsonIgnore protected Object internalInfo; + public static final Err UNAUTHORIZED_401 = new Err(BootErrorCode.AUTH_LOGIN_FAILED, null, "Authentication Required - Unknown caller", null); + public Err() { } diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java index 7564e0cc..a2cdf6b7 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java @@ -27,13 +27,15 @@ import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; -import io.netty.util.AttributeKey; import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.tika.Tika; +import org.apache.tika.mime.MimeTypes; import org.summerboot.jexpress.nio.server.NioConfig; import org.summerboot.jexpress.nio.server.NioHttpUtil; import org.summerboot.jexpress.security.auth.Caller; +import org.summerboot.jexpress.util.FileUtil; /** * usage example: @@ -65,10 +67,13 @@ abstract public class BootWebSocketHandler extends SimpleChannelInboundHandler { + Caller caller = ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get(); + if (caller == null) { + clients.remove(ctx.channel()); + ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate()); + ctx.close(); + log.warn("OTT auth failed - " + ctx.channel().remoteAddress() + ": " + ctx); + ctx.close(); + return; + } + + clients.add(ctx.channel()); + log.trace(() -> "handlerAdded: " + ctx.channel().remoteAddress()); + + String message = onCallerConnected(ctx, caller); + if (message != null) { + sendToAllChannels(message, true); + } + }; + NioConfig.cfg.getBizExecutor().execute(asyncTask); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + log.trace(() -> "channelActive: " + ctx.channel().remoteAddress()); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + clients.remove(ctx.channel()); + log.trace(() -> "handlerRemoved: " + ctx.channel().remoteAddress()); + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { clients.remove(ctx.channel()); @@ -104,56 +145,43 @@ protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throw } } + protected void onTextWebSocketFrame(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { + String txt = msg.text(); + processMessage(ctx, txt, null); + } + protected void onBinaryWebSocketFrame(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception { ByteBuf bb = msg.content(); byte[] data = ByteBufUtil.getBytes(bb); - Runnable asyncTask = () -> { - Caller caller = (Caller) ctx.channel().attr(KEY_CALLER).get(); - if (caller == null) { - clients.remove(ctx.channel()); - ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate()); - ctx.close(); - log.warn("OTT auth failed " + ctx.channel().remoteAddress()); - return; - } - String responseText = onMessage(ctx, caller, data); - if (responseText != null) { - sendToChannel(ctx, responseText); - } - }; - NioConfig.cfg.getBizExecutor().execute(asyncTask); + processMessage(ctx, null, data); } - protected void onTextWebSocketFrame(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { - String txt = msg.text(); + protected void onContinuationWebSocketFrame(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception { + + } + + protected void processMessage(ChannelHandlerContext ctx, String text, byte[] data) { Runnable asyncTask = () -> { - Caller caller = (Caller) ctx.channel().attr(KEY_CALLER).get(); - if (caller == null) { - /*caller = auth(ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get());//use the first message as token to auth - if (caller == null) { - clients.remove(ctx.channel()); - ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate()); - ctx.close(); - log.warn("OTT auth failed " + ctx.channel().remoteAddress() + ": " + txt); - return; + Caller caller = (Caller) ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get(); + if (text != null) { + String responseText = onMessage(ctx, caller, text); + if (responseText != null) { + sendToAllChannels(responseText, true); + } + } else if (data != null) { + String[] mimeType = FileUtil.getMIMEShortExtension(data); + StringBuilder sb = new StringBuilder(); + byte[] processedData = onMessage(ctx, caller, data, mimeType[0], mimeType[1], mimeType[2], sb); + if (processedData != null) { + sendToAllChannels(processedData, true); + } + if (!sb.isEmpty()) { + sendToAllChannels(sb.toString(), true); } - ctx.channel().attr(KEY_CALLER).set(caller); - String message = onCallerConnected(ctx, caller); - if (message != null) { - sendToAllChannels(message, true); - }*/ } - String responseText = onMessage(ctx, caller, txt); - if (responseText != null) { - //sendToChannel(ctx, responseText); - sendToAllChannels(responseText, true); - } }; NioConfig.cfg.getBizExecutor().execute(asyncTask); - } - - protected void onContinuationWebSocketFrame(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception { } @@ -171,7 +199,7 @@ protected Caller auth(Caller caller) { */ abstract protected String onMessage(ChannelHandlerContext ctx, Caller caller, String txt); - abstract protected String onMessage(ChannelHandlerContext ctx, Caller caller, byte[] data); + abstract protected byte[] onMessage(ChannelHandlerContext ctx, Caller caller, byte[] data, String mimeType, String fileType, String fileExtension, StringBuilder builder); public void sendToChannel(ChannelHandlerContext ctx, String message) { ctx.writeAndFlush(new TextWebSocketFrame(message)); @@ -194,7 +222,11 @@ public void sendToAllChannels(byte[] data, boolean auth) { public void sendToAllChannels(WebSocketFrame message, boolean auth) { if (auth) { clients.stream() - .filter(channel -> channel.attr(KEY_CALLER).get() != null) + /*.filter(channel -> { + Caller caller = channel.attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get(); + return caller != null; + })*/ + .filter(channel -> channel.attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get() != null) .forEach(channel -> channel.writeAndFlush(message.retainedDuplicate())); } else { clients.stream() @@ -202,43 +234,4 @@ public void sendToAllChannels(WebSocketFrame message, boolean auth) { } } - @Override - public void handlerAdded(ChannelHandlerContext ctx) { - Runnable asyncTask = () -> { - Caller caller = (Caller) ctx.channel().attr(KEY_CALLER).get(); - if (caller == null) { - caller = auth(ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get());//use the first message as token to auth - if (caller == null) { - clients.remove(ctx.channel()); - ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate()); - ctx.close(); - log.warn("OTT " + ctx.channel().remoteAddress() + ": " + ctx); - ctx.close(); - return; - } - - ctx.channel().attr(KEY_CALLER).set(caller); - clients.add(ctx.channel()); - log.trace(() -> "handlerAdded: " + ctx.channel().remoteAddress()); - - String message = onCallerConnected(ctx, caller); - if (message != null) { - sendToAllChannels(message, true); - } - } - }; - NioConfig.cfg.getBizExecutor().execute(asyncTask); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - log.trace(() -> "channelActive: " + ctx.channel().remoteAddress()); - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) { - clients.remove(ctx.channel()); - log.trace(() -> "handlerRemoved: " + ctx.channel().remoteAddress()); - } - } diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/ChatModuleHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/ChatModuleHandler.java deleted file mode 100644 index 7e6ca6fb..00000000 --- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/ChatModuleHandler.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2005-2026 Du Law Office - jExpress, The Summer Boot Framework Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://apache.org - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.summerboot.jexpress.nio.server.websocket; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import io.netty.handler.codec.http.websocketx.WebSocketFrame; -import org.summerboot.jexpress.security.auth.Caller; - -public class ChatModuleHandler extends SimpleChannelInboundHandler { - - @Override - protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { - Caller caller = ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get(); - - if (frame instanceof TextWebSocketFrame) { - System.out.println("[聊天模块] 收到用户 " + caller.getDisplayName() + " 的文本: " + ((TextWebSocketFrame) frame).text()); - } else if (frame instanceof BinaryWebSocketFrame) { - System.out.println("[聊天模块] 收到用户 " + caller.getDisplayName() + " 的二进制数据流"); - } - } -} \ No newline at end of file diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/GameModuleHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/GameModuleHandler.java deleted file mode 100644 index efc0f4a8..00000000 --- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/GameModuleHandler.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2005-2026 Du Law Office - jExpress, The Summer Boot Framework Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://apache.org - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.summerboot.jexpress.nio.server.websocket; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import io.netty.handler.codec.http.websocketx.WebSocketFrame; -import org.summerboot.jexpress.security.auth.Caller; - -public class GameModuleHandler extends SimpleChannelInboundHandler { - @Override - protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { - Caller caller = ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get(); - - if (frame instanceof TextWebSocketFrame) { - System.out.println("[游戏模块] 收到用户 " + caller.getDisplayName() + " 的文本: " + ((TextWebSocketFrame) frame).text()); - } else if (frame instanceof BinaryWebSocketFrame) { - System.out.println("[游戏模块] 收到用户 " + caller.getDisplayName() + " 的二进制数据流"); - } - } -} diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java new file mode 100644 index 00000000..a8291544 --- /dev/null +++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java @@ -0,0 +1,182 @@ +/* + * Copyright 2005-2026 Du Law Office - jExpress, The Summer Boot Framework Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://apache.org + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +package org.summerboot.jexpress.nio.server.websocket; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.util.AttributeKey; +import org.summerboot.jexpress.nio.server.NioConfig; +import org.summerboot.jexpress.security.auth.Caller; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +/** + * @author Changski Tie Zheng Zhang 张铁铮, 魏泽北, 杜旺财, 杜富贵 + */ +public abstract class LargeFileStreamHandler extends SimpleChannelInboundHandler { + + private static final AttributeKey FILE_CHANNEL_KEY = AttributeKey.valueOf("fileChannel"); + private static final AttributeKey FILE_STREAM_KEY = AttributeKey.valueOf("fileStream"); + private static final AttributeKey FILE_SIZE_KEY = AttributeKey.valueOf("fileSize"); + + // Used to remember the physical file object currently being written in the connection context. + private static final AttributeKey TARGET_FILE_KEY = AttributeKey.valueOf("targetFile"); + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { + Caller caller = ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get(); + String userId = caller == null ? "anonymous" : caller.getUid(); + + // 1. 【起始帧】收到文件的第一个分片 + if (frame instanceof BinaryWebSocketFrame) { + BinaryWebSocketFrame startFrame = (BinaryWebSocketFrame) frame; + + FileChannel fileChannel = ctx.channel().attr(FILE_CHANNEL_KEY).get(); + + // Initialize on first frame only + if (fileChannel == null) { + Path targetPath = Paths.get(NioConfig.instance(NioConfig.class).getTempUoloadDir(), String.valueOf(caller.getId()), System.currentTimeMillis() + ".dat").toAbsolutePath(); + Path parent = targetPath.getParent(); + if (parent != null) { + Files.createDirectories(parent); + } + File targetFile = targetPath.toFile(); + FileOutputStream fos = new FileOutputStream(targetFile, true); + fileChannel = fos.getChannel(); + + ctx.channel().attr(TARGET_FILE_KEY).set(targetFile); + ctx.channel().attr(FILE_STREAM_KEY).set(fos); + ctx.channel().attr(FILE_CHANNEL_KEY).set(fileChannel); + } + + // Write and send ACK + long currentSize = writeAndGetNewSize(startFrame.content(), fileChannel); + ctx.channel().attr(FILE_SIZE_KEY).set(currentSize); + + // IMPORTANT: Do NOT check isFinalFragment() here - each ws.send() arrives as a complete frame! + // Instead, the client must signal completion separately or the server must track expected size + sendAck(ctx, currentSize, false); // Always false for intermediate chunks + } + + // 2. [Continuous Frames] Received numerous subsequent fragments + else if (frame instanceof ContinuationWebSocketFrame) { + ContinuationWebSocketFrame continuationFrame = (ContinuationWebSocketFrame) frame; + FileChannel fileChannel = ctx.channel().attr(FILE_CHANNEL_KEY).get(); + + if (fileChannel == null) { + ctx.close(); + return; + } + + long currentSize = writeAndGetNewSize(continuationFrame.content(), fileChannel); + ctx.channel().attr(FILE_SIZE_KEY).set(currentSize); + + sendAck(ctx, currentSize, continuationFrame.isFinalFragment()); + + if (continuationFrame.isFinalFragment()) { + closeAndCleanUp(ctx, caller); + } + } + + // 3. [Special Frame] The client sends a text message to indicate that the transmission is complete. + else if (frame instanceof TextWebSocketFrame) { + TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; + String text = textFrame.text(); + + if ("UPLOAD_COMPLETE".equals(text)) { + closeAndCleanUp(ctx, caller); + } + } + } + + private long writeAndGetNewSize(ByteBuf content, FileChannel fileChannel) throws IOException { + long position = fileChannel.size(); + content.readBytes(fileChannel, position, content.readableBytes()); + return fileChannel.size(); + } + + private void sendAck(ChannelHandlerContext ctx, long uploadedSize, boolean isFinished) throws IOException { + Map ackMap = new HashMap<>(); + ackMap.put("status", isFinished ? "COMPLETE" : "PROGRESS"); + ackMap.put("uploadedSize", uploadedSize); + + String jsonAck = objectMapper.writeValueAsString(ackMap); + ctx.channel().writeAndFlush(new TextWebSocketFrame(jsonAck)); + } + + /** + * Shut down resources, perform disk cleanup, and finally trigger the lifecycle cleanup hook. + */ + private void closeAndCleanUp(ChannelHandlerContext ctx, Caller caller) throws IOException { + FileChannel fileChannel = ctx.channel().attr(FILE_CHANNEL_KEY).getAndSet(null); + FileOutputStream fos = ctx.channel().attr(FILE_STREAM_KEY).getAndSet(null); + File targetFile = ctx.channel().attr(TARGET_FILE_KEY).getAndSet(null); + + if (fileChannel != null) { + // 极其重要:强制将操作系统缓存区的数据刷入物理存储介质 + fileChannel.force(true); + fileChannel.close(); + } + if (fos != null) { + fos.close(); + } + + // ========================================== + // [Core Trigger Point] At this point, the file has been completely written to disk, allowing for a safe invocation of the business cleanup method. + // ========================================== + if (targetFile != null && targetFile.exists()) { + ctx.channel().writeAndFlush(new TextWebSocketFrame("{\"status\":\"ALL_TASKS_COMPLETE\"}")); + onUploadCompleted(ctx, targetFile, caller); + } + } + + /** + * Business callback hook after successful lossless transfer and disk write of large files + */ + abstract protected void onUploadCompleted(ChannelHandlerContext ctx, File targetFile, Caller caller); + + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + FileChannel fileChannel = ctx.channel().attr(FILE_CHANNEL_KEY).getAndSet(null); + FileOutputStream fos = ctx.channel().attr(FILE_STREAM_KEY).getAndSet(null); + + if (fileChannel != null) fileChannel.close(); + if (fos != null) fos.close(); + + ctx.close(); + } +} + diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java index a42f5517..c6d09ec6 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java @@ -23,9 +23,12 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; import io.netty.util.AttributeKey; import org.summerboot.jexpress.security.auth.Authenticator; import org.summerboot.jexpress.security.auth.Caller; @@ -40,7 +43,8 @@ public class WebSocketAuthHandler_OTT extends ChannelInboundHandlerAdapter { public static final AttributeKey USER_ID_KEY = AttributeKey.valueOf("userId"); public static final String CHANNEL_NAME = "WebSocketAuthHandler_OTT"; public static final String CHANNEL_CHANNEL_NAME_NEXT = "BootWebSocketServerProtocolHandler"; - public static final String WS_PATH_PREFIX = "/ws"; + public static final String WS_PATH = "/ws"; + public static final String WS_PATH_PREFIX = WS_PATH + "/"; protected final Injector injector; @@ -76,6 +80,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception String oneTimeTicket = uriRequested.substring(uriPredefinedOTT.length()); Caller caller = verifyAndDestroyTicket(oneTimeTicket); // 校验并销毁 Ticket if (caller == null) { + sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED)); break; } // save OTT result to channel attr @@ -94,24 +99,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ctx.fireChannelRead(msg); return; } - /*if (uriRequested.startsWith(CHAT_PATH_PREFIX)) { - String oneTimeTicket = uriRequested.substring(CHAT_PATH_PREFIX.length()); - Caller caller = verifyAndDestroyTicket(oneTimeTicket); // 校验并销毁 Ticket - - if (caller != null) { - ctx.channel().attr(USER_ID_KEY).set(caller); - - // 【核心点】动态向管道末尾添加聊天专属业务 Handler - //ctx.pipeline().addLast("businessHandler", new ChatModuleHandler()); - ChannelHandler ch = injector.getInstance(Key.get(ChannelHandler.class, Names.named("/ws/chat"))); - //ch = new ChatModuleHandler(); - ctx.pipeline().addAfter(CHANNEL_CHANNEL_NAME_NEXT, "wsChatModuleHandler", ch); - - // 重写 URI,让下游的 WebSocketServerProtocolHandler 能够精准匹配升级 - request.setUri("/ws/chat"); - ctx.fireChannelRead(msg); - return; - }*/ } // 3. 认证失败或路径不匹配:直接拒绝连接 @@ -123,14 +110,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ctx.fireChannelRead(msg); } - private Caller verifyAndDestroyTicket(String oneTimeTicket) { + protected Caller verifyAndDestroyTicket(String oneTimeTicket) { if (authenticator == null) { return null; } return authenticator.oneTimeTicketVerifyAndDestroy(oneTimeTicket); } - private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { + protected void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { HttpUtil.setContentLength(res, 0); ctx.channel().writeAndFlush(res).addListener(ChannelFutureListener.CLOSE); } diff --git a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java index 42398cd1..1516e58d 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java @@ -291,9 +291,10 @@ public void logout(@Parameter(hidden = true) final ServiceRequest request, @Para @Path(BootURI.CURRENT_VERSION + BootURI.API_NF_OTT) @Daemon @RequiresHealthCheck("") - public String oneTimeTicketAuthenticate(@HeaderParam(NioHttpUtil.HTTP_HEADER_AUTH_TOKEN) String authHeader, @Parameter(hidden = true) final SessionContext context) { + @Log(responseBody = false) + public String oneTimeTicketAuthenticate(@QueryParam("wsURI") String wsURI, @HeaderParam(NioHttpUtil.HTTP_HEADER_AUTH_TOKEN) String authHeader, @Parameter(hidden = true) final SessionContext context) { String jwt = BootAuthenticator.getBearerToken(authHeader); - return auth.oneTimeTicketAuthenticate(jwt, context); + return auth.oneTimeTicketAuthenticate(wsURI, jwt, context); } @Operation( diff --git a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/JaxRsRequestProcessor.java b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/JaxRsRequestProcessor.java index 765c3101..a0635274 100644 --- a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/JaxRsRequestProcessor.java +++ b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/JaxRsRequestProcessor.java @@ -405,8 +405,7 @@ public boolean authorizationCheck(final ChannelHandlerContext channelHandlerCtx, boolean isAuthorized = false; Caller caller = context.caller(); if (caller == null) { - context.status(HttpResponseStatus.UNAUTHORIZED) - .error(new Err(BootErrorCode.AUTH_NO_PERMISSION, null, "Authentication Required - Unknown caller", null)); + context.error(Err.UNAUTHORIZED_401).status(HttpResponseStatus.UNAUTHORIZED); return false; } diff --git a/src/main/java/org/summerboot/jexpress/security/auth/Authenticator.java b/src/main/java/org/summerboot/jexpress/security/auth/Authenticator.java index fc874159..8a9eda17 100644 --- a/src/main/java/org/summerboot/jexpress/security/auth/Authenticator.java +++ b/src/main/java/org/summerboot/jexpress/security/auth/Authenticator.java @@ -137,11 +137,12 @@ public interface Authenticator { * in production, generate a random string as one-time ticket, store it in redis with key "ws:ticket:" + oneTimeTicket, value = caller (or json string), * and set expire time to 10 seconds. return the one-time ticket string to caller. * - * @param jwt + * @param wsURI WebSocket URI + * @param jwt caller's JWT, can be used to verify caller's identity and generate one-time ticket for specific user * @param context contains caller info, e.g. caller.getUid() can be used to generate one-time ticket for specific user * @return (32 to 64 chars + prefix) random string as one-time ticket, e.g. t_f87yfs7shfash7kk7a877asdf */ - String oneTimeTicketAuthenticate(String jwt, SessionContext context); + String oneTimeTicketAuthenticate(String wsURI, String jwt, SessionContext context); /** * in production, call redis.getdel("ws:ticket:" + oneTimeTicket) diff --git a/src/main/java/org/summerboot/jexpress/security/auth/BootAuthenticator.java b/src/main/java/org/summerboot/jexpress/security/auth/BootAuthenticator.java index 12821fad..15e1ca53 100644 --- a/src/main/java/org/summerboot/jexpress/security/auth/BootAuthenticator.java +++ b/src/main/java/org/summerboot/jexpress/security/auth/BootAuthenticator.java @@ -100,7 +100,7 @@ public String signJWT(String username, String pwd, E metaData, int validForMinut @Override public String signJWT(Caller caller, int validForMinutes, final SessionContext context) { if (caller == null) { - context.status(HttpResponseStatus.UNAUTHORIZED); + context.error(Err.UNAUTHORIZED_401).status(HttpResponseStatus.UNAUTHORIZED); return null; } @@ -363,7 +363,7 @@ public Caller verifyToken(String authToken, AuthTokenCache cache, Integer errorC if (error == null) { caller = fromJwt(claims); } else { - Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_JWT, null, "Forbidden JWT", null, "Forbidden JWT: " + error); + Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_JWT, null, "Blocked JWT", null, "Blocked JWT: " + error); context.error(err).status(HttpResponseStatus.FORBIDDEN); } } @@ -503,7 +503,7 @@ public ServerCall.Listener interceptCall(ServerCall= 1024.0d && unitIndex < units.length - 1) { + size /= 1024.0d; + unitIndex++; + } + + // Manual two-decimal formatting avoids String.format(Locale, ...) overhead. + long scaled = Math.round(size * 100.0d); + long integerPart = scaled / 100; + int fractionalPart = (int) (scaled % 100); + + StringBuilder sb = new StringBuilder(16); + sb.append(integerPart).append('.'); + if (fractionalPart < 10) { + sb.append('0'); + } + sb.append(fractionalPart).append(units[unitIndex]); + return sb.toString(); + } } diff --git a/src/main/java/org/summerboot/jexpress/util/pdf/Agent_PDFBox.java b/src/main/java/org/summerboot/jexpress/util/pdf/Agent_PDFBox.java index 9a9a2d76..bb65857f 100644 --- a/src/main/java/org/summerboot/jexpress/util/pdf/Agent_PDFBox.java +++ b/src/main/java/org/summerboot/jexpress/util/pdf/Agent_PDFBox.java @@ -30,6 +30,7 @@ import org.apache.pdfbox.rendering.ImageType; import org.apache.pdfbox.rendering.PDFRenderer; import org.apache.pdfbox.rendering.RenderDestination; +import org.summerboot.jexpress.util.ApplicationUtil; import javax.imageio.ImageIO; import java.awt.image.BufferedImage; @@ -41,6 +42,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; /** * @author Changski Tie Zheng Zhang 张铁铮, 魏泽北, 杜旺财, 杜富贵 @@ -316,10 +319,24 @@ public static List pdf2Images(File pdfFile, String password, floa public static List pdf2Images(PDDocument document, float dpi, ImageType imageType, RenderDestination destination) throws IOException { PDFRenderer renderer = new PDFRenderer(document); int totalPages = document.getNumberOfPages(); - List images = new ArrayList(); - for (int currentPage = 0; currentPage < totalPages; currentPage++) { + List images = new ArrayList(totalPages); + /*for (int currentPage = 0; currentPage < totalPages; currentPage++) { BufferedImage image = renderer.renderImage(currentPage, dpi / 72f, imageType, destination); images.add(image); + }*/ + List> tasks = new ArrayList<>(totalPages); + for (int currentPage = 0; currentPage < totalPages; currentPage++) { + final int index = currentPage; + Callable task = () -> { + BufferedImage image = renderer.renderImage(index, dpi / 72f, imageType, destination); + return image; + }; + tasks.add(task); + } + try { + ApplicationUtil.runAndWaitForAllResults(tasks, images); + } catch (ExecutionException e) { + e.printStackTrace(); } return images; } @@ -332,13 +349,29 @@ public static List pdf2Images(PDDocument document, float dpi, Ima * @throws IOException */ public static List images2Bytes(List images, String formatName) throws IOException { - List imageDataList = new ArrayList(images.size()); + int totalPages = images.size(); + List imageDataList = new ArrayList(totalPages); + List> tasks = new ArrayList<>(totalPages); + /*for (BufferedImage image : images) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) { + ImageIO.write(image, formatName, baos); + byte[] imageData = baos.toByteArray(); + imageDataList.add(imageData); + } + }*/ for (BufferedImage image : images) { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) { - ImageIO.write(image, formatName, baos); - byte[] imageData = baos.toByteArray(); - imageDataList.add(imageData); - } + Callable task = () -> { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) { + ImageIO.write(image, formatName, baos); + return baos.toByteArray(); + } + }; + tasks.add(task); + } + try { + ApplicationUtil.runAndWaitForAllResults(tasks, imageDataList); + } catch (ExecutionException e) { + e.printStackTrace(); } return imageDataList; }