fix:修改有时连接卡死问题

This commit is contained in:
SL 2025-05-23 19:44:25 +08:00
parent e903e76864
commit 305124a838
8 changed files with 546 additions and 197 deletions

View File

@ -0,0 +1,5 @@
---
description:
globs:
alwaysApply: false
---

View File

@ -0,0 +1,5 @@
---
description:
globs:
alwaysApply: false
---

View File

@ -0,0 +1,5 @@
---
description:
globs:
alwaysApply: false
---

View File

@ -1,5 +1,40 @@
<component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173">
<JavaCodeStyleSettings>
<option name="IMPORT_LAYOUT_TABLE">
<value>
<package name="" withSubpackages="true" static="false" module="true" />
<package name="android" withSubpackages="true" static="true" />
<package name="androidx" withSubpackages="true" static="true" />
<package name="com" withSubpackages="true" static="true" />
<package name="junit" withSubpackages="true" static="true" />
<package name="net" withSubpackages="true" static="true" />
<package name="org" withSubpackages="true" static="true" />
<package name="java" withSubpackages="true" static="true" />
<package name="javax" withSubpackages="true" static="true" />
<package name="" withSubpackages="true" static="true" />
<emptyLine />
<package name="android" withSubpackages="true" static="false" />
<emptyLine />
<package name="androidx" withSubpackages="true" static="false" />
<emptyLine />
<package name="com" withSubpackages="true" static="false" />
<emptyLine />
<package name="junit" withSubpackages="true" static="false" />
<emptyLine />
<package name="net" withSubpackages="true" static="false" />
<emptyLine />
<package name="org" withSubpackages="true" static="false" />
<emptyLine />
<package name="java" withSubpackages="true" static="false" />
<emptyLine />
<package name="javax" withSubpackages="true" static="false" />
<emptyLine />
<package name="" withSubpackages="true" static="false" />
<emptyLine />
</value>
</option>
</JavaCodeStyleSettings>
<codeStyleSettings language="XML">
<option name="FORCE_REARRANGE_MODE" value="1" />
<indentOptions>

View File

@ -86,10 +86,9 @@ public class ConnectionManager extends BaseManager {
public void getIpAndPort(String requestId, IRequestIP iRequestIP) {
if (iGetIpAndPort != null) {
WKLoggerUtils.getInstance().e(TAG,"getIpAndPort get ip...");
runOnMainThread(() -> iGetIpAndPort.getIP((ip, port) -> iRequestIP.onResult(requestId, ip, port)));
} else {
WKLoggerUtils.getInstance().e(TAG,"Unregistered IP acquisition event");
WKLoggerUtils.getInstance().e(TAG,"未注册获取连接地址的事件");
}
}

View File

@ -25,6 +25,8 @@ class ConnectionClient implements IDataHandler, IConnectHandler,
IConnectionTimeoutHandler, IIdleTimeoutHandler {
private final String TAG = "ConnectionClient";
private boolean isConnectSuccess;
private static final int MAX_TIMEOUT_RETRIES = 3;
private int timeoutRetryCount = 0;
interface IConnResult {
void onResult(INonBlockingConnection iNonBlockingConnection);
@ -75,9 +77,39 @@ class ConnectionClient implements IDataHandler, IConnectHandler,
@Override
public boolean onConnectionTimeout(INonBlockingConnection iNonBlockingConnection) {
if (!isConnectSuccess) {
WKLoggerUtils.getInstance().e(TAG, "连接超时");
WKConnection.getInstance().forcedReconnection();
synchronized (WKConnection.getInstance().connectionLock) {
if (!isConnectSuccess) {
timeoutRetryCount++;
WKLoggerUtils.getInstance().e(TAG, String.format("Connection timeout (attempt %d/%d)", timeoutRetryCount, MAX_TIMEOUT_RETRIES));
// Check if this is the current connection
if (WKConnection.getInstance().connection != null &&
WKConnection.getInstance().connection.getId().equals(iNonBlockingConnection.getId())) {
if (timeoutRetryCount >= MAX_TIMEOUT_RETRIES) {
WKLoggerUtils.getInstance().e(TAG, "Maximum timeout retries reached, initiating reconnection");
timeoutRetryCount = 0;
WKConnection.getInstance().forcedReconnection();
} else {
// Log retry attempt
WKLoggerUtils.getInstance().i(TAG, "Retrying connection after timeout");
// Attempt to reset connection state
try {
iNonBlockingConnection.setConnectionTimeoutMillis(
Math.min(3000 * (timeoutRetryCount + 1), 10000) // Increase timeout with each retry
);
} catch (Exception e) {
WKLoggerUtils.getInstance().e(TAG, "Failed to adjust connection timeout: " + e.getMessage());
}
}
} else {
WKLoggerUtils.getInstance().w(TAG, "Timeout for old connection, ignoring");
timeoutRetryCount = 0;
}
} else {
WKLoggerUtils.getInstance().i(TAG, "Connection timeout ignored - connection already successful");
}
}
return true;
}
@ -120,22 +152,25 @@ class ConnectionClient implements IDataHandler, IConnectHandler,
Object attachmentObject = iNonBlockingConnection.getAttachment();
if (attachmentObject instanceof String) {
String att = (String) attachmentObject;
String attStr = "close" + id;
if (att.equals(attStr)) {
// Check if this is a planned closure
if (att.startsWith("closing_") || att.equals("close" + id)) {
WKLoggerUtils.getInstance().e("主动断开不重连");
return true;
}
}
}
if (WKIMApplication.getInstance().isCanConnect) {
WKLoggerUtils.getInstance().e("手动关闭需要重连");
// Reset timeout counter on disconnect
timeoutRetryCount = 0;
// Only attempt reconnection if we're allowed to connect and it's not a planned closure
if (WKIMApplication.getInstance().isCanConnect && !WKConnection.getInstance().isClosing.get()) {
WKLoggerUtils.getInstance().e("连接断开需要重连");
WKConnection.getInstance().forcedReconnection();
}
close(iNonBlockingConnection);
} catch (Exception ignored) {
}
return true;
}

View File

@ -52,7 +52,14 @@ import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.RejectedExecutionException;
/**
* 5/21/21 10:51 AM
@ -140,12 +147,54 @@ public class WKConnection {
};
// 添加一个专门用于同步connection访问的锁对象
private final Object connectionLock = new Object();
public final Object connectionLock = new Object();
private final Handler mainHandler = new Handler(Looper.getMainLooper());
private static final long CONNECTION_CLOSE_TIMEOUT = 5000; // 5 seconds timeout
private final AtomicBoolean isClosing = new AtomicBoolean(false);
public final AtomicBoolean isClosing = new AtomicBoolean(false);
private int maxReconnectAttempts = 5;
private long baseReconnectDelay = 500;
private final Object connectionStateLock = new Object();
private volatile boolean isConnecting = false;
private final Object reconnectLock = new Object();
private volatile boolean isReconnectScheduled = false;
private final Object executorLock = new Object();
private volatile ExecutorService connectionExecutor;
private ExecutorService getOrCreateExecutor() {
synchronized (executorLock) {
if (connectionExecutor == null || connectionExecutor.isShutdown() || connectionExecutor.isTerminated()) {
connectionExecutor = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r, "WKConnection-Worker");
thread.setDaemon(true);
return thread;
});
WKLoggerUtils.getInstance().i(TAG, "创建新的连接线程池");
}
return connectionExecutor;
}
}
private void shutdownExecutor() {
synchronized (executorLock) {
if (connectionExecutor != null && !connectionExecutor.isShutdown()) {
connectionExecutor.shutdown();
try {
if (!connectionExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
connectionExecutor.shutdownNow();
}
} catch (InterruptedException e) {
connectionExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
connectionExecutor = null;
}
}
private void startAll() {
heartbeatManager = new HeartbeatManager();
@ -155,18 +204,62 @@ public class WKConnection {
}
public synchronized void forcedReconnection() {
connCount++;
isReConnecting = false;
requestIPTime = 0;
long connIntervalMillisecond = 150;
reconnectionHandler.postDelayed(reconnectionRunnable, connIntervalMillisecond * connCount);
synchronized (reconnectLock) {
if (isReconnectScheduled) {
WKLoggerUtils.getInstance().w(TAG, "已经在重连计划中,忽略重复请求");
return;
}
// 检查线程池状态
ExecutorService executor = getOrCreateExecutor();
if (executor.isShutdown() || executor.isTerminated()) {
WKLoggerUtils.getInstance().e(TAG, "线程池已关闭,无法执行重连");
return;
}
connCount++;
if (connCount > maxReconnectAttempts) {
WKLoggerUtils.getInstance().e(TAG, "达到最大重连次数,停止重连");
stopAll();
return;
}
isReconnectScheduled = true;
isReConnecting = false;
requestIPTime = 0;
// 使用指数退避延迟最大延迟改为8秒
long delay = Math.min(baseReconnectDelay * (1L << (connCount - 1)), 8000);
WKLoggerUtils.getInstance().e(TAG, "重连延迟: " + delay + "ms");
try {
// 使用单独的线程池处理重连
executor.execute(() -> {
try {
Thread.sleep(delay);
if (WKIMApplication.getInstance().isCanConnect &&
!executor.isShutdown()) {
reconnection();
}
} catch (InterruptedException e) {
WKLoggerUtils.getInstance().e(TAG, "重连等待被中断");
Thread.currentThread().interrupt();
} finally {
isReconnectScheduled = false;
}
});
} catch (RejectedExecutionException e) {
WKLoggerUtils.getInstance().e(TAG, "重连任务被拒绝执行: " + e.getMessage());
isReconnectScheduled = false;
}
}
}
public synchronized void reconnection() {
// 如果正在关闭连接等待关闭完成
if (isClosing.get()) {
WKLoggerUtils.getInstance().e(TAG, "等待连接关闭完成后再重连");
mainHandler.postDelayed(this::reconnection, 100);
mainHandler.postDelayed(this::reconnection, 500);
return;
}
@ -203,76 +296,183 @@ public class WKConnection {
}
}
private synchronized void getConnAddress() {
private void getConnAddress() {
ExecutorService executor = getOrCreateExecutor();
if (executor.isShutdown()) {
WKLoggerUtils.getInstance().e(TAG, "线程池已关闭,重新初始化后重试");
executor = getOrCreateExecutor();
}
WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.connecting, WKConnectReason.Connecting);
// 计算获取IP时长 todo
startGetConnAddressTimer();
lastRequestId = UUID.randomUUID().toString().replace("-", "");
ConnectionManager.getInstance().getIpAndPort(lastRequestId, (requestId, ip, port) -> {
WKLoggerUtils.getInstance().e(TAG, "连接地址 " + ip + ":" + port);
if (TextUtils.isEmpty(ip) || port == 0) {
WKLoggerUtils.getInstance().e(TAG, "连接地址错误" + String.format("ip:%s & port:%s", ip, port));
isReConnecting = false;
forcedReconnection();
return;
}
if (lastRequestId.equals(requestId)) {
WKConnection.this.ip = ip;
WKConnection.this.port = port;
if (connectionIsNull()) {
connSocket();
try {
executor.execute(() -> {
try {
if (!WKIMApplication.getInstance().isCanConnect) {
WKLoggerUtils.getInstance().e(TAG, "不允许连接");
return;
}
final long startTime = System.currentTimeMillis();
final long ADDRESS_TIMEOUT = 10000; // 10秒超时
WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.connecting, WKConnectReason.Connecting);
String currentRequestId = UUID.randomUUID().toString().replace("-", "");
lastRequestId = currentRequestId;
CountDownLatch addressLatch = new CountDownLatch(1);
AtomicReference<String> receivedIp = new AtomicReference<>();
AtomicInteger receivedPort = new AtomicInteger();
ConnectionManager.getInstance().getIpAndPort(currentRequestId, (requestId, ip, port) -> {
if (!currentRequestId.equals(requestId)) {
WKLoggerUtils.getInstance().w(TAG, "收到过期的地址响应");
addressLatch.countDown();
return;
}
receivedIp.set(ip);
receivedPort.set(port);
addressLatch.countDown();
});
// 等待地址响应或超时
boolean gotAddress = addressLatch.await(ADDRESS_TIMEOUT, TimeUnit.MILLISECONDS);
if (!gotAddress) {
WKLoggerUtils.getInstance().e(TAG, "获取连接地址超时");
isReConnecting = false;
forcedReconnection();
return;
}
String ip = receivedIp.get();
int port = receivedPort.get();
if (TextUtils.isEmpty(ip) || port == 0) {
WKLoggerUtils.getInstance().e(TAG, "无效的连接地址");
isReConnecting = false;
forcedReconnection();
return;
}
WKConnection.this.ip = ip;
WKConnection.this.port = port;
if (connectionIsNull()) {
connSocket();
}
} catch (Exception e) {
WKLoggerUtils.getInstance().e(TAG, "获取地址异常: " + e.getMessage());
isReConnecting = false;
forcedReconnection();
}
return;
}
if (connectionIsNull()) {
forcedReconnection();
}
});
});
} catch (RejectedExecutionException e) {
WKLoggerUtils.getInstance().e(TAG, "任务提交被拒绝,重试: " + e.getMessage());
isReConnecting = false;
// 短暂延迟后重试
mainHandler.postDelayed(this::reconnection, 1000);
}
}
private synchronized void connSocket() {
synchronized (connectionLock) { // 使用专门的锁
closeConnect();
socketSingleID = UUID.randomUUID().toString().replace("-", "");
connectionClient = new ConnectionClient(iNonBlockingConnection -> {
synchronized (connectionLock) { // 回调中也需要使用相同的锁
connCount = 0;
if (iNonBlockingConnection == null || connection == null ||
!connection.getId().equals(iNonBlockingConnection.getId())) {
WKLoggerUtils.getInstance().e(TAG, "重复连接");
forcedReconnection();
return;
private void connSocket() {
// 检查线程池状态
ExecutorService executor = getOrCreateExecutor();
if (executor.isShutdown() || executor.isTerminated()) {
WKLoggerUtils.getInstance().e(TAG, "线程池已关闭,无法执行连接");
return;
}
// 使用CAS操作检查连接状态
if (!setConnectingState(true)) {
WKLoggerUtils.getInstance().e(TAG, "已经在连接中,忽略重复连接请求");
return;
}
try {
executor.execute(() -> {
try {
// 关闭现有连接
closeConnect();
// 生成新的连接ID
String newSocketId = UUID.randomUUID().toString().replace("-", "");
CountDownLatch connectLatch = new CountDownLatch(1);
AtomicBoolean connectSuccess = new AtomicBoolean(false);
ConnectionClient newClient = new ConnectionClient(iNonBlockingConnection -> {
INonBlockingConnection currentConn = null;
synchronized (connectionLock) {
currentConn = connection;
}
if (iNonBlockingConnection == null || currentConn == null ||
!currentConn.getId().equals(iNonBlockingConnection.getId())) {
WKLoggerUtils.getInstance().e(TAG, "无效的连接回调");
connectLatch.countDown();
return;
}
try {
iNonBlockingConnection.setIdleTimeoutMillis(1000 * 3);
iNonBlockingConnection.setConnectionTimeoutMillis(1000 * 3);
iNonBlockingConnection.setFlushmode(IConnection.FlushMode.ASYNC);
iNonBlockingConnection.setAutoflush(true);
connectSuccess.set(true);
isReConnecting = false;
connCount = 0;
} catch (Exception e) {
WKLoggerUtils.getInstance().e(TAG, "设置连接参数失败: " + e.getMessage());
} finally {
connectLatch.countDown();
}
});
// 创建新连接
INonBlockingConnection newConnection = new NonBlockingConnection(ip, port, newClient);
newConnection.setAttachment(newSocketId);
// 原子性地更新连接相关的字段
synchronized (connectionLock) {
connectionClient = newClient;
connection = newConnection;
socketSingleID = newSocketId;
}
Object att = iNonBlockingConnection.getAttachment();
if (att == null || !att.equals(socketSingleID)) {
WKLoggerUtils.getInstance().e(TAG, "不属于当前连接");
forcedReconnection();
return;
}
connection.setIdleTimeoutMillis(1000 * 3);
connection.setConnectionTimeoutMillis(1000 * 3);
connection.setFlushmode(IConnection.FlushMode.ASYNC);
isReConnecting = false;
if (connection != null)
connection.setAutoflush(true);
WKConnection.getInstance().sendConnectMsg();
}
});
dispatchQueuePool.execute(() -> {
synchronized (connectionLock) { // 在设置connection时也使用锁
try {
connection = new NonBlockingConnection(ip, port, connectionClient);
WKLoggerUtils.getInstance().e("当前连接ID",connection.getId());
connection.setAttachment(socketSingleID);
} catch (IOException e) {
isReConnecting = false;
WKLoggerUtils.getInstance().e(TAG, "connection exception:" + e.getMessage());
forcedReconnection();
// 等待连接完成或超时
boolean connected = connectLatch.await(15000, TimeUnit.MILLISECONDS);
if (!connected || !connectSuccess.get()) {
WKLoggerUtils.getInstance().e(TAG, "连接建立超时或失败");
closeConnect();
if (!executor.isShutdown()) {
forcedReconnection();
}
} else {
sendConnectMsg();
}
} catch (Exception e) {
WKLoggerUtils.getInstance().e(TAG, "连接异常: " + e.getMessage());
if (!executor.isShutdown()) {
forcedReconnection();
}
} finally {
setConnectingState(false);
}
});
} catch (RejectedExecutionException e) {
WKLoggerUtils.getInstance().e(TAG, "连接任务被拒绝执行: " + e.getMessage());
setConnectingState(false);
}
}
// 使用CAS操作设置连接状态
private boolean setConnectingState(boolean connecting) {
synchronized (connectionLock) {
if (connecting && isConnecting) {
return false;
}
isConnecting = connecting;
return true;
}
}
@ -348,66 +548,123 @@ public class WKConnection {
//处理登录消息状态
private void handleLoginStatus(short status) {
WKLoggerUtils.getInstance().e(TAG, "连接状态:" + status);
String reason = WKConnectReason.ConnectSuccess;
if (status == WKConnectStatus.kicked) {
reason = WKConnectReason.ReasonAuthFail;
}
connectStatus = status;
WKIM.getInstance().getConnectionManager().setConnectionStatus(status, reason);
if (status == WKConnectStatus.success) {
//等待中
connectStatus = WKConnectStatus.syncMsg;
// WKTimers.getInstance().startAll();
WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.syncMsg, WKConnectReason.SyncMsg);
// 判断同步模式
if (WKIMApplication.getInstance().getSyncMsgMode() == WKSyncMsgMode.WRITE) {
WKIM.getInstance().getMsgManager().setSyncOfflineMsg((isEnd, list) -> {
if (isEnd) {
connectStatus = WKConnectStatus.success;
MessageHandler.getInstance().saveReceiveMsg();
WKIMApplication.getInstance().isCanConnect = true;
MessageHandler.getInstance().sendAck();
startAll();
resendMsg();
WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.success, WKConnectReason.ConnectSuccess);
}
});
} else {
WKIM.getInstance().getConversationManager().setSyncConversationListener(syncChat -> {
connectStatus = WKConnectStatus.success;
WKIMApplication.getInstance().isCanConnect = true;
MessageHandler.getInstance().sendAck();
startAll();
resendMsg();
WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.success, WKConnectReason.ConnectSuccess);
});
WKLoggerUtils.getInstance().e(TAG, "Connection state transition: " + connectStatus + " -> " + status);
synchronized (connectionLock) {
String reason = WKConnectReason.ConnectSuccess;
if (status == WKConnectStatus.kicked) {
reason = WKConnectReason.ReasonAuthFail;
}
} else if (status == WKConnectStatus.kicked) {
WKLoggerUtils.getInstance().e(TAG, "收到被踢消息");
MessageHandler.getInstance().updateLastSendingMsgFail();
WKIMApplication.getInstance().isCanConnect = false;
stopAll();
} else {
reconnection();
WKLoggerUtils.getInstance().e(TAG, "登录状态:" + status);
stopAll();
// Validate state transition
if (!isValidStateTransition(connectStatus, status)) {
WKLoggerUtils.getInstance().e(TAG, "Invalid state transition attempted: " + connectStatus + " -> " + status);
return;
}
connectStatus = status;
WKIM.getInstance().getConnectionManager().setConnectionStatus(status, reason);
if (status == WKConnectStatus.success) {
// Reset reconnection counters since we have a successful connection
connCount = 0;
isReConnecting = false;
// Set to syncing state
connectStatus = WKConnectStatus.syncMsg;
WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.syncMsg, WKConnectReason.SyncMsg);
// Start timers and heartbeat before syncing
startAll();
// Handle message syncing based on mode
if (WKIMApplication.getInstance().getSyncMsgMode() == WKSyncMsgMode.WRITE) {
WKIM.getInstance().getMsgManager().setSyncOfflineMsg((isEnd, list) -> {
if (isEnd) {
synchronized (connectionLock) {
if (connection != null && !isClosing.get()) {
connectStatus = WKConnectStatus.success;
MessageHandler.getInstance().saveReceiveMsg();
WKIMApplication.getInstance().isCanConnect = true;
MessageHandler.getInstance().sendAck();
resendMsg();
WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.success, WKConnectReason.ConnectSuccess);
}
}
}
});
} else {
WKIM.getInstance().getConversationManager().setSyncConversationListener(syncChat -> {
synchronized (connectionLock) {
if (connection != null && !isClosing.get()) {
connectStatus = WKConnectStatus.success;
WKIMApplication.getInstance().isCanConnect = true;
MessageHandler.getInstance().sendAck();
resendMsg();
WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.success, WKConnectReason.ConnectSuccess);
}
}
});
}
} else if (status == WKConnectStatus.kicked) {
WKLoggerUtils.getInstance().e(TAG, "Received kick message");
MessageHandler.getInstance().updateLastSendingMsgFail();
WKIMApplication.getInstance().isCanConnect = false;
stopAll();
} else {
// Only attempt reconnection if we're allowed to connect
if (WKIMApplication.getInstance().isCanConnect) {
reconnection();
}
WKLoggerUtils.getInstance().e(TAG, "Login status: " + status);
stopAll();
}
}
}
private boolean isValidStateTransition(int currentState, int newState) {
// Define valid state transitions
switch (currentState) {
case WKConnectStatus.fail:
// From fail state, can move to connecting or success
return newState == WKConnectStatus.connecting ||
newState == WKConnectStatus.success;
case WKConnectStatus.connecting:
// From connecting, can move to success, fail, or no network
return newState == WKConnectStatus.success ||
newState == WKConnectStatus.fail ||
newState == WKConnectStatus.noNetwork;
case WKConnectStatus.success:
// From success, can move to syncMsg, kicked, or fail
return newState == WKConnectStatus.syncMsg ||
newState == WKConnectStatus.kicked ||
newState == WKConnectStatus.fail;
case WKConnectStatus.syncMsg:
// From syncMsg, can move to success or fail
return newState == WKConnectStatus.success ||
newState == WKConnectStatus.fail;
case WKConnectStatus.noNetwork:
// From noNetwork, can move to connecting or fail
return newState == WKConnectStatus.connecting ||
newState == WKConnectStatus.fail;
default:
// For any other state, allow transition to fail state
return newState == WKConnectStatus.fail;
}
}
public void sendMessage(WKBaseMsg mBaseMsg) {
if (mBaseMsg == null) {
WKLoggerUtils.getInstance().w(TAG + ": sendMessage called with null mBaseMsg.");
WKLoggerUtils.getInstance().w(TAG ,"sendMessage called with null mBaseMsg.");
return;
}
if (mBaseMsg.packetType != WKMsgType.CONNECT) {
if (connectStatus == WKConnectStatus.syncMsg) {
WKLoggerUtils.getInstance().i(TAG + ": sendMessage: In syncMsg status, message not sent: " + mBaseMsg.packetType);
WKLoggerUtils.getInstance().i(TAG ," sendMessage: In syncMsg status, message not sent: " + mBaseMsg.packetType);
return;
}
if (connectStatus != WKConnectStatus.success) {
WKLoggerUtils.getInstance().w(TAG + ": sendMessage: Not in success status (is " + connectStatus + "), attempting reconnection for: " + mBaseMsg.packetType);
WKLoggerUtils.getInstance().w(TAG , " sendMessage: Not in success status (is " + connectStatus + "), attempting reconnection for: " + mBaseMsg.packetType);
reconnection();
return;
}
@ -419,7 +676,7 @@ public class WKConnection {
}
if (currentConnection == null || !currentConnection.isOpen()) {
WKLoggerUtils.getInstance().w(TAG + ": sendMessage: Connection is null or not open, attempting reconnection for: " + mBaseMsg.packetType);
WKLoggerUtils.getInstance().w(TAG ," sendMessage: Connection is null or not open, attempting reconnection for: " + mBaseMsg.packetType);
reconnection();
return;
}
@ -602,13 +859,6 @@ public class WKConnection {
checkConnAckHandler.postDelayed(checkConnAckRunnable, 1000);
}
private synchronized void startGetConnAddressTimer() {
// 移除之前的回调
checkRequestAddressHandler.removeCallbacks(checkRequestAddressRunnable);
// 开始新的检查
checkRequestAddressHandler.postDelayed(checkRequestAddressRunnable, 1000);
}
private void saveSendMsg(WKMsg msg) {
if (msg.setting == null) msg.setting = new WKMsgSetting();
JSONObject jsonObject = WKProto.getInstance().getSendPayload(msg);
@ -632,50 +882,48 @@ public class WKConnection {
}
}
public void destroy() {
if (checkRequestAddressHandler != null) {
checkRequestAddressHandler.removeCallbacks(checkRequestAddressRunnable);
}
if (checkConnAckHandler != null) {
checkConnAckHandler.removeCallbacks(checkConnAckRunnable);
}
}
public void stopAll() {
// 先设置连接状态为失败
WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.fail, "");
// 清理连接相关资源
closeConnect();
// 关闭定时器管理器
TimerManager.getInstance().shutdown();
MessageHandler.getInstance().clearCacheData();
// 移除所有Handler回调
if (checkRequestAddressHandler != null) {
checkRequestAddressHandler.removeCallbacks(checkRequestAddressRunnable);
}
if (checkConnAckHandler != null) {
checkConnAckHandler.removeCallbacks(checkConnAckRunnable);
}
if (reconnectionHandler != null) {
reconnectionHandler.removeCallbacks(reconnectionRunnable);
}
synchronized (connectionLock) {
// 先设置连接状态为失败
WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.fail, "");
// 清理连接相关资源
closeConnect();
// 关闭定时器管理器
TimerManager.getInstance().shutdown();
MessageHandler.getInstance().clearCacheData();
// 移除所有Handler回调
if (checkRequestAddressHandler != null) {
checkRequestAddressHandler.removeCallbacks(checkRequestAddressRunnable);
}
if (checkConnAckHandler != null) {
checkConnAckHandler.removeCallbacks(checkConnAckRunnable);
}
if (reconnectionHandler != null) {
reconnectionHandler.removeCallbacks(reconnectionRunnable);
}
// 重置所有状态
connectStatus = WKConnectStatus.fail;
isReConnecting = false;
ip = "";
port = 0;
requestIPTime = 0;
connAckTime = 0;
lastMsgTime = 0;
connCount = 0;
// 清空发送消息队列
if (sendingMsgHashMap != null) {
sendingMsgHashMap.clear();
// 重置所有状态
connectStatus = WKConnectStatus.fail;
isReConnecting = false;
isConnecting = false;
ip = "";
port = 0;
requestIPTime = 0;
connAckTime = 0;
lastMsgTime = 0;
connCount = 0;
// 清空发送消息队列
if (sendingMsgHashMap != null) {
sendingMsgHashMap.clear();
}
// 清理连接客户端
connectionClient = null;
// 关闭线程池
shutdownExecutor();
System.gc();
}
// 清理连接客户端
connectionClient = null;
System.gc();
}
private void closeConnect() {
@ -683,21 +931,29 @@ public class WKConnection {
// 如果已经在关闭过程中直接返回
if (!isClosing.compareAndSet(false, true)) {
WKLoggerUtils.getInstance().i(TAG + ": Close operation already in progress");
WKLoggerUtils.getInstance().i(TAG , " Close operation already in progress");
return;
}
synchronized (connectionLock) {
if (connection == null) {
isClosing.set(false);
WKLoggerUtils.getInstance().i(TAG + ": closeConnect called but connection is already null.");
WKLoggerUtils.getInstance().i(TAG , " closeConnect called but connection is already null.");
return;
}
connectionToCloseActual = connection;
String connId = connectionToCloseActual.getId();
// Mark connection for closure to prevent reconnection attempts
try {
connectionToCloseActual.setAttachment("closing_" + System.currentTimeMillis() + "_" + connId);
} catch (Exception e) {
WKLoggerUtils.getInstance().e(TAG, "Failed to set closing attachment: " + e.getMessage());
}
connection = null;
connectionClient = null;
WKLoggerUtils.getInstance().i(TAG + ": Connection object nulled, preparing for async close of: " + connId);
WKLoggerUtils.getInstance().i(TAG , " Connection object nulled, preparing for async close of: " + connId);
}
// Create a timeout handler to force close after timeout
@ -705,7 +961,7 @@ public class WKConnection {
try {
if (connectionToCloseActual.isOpen()) {
String connId = connectionToCloseActual.getId();
WKLoggerUtils.getInstance().w(TAG + ": Connection close timeout reached for: " + connId);
WKLoggerUtils.getInstance().w(TAG , " Connection close timeout reached for: " + connId);
connectionToCloseActual.close();
}
} catch (Exception e) {
@ -723,21 +979,30 @@ public class WKConnection {
try {
if (connectionToCloseActual.isOpen()) {
String connId = connectionToCloseActual.getId();
WKLoggerUtils.getInstance().i(TAG + ": Attempting to close connection: " + connId);
connectionToCloseActual.setAttachment("closing_" + System.currentTimeMillis() + "_" + connId);
WKLoggerUtils.getInstance().i(TAG , " Attempting to close connection: " + connId);
connectionToCloseActual.close();
// Remove the timeout handler since we closed successfully
mainHandler.removeCallbacks(timeoutRunnable);
WKLoggerUtils.getInstance().i(TAG + ": Successfully closed connection: " + connId);
WKLoggerUtils.getInstance().i(TAG , " Successfully closed connection: " + connId);
} else {
WKLoggerUtils.getInstance().i(TAG + ": Connection was already closed or not open when async close executed: " + connectionToCloseActual.getId());
WKLoggerUtils.getInstance().i(TAG , " Connection was already closed or not open when async close executed: " + connectionToCloseActual.getId());
}
} catch (IOException e) {
WKLoggerUtils.getInstance().e(TAG, "IOException during async connection close for " + connectionToCloseActual.getId() + ": " + e.getMessage());
} catch (Exception e) {
WKLoggerUtils.getInstance().e(TAG, "Exception during async connection close for " + connectionToCloseActual.getId() + ": " + e.getMessage());
} finally {
isClosing.set(false);
synchronized (connectionLock) {
isClosing.set(false);
// Only trigger reconnection if we're still supposed to be connected
if (WKIMApplication.getInstance().isCanConnect && connectStatus != WKConnectStatus.kicked) {
mainHandler.postDelayed(() -> {
if (connectionIsNull() && !isClosing.get()) {
reconnection();
}
}, 1000);
}
}
}
}, "ConnectionCloser");
closeThread.setDaemon(true);

View File

@ -79,22 +79,22 @@ public class WKLoggerUtils {
/**
* log.i
*/
private void info(String msg) {
private void info(String tag, String msg) {
String message = createMessage(msg);
if (WKIM.getInstance().isDebug()) {
Log.i(TAG, message);
Log.i(TAG + " " + tag, message);
}
if (WKIM.getInstance().isDebug()) {
writeLog(message);
}
}
public void i(String msg) {
info(msg);
public void i(String tag, String msg) {
info(tag, msg);
}
public void i(Exception e) {
info(e != null ? e.toString() : "null");
public void i(String tag, Exception e) {
info(tag, e != null ? e.toString() : "null");
}
/**
@ -185,12 +185,12 @@ public class WKLoggerUtils {
/**
* log.warn
*/
private void warn(String msg) {
private void warn(String tag, String msg) {
String message = createMessage(msg);
if (WKIM.getInstance().isDebug()) {
System.out.println(message);
} else {
Log.w(TAG, message);
Log.w(TAG + " " + tag, message);
}
if (WKIM.getInstance().isDebug()) {
writeLog(message);
@ -222,12 +222,12 @@ public class WKLoggerUtils {
/**
* log.w
*/
public void w(String msg) {
warn(msg);
public void w(String tag, String msg) {
warn(tag, msg);
}
public void w(Exception e) {
warn(e != null ? e.toString() : "null");
public void w(String tag,Exception e) {
warn(tag,e != null ? e.toString() : "null");
}
//
// public void resetLogFile() {