fix:修改在接受消息时频繁连接断开会导致接受消息数量对不上

This commit is contained in:
SL 2025-05-04 14:10:57 +08:00
parent 275ebfbd47
commit a7984b8ed8
4 changed files with 18 additions and 44 deletions

View File

@ -43,7 +43,7 @@ public class ConnectionManager extends BaseManager {
} }
WKIMApplication.getInstance().isCanConnect = true; WKIMApplication.getInstance().isCanConnect = true;
if (WKConnection.getInstance().connectionIsNull()) { if (WKConnection.getInstance().connectionIsNull()) {
WKConnection.getInstance().reconnection("手动"); WKConnection.getInstance().reconnection();
} }
} }

View File

@ -103,7 +103,7 @@ class ConnectionClient implements IDataHandler, IConnectHandler,
WKLoggerUtils.getInstance().e(TAG, "关闭连接异常"); WKLoggerUtils.getInstance().e(TAG, "关闭连接异常");
} }
if (WKIMApplication.getInstance().isCanConnect) { if (WKIMApplication.getInstance().isCanConnect) {
WKConnection.getInstance().reconnection("错误消息"); WKConnection.getInstance().reconnection();
} }
return true; return true;
} }
@ -115,7 +115,6 @@ class ConnectionClient implements IDataHandler, IConnectHandler,
@Override @Override
public boolean onDisconnect(INonBlockingConnection iNonBlockingConnection) { public boolean onDisconnect(INonBlockingConnection iNonBlockingConnection) {
try { try {
WKLoggerUtils.getInstance().e("收到了断开连接"+iNonBlockingConnection.getId());
if (iNonBlockingConnection != null && !TextUtils.isEmpty(iNonBlockingConnection.getId()) && iNonBlockingConnection.getAttachment() != null) { if (iNonBlockingConnection != null && !TextUtils.isEmpty(iNonBlockingConnection.getId()) && iNonBlockingConnection.getAttachment() != null) {
String id = iNonBlockingConnection.getId(); String id = iNonBlockingConnection.getId();
Object attachmentObject = iNonBlockingConnection.getAttachment(); Object attachmentObject = iNonBlockingConnection.getAttachment();

View File

@ -91,14 +91,8 @@ public class WKConnection {
private final long connAckTimeoutTime = 10; private final long connAckTimeoutTime = 10;
public String socketSingleID; public String socketSingleID;
private String lastRequestId; private String lastRequestId;
private int unReceivePongCount = 0;
public volatile Handler reconnectionHandler = new Handler(Objects.requireNonNull(Looper.myLooper())); public volatile Handler reconnectionHandler = new Handler(Objects.requireNonNull(Looper.myLooper()));
Runnable reconnectionRunnable = new Runnable() { Runnable reconnectionRunnable = this::reconnection;
@Override
public void run() {
reconnection("handler");
}
};
private int connCount = 0; private int connCount = 0;
private HeartbeatManager heartbeatManager; private HeartbeatManager heartbeatManager;
private NetworkChecker networkChecker; private NetworkChecker networkChecker;
@ -112,7 +106,7 @@ public class WKConnection {
if (TextUtils.isEmpty(ip) || port == 0) { if (TextUtils.isEmpty(ip) || port == 0) {
WKLoggerUtils.getInstance().e(TAG, "获取连接地址超时"); WKLoggerUtils.getInstance().e(TAG, "获取连接地址超时");
isReConnecting = false; isReConnecting = false;
reconnection("获取地址超时"); reconnection();
} }
} else { } else {
if (TextUtils.isEmpty(ip) || port == 0) { if (TextUtils.isEmpty(ip) || port == 0) {
@ -132,8 +126,8 @@ public class WKConnection {
if (nowTime - connAckTime > connAckTimeoutTime && connectStatus != WKConnectStatus.success && connectStatus != WKConnectStatus.syncMsg) { if (nowTime - connAckTime > connAckTimeoutTime && connectStatus != WKConnectStatus.success && connectStatus != WKConnectStatus.syncMsg) {
WKLoggerUtils.getInstance().e(TAG, "连接确认超时"); WKLoggerUtils.getInstance().e(TAG, "连接确认超时");
isReConnecting = false; isReConnecting = false;
closeConnect("检查连接ack超时"); closeConnect();
reconnection("检查连接超时"); reconnection();
} else { } else {
if (connectStatus == WKConnectStatus.success || connectStatus == WKConnectStatus.syncMsg) { if (connectStatus == WKConnectStatus.success || connectStatus == WKConnectStatus.syncMsg) {
WKLoggerUtils.getInstance().e(TAG, "连接确认成功"); WKLoggerUtils.getInstance().e(TAG, "连接确认成功");
@ -164,8 +158,7 @@ public class WKConnection {
reconnectionHandler.postDelayed(reconnectionRunnable, connIntervalMillisecond * connCount); reconnectionHandler.postDelayed(reconnectionRunnable, connIntervalMillisecond * connCount);
} }
public synchronized void reconnection(String from) { public synchronized void reconnection() {
Log.e("重连来源",from);
if (!WKIMApplication.getInstance().isCanConnect) { if (!WKIMApplication.getInstance().isCanConnect) {
WKLoggerUtils.getInstance().e(TAG, "断开"); WKLoggerUtils.getInstance().e(TAG, "断开");
stopAll(); stopAll();
@ -185,7 +178,7 @@ public class WKConnection {
reconnectionHandler.removeCallbacks(reconnectionRunnable); reconnectionHandler.removeCallbacks(reconnectionRunnable);
boolean isHaveNetwork = WKIMApplication.getInstance().isNetworkConnected(); boolean isHaveNetwork = WKIMApplication.getInstance().isNetworkConnected();
if (isHaveNetwork) { if (isHaveNetwork) {
closeConnect("重连"); closeConnect();
isReConnecting = true; isReConnecting = true;
requestIPTime = DateUtils.getInstance().getCurrentSeconds(); requestIPTime = DateUtils.getInstance().getCurrentSeconds();
getConnAddress(); getConnAddress();
@ -232,7 +225,7 @@ public class WKConnection {
private synchronized void connSocket() { private synchronized void connSocket() {
synchronized (connectionLock) { // 使用专门的锁 synchronized (connectionLock) { // 使用专门的锁
closeConnect("连接socket"); closeConnect();
socketSingleID = UUID.randomUUID().toString().replace("-", ""); socketSingleID = UUID.randomUUID().toString().replace("-", "");
connectionClient = new ConnectionClient(iNonBlockingConnection -> { connectionClient = new ConnectionClient(iNonBlockingConnection -> {
synchronized (connectionLock) { // 回调中也需要使用相同的锁 synchronized (connectionLock) { // 回调中也需要使用相同的锁
@ -299,7 +292,7 @@ public class WKConnection {
@Override @Override
public void reconnect() { public void reconnect() {
WKIMApplication.getInstance().isCanConnect = true; WKIMApplication.getInstance().isCanConnect = true;
reconnection("消息处理"); reconnection();
} }
@Override @Override
@ -311,7 +304,6 @@ public class WKConnection {
public void pongMsg(WKPongMsg msgHeartbeat) { public void pongMsg(WKPongMsg msgHeartbeat) {
// 心跳消息 // 心跳消息
lastMsgTime = DateUtils.getInstance().getCurrentSeconds(); lastMsgTime = DateUtils.getInstance().getCurrentSeconds();
unReceivePongCount = 0;
} }
@Override @Override
@ -389,7 +381,7 @@ public class WKConnection {
WKIMApplication.getInstance().isCanConnect = false; WKIMApplication.getInstance().isCanConnect = false;
stopAll(); stopAll();
} else { } else {
reconnection("解码未知类型:" + status); reconnection();
WKLoggerUtils.getInstance().e(TAG, "登录状态:" + status); WKLoggerUtils.getInstance().e(TAG, "登录状态:" + status);
stopAll(); stopAll();
@ -405,36 +397,21 @@ public class WKConnection {
return; return;
} }
if (connectStatus != WKConnectStatus.success) { if (connectStatus != WKConnectStatus.success) {
reconnection("发消息发现连接不成功" + mBaseMsg.packetType); reconnection();
return; return;
} }
} }
if (mBaseMsg.packetType == WKMsgType.PING) {
unReceivePongCount++;
}
if (connection == null || !connection.isOpen()) { if (connection == null || !connection.isOpen()) {
reconnection("发消息发现连接为空"); reconnection();
return; return;
} }
int status = MessageHandler.getInstance().sendMessage(connection, mBaseMsg); int status = MessageHandler.getInstance().sendMessage(connection, mBaseMsg);
if (status == 0) { if (status == 0) {
WKLoggerUtils.getInstance().e(TAG, "发消息失败"); WKLoggerUtils.getInstance().e(TAG, "发消息失败");
reconnection("发消息失败"); reconnection();
} }
} }
// 查看心跳是否超时
void checkHeartIsTimeOut() {
if (unReceivePongCount >= 5) {
forcedReconnection();
return;
}
long nowTime = DateUtils.getInstance().getCurrentSeconds();
if (nowTime - lastMsgTime >= 60) {
sendMessage(new WKPingMsg());
}
}
private void removeSendingMsg() { private void removeSendingMsg() {
if (!sendingMsgHashMap.isEmpty()) { if (!sendingMsgHashMap.isEmpty()) {
Iterator<Map.Entry<Integer, WKSendingMsg>> it = sendingMsgHashMap.entrySet().iterator(); Iterator<Map.Entry<Integer, WKSendingMsg>> it = sendingMsgHashMap.entrySet().iterator();
@ -648,7 +625,7 @@ public class WKConnection {
// 先设置连接状态为失败 // 先设置连接状态为失败
WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.fail, ""); WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.fail, "");
// 清理连接相关资源 // 清理连接相关资源
closeConnect("stopall"); closeConnect();
// 关闭定时器管理器 // 关闭定时器管理器
TimerManager.getInstance().shutdown(); TimerManager.getInstance().shutdown();
MessageHandler.getInstance().clearCacheData(); MessageHandler.getInstance().clearCacheData();
@ -668,7 +645,6 @@ public class WKConnection {
isReConnecting = false; isReConnecting = false;
ip = ""; ip = "";
port = 0; port = 0;
unReceivePongCount = 0;
requestIPTime = 0; requestIPTime = 0;
connAckTime = 0; connAckTime = 0;
lastMsgTime = 0; lastMsgTime = 0;
@ -682,9 +658,8 @@ public class WKConnection {
System.gc(); System.gc();
} }
private synchronized void closeConnect(String from) { private synchronized void closeConnect() {
synchronized (connectionLock) { synchronized (connectionLock) {
WKLoggerUtils.getInstance().e("关闭连接来源",from);
if (connection != null) { if (connection != null) {
try { try {
if (connection.isOpen()) { if (connection.isOpen()) {

View File

@ -37,7 +37,7 @@ public class NetworkChecker {
} else { } else {
//有网络 //有网络
if (WKConnection.getInstance().connectionIsNull() || isForcedReconnect) { if (WKConnection.getInstance().connectionIsNull() || isForcedReconnect) {
WKConnection.getInstance().reconnection("网络1"); WKConnection.getInstance().reconnection();
isForcedReconnect = false; isForcedReconnect = false;
} }
} }