完成ChatServer的基本配置

This commit is contained in:
ydoily 2025-02-24 15:29:15 +08:00
commit b3df44bdbd
13 changed files with 497 additions and 0 deletions

2
.gitattributes vendored Normal file
View File

@ -0,0 +1,2 @@
/mvnw text eol=lf
*.cmd text eol=crlf

33
.gitignore vendored Normal file
View File

@ -0,0 +1,33 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/

19
.mvn/wrapper/maven-wrapper.properties vendored Normal file
View File

@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
wrapperVersion=3.3.2
distributionType=only-script
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.9/apache-maven-3.9.9-bin.zip

106
pom.xml Normal file
View File

@ -0,0 +1,106 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.9</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>netty-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>netty-demo</name>
<description>基于 Netty 和 Spring Boot 的 WebSocket 聊天室</description>
<properties>
<java.version>21</java.version>
<netty.version>4.1.113.Final</netty.version>
<lombok-version>1.18.30</lombok-version>
</properties>
<dependencies>
<!-- Spring Boot 核心 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Spring Boot WebFlux (基于 Netty) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Netty 核心组件 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
<!-- 解析 YAML 配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok-version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<!-- Spring Boot Maven Plugin -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,13 @@
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class NettyDemoApplication {
public static void main(String[] args) {
SpringApplication.run(NettyDemoApplication.class, args);
}
}

View File

@ -0,0 +1,45 @@
package com.example.config;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.netty.http.server.HttpServer;
@Slf4j
@Configuration
public class NettyConfig {
@Bean
public NettyReactiveWebServerFactory nettyReactiveWebServerFactory() {
NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
factory.addServerCustomizers(this::apply);
return factory;
}
public HttpServer apply(HttpServer httpServer) {
// 创建一个 NioEventLoopGroup指定 8 Worker 线程处理 I/O 事件
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
return httpServer
// 绑定 Worker 线程池负责处理 I/O 事件
.runOn(workerGroup)
// 设置 TCP 连接队列的最大长度为 128防止服务器过载
.option(ChannelOption.SO_BACKLOG, 128)
// 启用 TCP Keep-Alive保持长连接防止连接频繁关闭
.option(ChannelOption.SO_KEEPALIVE, true)
// 监听服务器成功绑定端口的事件并记录服务器启动的地址
.doOnBound(server -> log.info("Netty Server started on: {}", server.address()))
// 监听服务器解绑端口的事件并记录服务器已停止
.doOnUnbound(server -> log.info("Netty Server stopped."))
// 监听新的连接事件并打印远程客户端的地址
.doOnConnection(con -> log.info("Connected to Netty Server: {}", con.channel().remoteAddress()))
// 监听新的 Channel 初始化事件并记录新建的 Channel 信息
.doOnChannelInit((observer, channel, remoteAddr) ->
log.info("New channel initialized: {}", channel));
}
}

View File

@ -0,0 +1,20 @@
package com.example.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class NettyConfigTest implements CommandLineRunner {
private final NettyProperties nettyProperties;
public NettyConfigTest(NettyProperties nettyProperties) {
this.nettyProperties = nettyProperties;
}
@Override
public void run(String... args) {
log.info("Netty 配置加载成功: {}", nettyProperties);
}
}

View File

@ -0,0 +1,56 @@
package com.example.config;
import lombok.Data;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
@Data
@ToString
@Component
@ConfigurationProperties(prefix = "netty")
public class NettyProperties {
private WebSocketConfig websocket;
private NettyOptions options;
private NettyServer server;
private LoggingConfig logging;
private ConnectionConfig connections;
@Data
@ToString
public static class WebSocketConfig {
private int maxFrameSize;
private boolean allowExtensions;
private List<String> subProtocols;
}
@Data
@ToString
public static class NettyOptions {
private int soBacklog;
private boolean soReuseaddr;
private boolean tcpNodeLay;
private boolean keepAlive;
}
@Data
@ToString
public static class NettyServer {
private int port;
}
@Data
@ToString
public static class LoggingConfig {
private String level;
private String logFile;
}
@Data
@ToString
public static class ConnectionConfig {
private int maxClients;
private int timeoutSeconds;
}
}

View File

@ -0,0 +1,81 @@
package com.example.handler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
// 维护所有已连接的客户端 Channel
private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) {
Channel incoming = ctx.channel();
log.info("Client connected: {}", incoming.remoteAddress());
// 广播通知所有在线用户
channels.writeAndFlush("[Server] - " + incoming.remoteAddress() + " joined\n");
channels.add(incoming);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx,
String msg) throws Exception {
Channel sender = ctx.channel();
log.info("Received message from {} : {}", sender.remoteAddress(), msg);
if (msg.startsWith("@")) {
String[] parts = msg.split(":", 2);
if (parts.length == 2) {
String targetAddress = parts[0].substring(1);
String privateMessage = parts[1];
// 发送私聊消息
sendPrivateMessage(sender, targetAddress, privateMessage);
return;
}
}
// 广播消息给所有客户端排除发送者
for (Channel channel : channels) {
if (channel != sender) {
channel.writeAndFlush("[" + sender.remoteAddress() + "] " + msg + "\n");
}
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel outgoing = ctx.channel();
log.info("Client disconnected: {}", outgoing.remoteAddress());
// ChannelGroup 移除断开的客户端
// 广播通知所有在线用户
channels.writeAndFlush("[Server] - " + outgoing.remoteAddress() + " left\n");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("Error occurred: ", cause);
ctx.close();
}
// 私聊
private void sendPrivateMessage(Channel sender, String targetAddress, String message) {
for (Channel channel : channels) {
if (channel.remoteAddress().toString().contains(targetAddress)) {
channel.writeAndFlush("[PRIVATE] From [" + sender.remoteAddress() + "]: " + message + "\n");
sender.writeAndFlush("[PRIVATE] To [" + targetAddress + "]: " + message + "\n");
return;
}
}
sender.writeAndFlush("[SERVER] - User " + targetAddress + " not found.\n");
}
}

View File

@ -0,0 +1,36 @@
package com.example.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ServerHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 当客户端连接时触发记录客户端的远程地址
log.info("Client connected: {}", ctx.channel().remoteAddress());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
// 处理接收到的消息日志记录接收到的消息内容
log.info("Received message: {}", msg);
// 发送响应消息给客户端
ctx.writeAndFlush("Server received: " + msg + "\n");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// 当客户端断开连接时触发记录客户端的远程地址
log.info("Client disconnected: {}", ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 处理异常记录错误信息并关闭连接
log.error("Error occurred: ", cause);
ctx.close();
}
}

View File

@ -0,0 +1,47 @@
package com.example.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class NettyServer {
private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
public void start(int port) {
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ServerInitializer());
ChannelFuture future = bootstrap.bind(port).sync();
Channel serverChannel = future.channel();
log.info("Server started on port {}", port);
serverChannel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("Netty Chat Server interrupted!", e);
Thread.currentThread().interrupt();
} finally {
shutdown();
}
}
@PreDestroy
public void shutdown() {
log.info("Shutting down Netty Chat Server...");
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -0,0 +1,18 @@
package com.example.server;
import com.example.config.NettyProperties;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class NettyServerStarter {
private final NettyServer nettyServer;
private final NettyProperties properties;
@PostConstruct
public void startServer() {
new Thread(() -> nettyServer.start(properties.getServer().getPort())).start();
}
}

View File

@ -0,0 +1,21 @@
package com.example.server;
import com.example.handler.ChatServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO), // 日志
new StringDecoder(), // 解码
new StringEncoder(), // 编码
new ChatServerHandler()
);
}
}