修复并发问题:改进SDK引用计数机制,使用原子操作避免竞态条件
Co-authored-by: binarywang <1343140+binarywang@users.noreply.github.com>
This commit is contained in:
@@ -137,6 +137,49 @@ public class WxCpMsgAuditServiceImpl implements WxCpMsgAuditService {
|
||||
return sdk;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取SDK并增加引用计数(原子操作)
|
||||
* 如果SDK未初始化或已过期,会自动初始化
|
||||
*
|
||||
* @return sdk id
|
||||
* @throws WxErrorException 初始化失败时抛出异常
|
||||
*/
|
||||
private long acquireSdk() throws WxErrorException {
|
||||
WxCpConfigStorage configStorage = cpService.getWxCpConfigStorage();
|
||||
|
||||
// 尝试获取现有的有效SDK并增加引用计数(原子操作)
|
||||
long sdk = configStorage.acquireMsgAuditSdk();
|
||||
|
||||
if (sdk > 0) {
|
||||
// 成功获取到有效的SDK
|
||||
return sdk;
|
||||
}
|
||||
|
||||
// SDK未初始化或已过期,需要初始化
|
||||
// initSdk()方法已经是synchronized的,确保只有一个线程初始化
|
||||
sdk = this.initSdk();
|
||||
|
||||
// 初始化后增加引用计数
|
||||
int refCount = configStorage.incrementMsgAuditSdkRefCount(sdk);
|
||||
if (refCount < 0) {
|
||||
// SDK已经被替换,需要重新获取
|
||||
return acquireSdk();
|
||||
}
|
||||
|
||||
return sdk;
|
||||
}
|
||||
|
||||
/**
|
||||
* 释放SDK引用计数
|
||||
*
|
||||
* @param sdk sdk id
|
||||
*/
|
||||
private void releaseSdk(long sdk) {
|
||||
if (sdk > 0) {
|
||||
cpService.getWxCpConfigStorage().releaseMsgAuditSdk(sdk);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public WxCpChatModel getDecryptData(@NonNull long sdk, @NonNull WxCpChatDatas.WxCpChatData chatData,
|
||||
@NonNull Integer pkcs1) throws Exception {
|
||||
@@ -283,12 +326,8 @@ public class WxCpMsgAuditServiceImpl implements WxCpMsgAuditService {
|
||||
@Override
|
||||
public List<WxCpChatDatas.WxCpChatData> getChatRecords(long seq, @NonNull long limit, String proxy, String passwd,
|
||||
@NonNull long timeout) throws Exception {
|
||||
// 获取或初始化SDK
|
||||
long sdk = this.initSdk();
|
||||
WxCpConfigStorage configStorage = cpService.getWxCpConfigStorage();
|
||||
|
||||
// 增加引用计数
|
||||
configStorage.incrementMsgAuditSdkRefCount(sdk);
|
||||
// 获取SDK并自动增加引用计数(原子操作)
|
||||
long sdk = this.acquireSdk();
|
||||
|
||||
try {
|
||||
long slice = Finance.NewSlice();
|
||||
@@ -308,66 +347,51 @@ public class WxCpMsgAuditServiceImpl implements WxCpMsgAuditService {
|
||||
|
||||
return chatDatas.getChatData();
|
||||
} finally {
|
||||
// 减少引用计数
|
||||
configStorage.decrementMsgAuditSdkRefCount(sdk);
|
||||
// 释放SDK引用计数(原子操作)
|
||||
this.releaseSdk(sdk);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public WxCpChatModel getDecryptChatData(@NonNull WxCpChatDatas.WxCpChatData chatData,
|
||||
@NonNull Integer pkcs1) throws Exception {
|
||||
// 获取或初始化SDK
|
||||
long sdk = this.initSdk();
|
||||
WxCpConfigStorage configStorage = cpService.getWxCpConfigStorage();
|
||||
|
||||
// 增加引用计数
|
||||
configStorage.incrementMsgAuditSdkRefCount(sdk);
|
||||
// 获取SDK并自动增加引用计数(原子操作)
|
||||
long sdk = this.acquireSdk();
|
||||
|
||||
try {
|
||||
String plainText = this.decryptChatData(sdk, chatData, pkcs1);
|
||||
return WxCpChatModel.fromJson(plainText);
|
||||
} finally {
|
||||
// 减少引用计数
|
||||
configStorage.decrementMsgAuditSdkRefCount(sdk);
|
||||
// 释放SDK引用计数(原子操作)
|
||||
this.releaseSdk(sdk);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getChatRecordPlainText(@NonNull WxCpChatDatas.WxCpChatData chatData,
|
||||
@NonNull Integer pkcs1) throws Exception {
|
||||
// 获取或初始化SDK
|
||||
long sdk = this.initSdk();
|
||||
WxCpConfigStorage configStorage = cpService.getWxCpConfigStorage();
|
||||
|
||||
// 增加引用计数
|
||||
configStorage.incrementMsgAuditSdkRefCount(sdk);
|
||||
// 获取SDK并自动增加引用计数(原子操作)
|
||||
long sdk = this.acquireSdk();
|
||||
|
||||
try {
|
||||
return this.decryptChatData(sdk, chatData, pkcs1);
|
||||
} finally {
|
||||
// 减少引用计数
|
||||
configStorage.decrementMsgAuditSdkRefCount(sdk);
|
||||
// 释放SDK引用计数(原子操作)
|
||||
this.releaseSdk(sdk);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void downloadMediaFile(@NonNull String sdkfileid, String proxy, String passwd, @NonNull long timeout,
|
||||
@NonNull String targetFilePath) throws WxErrorException {
|
||||
// 获取或初始化SDK
|
||||
// 获取SDK并自动增加引用计数(原子操作)
|
||||
long sdk;
|
||||
try {
|
||||
sdk = this.initSdk();
|
||||
} catch (WxErrorException e) {
|
||||
throw e;
|
||||
sdk = this.acquireSdk();
|
||||
} catch (Exception e) {
|
||||
throw new WxErrorException(e);
|
||||
}
|
||||
|
||||
WxCpConfigStorage configStorage = cpService.getWxCpConfigStorage();
|
||||
|
||||
// 增加引用计数
|
||||
configStorage.incrementMsgAuditSdkRefCount(sdk);
|
||||
|
||||
try {
|
||||
File targetFile = new File(targetFilePath);
|
||||
if (!targetFile.getParentFile().exists()) {
|
||||
@@ -384,34 +408,27 @@ public class WxCpMsgAuditServiceImpl implements WxCpMsgAuditService {
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
// 减少引用计数
|
||||
configStorage.decrementMsgAuditSdkRefCount(sdk);
|
||||
// 释放SDK引用计数(原子操作)
|
||||
this.releaseSdk(sdk);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void downloadMediaFile(@NonNull String sdkfileid, String proxy, String passwd, @NonNull long timeout,
|
||||
@NonNull Consumer<byte[]> action) throws WxErrorException {
|
||||
// 获取或初始化SDK
|
||||
// 获取SDK并自动增加引用计数(原子操作)
|
||||
long sdk;
|
||||
try {
|
||||
sdk = this.initSdk();
|
||||
} catch (WxErrorException e) {
|
||||
throw e;
|
||||
sdk = this.acquireSdk();
|
||||
} catch (Exception e) {
|
||||
throw new WxErrorException(e);
|
||||
}
|
||||
|
||||
WxCpConfigStorage configStorage = cpService.getWxCpConfigStorage();
|
||||
|
||||
// 增加引用计数
|
||||
configStorage.incrementMsgAuditSdkRefCount(sdk);
|
||||
|
||||
try {
|
||||
this.getMediaFile(sdk, sdkfileid, proxy, passwd, timeout, action);
|
||||
} finally {
|
||||
// 减少引用计数
|
||||
configStorage.decrementMsgAuditSdkRefCount(sdk);
|
||||
// 释放SDK引用计数(原子操作)
|
||||
this.releaseSdk(sdk);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -298,7 +298,7 @@ public interface WxCpConfigStorage {
|
||||
* 用于支持多线程安全的SDK生命周期管理
|
||||
*
|
||||
* @param sdk sdk id
|
||||
* @return 增加后的引用计数
|
||||
* @return 增加后的引用计数,如果SDK不匹配返回-1
|
||||
*/
|
||||
int incrementMsgAuditSdkRefCount(long sdk);
|
||||
|
||||
@@ -307,7 +307,7 @@ public interface WxCpConfigStorage {
|
||||
* 当引用计数降为0时,自动销毁SDK以释放资源
|
||||
*
|
||||
* @param sdk sdk id
|
||||
* @return 减少后的引用计数,如果返回0表示SDK已被销毁
|
||||
* @return 减少后的引用计数,如果返回0表示SDK已被销毁,如果SDK不匹配返回-1
|
||||
*/
|
||||
int decrementMsgAuditSdkRefCount(long sdk);
|
||||
|
||||
@@ -315,7 +315,24 @@ public interface WxCpConfigStorage {
|
||||
* 获取会话存档SDK的引用计数
|
||||
*
|
||||
* @param sdk sdk id
|
||||
* @return 当前引用计数
|
||||
* @return 当前引用计数,如果SDK不匹配返回-1
|
||||
*/
|
||||
int getMsgAuditSdkRefCount(long sdk);
|
||||
|
||||
/**
|
||||
* 获取当前SDK并增加引用计数(原子操作)
|
||||
* 如果SDK未初始化或已过期,返回0而不增加引用计数
|
||||
* 此方法用于在获取SDK后立即增加引用计数,避免并发问题
|
||||
*
|
||||
* @return 当前有效的SDK id并已增加引用计数,如果SDK无效返回0
|
||||
*/
|
||||
long acquireMsgAuditSdk();
|
||||
|
||||
/**
|
||||
* 减少SDK引用计数并在必要时释放(原子操作)
|
||||
* 此方法确保引用计数递减和SDK检查在同一个同步块内完成
|
||||
*
|
||||
* @param sdk sdk id
|
||||
*/
|
||||
void releaseMsgAuditSdk(long sdk);
|
||||
}
|
||||
|
||||
@@ -475,6 +475,10 @@ public class WxCpDefaultConfigImpl implements WxCpConfigStorage, Serializable {
|
||||
|
||||
@Override
|
||||
public synchronized void updateMsgAuditSdk(long sdk, int expiresInSeconds) {
|
||||
// 如果有旧的SDK且引用计数为0,先销毁旧的SDK
|
||||
if (this.msgAuditSdk > 0 && this.msgAuditSdk != sdk && this.msgAuditSdkRefCount == 0) {
|
||||
Finance.DestroySdk(this.msgAuditSdk);
|
||||
}
|
||||
this.msgAuditSdk = sdk;
|
||||
// 预留200秒的时间
|
||||
this.msgAuditSdkExpiresTime = System.currentTimeMillis() + (expiresInSeconds - 200) * 1000L;
|
||||
@@ -489,10 +493,10 @@ public class WxCpDefaultConfigImpl implements WxCpConfigStorage, Serializable {
|
||||
|
||||
@Override
|
||||
public synchronized int incrementMsgAuditSdkRefCount(long sdk) {
|
||||
if (this.msgAuditSdk == sdk) {
|
||||
if (this.msgAuditSdk == sdk && sdk > 0) {
|
||||
return ++this.msgAuditSdkRefCount;
|
||||
}
|
||||
return 0;
|
||||
return -1; // SDK不匹配,返回-1表示错误
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -500,21 +504,46 @@ public class WxCpDefaultConfigImpl implements WxCpConfigStorage, Serializable {
|
||||
if (this.msgAuditSdk == sdk && this.msgAuditSdkRefCount > 0) {
|
||||
int newCount = --this.msgAuditSdkRefCount;
|
||||
// 当引用计数降为0时,自动销毁SDK以释放资源
|
||||
if (newCount == 0) {
|
||||
// 再次检查SDK是否仍然是当前缓存的SDK(防止并发重新初始化)
|
||||
if (newCount == 0 && this.msgAuditSdk == sdk) {
|
||||
Finance.DestroySdk(sdk);
|
||||
this.msgAuditSdk = 0;
|
||||
this.msgAuditSdkExpiresTime = 0;
|
||||
}
|
||||
return newCount;
|
||||
}
|
||||
return 0;
|
||||
return -1; // SDK不匹配或引用计数已为0,返回-1表示错误
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getMsgAuditSdkRefCount(long sdk) {
|
||||
if (this.msgAuditSdk == sdk) {
|
||||
if (this.msgAuditSdk == sdk && sdk > 0) {
|
||||
return this.msgAuditSdkRefCount;
|
||||
}
|
||||
return 0;
|
||||
return -1; // SDK不匹配,返回-1表示错误
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized long acquireMsgAuditSdk() {
|
||||
// 检查SDK是否有效(已初始化且未过期)
|
||||
if (this.msgAuditSdk > 0 && !isMsgAuditSdkExpired()) {
|
||||
this.msgAuditSdkRefCount++;
|
||||
return this.msgAuditSdk;
|
||||
}
|
||||
return 0; // SDK未初始化或已过期
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void releaseMsgAuditSdk(long sdk) {
|
||||
if (this.msgAuditSdk == sdk && this.msgAuditSdkRefCount > 0) {
|
||||
int newCount = --this.msgAuditSdkRefCount;
|
||||
// 当引用计数降为0时,自动销毁SDK以释放资源
|
||||
// 再次检查SDK是否仍然是当前缓存的SDK(防止并发重新初始化)
|
||||
if (newCount == 0 && this.msgAuditSdk == sdk) {
|
||||
Finance.DestroySdk(sdk);
|
||||
this.msgAuditSdk = 0;
|
||||
this.msgAuditSdkExpiresTime = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -493,6 +493,10 @@ public class WxCpRedisConfigImpl implements WxCpConfigStorage {
|
||||
|
||||
@Override
|
||||
public synchronized void updateMsgAuditSdk(long sdk, int expiresInSeconds) {
|
||||
// 如果有旧的SDK且引用计数为0,先销毁旧的SDK
|
||||
if (this.msgAuditSdk > 0 && this.msgAuditSdk != sdk && this.msgAuditSdkRefCount == 0) {
|
||||
Finance.DestroySdk(this.msgAuditSdk);
|
||||
}
|
||||
this.msgAuditSdk = sdk;
|
||||
// 预留200秒的时间
|
||||
this.msgAuditSdkExpiresTime = System.currentTimeMillis() + (expiresInSeconds - 200) * 1000L;
|
||||
@@ -507,10 +511,10 @@ public class WxCpRedisConfigImpl implements WxCpConfigStorage {
|
||||
|
||||
@Override
|
||||
public synchronized int incrementMsgAuditSdkRefCount(long sdk) {
|
||||
if (this.msgAuditSdk == sdk) {
|
||||
if (this.msgAuditSdk == sdk && sdk > 0) {
|
||||
return ++this.msgAuditSdkRefCount;
|
||||
}
|
||||
return 0;
|
||||
return -1; // SDK不匹配,返回-1表示错误
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -518,21 +522,46 @@ public class WxCpRedisConfigImpl implements WxCpConfigStorage {
|
||||
if (this.msgAuditSdk == sdk && this.msgAuditSdkRefCount > 0) {
|
||||
int newCount = --this.msgAuditSdkRefCount;
|
||||
// 当引用计数降为0时,自动销毁SDK以释放资源
|
||||
if (newCount == 0) {
|
||||
// 再次检查SDK是否仍然是当前缓存的SDK(防止并发重新初始化)
|
||||
if (newCount == 0 && this.msgAuditSdk == sdk) {
|
||||
Finance.DestroySdk(sdk);
|
||||
this.msgAuditSdk = 0;
|
||||
this.msgAuditSdkExpiresTime = 0;
|
||||
}
|
||||
return newCount;
|
||||
}
|
||||
return 0;
|
||||
return -1; // SDK不匹配或引用计数已为0,返回-1表示错误
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getMsgAuditSdkRefCount(long sdk) {
|
||||
if (this.msgAuditSdk == sdk) {
|
||||
if (this.msgAuditSdk == sdk && sdk > 0) {
|
||||
return this.msgAuditSdkRefCount;
|
||||
}
|
||||
return 0;
|
||||
return -1; // SDK不匹配,返回-1表示错误
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized long acquireMsgAuditSdk() {
|
||||
// 检查SDK是否有效(已初始化且未过期)
|
||||
if (this.msgAuditSdk > 0 && !isMsgAuditSdkExpired()) {
|
||||
this.msgAuditSdkRefCount++;
|
||||
return this.msgAuditSdk;
|
||||
}
|
||||
return 0; // SDK未初始化或已过期
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void releaseMsgAuditSdk(long sdk) {
|
||||
if (this.msgAuditSdk == sdk && this.msgAuditSdkRefCount > 0) {
|
||||
int newCount = --this.msgAuditSdkRefCount;
|
||||
// 当引用计数降为0时,自动销毁SDK以释放资源
|
||||
// 再次检查SDK是否仍然是当前缓存的SDK(防止并发重新初始化)
|
||||
if (newCount == 0 && this.msgAuditSdk == sdk) {
|
||||
Finance.DestroySdk(sdk);
|
||||
this.msgAuditSdk = 0;
|
||||
this.msgAuditSdkExpiresTime = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user