diff --git a/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/InternalSession.java b/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/InternalSession.java
index e81a87179..79a7f4dbd 100644
--- a/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/InternalSession.java
+++ b/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/InternalSession.java
@@ -39,11 +39,9 @@ public interface InternalSession {
void access();
/**
- * Set the isNew flag for this session.
- *
- * @param isNew The new value for the isNew flag
+ * End the access.
*/
- void setNew(boolean isNew);
+ void endAccess();
/**
* Set the creation time for this session. This method is called by the
diff --git a/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/SessionImpl.java b/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/SessionImpl.java
index 8ad039ec1..a899190d5 100644
--- a/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/SessionImpl.java
+++ b/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/SessionImpl.java
@@ -4,6 +4,7 @@ import me.chanjar.weixin.common.util.res.StringManager;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
public class SessionImpl implements WxSession, InternalSession {
@@ -90,11 +91,6 @@ public class SessionImpl implements WxSession, InternalSession {
*/
protected volatile boolean isValid = false;
- /**
- * Flag indicating whether this session is new or not.
- */
- protected boolean isNew = false;
-
/**
* We are currently processing a session expiration, so bypass
* certain IllegalStateException tests. NOTE: This value is not
@@ -123,11 +119,6 @@ public class SessionImpl implements WxSession, InternalSession {
*/
protected volatile long thisAccessedTime = creationTime;
- /**
- * The last accessed time for this Session.
- */
- protected volatile long lastAccessedTime = creationTime;
-
/**
* The default maximum inactive interval for Sessions created by
* this Manager.
@@ -140,9 +131,15 @@ public class SessionImpl implements WxSession, InternalSession {
*/
protected transient InternalSessionFacade facade = null;
+ /**
+ * The access count for this session.
+ */
+ protected transient AtomicInteger accessCount = null;
+
public SessionImpl(InternalSessionManager manager) {
this.manager = manager;
+ this.accessCount = new AtomicInteger();
}
@@ -176,7 +173,28 @@ public class SessionImpl implements WxSession, InternalSession {
@Override
public boolean isValid() {
- return isValid;
+ if (!this.isValid) {
+ return false;
+ }
+
+ if (this.expiring) {
+ return true;
+ }
+
+ if (accessCount.get() > 0) {
+ return true;
+ }
+
+ if (maxInactiveInterval > 0) {
+ long timeNow = System.currentTimeMillis();
+ int timeIdle;
+ timeIdle = (int) ((timeNow - thisAccessedTime) / 1000L);
+ if (timeIdle >= maxInactiveInterval) {
+ expire();
+ }
+ }
+
+ return this.isValid;
}
@Override
@@ -215,6 +233,8 @@ public class SessionImpl implements WxSession, InternalSession {
// Mark this session as "being expired"
expiring = true;
+ accessCount.set(0);
+
// Remove this session from our manager's active sessions
manager.remove(this, true);
@@ -238,23 +258,23 @@ public class SessionImpl implements WxSession, InternalSession {
public void access() {
this.thisAccessedTime = System.currentTimeMillis();
+ accessCount.incrementAndGet();
}
@Override
- public void setNew(boolean isNew) {
+ public void endAccess() {
- this.isNew = isNew;
+ this.thisAccessedTime = System.currentTimeMillis();
+ accessCount.decrementAndGet();
}
-
@Override
public void setCreationTime(long time) {
this.creationTime = time;
- this.lastAccessedTime = time;
this.thisAccessedTime = time;
}
diff --git a/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/SessionManagerImpl.java b/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/SessionManagerImpl.java
index 0188c6723..e6de713d1 100644
--- a/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/SessionManagerImpl.java
+++ b/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/SessionManagerImpl.java
@@ -6,6 +6,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
public class SessionManagerImpl implements WxSessionManager, InternalSessionManager {
@@ -104,6 +105,11 @@ public class SessionManagerImpl implements WxSessionManager, InternalSessionMana
*/
protected int processExpiresFrequency = 6;
+ /**
+ * 后台清理线程是否已经开启
+ */
+ private final AtomicBoolean backgroundProcessStarted = new AtomicBoolean(false);
+
@Override
public void remove(InternalSession session) {
remove(session, false);
@@ -155,7 +161,6 @@ public class SessionManagerImpl implements WxSessionManager, InternalSessionMana
InternalSession session = createEmptySession();
// Initialize the properties of the new session and return it
- session.setNew(true);
session.setValid(true);
session.setCreationTime(System.currentTimeMillis());
session.setMaxInactiveInterval(this.maxInactiveInterval);
@@ -191,6 +196,26 @@ public class SessionManagerImpl implements WxSessionManager, InternalSessionMana
@Override
public void add(InternalSession session) {
+ // 当第一次有session创建的时候,开启session清理线程
+ if (!backgroundProcessStarted.getAndSet(true)) {
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ // 每秒清理一次
+ Thread.sleep(1000l);
+ backgroundProcess();
+ } catch (InterruptedException e) {
+ log.error("SessionManagerImpl.backgroundProcess error", e);
+ }
+ }
+ }
+ });
+ t.setDaemon(true);
+ t.start();
+ }
+
sessions.put(session.getIdInternal(), session);
int size = getActiveSessions();
if( size > maxActive ) {
diff --git a/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/WxSessionManager.java b/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/WxSessionManager.java
index e2b1dd79b..262d9fe4f 100644
--- a/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/WxSessionManager.java
+++ b/weixin-java-common/src/main/java/me/chanjar/weixin/common/session/WxSessionManager.java
@@ -2,9 +2,19 @@ package me.chanjar.weixin.common.session;
public interface WxSessionManager {
-
+ /**
+ * 获取某个sessionId对应的session,如果sessionId没有对应的session,则新建一个并返回。
+ * @param sessionId
+ * @return
+ */
public WxSession getSession(String sessionId);
+ /**
+ * 获取某个sessionId对应的session,如果sessionId没有对应的session,若create为true则新建一个,否则返回null。
+ * @param sessionId
+ * @param create
+ * @return
+ */
public WxSession getSession(String sessionId, boolean create);
diff --git a/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/WxMsgIdMemoryDuplicateChecker.java b/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/WxMsgIdMemoryDuplicateChecker.java
index 5343ff3cc..461a245a4 100644
--- a/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/WxMsgIdMemoryDuplicateChecker.java
+++ b/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/WxMsgIdMemoryDuplicateChecker.java
@@ -2,6 +2,7 @@ package me.chanjar.weixin.common.util;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@@ -21,8 +22,16 @@ public class WxMsgIdMemoryDuplicateChecker implements WxMsgIdDuplicateChecker {
*/
private final Long clearPeriod;
+ /**
+ * 消息id->消息时间戳的map
+ */
private final ConcurrentHashMap msgId2Timestamp = new ConcurrentHashMap();
+ /**
+ * 后台清理线程是否已经开启
+ */
+ private final AtomicBoolean backgroundProcessStarted = new AtomicBoolean(false);
+
/**
* WxMsgIdInMemoryDuplicateChecker构造函数
*
@@ -33,7 +42,6 @@ public class WxMsgIdMemoryDuplicateChecker implements WxMsgIdDuplicateChecker {
public WxMsgIdMemoryDuplicateChecker() {
this.timeToLive = 15 * 1000l;
this.clearPeriod = 5 * 1000l;
- this.start();
}
/**
@@ -44,10 +52,12 @@ public class WxMsgIdMemoryDuplicateChecker implements WxMsgIdDuplicateChecker {
public WxMsgIdMemoryDuplicateChecker(Long timeToLive, Long clearPeriod) {
this.timeToLive = timeToLive;
this.clearPeriod = clearPeriod;
- this.start();
}
- private void start() {
+ protected void checkBackgroundProcessStarted() {
+ if (backgroundProcessStarted.getAndSet(true)) {
+ return;
+ }
Thread t = new Thread(new Runnable() {
@Override
public void run() {
@@ -72,6 +82,7 @@ public class WxMsgIdMemoryDuplicateChecker implements WxMsgIdDuplicateChecker {
@Override
public boolean isDuplicate(Long wxMsgId) {
+ checkBackgroundProcessStarted();
Long timestamp = msgId2Timestamp.putIfAbsent(wxMsgId, System.currentTimeMillis());
if (timestamp == null) {
// 第一次接收到这个消息
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpService.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpService.java
index c3b8b4392..29ec81771 100644
--- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpService.java
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpService.java
@@ -3,6 +3,8 @@ package me.chanjar.weixin.cp.api;
import me.chanjar.weixin.common.bean.WxMenu;
import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
import me.chanjar.weixin.common.exception.WxErrorException;
+import me.chanjar.weixin.common.session.WxSession;
+import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.common.util.http.RequestExecutor;
import me.chanjar.weixin.cp.bean.WxCpDepart;
import me.chanjar.weixin.cp.bean.WxCpMessage;
@@ -371,4 +373,28 @@ public interface WxCpService {
* @param maxRetryTimes
*/
void setMaxRetryTimes(int maxRetryTimes);
+
+ /**
+ * 获取某个sessionId对应的session,如果sessionId没有对应的session,则新建一个并返回。
+ * @param id id可以为任意字符串,建议使用FromUserName作为id
+ * @return
+ */
+ WxSession getSession(String id);
+
+ /**
+ * 获取某个sessionId对应的session,如果sessionId没有对应的session,若create为true则新建一个,否则返回null。
+ * @param id id可以为任意字符串,建议使用FromUserName作为id
+ * @param create
+ * @return
+ */
+ WxSession getSession(String id, boolean create);
+
+ /**
+ *
+ * 设置WxSessionManager,只有当需要使用个性化的WxSessionManager的时候才需要调用此方法,
+ * WxCpService默认使用的是{@link me.chanjar.weixin.common.session.SessionManagerImpl}
+ *
+ * @param sessionManager
+ */
+ void setSessionManager(WxSessionManager sessionManager);
}
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpServiceImpl.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpServiceImpl.java
index c49a90a70..473050e6a 100644
--- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpServiceImpl.java
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpServiceImpl.java
@@ -12,6 +12,9 @@ import me.chanjar.weixin.common.bean.WxMenu;
import me.chanjar.weixin.common.bean.result.WxError;
import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
import me.chanjar.weixin.common.exception.WxErrorException;
+import me.chanjar.weixin.common.session.SessionManagerImpl;
+import me.chanjar.weixin.common.session.WxSession;
+import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.common.util.StringUtils;
import me.chanjar.weixin.common.util.crypto.SHA1;
import me.chanjar.weixin.common.util.fs.FileUtils;
@@ -64,6 +67,8 @@ public class WxCpServiceImpl implements WxCpService {
private int maxRetryTimes = 5;
+ protected WxSessionManager sessionManager = new SessionManagerImpl();
+
public boolean checkSignature(String msgSignature, String timestamp, String nonce, String data) {
try {
return SHA1.gen(wxCpConfigStorage.getToken(), timestamp, nonce, data).equals(msgSignature);
@@ -473,4 +478,26 @@ public class WxCpServiceImpl implements WxCpService {
this.maxRetryTimes = maxRetryTimes;
}
+ @Override
+ public WxSession getSession(String id) {
+ if (sessionManager == null) {
+ return null;
+ }
+ return sessionManager.getSession(id);
+ }
+
+ @Override
+ public WxSession getSession(String id, boolean create) {
+ if (sessionManager == null) {
+ return null;
+ }
+ return sessionManager.getSession(id, create);
+ }
+
+
+ @Override
+ public void setSessionManager(WxSessionManager sessionManager) {
+ this.sessionManager = sessionManager;
+ }
+
}
diff --git a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java
index 00b0f80fb..f069fc8da 100644
--- a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java
+++ b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java
@@ -1,16 +1,25 @@
package me.chanjar.weixin.mp.api;
+import me.chanjar.weixin.common.session.InternalSession;
+import me.chanjar.weixin.common.session.SessionManagerImpl;
+import me.chanjar.weixin.common.session.WxSession;
+import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.common.util.WxMsgIdDuplicateChecker;
import me.chanjar.weixin.common.util.WxMsgIdMemoryDuplicateChecker;
import me.chanjar.weixin.mp.bean.WxMpXmlMessage;
import me.chanjar.weixin.mp.bean.WxMpXmlOutMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.swing.text.StyledEditorKit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.regex.Pattern;
/**
@@ -41,6 +50,8 @@ import java.util.regex.Pattern;
*/
public class WxMpMessageRouter {
+ protected final Logger log = LoggerFactory.getLogger(WxMpMessageRouter.class);
+
private static final int DEFAULT_THREAD_POOL_SIZE = 20;
private final List rules = new ArrayList();
@@ -51,6 +62,8 @@ public class WxMpMessageRouter {
private WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker;
+ protected WxSessionManager sessionManager = new SessionManagerImpl();
+
public WxMpMessageRouter(WxMpService wxMpService) {
this.wxMpService = wxMpService;
this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
@@ -113,21 +126,54 @@ public class WxMpMessageRouter {
}
WxMpXmlOutMessage res = null;
+ final List futures = new ArrayList();
for (final Rule rule : matchRules) {
// 返回最后一个非异步的rule的执行结果
if(rule.async) {
- executorService.submit(new Runnable() {
- public void run() {
- rule.service(wxMessage);
- }
- });
+ futures.add(
+ executorService.submit(new Runnable() {
+ public void run() {
+ rule.service(wxMessage);
+ }
+ })
+ );
} else {
res = rule.service(wxMessage);
}
}
+
+ // 告诉session,它已经用不着了
+ if (futures.size() > 0) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (Future future : futures) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ log.error("Error happened when wait task finish", e);
+ } catch (ExecutionException e) {
+ log.error("Error happened when wait task finish", e);
+ }
+ }
+ // 在这里session再也不会被使用了
+ sessionEndAccess(wxMessage);
+ }
+ });
+ } else {
+ // 在这里session再也不会被使用了
+ sessionEndAccess(wxMessage);
+ }
return res;
}
-
+
+ protected void sessionEndAccess(WxMpXmlMessage wxMessage) {
+ WxSession session = sessionManager.getSession(wxMessage.getFromUserName(), false);
+ if (session != null) {
+ ((InternalSession) session).endAccess();
+ }
+ }
+
public static class Rule {
private final WxMpMessageRouter routerBuilder;
diff --git a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpService.java b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpService.java
index 3ad10bdc5..6bd501f3d 100644
--- a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpService.java
+++ b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpService.java
@@ -3,6 +3,8 @@ package me.chanjar.weixin.mp.api;
import me.chanjar.weixin.common.bean.WxMenu;
import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
import me.chanjar.weixin.common.exception.WxErrorException;
+import me.chanjar.weixin.common.session.WxSession;
+import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.common.util.http.RequestExecutor;
import me.chanjar.weixin.mp.bean.*;
import me.chanjar.weixin.mp.bean.result.*;
@@ -456,10 +458,10 @@ public interface WxMpService {
*/
public T execute(RequestExecutor executor, String uri, E data) throws WxErrorException;
- /**
- * 注入 {@link WxMpConfigStorage} 的实现
- * @param wxConfigProvider
- */
+ /**
+ * 注入 {@link WxMpConfigStorage} 的实现
+ * @param wxConfigProvider
+ */
public void setWxMpConfigStorage(WxMpConfigStorage wxConfigProvider);
/**
@@ -479,4 +481,5 @@ public interface WxMpService {
* @param maxRetryTimes
*/
void setMaxRetryTimes(int maxRetryTimes);
+
}
diff --git a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpServiceImpl.java b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpServiceImpl.java
index 21a02648a..f3c96f5b8 100644
--- a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpServiceImpl.java
+++ b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpServiceImpl.java
@@ -10,6 +10,9 @@ import me.chanjar.weixin.common.bean.WxMenu;
import me.chanjar.weixin.common.bean.result.WxError;
import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
import me.chanjar.weixin.common.exception.WxErrorException;
+import me.chanjar.weixin.common.session.SessionManagerImpl;
+import me.chanjar.weixin.common.session.WxSession;
+import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.common.util.StringUtils;
import me.chanjar.weixin.common.util.crypto.SHA1;
import me.chanjar.weixin.common.util.fs.FileUtils;
@@ -66,6 +69,8 @@ public class WxMpServiceImpl implements WxMpService {
private int maxRetryTimes = 5;
+ protected WxSessionManager sessionManager = new SessionManagerImpl();
+
public boolean checkSignature(String timestamp, String nonce, String signature) {
try {
return SHA1.gen(wxMpConfigStorage.getToken(), timestamp, nonce).equals(signature);
@@ -545,7 +550,6 @@ public class WxMpServiceImpl implements WxMpService {
}
}
-
@Override
public void setRetrySleepMillis(int retrySleepMillis) {
this.retrySleepMillis = retrySleepMillis;