diff --git a/app/src/main/java/com/xinbida/wukongdemo/ConversationActivity.kt b/app/src/main/java/com/xinbida/wukongdemo/ConversationActivity.kt index d9672cf..0a41e32 100644 --- a/app/src/main/java/com/xinbida/wukongdemo/ConversationActivity.kt +++ b/app/src/main/java/com/xinbida/wukongdemo/ConversationActivity.kt @@ -98,12 +98,10 @@ class ConversationActivity : AppCompatActivity() { WKIM.getInstance().conversationManager.addOnRefreshMsgListener( "conv" ) { uiConversationMsg, isEnd -> - Log.e("修改的值",uiConversationMsg.channelID) var isAdd = true number++ for (index in adapter.data.indices) { if (adapter.data[index].channelID == uiConversationMsg?.channelID && adapter.data[index].channelType == uiConversationMsg?.channelType) { - Log.e("要修改","-->") isAdd = false adapter.data[index].wkMsg = uiConversationMsg.wkMsg adapter.data[index].lastMsgSeq = diff --git a/app/src/main/java/com/xinbida/wukongdemo/MainActivity.java b/app/src/main/java/com/xinbida/wukongdemo/MainActivity.java index 9bf999e..86eedba 100644 --- a/app/src/main/java/com/xinbida/wukongdemo/MainActivity.java +++ b/app/src/main/java/com/xinbida/wukongdemo/MainActivity.java @@ -162,6 +162,9 @@ public class MainActivity extends AppCompatActivity { // 新消息监听 WKIM.getInstance().getMsgManager().addOnNewMsgListener(channelID, msgList -> { for (WKMsg msg : msgList) { + if (!msg.channelID.equals(channelID)){ + continue; + } if (msg.type == 56) { adapter.addData(new UIMessageEntity(msg, 3)); } else { diff --git a/app/src/main/java/com/xinbida/wukongdemo/WKApplication.kt b/app/src/main/java/com/xinbida/wukongdemo/WKApplication.kt index 875779c..41a40ec 100644 --- a/app/src/main/java/com/xinbida/wukongdemo/WKApplication.kt +++ b/app/src/main/java/com/xinbida/wukongdemo/WKApplication.kt @@ -2,6 +2,7 @@ package com.xinbida.wukongdemo import android.app.Application import android.text.TextUtils +import android.util.Log import com.xinbida.wukongim.WKIM import com.xinbida.wukongim.entity.WKChannelType import com.xinbida.wukongim.entity.WKSyncChat @@ -45,6 +46,10 @@ class WKApplication : Application() { } }] }.start() +// andPortListener.onGetSocketIpAndPort( +// "192.168.3.13", +// 5100 +// ) } // 对接频道资料(群信息/用户信息) WKIM.getInstance().channelManager.addOnGetChannelInfoListener { channelId, channelType, _ -> @@ -129,6 +134,7 @@ class WKApplication : Application() { "/conversation/sync", json ) { code, data -> if (code != 200 || TextUtils.isEmpty(data)) { + Log.e("同步失败","-->") iSyncConvChatBack?.onBack(null) return@post } diff --git a/wkim/src/main/java/com/xinbida/wukongim/manager/ConnectionManager.java b/wkim/src/main/java/com/xinbida/wukongim/manager/ConnectionManager.java index 6da9f56..713e92a 100644 --- a/wkim/src/main/java/com/xinbida/wukongim/manager/ConnectionManager.java +++ b/wkim/src/main/java/com/xinbida/wukongim/manager/ConnectionManager.java @@ -43,14 +43,13 @@ public class ConnectionManager extends BaseManager { } WKIMApplication.getInstance().isCanConnect = true; if (WKConnection.getInstance().connectionIsNull()) { - WKConnection.getInstance().reconnection(); + WKConnection.getInstance().reconnection("手动"); } } public void disconnect(boolean isLogout) { if (TextUtils.isEmpty(WKIMApplication.getInstance().getToken())) return; - WKLoggerUtils.getInstance().e(TAG,"disconnect Disconnect is exit :" + isLogout); if (isLogout) { logoutChat(); } else { diff --git a/wkim/src/main/java/com/xinbida/wukongim/message/ConnectionClient.java b/wkim/src/main/java/com/xinbida/wukongim/message/ConnectionClient.java index 29f6500..7bcbd19 100644 --- a/wkim/src/main/java/com/xinbida/wukongim/message/ConnectionClient.java +++ b/wkim/src/main/java/com/xinbida/wukongim/message/ConnectionClient.java @@ -37,7 +37,7 @@ class ConnectionClient implements IDataHandler, IConnectHandler, @Override public boolean onConnectException(INonBlockingConnection iNonBlockingConnection, IOException e) { - WKLoggerUtils.getInstance().e(TAG, "connection exception"); + WKLoggerUtils.getInstance().e(TAG,"连接异常"); WKConnection.getInstance().forcedReconnection(); close(iNonBlockingConnection); return true; @@ -76,7 +76,7 @@ class ConnectionClient implements IDataHandler, IConnectHandler, @Override public boolean onConnectionTimeout(INonBlockingConnection iNonBlockingConnection) { if (!isConnectSuccess) { - WKLoggerUtils.getInstance().e(TAG, "connection timeout"); + WKLoggerUtils.getInstance().e(TAG, "连接超时"); WKConnection.getInstance().forcedReconnection(); } return true; @@ -84,71 +84,46 @@ class ConnectionClient implements IDataHandler, IConnectHandler, @Override public boolean onData(INonBlockingConnection iNonBlockingConnection) throws BufferUnderflowException { + if (WKConnection.getInstance().connectionIsNull() || WKConnection.getInstance().isReConnecting) { + return true; + } Object id = iNonBlockingConnection.getAttachment(); if (id instanceof String) { if (id.toString().startsWith("close")) { return true; } if (!TextUtils.isEmpty(WKConnection.getInstance().socketSingleID) && !WKConnection.getInstance().socketSingleID.equals(id)) { - WKLoggerUtils.getInstance().e(TAG, "onData method The received message ID does not match the connected ID"); + WKLoggerUtils.getInstance().e(TAG, "非当前连接的消息"); try { iNonBlockingConnection.close(); if (WKConnection.getInstance().connection != null) { WKConnection.getInstance().connection.close(); } } catch (IOException e) { - WKLoggerUtils.getInstance().e(TAG, "onData close connection error"); + WKLoggerUtils.getInstance().e(TAG, "关闭连接异常"); } if (WKIMApplication.getInstance().isCanConnect) { - WKConnection.getInstance().forcedReconnection(); + WKConnection.getInstance().reconnection("错误消息"); } return true; } } - int available_len; - int bufLen = 102400; - try { - available_len = iNonBlockingConnection.available(); - if (available_len == -1) { - return true; - } - int readCount = available_len / bufLen; - if (available_len % bufLen != 0) { - readCount++; - } - - for (int i = 0; i < readCount; i++) { - int readLen = bufLen; - if (i == readCount - 1) { - if (available_len % bufLen != 0) { - readLen = available_len % bufLen; - } - } - byte[] buffBytes = iNonBlockingConnection.readBytesByLength(readLen); - if (buffBytes.length > 0) { - WKConnection.getInstance().receivedData(buffBytes); - } - } - - } catch (IOException e) { - WKLoggerUtils.getInstance().e(TAG, "Handling Received Data Exception:" + e.getMessage()); - } + MessageHandler.getInstance().handlerOnlineBytes(iNonBlockingConnection); return true; } @Override public boolean onDisconnect(INonBlockingConnection iNonBlockingConnection) { - WKLoggerUtils.getInstance().e(TAG, "Connection disconnected"); try { + WKLoggerUtils.getInstance().e("收到了断开连接"+iNonBlockingConnection.getId()); if (iNonBlockingConnection != null && !TextUtils.isEmpty(iNonBlockingConnection.getId()) && iNonBlockingConnection.getAttachment() != null) { String id = iNonBlockingConnection.getId(); Object attachmentObject = iNonBlockingConnection.getAttachment(); if (attachmentObject instanceof String) { String att = (String) attachmentObject; String attStr = "close" + id; - WKLoggerUtils.getInstance().e("手动关闭"); if (att.equals(attStr)) { - WKLoggerUtils.getInstance().e("手动关闭直接返回"); + WKLoggerUtils.getInstance().e("主动断开不重连"); return true; } } @@ -156,8 +131,6 @@ class ConnectionClient implements IDataHandler, IConnectHandler, if (WKIMApplication.getInstance().isCanConnect) { WKLoggerUtils.getInstance().e("手动关闭需要重连"); WKConnection.getInstance().forcedReconnection(); - } else { - WKLoggerUtils.getInstance().e(TAG, "No reconnection allowed"); } close(iNonBlockingConnection); } catch (Exception ignored) { @@ -170,7 +143,6 @@ class ConnectionClient implements IDataHandler, IConnectHandler, @Override public boolean onIdleTimeout(INonBlockingConnection iNonBlockingConnection) { if (!isConnectSuccess) { - WKLoggerUtils.getInstance().e(TAG, "Idle timeout"); WKConnection.getInstance().forcedReconnection(); close(iNonBlockingConnection); } @@ -182,7 +154,7 @@ class ConnectionClient implements IDataHandler, IConnectHandler, if (iNonBlockingConnection != null) iNonBlockingConnection.close(); } catch (IOException e) { - WKLoggerUtils.getInstance().e(TAG, "close connection error"); + WKLoggerUtils.getInstance().e(TAG, "关闭连接异常"); } } } \ No newline at end of file diff --git a/wkim/src/main/java/com/xinbida/wukongim/message/MessageHandler.java b/wkim/src/main/java/com/xinbida/wukongim/message/MessageHandler.java index bde1044..60d7f29 100644 --- a/wkim/src/main/java/com/xinbida/wukongim/message/MessageHandler.java +++ b/wkim/src/main/java/com/xinbida/wukongim/message/MessageHandler.java @@ -1,5 +1,7 @@ package com.xinbida.wukongim.message; +import android.os.Handler; +import android.os.Looper; import android.text.TextUtils; import com.xinbida.wukongim.WKIM; @@ -12,7 +14,6 @@ import com.xinbida.wukongim.entity.WKSyncMsg; import com.xinbida.wukongim.entity.WKUIConversationMsg; import com.xinbida.wukongim.interfaces.IReceivedMsgListener; import com.xinbida.wukongim.manager.CMDManager; -import com.xinbida.wukongim.manager.ConversationManager; import com.xinbida.wukongim.message.type.WKMsgContentType; import com.xinbida.wukongim.message.type.WKMsgType; import com.xinbida.wukongim.protocol.WKBaseMsg; @@ -35,11 +36,10 @@ import java.nio.BufferOverflowException; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; /** * 5/21/21 11:25 AM @@ -59,13 +59,15 @@ public class MessageHandler { return MessageHandlerBinder.handler; } + private final List receivedAckMsgList = Collections.synchronizedList(new ArrayList<>()); + int sendMessage(INonBlockingConnection connection, WKBaseMsg msg) { if (msg == null) { return 1; } byte[] bytes = WKProto.getInstance().encodeMsg(msg); if (bytes == null || bytes.length == 0) { - WKLoggerUtils.getInstance().e(TAG, "Send unknown message packet:" + msg.packetType); + WKLoggerUtils.getInstance().e(TAG, "发送了非法包:" + msg.packetType); return 1; } @@ -75,107 +77,195 @@ public class MessageHandler { connection.flush(); return 1; } catch (BufferOverflowException e) { - WKLoggerUtils.getInstance().e(TAG, "sendMessages Exception BufferOverflowException" + WKLoggerUtils.getInstance().e(TAG, "发消息异常 BufferOverflowException" + e.getMessage()); return 0; } catch (ClosedChannelException e) { - WKLoggerUtils.getInstance().e(TAG, "sendMessages Exception ClosedChannelException" + WKLoggerUtils.getInstance().e(TAG, "发消息异常 ClosedChannelException" + e.getMessage()); return 0; } catch (SocketTimeoutException e) { - WKLoggerUtils.getInstance().e(TAG, "sendMessages Exception SocketTimeoutException" + WKLoggerUtils.getInstance().e(TAG, "发消息异常 SocketTimeoutException" + e.getMessage()); return 0; } catch (IOException e) { - WKLoggerUtils.getInstance().e(TAG, "sendMessages Exception IOException" + e.getMessage()); + WKLoggerUtils.getInstance().e(TAG, "发消息异常 IOException" + e.getMessage()); return 0; } } else { - WKLoggerUtils.getInstance().e("sendMessages Exception sendMessage conn null:" + WKLoggerUtils.getInstance().e("发消息异常:" + connection); return 0; } } - private List receivedMsgList; + private volatile List receivedMsgList; + private final Object receivedMsgListLock = new Object(); + private final Object cacheLock = new Object(); private byte[] cacheData = null; + private int available_len; + + public void clearCacheData() { + synchronized (cacheLock) { + cacheData = null; + available_len = 0; + } + } + + + synchronized void handlerOnlineBytes(INonBlockingConnection iNonBlockingConnection) { + synchronized (cacheLock) { + try { + // 获取可用数据长度 + available_len = iNonBlockingConnection.available(); + + // 安全检查 + if (available_len <= 0) { + return; + } + + // 限制单次最大读取大小为150kb + int bufLen = 1024 / 2; + + // 分批读取数据 + while (available_len > 0) { + // 计算本次应该读取的长度 + int readLen = Math.min(bufLen, available_len); + if (readLen <= 0) break; + // 读取数据前确保连接仍然有效 + if (!iNonBlockingConnection.isOpen()) { + WKLoggerUtils.getInstance().e(TAG, "读取数据时连接关闭"); + break; + } + // 读取数据 + byte[] buffBytes = iNonBlockingConnection.readBytesByLength(readLen); + if (buffBytes != null && buffBytes.length > 0) { + WKConnection.getInstance().receivedData(buffBytes); + available_len -= buffBytes.length; + } else { + WKLoggerUtils.getInstance().e(TAG, "读取数据失败或收到空数据"); + break; + } + // 给一个很小的延迟,避免过快读取 + try { + Thread.sleep(1); + } catch (InterruptedException ignored) { + } + } + } catch (IOException e) { + WKLoggerUtils.getInstance().e(TAG, "处理接收到的数据异常:" + e.getMessage()); + clearCacheData(); + } catch (Exception e) { + WKLoggerUtils.getInstance().e(TAG, "onData 中发生意外错误: " + e.getMessage()); + clearCacheData(); + } + } + } synchronized void cutBytes(byte[] available_bytes, IReceivedMsgListener mIReceivedMsgListener) { - - if (cacheData == null || cacheData.length == 0) cacheData = available_bytes; - else { - //如果上次还存在未解析完的消息将新数据追加到缓存数据中 - byte[] temp = new byte[available_bytes.length + cacheData.length]; - try { - System.arraycopy(cacheData, 0, temp, 0, cacheData.length); - System.arraycopy(available_bytes, 0, temp, cacheData.length, available_bytes.length); - cacheData = temp; - } catch (Exception e) { - WKLoggerUtils.getInstance().e(TAG, "cutBytes Merge message error" + e.getMessage()); - } - - } - byte[] lastMsgBytes = cacheData; - int readLength = 0; - - while (lastMsgBytes.length > 0 && readLength != lastMsgBytes.length) { - - readLength = lastMsgBytes.length; - int packetType = WKTypeUtils.getInstance().getHeight4(lastMsgBytes[0]); - // 是否不持久化:0。 是否显示红点:1。是否只同步一次:0 - //是否持久化[是否保存在数据库] - int no_persist = WKTypeUtils.getInstance().getBit(lastMsgBytes[0], 0); - //是否显示红点 - int red_dot = WKTypeUtils.getInstance().getBit(lastMsgBytes[0], 1); - //是否只同步一次 - int sync_once = WKTypeUtils.getInstance().getBit(lastMsgBytes[0], 2); - WKLoggerUtils.getInstance().e(TAG, "no_persist:" + no_persist + "red_dot:" + red_dot + "sync_once:" + sync_once); - WKLoggerUtils.getInstance().e(TAG, "packet type" + packetType); - if (packetType == WKMsgType.PONG) { - //心跳ack - mIReceivedMsgListener.pongMsg(new WKPongMsg()); - WKLoggerUtils.getInstance().e(TAG, "pong..."); - byte[] bytes = Arrays.copyOfRange(lastMsgBytes, 1, lastMsgBytes.length); - cacheData = lastMsgBytes = bytes; - } else { - if (packetType < 10) { - // 2019-12-21 计算剩余长度 - if (lastMsgBytes.length < 5) { - cacheData = lastMsgBytes; - break; - } - //其他消息类型 - int remainingLength = WKTypeUtils.getInstance().getRemainingLength(Arrays.copyOfRange(lastMsgBytes, 1, lastMsgBytes.length)); - if (remainingLength == -1) { - //剩余长度被分包 - cacheData = lastMsgBytes; - break; - } - if (remainingLength > 1 << 21) { - cacheData = null; - break; - } - byte[] bytes = WKTypeUtils.getInstance().getRemainingLengthByte(remainingLength); - if (remainingLength + 1 + bytes.length > lastMsgBytes.length) { - //半包情况 - cacheData = lastMsgBytes; - } else { - byte[] msg = Arrays.copyOfRange(lastMsgBytes, 0, remainingLength + 1 + bytes.length); - acceptMsg(msg, no_persist, sync_once, red_dot, mIReceivedMsgListener); - byte[] temps = Arrays.copyOfRange(lastMsgBytes, msg.length, lastMsgBytes.length); - cacheData = lastMsgBytes = temps; - } - - } else { - cacheData = null; - mIReceivedMsgListener.reconnect(); - break; + synchronized (cacheLock) { + if (cacheData == null || cacheData.length == 0) cacheData = available_bytes; + else { + //如果上次还存在未解析完的消息将新数据追加到缓存数据中 + byte[] temp = new byte[available_bytes.length + cacheData.length]; + try { + System.arraycopy(cacheData, 0, temp, 0, cacheData.length); + System.arraycopy(available_bytes, 0, temp, cacheData.length, available_bytes.length); + cacheData = temp; + } catch (Exception e) { + WKLoggerUtils.getInstance().e(TAG, "处理粘包消息异常" + e.getMessage()); + clearCacheData(); + return; } } + byte[] lastMsgBytes = cacheData; + int readLength = 0; + + while (lastMsgBytes.length > 0 && readLength != lastMsgBytes.length) { + readLength = lastMsgBytes.length; + int packetType = WKTypeUtils.getInstance().getHeight4(lastMsgBytes[0]); + // 是否不持久化:0。 是否显示红点:1。是否只同步一次:0 + //是否持久化[是否保存在数据库] + int no_persist = WKTypeUtils.getInstance().getBit(lastMsgBytes[0], 0); + //是否显示红点 + int red_dot = WKTypeUtils.getInstance().getBit(lastMsgBytes[0], 1); + //是否只同步一次 + int sync_once = WKTypeUtils.getInstance().getBit(lastMsgBytes[0], 2); + if (WKIM.getInstance().isDebug()) { + String packetTypeStr = "[其他]"; + switch (packetType) { + case WKMsgType.CONNACK: + packetTypeStr = "[连接状态包]"; + break; + case WKMsgType.SEND: + packetTypeStr = "[发送包]"; + break; + case WKMsgType.RECEIVED: + packetTypeStr = "[收到消息包]"; + break; + case WKMsgType.DISCONNECT: + packetTypeStr = "[断开连接包]"; + break; + case WKMsgType.SENDACK: + packetTypeStr = "[发送回执包]"; + break; + case WKMsgType.PONG: + packetTypeStr = "[心跳包]"; + break; + } + String info = "是否不持续化:" + no_persist + ",是否显示红点:" + red_dot + ",是否只同步一次:" + sync_once; + WKLoggerUtils.getInstance().e(TAG, "收到包类型" + packetType + " " + packetTypeStr + "|" + info); + } + if (packetType == WKMsgType.REVACK || packetType == WKMsgType.SEND || packetType == WKMsgType.Reserved) { + WKConnection.getInstance().forcedReconnection(); + return; + } + if (packetType == WKMsgType.PONG) { + //心跳ack + mIReceivedMsgListener.pongMsg(new WKPongMsg()); + WKLoggerUtils.getInstance().e(TAG, "pong..."); + byte[] bytes = Arrays.copyOfRange(lastMsgBytes, 1, lastMsgBytes.length); + cacheData = lastMsgBytes = bytes; + } else { + if (packetType < 10) { + // 2019-12-21 计算剩余长度 + if (lastMsgBytes.length < 5) { + cacheData = lastMsgBytes; + break; + } + //其他消息类型 + int remainingLength = WKTypeUtils.getInstance().getRemainingLength(Arrays.copyOfRange(lastMsgBytes, 1, lastMsgBytes.length)); + if (remainingLength == -1) { + //剩余长度被分包 + cacheData = lastMsgBytes; + break; + } + if (remainingLength > 1 << 21) { + cacheData = null; + break; + } + byte[] bytes = WKTypeUtils.getInstance().getRemainingLengthByte(remainingLength); + if (remainingLength + 1 + bytes.length > lastMsgBytes.length) { + //半包情况 + cacheData = lastMsgBytes; + } else { + byte[] msg = Arrays.copyOfRange(lastMsgBytes, 0, remainingLength + 1 + bytes.length); + acceptMsg(msg, no_persist, sync_once, red_dot, mIReceivedMsgListener); + byte[] temps = Arrays.copyOfRange(lastMsgBytes, msg.length, lastMsgBytes.length); + cacheData = lastMsgBytes = temps; + } + + } else { + cacheData = null; + mIReceivedMsgListener.reconnect(); + break; + } + } + } + saveReceiveMsg(); } - saveReceiveMsg(); } private void acceptMsg(byte[] bytes, int no_persist, int sync_once, int red_dot, @@ -222,6 +312,8 @@ public class MessageHandler { } else if (g_msg.packetType == WKMsgType.PONG) { mIReceivedMsgListener.pongMsg((WKPongMsg) g_msg); } + } else { + mIReceivedMsgListener.reconnect(); } } } @@ -232,7 +324,7 @@ public class MessageHandler { addReceivedMsg(message); } else { WKReceivedAckMsg receivedAckMsg = getReceivedAckMsg(message); - WKConnection.getInstance().sendMessage(receivedAckMsg); + receivedAckMsgList.add(receivedAckMsg); } } @@ -246,49 +338,92 @@ public class MessageHandler { return receivedAckMsg; } - private synchronized void addReceivedMsg(WKMsg msg) { - if (receivedMsgList == null) receivedMsgList = new ArrayList<>(); - WKSyncMsg syncMsg = new WKSyncMsg(); - syncMsg.no_persist = msg.header.noPersist ? 1 : 0; - syncMsg.sync_once = msg.header.syncOnce ? 1 : 0; - syncMsg.red_dot = msg.header.redDot ? 1 : 0; - syncMsg.wkMsg = msg; - receivedMsgList.add(syncMsg); - } - - public synchronized void saveReceiveMsg() { - if (WKCommonUtils.isNotEmpty(receivedMsgList)) { - saveSyncMsg(receivedMsgList); - List list = new ArrayList<>(); - for (int i = 0, size = receivedMsgList.size(); i < size; i++) { - WKReceivedAckMsg receivedAckMsg = getReceivedAckMsg(receivedMsgList.get(i).wkMsg); - list.add(receivedAckMsg); + private void addReceivedMsg(WKMsg msg) { + synchronized (receivedMsgListLock) { + if (receivedMsgList == null) { + receivedMsgList = new ArrayList<>(); } - sendAck(list); - receivedMsgList.clear(); + WKSyncMsg syncMsg = new WKSyncMsg(); + syncMsg.no_persist = msg.header.noPersist ? 1 : 0; + syncMsg.sync_once = msg.header.syncOnce ? 1 : 0; + syncMsg.red_dot = msg.header.redDot ? 1 : 0; + syncMsg.wkMsg = msg; + receivedMsgList.add(syncMsg); } } - //回复消息ack - private void sendAck(List list) { - if (list.size() == 1) { - WKConnection.getInstance().sendMessage(list.get(0)); - return; + public void saveReceiveMsg() { + List tempList = null; + synchronized (receivedMsgListLock) { + if (WKCommonUtils.isNotEmpty(receivedMsgList)) { + tempList = new ArrayList<>(receivedMsgList); + receivedMsgList.clear(); + } } - final Timer sendAckTimer = new Timer(); - sendAckTimer.schedule(new TimerTask() { - @Override - public void run() { - if (WKCommonUtils.isNotEmpty(list)) { - WKConnection.getInstance().sendMessage(list.get(0)); - list.remove(0); - } else { - sendAckTimer.cancel(); + + if (tempList != null) { + saveSyncMsg(tempList); + synchronized (receivedAckMsgList) { + for (WKSyncMsg syncMsg : tempList) { + WKReceivedAckMsg receivedAckMsg = getReceivedAckMsg(syncMsg.wkMsg); + receivedAckMsgList.add(receivedAckMsg); } } - }, 0, 100); + sendAck(); + } } + private final Handler sendAckHandler = new Handler(Looper.getMainLooper()); + private final Runnable sendAckRunnable = new Runnable() { + @Override + public void run() { + // 检查连接状态 + if (WKConnection.getInstance().connectionIsNull() || WKConnection.getInstance().isReConnecting) { + // 连接断开,取消所有待发送的消息 + sendAckHandler.removeCallbacks(this); + return; + } + + synchronized (receivedAckMsgList) { + if (!receivedAckMsgList.isEmpty()) { + WKConnection.getInstance().sendMessage(receivedAckMsgList.get(0)); + receivedAckMsgList.remove(0); + // 如果列表不为空,继续发送下一条 + if (!receivedAckMsgList.isEmpty()) { + sendAckHandler.postDelayed(this, 100); + } + } + } + } + }; + + //回复消息ack + public void sendAck() { + if (WKConnection.getInstance().connectionIsNull() || WKConnection.getInstance().isReConnecting) { + return; + } + synchronized (receivedAckMsgList) { + if (receivedAckMsgList.isEmpty()) { + return; + } + if (receivedAckMsgList.size() == 1) { + WKConnection.getInstance().sendMessage(receivedAckMsgList.get(0)); + receivedAckMsgList.clear(); + return; + } + // 移除所有待发送的消息,避免重复发送 + sendAckHandler.removeCallbacks(sendAckRunnable); + // 开始发送消息 + sendAckHandler.post(sendAckRunnable); + } + } + + // 在需要清理资源的地方(比如onDestroy)调用此方法 + public void destroy() { + if (sendAckHandler != null) { + sendAckHandler.removeCallbacks(sendAckRunnable); + } + } /** * 保存同步消息 @@ -379,7 +514,7 @@ public class MessageHandler { // for (int i = 0, size = refreshList.size(); i < size; i++) { // ConversationManager.getInstance().setOnRefreshMsg(refreshList.get(i), i == refreshList.size() - 1, "groupMsg"); // } - WKIM.getInstance().getConversationManager().setOnRefreshMsg(refreshList,"groupMsg"); + WKIM.getInstance().getConversationManager().setOnRefreshMsg(refreshList, "groupMsg"); } public WKMsg parsingMsg(WKMsg message) { @@ -412,7 +547,7 @@ public class MessageHandler { } } catch (JSONException e) { message.type = WKMsgContentType.WK_CONTENT_FORMAT_ERROR; - WKLoggerUtils.getInstance().e(TAG, "Parsing message error, message is not a JSON structure"); + WKLoggerUtils.getInstance().e(TAG, "消息体非json"); } if (json == null) { diff --git a/wkim/src/main/java/com/xinbida/wukongim/message/WKConnection.java b/wkim/src/main/java/com/xinbida/wukongim/message/WKConnection.java index ea6d3a6..698afed 100644 --- a/wkim/src/main/java/com/xinbida/wukongim/message/WKConnection.java +++ b/wkim/src/main/java/com/xinbida/wukongim/message/WKConnection.java @@ -5,6 +5,7 @@ import android.graphics.BitmapFactory; import android.os.Handler; import android.os.Looper; import android.text.TextUtils; +import android.util.Log; import com.xinbida.wukongim.WKIM; import com.xinbida.wukongim.WKIMApplication; @@ -18,6 +19,9 @@ import com.xinbida.wukongim.entity.WKSyncMsgMode; import com.xinbida.wukongim.entity.WKUIConversationMsg; import com.xinbida.wukongim.interfaces.IReceivedMsgListener; import com.xinbida.wukongim.manager.ConnectionManager; +import com.xinbida.wukongim.message.timer.HeartbeatManager; +import com.xinbida.wukongim.message.timer.NetworkChecker; +import com.xinbida.wukongim.message.timer.TimerManager; import com.xinbida.wukongim.message.type.WKConnectReason; import com.xinbida.wukongim.message.type.WKConnectStatus; import com.xinbida.wukongim.message.type.WKMsgType; @@ -48,8 +52,6 @@ import java.io.IOException; import java.util.Iterator; import java.util.Map; import java.util.Objects; -import java.util.Timer; -import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -75,25 +77,84 @@ public class WKConnection { // 正在发送的消息 private final ConcurrentHashMap sendingMsgHashMap = new ConcurrentHashMap<>(); // 正在重连中 - private boolean isReConnecting = false; + public boolean isReConnecting = false; // 连接状态 private int connectStatus; private long lastMsgTime = 0; private String ip; private int port; - volatile INonBlockingConnection connection; + public volatile INonBlockingConnection connection; volatile ConnectionClient connectionClient; private long requestIPTime; private long connAckTime; private final long requestIPTimeoutTime = 6; - private final long connAckTimeoutTime = 2; + private final long connAckTimeoutTime = 10; public String socketSingleID; private String lastRequestId; private int unReceivePongCount = 0; public volatile Handler reconnectionHandler = new Handler(Objects.requireNonNull(Looper.myLooper())); - // private final Handler mainHandler = new Handler(Looper.getMainLooper()); - Runnable reconnectionRunnable = this::reconnection; + Runnable reconnectionRunnable = new Runnable() { + @Override + public void run() { + reconnection("handler"); + } + }; private int connCount = 0; + private HeartbeatManager heartbeatManager; + private NetworkChecker networkChecker; + + private final Handler checkRequestAddressHandler = new Handler(Looper.getMainLooper()); + private final Runnable checkRequestAddressRunnable = new Runnable() { + @Override + public void run() { + long nowTime = DateUtils.getInstance().getCurrentSeconds(); + if (nowTime - requestIPTime >= requestIPTimeoutTime) { + if (TextUtils.isEmpty(ip) || port == 0) { + WKLoggerUtils.getInstance().e(TAG, "获取连接地址超时"); + isReConnecting = false; + reconnection("获取地址超时"); + } + } else { + if (TextUtils.isEmpty(ip) || port == 0) { + WKLoggerUtils.getInstance().e(TAG, "请求连接地址--->" + (nowTime - requestIPTime)); + // 继续检查 + checkRequestAddressHandler.postDelayed(this, 1000); + } + } + } + }; + + private final Handler checkConnAckHandler = new Handler(Looper.getMainLooper()); + private final Runnable checkConnAckRunnable = new Runnable() { + @Override + public void run() { + long nowTime = DateUtils.getInstance().getCurrentSeconds(); + if (nowTime - connAckTime > connAckTimeoutTime && connectStatus != WKConnectStatus.success && connectStatus != WKConnectStatus.syncMsg) { + WKLoggerUtils.getInstance().e(TAG, "连接确认超时"); + isReConnecting = false; + closeConnect("检查连接ack超时"); + reconnection("检查连接超时"); + } else { + if (connectStatus == WKConnectStatus.success || connectStatus == WKConnectStatus.syncMsg) { + WKLoggerUtils.getInstance().e(TAG, "连接确认成功"); + } else { + WKLoggerUtils.getInstance().e(TAG, "等待连接确认--->" + (nowTime - connAckTime)); + // 继续检查 + checkConnAckHandler.postDelayed(this, 1000); + } + } + } + }; + + // 添加一个专门用于同步connection访问的锁对象 + private final Object connectionLock = new Object(); + + private void startAll() { + heartbeatManager = new HeartbeatManager(); + networkChecker = new NetworkChecker(); + heartbeatManager.startHeartbeat(); + networkChecker.startNetworkCheck(); + } public synchronized void forcedReconnection() { connCount++; @@ -103,12 +164,19 @@ public class WKConnection { reconnectionHandler.postDelayed(reconnectionRunnable, connIntervalMillisecond * connCount); } - public synchronized void reconnection() { + public synchronized void reconnection(String from) { + Log.e("重连来源",from); + if (!WKIMApplication.getInstance().isCanConnect) { + WKLoggerUtils.getInstance().e(TAG, "断开"); + stopAll(); + return; + } ip = ""; port = 0; if (isReConnecting) { long nowTime = DateUtils.getInstance().getCurrentSeconds(); if (nowTime - requestIPTime > requestIPTimeoutTime) { + WKLoggerUtils.getInstance().e("重置了正在连接"); isReConnecting = false; } return; @@ -117,12 +185,17 @@ public class WKConnection { reconnectionHandler.removeCallbacks(reconnectionRunnable); boolean isHaveNetwork = WKIMApplication.getInstance().isNetworkConnected(); if (isHaveNetwork) { - closeConnect(); + closeConnect("重连"); isReConnecting = true; requestIPTime = DateUtils.getInstance().getCurrentSeconds(); getConnAddress(); } else { - if (!WKTimers.getInstance().checkNetWorkTimerIsRunning) { +// if (!WKTimers.getInstance().checkNetWorkTimerIsRunning) { +// WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.noNetwork, WKConnectReason.NoNetwork); +// forcedReconnection(); +// } + + if (networkChecker != null && networkChecker.checkNetWorkTimerIsRunning) { WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.noNetwork, WKConnectReason.NoNetwork); forcedReconnection(); } @@ -130,19 +203,15 @@ public class WKConnection { } private synchronized void getConnAddress() { - if (!WKIMApplication.getInstance().isCanConnect) { - WKLoggerUtils.getInstance().e(TAG, "SDK determines that reconnection is not possible"); - stopAll(); - return; - } + WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.connecting, WKConnectReason.Connecting); // 计算获取IP时长 todo startGetConnAddressTimer(); lastRequestId = UUID.randomUUID().toString().replace("-", ""); ConnectionManager.getInstance().getIpAndPort(lastRequestId, (requestId, ip, port) -> { - WKLoggerUtils.getInstance().e(TAG, "connection address " + ip + ":" + port); + WKLoggerUtils.getInstance().e(TAG, "连接地址 " + ip + ":" + port); if (TextUtils.isEmpty(ip) || port == 0) { - WKLoggerUtils.getInstance().e(TAG, "Return connection IP or port error," + String.format("ip:%s & port:%s", ip, port)); + WKLoggerUtils.getInstance().e(TAG, "连接地址错误" + String.format("ip:%s & port:%s", ip, port)); isReConnecting = false; forcedReconnection(); return; @@ -156,46 +225,54 @@ public class WKConnection { return; } if (connectionIsNull()) { - WKLoggerUtils.getInstance().e(TAG, "The IP number requested is inconsistent, reconnecting"); forcedReconnection(); } }); } private synchronized void connSocket() { - closeConnect(); - socketSingleID = UUID.randomUUID().toString().replace("-", ""); - connectionClient = new ConnectionClient(iNonBlockingConnection -> { - connCount = 0; - if (iNonBlockingConnection == null || connection == null || !connection.getId().equals(iNonBlockingConnection.getId())) { - WKLoggerUtils.getInstance().e(TAG, "重复连接"); - forcedReconnection(); - return; - } - Object att = iNonBlockingConnection.getAttachment(); - if (att == null || !att.equals(socketSingleID)) { - WKLoggerUtils.getInstance().e(TAG, "不属于当前连接"); - forcedReconnection(); - return; - } - connection.setIdleTimeoutMillis(1000 * 3); - connection.setConnectionTimeoutMillis(1000 * 3); - connection.setFlushmode(IConnection.FlushMode.ASYNC); - isReConnecting = false; - if (connection != null) - connection.setAutoflush(true); - WKConnection.getInstance().sendConnectMsg(); - }); - dispatchQueuePool.execute(() -> { - try { - connection = new NonBlockingConnection(ip, port, connectionClient); - connection.setAttachment(socketSingleID); - } catch (IOException e) { - isReConnecting = false; - WKLoggerUtils.getInstance().e(TAG, "connection exception:" + e.getMessage()); - forcedReconnection(); - } - }); + synchronized (connectionLock) { // 使用专门的锁 + closeConnect("连接socket"); + socketSingleID = UUID.randomUUID().toString().replace("-", ""); + connectionClient = new ConnectionClient(iNonBlockingConnection -> { + synchronized (connectionLock) { // 回调中也需要使用相同的锁 + connCount = 0; + if (iNonBlockingConnection == null || connection == null || + !connection.getId().equals(iNonBlockingConnection.getId())) { + WKLoggerUtils.getInstance().e(TAG, "重复连接"); + forcedReconnection(); + return; + } + Object att = iNonBlockingConnection.getAttachment(); + if (att == null || !att.equals(socketSingleID)) { + WKLoggerUtils.getInstance().e(TAG, "不属于当前连接"); + forcedReconnection(); + return; + } + connection.setIdleTimeoutMillis(1000 * 3); + connection.setConnectionTimeoutMillis(1000 * 3); + connection.setFlushmode(IConnection.FlushMode.ASYNC); + isReConnecting = false; + if (connection != null) + connection.setAutoflush(true); + WKConnection.getInstance().sendConnectMsg(); + } + }); + + dispatchQueuePool.execute(() -> { + synchronized (connectionLock) { // 在设置connection时也使用锁 + try { + connection = new NonBlockingConnection(ip, port, connectionClient); + WKLoggerUtils.getInstance().e("当前连接ID",connection.getId()); + connection.setAttachment(socketSingleID); + } catch (IOException e) { + isReConnecting = false; + WKLoggerUtils.getInstance().e(TAG, "connection exception:" + e.getMessage()); + forcedReconnection(); + } + } + }); + } } //发送连接消息 @@ -222,7 +299,7 @@ public class WKConnection { @Override public void reconnect() { WKIMApplication.getInstance().isCanConnect = true; - reconnection(); + reconnection("消息处理"); } @Override @@ -271,7 +348,7 @@ public class WKConnection { //处理登录消息状态 private void handleLoginStatus(short status) { - WKLoggerUtils.getInstance().e(TAG, "connection status:" + status); + WKLoggerUtils.getInstance().e(TAG, "连接状态:" + status); String reason = WKConnectReason.ConnectSuccess; if (status == WKConnectStatus.kicked) { reason = WKConnectReason.ReasonAuthFail; @@ -280,44 +357,55 @@ public class WKConnection { WKIM.getInstance().getConnectionManager().setConnectionStatus(status, reason); if (status == WKConnectStatus.success) { //等待中 - connectStatus = WKConnectStatus.success; - WKTimers.getInstance().startAll(); - resendMsg(); + connectStatus = WKConnectStatus.syncMsg; + // WKTimers.getInstance().startAll(); WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.syncMsg, WKConnectReason.SyncMsg); // 判断同步模式 if (WKIMApplication.getInstance().getSyncMsgMode() == WKSyncMsgMode.WRITE) { WKIM.getInstance().getMsgManager().setSyncOfflineMsg((isEnd, list) -> { if (isEnd) { + connectStatus = WKConnectStatus.success; MessageHandler.getInstance().saveReceiveMsg(); WKIMApplication.getInstance().isCanConnect = true; + MessageHandler.getInstance().sendAck(); + startAll(); + resendMsg(); WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.success, WKConnectReason.ConnectSuccess); } }); } else { WKIM.getInstance().getConversationManager().setSyncConversationListener(syncChat -> { + connectStatus = WKConnectStatus.success; WKIMApplication.getInstance().isCanConnect = true; + MessageHandler.getInstance().sendAck(); + startAll(); + resendMsg(); WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.success, WKConnectReason.ConnectSuccess); }); } } else if (status == WKConnectStatus.kicked) { - WKLoggerUtils.getInstance().e(TAG, "Received kicked message"); + WKLoggerUtils.getInstance().e(TAG, "收到被踢消息"); MessageHandler.getInstance().updateLastSendingMsgFail(); WKIMApplication.getInstance().isCanConnect = false; stopAll(); } else { - WKLoggerUtils.getInstance().e(TAG, "parsing login returns error type:" + status); + reconnection("解码未知类型:" + status); + WKLoggerUtils.getInstance().e(TAG, "登录状态:" + status); stopAll(); - reconnection(); + } } - void sendMessage(WKBaseMsg mBaseMsg) { + public void sendMessage(WKBaseMsg mBaseMsg) { if (mBaseMsg == null) { return; } if (mBaseMsg.packetType != WKMsgType.CONNECT) { + if (connectStatus == WKConnectStatus.syncMsg) { + return; + } if (connectStatus != WKConnectStatus.success) { - reconnection(); + reconnection("发消息发现连接不成功" + mBaseMsg.packetType); return; } } @@ -325,13 +413,13 @@ public class WKConnection { unReceivePongCount++; } if (connection == null || !connection.isOpen()) { - reconnection(); + reconnection("发消息发现连接为空"); return; } int status = MessageHandler.getInstance().sendMessage(connection, mBaseMsg); if (status == 0) { - WKLoggerUtils.getInstance().e(TAG, "send message failed"); - reconnection(); + WKLoggerUtils.getInstance().e(TAG, "发消息失败"); + reconnection("发消息失败"); } } @@ -360,7 +448,7 @@ public class WKConnection { } //检测正在发送的消息 - synchronized void checkSendingMsg() { + public synchronized void checkSendingMsg() { removeSendingMsg(); if (!sendingMsgHashMap.isEmpty()) { Iterator> it = sendingMsgHashMap.entrySet().iterator(); @@ -373,7 +461,6 @@ public class WKConnection { MsgDbManager.getInstance().updateMsgStatus(item.getKey(), WKSendMsgResult.send_fail); it.remove(); wkSendingMsg.isCanResend = false; - WKLoggerUtils.getInstance().e(TAG, "checkSendingMsg send message failed"); } else { long nowTime = DateUtils.getInstance().getCurrentSeconds(); if (nowTime - wkSendingMsg.sendTime > 10) { @@ -381,7 +468,6 @@ public class WKConnection { sendingMsgHashMap.put(item.getKey(), wkSendingMsg); wkSendingMsg.sendCount++; sendMessage(Objects.requireNonNull(sendingMsgHashMap.get(item.getKey())).wkSendMsg); - WKLoggerUtils.getInstance().e(TAG, "checkSendingMsg send message failed"); } } } @@ -511,94 +597,19 @@ public class WKConnection { return connection == null || !connection.isOpen(); } - public void stopAll() { - connectionClient = null; - WKTimers.getInstance().stopAll(); - closeConnect(); - connectStatus = WKConnectStatus.fail; - isReConnecting = false; - System.gc(); - } - - private synchronized void closeConnect() { - if (connection != null && connection.isOpen()) { - try { - WKLoggerUtils.getInstance().e("stop connection:" + connection.getId()); -// connection.flush(); - connection.setAttachment("close" + connection.getId()); - connection.close(); - } catch (IOException e) { - WKLoggerUtils.getInstance().e("stop connection IOException" + e.getMessage()); - } finally { - connection = null; - connectionClient = null; - } - } - } - - private Timer checkNetWorkTimer; - private Timer checkConnectionAckTimer; - private synchronized void startConnAckTimer() { - if (checkConnectionAckTimer != null) { - checkConnectionAckTimer.cancel(); - checkConnectionAckTimer = null; - } - checkConnectionAckTimer = new Timer(); + // 移除之前的回调 + checkConnAckHandler.removeCallbacks(checkConnAckRunnable); connAckTime = DateUtils.getInstance().getCurrentSeconds(); - checkConnectionAckTimer.schedule(new TimerTask() { - @Override - public void run() { - long nowTime = DateUtils.getInstance().getCurrentSeconds(); - if (nowTime - connAckTime > connAckTimeoutTime && connectStatus != WKConnectStatus.success) { - checkConnectionAckTimer.cancel(); - checkConnectionAckTimer.purge(); - checkConnectionAckTimer = null; - isReConnecting = false; - closeConnect(); - reconnection(); - } else { - if (connectStatus == WKConnectStatus.success) { - checkConnectionAckTimer.cancel(); - checkConnectionAckTimer.purge(); - checkConnectionAckTimer = null; - } - } - } - }, 100, 1000); + // 开始新的检查 + checkConnAckHandler.postDelayed(checkConnAckRunnable, 1000); } private synchronized void startGetConnAddressTimer() { - if (checkNetWorkTimer != null) { - checkNetWorkTimer.cancel(); - checkNetWorkTimer = null; - } - checkNetWorkTimer = new Timer(); - checkNetWorkTimer.schedule(new TimerTask() { - @Override - public void run() { - long nowTime = DateUtils.getInstance().getCurrentSeconds(); - if (nowTime - requestIPTime >= requestIPTimeoutTime) { - checkNetWorkTimer.cancel(); - checkNetWorkTimer.purge(); - checkNetWorkTimer = null; - if (TextUtils.isEmpty(ip) || port == 0) { - WKLoggerUtils.getInstance().e(TAG, "Request for IP has timed out"); - isReConnecting = false; - reconnection(); - } - } else { - if (!TextUtils.isEmpty(ip) && port != 0) { - checkNetWorkTimer.cancel(); - checkNetWorkTimer.purge(); - checkNetWorkTimer = null; - WKLoggerUtils.getInstance().e(TAG, "Request IP countdown has been destroyed"); - } else { - WKLoggerUtils.getInstance().e(TAG, "Requesting IP countdown--->" + (nowTime - requestIPTime)); - } - } - } - }, 500, 1000L); + // 移除之前的回调 + checkRequestAddressHandler.removeCallbacks(checkRequestAddressRunnable); + // 开始新的检查 + checkRequestAddressHandler.postDelayed(checkRequestAddressRunnable, 1000); } private void saveSendMsg(WKMsg msg) { @@ -623,4 +634,70 @@ public class WKConnection { } } } + + public void destroy() { + if (checkRequestAddressHandler != null) { + checkRequestAddressHandler.removeCallbacks(checkRequestAddressRunnable); + } + if (checkConnAckHandler != null) { + checkConnAckHandler.removeCallbacks(checkConnAckRunnable); + } + } + + public void stopAll() { + // 先设置连接状态为失败 + WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.fail, ""); + // 清理连接相关资源 + closeConnect("stopall"); + // 关闭定时器管理器 + TimerManager.getInstance().shutdown(); + MessageHandler.getInstance().clearCacheData(); + // 移除所有Handler回调 + if (checkRequestAddressHandler != null) { + checkRequestAddressHandler.removeCallbacks(checkRequestAddressRunnable); + } + if (checkConnAckHandler != null) { + checkConnAckHandler.removeCallbacks(checkConnAckRunnable); + } + if (reconnectionHandler != null) { + reconnectionHandler.removeCallbacks(reconnectionRunnable); + } + + // 重置所有状态 + connectStatus = WKConnectStatus.fail; + isReConnecting = false; + ip = ""; + port = 0; + unReceivePongCount = 0; + requestIPTime = 0; + connAckTime = 0; + lastMsgTime = 0; + connCount = 0; + // 清空发送消息队列 + if (sendingMsgHashMap != null) { + sendingMsgHashMap.clear(); + } + // 清理连接客户端 + connectionClient = null; + System.gc(); + } + + private synchronized void closeConnect(String from) { + synchronized (connectionLock) { + WKLoggerUtils.getInstance().e("关闭连接来源",from); + if (connection != null) { + try { + if (connection.isOpen()) { + connection.setAttachment("close" + connection.getId()); + connection.close(); + } + } catch (IOException e) { + WKLoggerUtils.getInstance().e("关闭连接异常:" + e.getMessage()); + } finally { + connection = null; + } + } + connectionClient = null; + } + } } \ No newline at end of file diff --git a/wkim/src/main/java/com/xinbida/wukongim/message/WKProto.java b/wkim/src/main/java/com/xinbida/wukongim/message/WKProto.java index d71615f..e7dfaeb 100644 --- a/wkim/src/main/java/com/xinbida/wukongim/message/WKProto.java +++ b/wkim/src/main/java/com/xinbida/wukongim/message/WKProto.java @@ -89,7 +89,7 @@ class WKProto { wkWrite.writeLong(connectMsg.clientTimestamp); wkWrite.writeString(CryptoUtils.getInstance().getPublicKey()); } catch (UnsupportedEncodingException e) { - WKLoggerUtils.getInstance().e(TAG, "enConnectMsg error"); + WKLoggerUtils.getInstance().e(TAG, "编码连接包错误"); } return wkWrite.getWriteBytes(); } @@ -138,7 +138,7 @@ class WKProto { wkWrite.writePayload(sendContent); } catch (UnsupportedEncodingException e) { - WKLoggerUtils.getInstance().e(TAG, "enSendMsg error"); + WKLoggerUtils.getInstance().e(TAG, "编码发送包错误"); } return wkWrite.getWriteBytes(); } @@ -163,7 +163,7 @@ class WKProto { connectAckMsg.timeDiff = time; connectAckMsg.reasonCode = reasonCode; } catch (IOException e) { - WKLoggerUtils.getInstance().e(TAG, "Decoding connection ack error"); + WKLoggerUtils.getInstance().e(TAG, "解码连接ack包错误"); } return connectAckMsg; @@ -177,7 +177,7 @@ class WKProto { sendAckMsg.messageSeq = wkRead.readInt(); sendAckMsg.reasonCode = wkRead.readByte(); } catch (IOException e) { - WKLoggerUtils.getInstance().e(TAG, "deSendAckMsg Decoding and sending message ack error"); + WKLoggerUtils.getInstance().e(TAG, "解码发送ack错误"); } return sendAckMsg; } @@ -187,10 +187,10 @@ class WKProto { try { disconnectMsg.reasonCode = wkRead.readByte(); disconnectMsg.reason = wkRead.readString(); - WKLoggerUtils.getInstance().e(TAG, "received kicked reasonCode:" + disconnectMsg.reasonCode + ",reason:" + disconnectMsg.reason); + WKLoggerUtils.getInstance().e(TAG, "断开消息code:" + disconnectMsg.reasonCode + ",reason:" + disconnectMsg.reason); return disconnectMsg; } catch (IOException e) { - WKLoggerUtils.getInstance().e(TAG, "Decoding disconnection error"); + WKLoggerUtils.getInstance().e(TAG, "解码断开包错误"); } return disconnectMsg; } @@ -235,16 +235,16 @@ class WKProto { String base64Result = CryptoUtils.getInstance().base64Encode(result); String localMsgKey = CryptoUtils.getInstance().digestMD5(base64Result); if (!localMsgKey.equals(receivedMsg.msgKey)) { - WKLoggerUtils.getInstance().e("Illegal messages,localMsgKey:" + localMsgKey + ",msgKey:" + msgKey); + WKLoggerUtils.getInstance().e("非法消息,本地消息key:" + localMsgKey + ",期望key:" + msgKey); return null; } receivedMsg.payload = CryptoUtils.getInstance().aesDecrypt(CryptoUtils.getInstance().base64Decode(content)); - WKLoggerUtils.getInstance().e("Receive message:"); WKLoggerUtils.getInstance().e(receivedMsg.toString()); + return receivedMsg; } catch (IOException e) { - WKLoggerUtils.getInstance().e(TAG, "deReceivedMsg Decoding received message error"); + WKLoggerUtils.getInstance().e(TAG, "解码收到的消息错误"); + return null; } - return receivedMsg; } WKBaseMsg decodeMessage(byte[] bytes) { @@ -264,11 +264,11 @@ class WKProto { } else if (packetType == WKMsgType.PONG) { return new WKPongMsg(); } else { - WKLoggerUtils.getInstance().e("Failed to parse protocol type:" + packetType); + WKLoggerUtils.getInstance().e("解码未知消息包类型:" + packetType); return null; } } catch (IOException e) { - WKLoggerUtils.getInstance().e("Parsing data exception:" + e.getMessage()); + WKLoggerUtils.getInstance().e("解码消息错误:" + e.getMessage()); return null; } } @@ -337,7 +337,7 @@ class WKProto { // jsonObject.put("flame", msg.baseContentMsgModel.flame); // } } catch (JSONException e) { - WKLoggerUtils.getInstance().e(TAG, "getSendPayload error"); + WKLoggerUtils.getInstance().e(TAG, "获取消息体错误"); } return jsonObject; } @@ -407,7 +407,7 @@ class WKProto { JSONObject jsonObject = new JSONObject(contentJson); isDelete = WKIM.getInstance().getMsgManager().isDeletedMsg(jsonObject); } catch (JSONException e) { - WKLoggerUtils.getInstance().e(TAG, "isDelete error"); + WKLoggerUtils.getInstance().e(TAG, "获取消息是否删除时发现消息体非json"); } } return isDelete; diff --git a/wkim/src/main/java/com/xinbida/wukongim/message/WKTimers.java b/wkim/src/main/java/com/xinbida/wukongim/message/WKTimers.java index 3f0ebda..1638bfa 100644 --- a/wkim/src/main/java/com/xinbida/wukongim/message/WKTimers.java +++ b/wkim/src/main/java/com/xinbida/wukongim/message/WKTimers.java @@ -1,139 +1,139 @@ -package com.xinbida.wukongim.message; - -import com.xinbida.wukongim.WKIM; -import com.xinbida.wukongim.WKIMApplication; -import com.xinbida.wukongim.message.type.WKConnectReason; -import com.xinbida.wukongim.message.type.WKConnectStatus; -import com.xinbida.wukongim.protocol.WKPingMsg; -import com.xinbida.wukongim.utils.WKLoggerUtils; - -import java.util.Timer; -import java.util.TimerTask; - -/** - * 5/21/21 11:19 AM - */ -class WKTimers { - private WKTimers() { - } - - private static class ConnectionTimerHandlerBinder { - static final WKTimers timeHandle = new WKTimers(); - } - - public static WKTimers getInstance() { - return ConnectionTimerHandlerBinder.timeHandle; - } - - - // 发送心跳定时器 - private Timer heartBeatTimer; - // 检查心跳定时器 - private Timer checkHeartTimer; - // 检查网络状态定时器 - private Timer checkNetWorkTimer; - boolean checkNetWorkTimerIsRunning = false; - - //关闭所有定时器 - void stopAll() { - stopHeartBeatTimer(); - stopCheckHeartTimer(); - stopCheckNetWorkTimer(); - } - - //开启所有定时器 - void startAll() { - startHeartBeatTimer(); - startCheckHeartTimer(); - startCheckNetWorkTimer(); - } - - //检测网络 - private void stopCheckNetWorkTimer() { - if (checkNetWorkTimer != null) { - checkNetWorkTimer.cancel(); - checkNetWorkTimer.purge(); - checkNetWorkTimer = null; - checkNetWorkTimerIsRunning = false; - } - } - - //检测心跳 - private void stopCheckHeartTimer() { - if (checkHeartTimer != null) { - checkHeartTimer.cancel(); - checkHeartTimer.purge(); - checkHeartTimer = null; - } - } - - //停止心跳Timer - private void stopHeartBeatTimer() { - if (heartBeatTimer != null) { - heartBeatTimer.cancel(); - heartBeatTimer.purge(); - heartBeatTimer = null; - } - } - - //开始心跳 - private void startHeartBeatTimer() { - stopHeartBeatTimer(); - heartBeatTimer = new Timer(); - // 心跳时间 - int heart_time = 60 * 2; - heartBeatTimer.schedule(new TimerTask() { - @Override - public void run() { - //发送心跳 - WKConnection.getInstance().sendMessage(new WKPingMsg()); - } - }, 0, heart_time * 1000); - } - - //开始检查心跳Timer - private void startCheckHeartTimer() { - stopCheckHeartTimer(); - checkHeartTimer = new Timer(); - checkHeartTimer.schedule(new TimerTask() { - - @Override - public void run() { - if (WKConnection.getInstance().connection == null || heartBeatTimer == null) { - WKConnection.getInstance().reconnection(); - } - WKConnection.getInstance().checkHeartIsTimeOut(); - } - }, 1000 * 7, 1000 * 7); - } - - boolean isForcedReconnect; - - //开启检测网络定时器 - void startCheckNetWorkTimer() { - stopCheckNetWorkTimer(); - checkNetWorkTimer = new Timer(); - checkNetWorkTimer.schedule(new TimerTask() { - @Override - public void run() { - boolean is_have_network = WKIMApplication.getInstance().isNetworkConnected(); - if (!is_have_network) { - isForcedReconnect = true; - WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.noNetwork, WKConnectReason.NoNetwork); - WKLoggerUtils.getInstance().e("No network connection..."); - WKConnection.getInstance().checkSendingMsg(); - } else { - //有网络 - if (WKConnection.getInstance().connectionIsNull() || isForcedReconnect ) { - WKConnection.getInstance().reconnection(); - isForcedReconnect = false; - } - } - if (WKConnection.getInstance().connection == null || !WKConnection.getInstance().connection.isOpen()) { - WKConnection.getInstance().reconnection(); - } - checkNetWorkTimerIsRunning = true; - } - }, 0, 1000); - } -} +//package com.xinbida.wukongim.message; +// +//import com.xinbida.wukongim.WKIM; +//import com.xinbida.wukongim.WKIMApplication; +//import com.xinbida.wukongim.message.type.WKConnectReason; +//import com.xinbida.wukongim.message.type.WKConnectStatus; +//import com.xinbida.wukongim.protocol.WKPingMsg; +//import com.xinbida.wukongim.utils.WKLoggerUtils; +// +//import java.util.Timer; +//import java.util.TimerTask; +// +///** +// * 5/21/21 11:19 AM +// */ +//class WKTimers { +// private WKTimers() { +// } +// +// private static class ConnectionTimerHandlerBinder { +// static final WKTimers timeHandle = new WKTimers(); +// } +// +// public static WKTimers getInstance() { +// return ConnectionTimerHandlerBinder.timeHandle; +// } +// +// +// // 发送心跳定时器 +// private Timer heartBeatTimer; +// // 检查心跳定时器 +// private Timer checkHeartTimer; +// // 检查网络状态定时器 +// private Timer checkNetWorkTimer; +// boolean checkNetWorkTimerIsRunning = false; +// +// //关闭所有定时器 +// void stopAll() { +// stopHeartBeatTimer(); +// stopCheckHeartTimer(); +// stopCheckNetWorkTimer(); +// } +// +// //开启所有定时器 +// void startAll() { +// startHeartBeatTimer(); +// startCheckHeartTimer(); +// startCheckNetWorkTimer(); +// } +// +// //检测网络 +// private void stopCheckNetWorkTimer() { +// if (checkNetWorkTimer != null) { +// checkNetWorkTimer.cancel(); +// checkNetWorkTimer.purge(); +// checkNetWorkTimer = null; +// checkNetWorkTimerIsRunning = false; +// } +// } +// +// //检测心跳 +// private void stopCheckHeartTimer() { +// if (checkHeartTimer != null) { +// checkHeartTimer.cancel(); +// checkHeartTimer.purge(); +// checkHeartTimer = null; +// } +// } +// +// //停止心跳Timer +// private void stopHeartBeatTimer() { +// if (heartBeatTimer != null) { +// heartBeatTimer.cancel(); +// heartBeatTimer.purge(); +// heartBeatTimer = null; +// } +// } +// +// //开始心跳 +// private void startHeartBeatTimer() { +// stopHeartBeatTimer(); +// heartBeatTimer = new Timer(); +// // 心跳时间 +// int heart_time = 60 * 2; +// heartBeatTimer.schedule(new TimerTask() { +// @Override +// public void run() { +// //发送心跳 +// WKConnection.getInstance().sendMessage(new WKPingMsg()); +// } +// }, 0, heart_time * 1000); +// } +// +// //开始检查心跳Timer +// private void startCheckHeartTimer() { +// stopCheckHeartTimer(); +// checkHeartTimer = new Timer(); +// checkHeartTimer.schedule(new TimerTask() { +// +// @Override +// public void run() { +// if (WKConnection.getInstance().connection == null || heartBeatTimer == null) { +// WKConnection.getInstance().reconnection(); +// } +// WKConnection.getInstance().checkHeartIsTimeOut(); +// } +// }, 1000 * 7, 1000 * 7); +// } +// +// boolean isForcedReconnect; +// +// //开启检测网络定时器 +// void startCheckNetWorkTimer() { +// stopCheckNetWorkTimer(); +// checkNetWorkTimer = new Timer(); +// checkNetWorkTimer.schedule(new TimerTask() { +// @Override +// public void run() { +// boolean is_have_network = WKIMApplication.getInstance().isNetworkConnected(); +// if (!is_have_network) { +// isForcedReconnect = true; +// WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.noNetwork, WKConnectReason.NoNetwork); +// WKLoggerUtils.getInstance().e("No network connection..."); +// WKConnection.getInstance().checkSendingMsg(); +// } else { +// //有网络 +// if (WKConnection.getInstance().connectionIsNull() || isForcedReconnect ) { +// WKConnection.getInstance().reconnection(); +// isForcedReconnect = false; +// } +// } +// if (WKConnection.getInstance().connection == null || !WKConnection.getInstance().connection.isOpen()) { +// WKConnection.getInstance().reconnection(); +// } +// checkNetWorkTimerIsRunning = true; +// } +// }, 0, 1000); +// } +//} diff --git a/wkim/src/main/java/com/xinbida/wukongim/message/timer/HeartbeatManager.java b/wkim/src/main/java/com/xinbida/wukongim/message/timer/HeartbeatManager.java new file mode 100644 index 0000000..dce2ed0 --- /dev/null +++ b/wkim/src/main/java/com/xinbida/wukongim/message/timer/HeartbeatManager.java @@ -0,0 +1,25 @@ +package com.xinbida.wukongim.message.timer; + +import com.xinbida.wukongim.message.WKConnection; +import com.xinbida.wukongim.protocol.WKPingMsg; + +import java.util.concurrent.locks.ReentrantLock; + +public class HeartbeatManager { + private final ReentrantLock heartbeatLock = new ReentrantLock(); + public void startHeartbeat() { + TimerManager.getInstance().addTask( + TimerTasks.HEARTBEAT, + () -> { + heartbeatLock.lock(); + try { + WKConnection.getInstance().sendMessage(new WKPingMsg()); + } finally { + heartbeatLock.unlock(); + } + }, + 0, + 1000 * 60 + ); + } +} diff --git a/wkim/src/main/java/com/xinbida/wukongim/message/timer/NetworkChecker.java b/wkim/src/main/java/com/xinbida/wukongim/message/timer/NetworkChecker.java new file mode 100644 index 0000000..e7a4c05 --- /dev/null +++ b/wkim/src/main/java/com/xinbida/wukongim/message/timer/NetworkChecker.java @@ -0,0 +1,49 @@ +package com.xinbida.wukongim.message.timer; + +import android.util.Log; + +import com.xinbida.wukongim.WKIM; +import com.xinbida.wukongim.WKIMApplication; +import com.xinbida.wukongim.message.WKConnection; +import com.xinbida.wukongim.message.type.WKConnectReason; +import com.xinbida.wukongim.message.type.WKConnectStatus; +import com.xinbida.wukongim.utils.WKLoggerUtils; + +public class NetworkChecker { + private final Object lock = new Object(); // 添加锁对象 + public boolean isForcedReconnect; + public boolean checkNetWorkTimerIsRunning = false; + + public void startNetworkCheck() { + TimerManager.getInstance().addTask( + TimerTasks.NETWORK_CHECK, + () -> { + synchronized (lock) { + checkNetworkStatus(); + } + }, + 0, + 1000 + ); + } + + private void checkNetworkStatus() { + boolean is_have_network = WKIMApplication.getInstance().isNetworkConnected(); + if (!is_have_network) { + isForcedReconnect = true; + WKIM.getInstance().getConnectionManager().setConnectionStatus(WKConnectStatus.noNetwork, WKConnectReason.NoNetwork); + WKLoggerUtils.getInstance().e("无网络连接..."); + WKConnection.getInstance().checkSendingMsg(); + } else { + //有网络 + if (WKConnection.getInstance().connectionIsNull() || isForcedReconnect) { + WKConnection.getInstance().reconnection("网络1"); + isForcedReconnect = false; + } + } +// if (WKConnection.getInstance().connection == null || !WKConnection.getInstance().connection.isOpen()) { +// WKConnection.getInstance().reconnection("网络2"); +// } + checkNetWorkTimerIsRunning = true; + } +} diff --git a/wkim/src/main/java/com/xinbida/wukongim/message/timer/TimerManager.java b/wkim/src/main/java/com/xinbida/wukongim/message/timer/TimerManager.java new file mode 100644 index 0000000..9dc724c --- /dev/null +++ b/wkim/src/main/java/com/xinbida/wukongim/message/timer/TimerManager.java @@ -0,0 +1,186 @@ +package com.xinbida.wukongim.message.timer; + +import android.os.Handler; +import android.os.Looper; +import android.util.Log; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class TimerManager { + private static volatile TimerManager instance; + private final Handler handler; + private final Map taskMap; + private volatile ExecutorService executorService; // 改为 volatile + private final Object executorLock = new Object(); // 添加锁对象 + + private TimerManager() { + handler = new Handler(Looper.getMainLooper()); + taskMap = new ConcurrentHashMap<>(); + initExecutorService(); + } + public static TimerManager getInstance() { + if (instance == null) { + synchronized (TimerManager.class) { + if (instance == null) { + instance = new TimerManager(); + } + } + } + return instance; + } + // 初始化线程池 + private void initExecutorService() { + synchronized (executorLock) { + if (executorService == null || executorService.isShutdown()) { + executorService = new ThreadPoolExecutor( + 3, // 核心线程数 + 5, // 最大线程数 + 60L, // 空闲线程存活时间 + TimeUnit.SECONDS, // 时间单位 + new LinkedBlockingQueue<>(), // 任务队列 + new ThreadFactory() { + private final AtomicInteger count = new AtomicInteger(1); + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setName("TimerTask-" + count.getAndIncrement()); + return thread; + } + }, + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + // 如果线程池已关闭,尝试重新初始化 + if (executor.isShutdown()) { + initExecutorService(); + // 重新提交任务 + try { + executorService.execute(r); + } catch (Exception e) { + Log.e("TimerManager", "Task execution failed after retry", e); + } + } else { + // 如果是其他原因导致的拒绝,记录日志 + Log.e("TimerManager", "Task rejected: " + r.toString()); + } + } + } + ); + } + } + } + + /** + * 添加定时任务 + */ + public void addTask(String taskId, final Runnable task, long delayMillis, final long periodMillis) { + removeTask(taskId); + + Runnable wrappedTask = new Runnable() { + @Override + public void run() { + // 检查线程池状态 + synchronized (executorLock) { + if (executorService == null || executorService.isShutdown()) { + initExecutorService(); + } + + try { + executorService.execute(new Runnable() { + @Override + public void run() { + try { + task.run(); + } catch (Exception e) { + Log.e("TimerManager", "Task execution failed", e); + } + } + }); + } catch (RejectedExecutionException e) { + Log.e("TimerManager", "Task rejected", e); + // 如果任务被拒绝,可以选择重试或其他处理方式 + initExecutorService(); + try { + executorService.execute(task); + } catch (Exception ex) { + Log.e("TimerManager", "Task retry failed", ex); + } + } + } + + // 继续安排下一次执行 + if (!Thread.currentThread().isInterrupted()) { + handler.postDelayed(this, periodMillis); + } + } + }; + + taskMap.put(taskId, wrappedTask); + handler.postDelayed(wrappedTask, delayMillis); + } + + /** + * 移除定时任务 + */ + public void removeTask(String taskId) { + Runnable task = taskMap.remove(taskId); + if (task != null) { + handler.removeCallbacks(task); + } + } + + /** + * 优雅关闭 + */ + public void shutdown() { + // 先移除所有定时任务 + for (Runnable task : taskMap.values()) { + handler.removeCallbacks(task); + } + taskMap.clear(); + + // 关闭线程池 + synchronized (executorLock) { + if (executorService != null && !executorService.isShutdown()) { + try { + // 尝试优雅关闭 + executorService.shutdown(); + // 等待任务完成 + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + // 如果等待超时,强制关闭 + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + // 如果等待被中断,强制关闭 + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + executorService = null; + } + } + + /** + * 重启定时器管理器 + */ + public void restart() { + shutdown(); + initExecutorService(); + } + + /** + * 检查任务是否正在运行 + */ + public boolean isTaskRunning(String taskId) { + return taskMap.containsKey(taskId); + } +} diff --git a/wkim/src/main/java/com/xinbida/wukongim/message/timer/TimerTasks.java b/wkim/src/main/java/com/xinbida/wukongim/message/timer/TimerTasks.java new file mode 100644 index 0000000..3859975 --- /dev/null +++ b/wkim/src/main/java/com/xinbida/wukongim/message/timer/TimerTasks.java @@ -0,0 +1,7 @@ +package com.xinbida.wukongim.message.timer; + +public class TimerTasks { + public static final String NETWORK_CHECK = "network_check"; +// public static final String MESSAGE_SENDER = "message_sender"; + public static final String HEARTBEAT = "heartbeat"; +}