소스 검색

微信推送不用mq,直接即时推送

shenhao 4 년 전
부모
커밋
231ec03d97

+ 70 - 17
src/main/java/com/ssj/service/weixin/push/service/impl/PushTemplateServiceImpl.java

@@ -3,11 +3,16 @@ package com.ssj.service.weixin.push.service.impl;
 import com.alibaba.fastjson.JSONObject;
 import com.ssj.bean.weixin.push.domain.PushTemplate;
 import com.ssj.bean.weixin.push.domain.SendTemplateShort;
+import com.ssj.bean.weixin.push.domain.TbWxTemplate;
 import com.ssj.framework.core.common.service.BaseServiceImpl;
 import com.ssj.framework.core.rabbitmq.TopicRabbitConfig;
 import com.ssj.framework.core.rabbitmq.producer.BaseRabbitSend;
+import com.ssj.framework.core.security.manager.TokenManager;
+import com.ssj.framework.core.util.SystemResourceLocator;
+import com.ssj.framework.weixin.util.NewsUtil;
 import com.ssj.service.weixin.push.service.PushTemplateService;
 import com.ssj.service.weixin.push.service.dao.PushTemplateDao;
+import com.ssj.service.weixin.push.service.util.WxTemplateCacheUtils;
 import com.ssj.taskthread.Task;
 import com.ssj.taskthread.ThreadPool;
 import org.apache.commons.lang3.StringUtils;
@@ -35,16 +40,20 @@ public class PushTemplateServiceImpl  extends BaseServiceImpl<PushTemplate, Stri
 	
     @Autowired
 	private PushTemplateDao dao;
- 
 
+    @Autowired
+    private TokenManager tokenManager;
+
+	@Override
 	public boolean savePushWxTemplate(SendTemplateShort sendTemplateShort) {
 		boolean is=true;
-		ThreadPool.getInstance().addTask(new SyncSaveAndSendPushDBTask(sendTemplateShort,dao));
+		ThreadPool.getInstance().addTask(new SyncSaveAndSendPushDBTask(sendTemplateShort,dao,tokenManager));
 		return is;
 	}
 
+	@Override
 	public void saveAndSendPushWxTemplate(SendTemplateShort sendTemplateShort) {
-		ThreadPool.getInstance().addTask(new SyncSaveAndSendPushDBTask(sendTemplateShort,dao));
+		ThreadPool.getInstance().addTask(new SyncSaveAndSendPushDBTask(sendTemplateShort,dao,tokenManager));
 	}
 
 	public boolean savePushWxTemplateByDb(SendTemplateShort sendTemplateShort) {
@@ -108,11 +117,14 @@ class SyncSaveAndSendPushDBTask extends Task{
 	private SendTemplateShort sendTemplateShort;
 	
 	private PushTemplateDao dao;
+
+	private TokenManager tokenManager;
 	
-	public SyncSaveAndSendPushDBTask(SendTemplateShort sendTemplateShort,PushTemplateDao dao) {
+	public SyncSaveAndSendPushDBTask(SendTemplateShort sendTemplateShort,PushTemplateDao dao,TokenManager tokenManager) {
 	    super();
 	    this.sendTemplateShort=sendTemplateShort;
 	    this.dao=dao;
+	    this.tokenManager = tokenManager;
 	} 
 	
 	@Override
@@ -122,15 +134,6 @@ class SyncSaveAndSendPushDBTask extends Task{
 		} catch (Exception e) {
 			logger.error("推送信息保存异常...");
 		}
-		try {
-			String uuid=BaseRabbitSend.convertAndTopicSend(
-					TopicRabbitConfig.exchange_task,
-					TopicRabbitConfig.topic_push_messages_wx, 
-					sendTemplateShort);
-			logger.error("已加入推送信息队列数据uuid:"+uuid+" data:"+JSONObject.toJSONString(sendTemplateShort));
-		} catch (Exception e) {
-			logger.error("推送信息队列异常...");
-		}
 	}
 
 	public boolean savePushWxTemplateByDb(SendTemplateShort sendTemplateShort) {
@@ -153,16 +156,66 @@ class SyncSaveAndSendPushDBTask extends Task{
 			pushTemplate.setStat(1);
 			pushTemplate.setTryTime(0);
 			pushTemplate.setRemark("");
-			pushTemplate=dao.save(pushTemplate);
-			sendTemplateShort.setPushTemplateId(pushTemplate.getId());
+			run(pushTemplate,tokenManager);
 		} catch (Exception e) {
 			logger.error("保存推送信息异常...");
 			return false;
 		}
 		return true;
 	}
-	
-	
+
+	public void run(PushTemplate pushTemplate,TokenManager tokenManager) {
+		logger.error("立即微信推送任务开始---------------");
+		try {
+			TbWxTemplate tbWxTemplate= WxTemplateCacheUtils.getTbWxTemplate(pushTemplate.getWxTplSid(),null,null);
+			if(StringUtils.isEmpty(pushTemplate.getOpenid())){
+				pushTemplate.setRemark("OPENID为空,请注意检查来源代码逻辑...");
+				pushTemplate.setTryTime(pushTemplate.getTryTime()+1);
+			}else if(tbWxTemplate!=null && StringUtils.isNotEmpty(tbWxTemplate.getTemplateId())){
+				try {
+					String msg= NewsUtil.WX_TPL_BODY
+							.replace(NewsUtil.TPL_CODE_OPENID, pushTemplate.getOpenid())
+							.replace(NewsUtil.TPL_CODE_TPLID, tbWxTemplate.getTemplateId())
+							.replace(NewsUtil.TPL_CODE_URL, StringUtils.isNotEmpty(pushTemplate.getClickUrl())?pushTemplate.getClickUrl():"")
+							.replace(NewsUtil.TPL_CODE_DATA, pushTemplate.getContent())
+							.replace(NewsUtil.TPL_CODE_TOPCOLOR, StringUtils.isNotEmpty(pushTemplate.getTopColor())?pushTemplate.getTopColor():"");
+					if(StringUtils.isNotEmpty(pushTemplate.getMiniprogram())){
+						msg=msg.replace(NewsUtil.TPL_CODE_MINIPROQRAM_TEXT, NewsUtil.TPL_SUB_MINIPROQRAM_DATA);
+						msg=msg.replace(NewsUtil.MINIPROQRAM_DATA, pushTemplate.getMiniprogram());
+					}else{
+						msg=msg.replace(NewsUtil.TPL_CODE_MINIPROQRAM_TEXT, "");
+					}
+
+					//再去查询,防止重复推送,定时推送数据锁定一分钟
+
+					if(pushTemplate.getIsSuccess()==0){
+						JSONObject result =NewsUtil.sendTemplate(tokenManager.getSSJAccessToken(), msg);
+						logger.error("[PushTemplateServiceImpl]  WxTplSid: " + pushTemplate.getWxTplSid() + ", result: " + result);
+						if(result != null && "0".equals(result.getString("errcode"))){
+							pushTemplate.setIsSuccess(1);
+							pushTemplate.setSuccessTime(new Date());
+							pushTemplate.setCurTotal(pushTemplate.getCurTotal()+1);
+							pushTemplate.setRemark("");
+						}else{
+							pushTemplate.setTryTime(pushTemplate.getTryTime()+1);
+							pushTemplate.setRemark("errcode:"+result.getString("errcode"));
+						}
+					}
+				} catch (Exception e) {
+					pushTemplate.setTryTime(pushTemplate.getTryTime()+1);
+					pushTemplate.setRemark("exception:"+e.getMessage());
+				}
+			}else{
+				pushTemplate.setRemark("模板池无法得到微信模板ID...");
+				pushTemplate.setTryTime(pushTemplate.getTryTime()+1);
+			}
+			dao.save(pushTemplate);
+		} catch (Exception e) {
+			e.printStackTrace();
+			logger.error("保存推送信息异常...");
+		}
+		logger.error("立即微信推送任务结束---------------");
+	}
 	
 }
 

+ 61 - 0
src/main/java/com/ssj/service/weixin/push/service/util/SyncTemplateInfoSaveTask.java

@@ -0,0 +1,61 @@
+package com.ssj.service.weixin.push.service.util;
+
+import com.ssj.bean.weixin.push.domain.TbWxTemplate;
+import com.ssj.bean.weixin.push.domain.TbWxTemplateInfo;
+import com.ssj.framework.core.util.StringUtil;
+import com.ssj.framework.core.util.SystemResourceLocator;
+import com.ssj.framework.weixin.news.bean.Template;
+import com.ssj.framework.weixin.util.NewsUtil;
+import com.ssj.service.weixin.push.service.WxTemplateInfoService;
+import org.apache.http.client.utils.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.List;
+
+
+@Component
+public class SyncTemplateInfoSaveTask {
+	
+	protected Logger logger = LoggerFactory.getLogger(this.getClass());
+	
+	@Async("taskExecutor")
+	public void run(String templateIdShort, TbWxTemplate tbWxTemplate, String accessToken) {
+		
+		synchronized(this){
+			WxTemplateInfoService wxTemplateInfoService = (WxTemplateInfoService) SystemResourceLocator.getBean(WxTemplateInfoService.class);
+		    try {
+		    	if(StringUtil.isNotEmpty(templateIdShort)){
+			    	List<TbWxTemplateInfo> tbWxTemplateInfos= wxTemplateInfoService.queryTbWxTemplateInfoByShort(templateIdShort);
+			    	if(tbWxTemplateInfos==null || tbWxTemplateInfos.size()==0){
+				    	List<Template> templates= NewsUtil.getAllTemplate(accessToken);
+			  		    for (Template template : templates) {
+			  			   if(template.getTemplate_id().equals(tbWxTemplate.getTemplateId())){
+			  				   TbWxTemplateInfo tbWxTemplateInfo=new TbWxTemplateInfo();
+			  				   tbWxTemplateInfo.setContent(template.getContent());
+			  				   tbWxTemplateInfo.setCreateTime(DateUtils.formatDate(new Date(), "yyyy-MM-dd"));
+			  				   tbWxTemplateInfo.setDeputyIndustry(template.getDeputy_industry());
+			  				   tbWxTemplateInfo.setExample(template.getExample());
+			  				   tbWxTemplateInfo.setPrimaryIndustry(template.getTitle());
+			  				   tbWxTemplateInfo.setTemplateIdShort(templateIdShort);
+			  				   tbWxTemplateInfo.setTitle(template.getTitle());
+			  				   wxTemplateInfoService.save(tbWxTemplateInfo);
+			  				   break;
+			  			   }
+			  		   }    
+			    	}
+		    	}
+	        } catch (Exception e) {
+	        	logger.error(" 微信模板明细数据保存,异常---------------");
+	        }
+		}
+		
+	}
+
+	
+	
+
+}

+ 33 - 0
src/main/java/com/ssj/service/weixin/push/service/util/SyncTemplateSaveTask.java

@@ -0,0 +1,33 @@
+package com.ssj.service.weixin.push.service.util;
+
+import com.ssj.bean.weixin.push.domain.TbWxTemplate;
+import com.ssj.framework.core.util.SystemResourceLocator;
+import com.ssj.service.weixin.push.service.WxTemplateService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SyncTemplateSaveTask {
+
+	protected Logger logger = LoggerFactory.getLogger(this.getClass());
+	
+	@Async("taskExecutor")
+	public void run(TbWxTemplate tbWxTemplate) {
+		synchronized(this){
+		    try {
+		    	if(tbWxTemplate!=null){
+		    		WxTemplateService wxTemplateService = (WxTemplateService) SystemResourceLocator.getBean(WxTemplateService.class);
+		    		wxTemplateService.save(tbWxTemplate);
+		    	}
+	        } catch (Exception e) {
+	        	logger.error(" 微信模板运行数据保存,异常---------------");
+	        }
+		}
+		
+	}
+
+	
+	
+}

+ 14 - 0
src/main/java/com/ssj/service/weixin/push/service/util/TaskConstant.java

@@ -0,0 +1,14 @@
+package com.ssj.service.weixin.push.service.util;
+
+public class TaskConstant {
+
+	
+
+	//临时的key,可以用于删减的
+	public static final String TEMP_KEY="WX_NEWTEMP_List";
+	
+	//长久的key,只要设置了start和end时间的,都会放到这里
+	public static final String LONG_KEY="WX_LONG_List";
+	
+	
+}

+ 186 - 0
src/main/java/com/ssj/service/weixin/push/service/util/WxTemplateCacheUtils.java

@@ -0,0 +1,186 @@
+package com.ssj.service.weixin.push.service.util;
+
+import com.alibaba.fastjson.JSONObject;
+import com.ssj.bean.weixin.push.domain.TbWxTemplate;
+import com.ssj.framework.core.security.manager.TokenManager;
+import com.ssj.framework.core.util.RedisUtil;
+import com.ssj.framework.core.util.SystemResourceLocator;
+import com.ssj.framework.weixin.util.NewsUtil;
+import com.ssj.framework.weixin.util.WeixinUtil;
+import com.ssj.service.weixin.push.service.WxTemplateService;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * 重构模板池
+ * 2019年4月11日15:00:41
+ * @author wuwen
+ */
+public class WxTemplateCacheUtils {
+	
+	private static Logger logger = LoggerFactory.getLogger(WxTemplateCacheUtils.class);
+	
+	//初始化模板到redis
+	@SuppressWarnings("all")
+	public static synchronized void init() {
+    	WxTemplateService wxTemplateService = (WxTemplateService) SystemResourceLocator.getBean(WxTemplateService.class);
+    	RedisUtil redisUtil= (RedisUtil) SystemResourceLocator.getBean(RedisUtil.class);
+    	List<TbWxTemplate> tbWxTemplates=wxTemplateService.queryTbWxTemplate();
+    	
+    	redisUtil.del(TaskConstant.TEMP_KEY);
+    	redisUtil.del( TaskConstant.LONG_KEY);
+    	//使用List集合,有序。
+    	for (TbWxTemplate tbWxTemplate : tbWxTemplates) {
+    		if(tbWxTemplate.getStartTime()!=null && tbWxTemplate.getEndTime()!=null){
+    			redisUtil.lpush(
+
+        				TaskConstant.LONG_KEY, 
+        				JSONObject.toJSONString(tbWxTemplate));
+    		}else{
+    			redisUtil.lpush(
+
+        				TaskConstant.TEMP_KEY, 
+        				JSONObject.toJSONString(tbWxTemplate));
+        		
+    		}
+		}
+    }
+	
+	//查询redis中所有的模板
+	@SuppressWarnings("all")
+    public static synchronized List<TbWxTemplate> getTbWxTemplates() {
+    	RedisUtil redisUtil= (RedisUtil) SystemResourceLocator.getBean(RedisUtil.class);
+    	List<TbWxTemplate> lists=new ArrayList<TbWxTemplate>();
+    	List<String> longList = redisUtil.lrange(TaskConstant.LONG_KEY,0,-1);
+    	List<String> tempList = redisUtil.lrange(TaskConstant.TEMP_KEY,0,-1);
+    	for (String str : longList) {
+    		lists.add(JSONObject.toJavaObject(JSONObject.parseObject(str), TbWxTemplate.class));
+		}
+    	for (String str : tempList) {
+    		lists.add(JSONObject.toJavaObject(JSONObject.parseObject(str), TbWxTemplate.class));
+		}
+    	return lists;
+	}
+	
+	
+	
+	//查询redis中id的模板
+	@SuppressWarnings("all")
+    public static synchronized TbWxTemplate getTbWxTemplateById(String templateIdShort) {
+    	RedisUtil redisUtil= (RedisUtil) SystemResourceLocator.getBean(RedisUtil.class);
+    	//长久的调用次数高,放前面
+    	List<String> longList = redisUtil.lrange(TaskConstant.LONG_KEY,0,-1);
+    	for (String str : longList) {
+    		JSONObject json= JSONObject.parseObject(str);
+    		if(templateIdShort.equals(json.getString("templateIdShort"))){
+    			return JSONObject.toJavaObject(json, TbWxTemplate.class);
+    		}
+		}
+    	List<String> tempList = redisUtil.lrange(TaskConstant.TEMP_KEY,0,-1);
+    	for (String str : tempList) {
+    		JSONObject json= JSONObject.parseObject(str);
+    		if(templateIdShort.equals(json.getString("templateIdShort"))){
+    			return JSONObject.toJavaObject(json, TbWxTemplate.class);
+    		}
+		}
+    	
+    	return null;
+	}
+	
+	//得到列表的总数量
+	@SuppressWarnings("all")
+    public static synchronized Long getTbWxTemplateSize() {
+    	RedisUtil redisUtil= (RedisUtil) SystemResourceLocator.getBean(RedisUtil.class);
+    	return redisUtil.llen(TaskConstant.LONG_KEY)+redisUtil.llen(TaskConstant.TEMP_KEY);
+	}
+	
+   //查询模板
+   public  static synchronized TbWxTemplate getTbWxTemplate(String templateIdShort, Date startTime, Date endTime) {
+	       //Redis中存在值,直接返回
+		   TbWxTemplate wxTemplate=getTbWxTemplateById(templateIdShort);
+	       if(wxTemplate!=null){
+	    	   wxTemplate.setLastTime(new Date());
+	    	   wxTemplate.setNumber(wxTemplate.getNumber()+1);
+	    	   
+	    	   //更新模板调用数据库,异步任务-----开始
+			   //ThreadPool.getInstance().addTask(new SyncTemplateSaveTask(wxTemplate));
+	    	   SyncTemplateSaveTask syncTemplateSaveTask = (SyncTemplateSaveTask) SystemResourceLocator.getBean(SyncTemplateSaveTask.class);
+	    	   syncTemplateSaveTask.run(wxTemplate);
+			   //更新模板调用数据库,异步任务-----结束
+	    	   return wxTemplate;
+	       }else{//去添加模板
+	    	   /**
+	    	    * 1:如果当前模板池中数量少于25,直接下载添加到池中
+	    	    * 2:如果当前模板池中数量大于等于25,先把池中,上一次生成时间最长的移出池,然后再下载模板添加到Redis池中。
+	    	    */
+	    		return addPoolTbWxTemplate(templateIdShort, startTime, endTime);//添加模板到池中
+	       }  
+    }
+    
+   //在list中最右边删除一个,再从左边添加一个,返回添加值
+    private  static synchronized TbWxTemplate addPoolTbWxTemplate(String templateIdShort, Date startTime, Date endTime){
+    	   TokenManager tokenManager = (TokenManager) SystemResourceLocator.getBean(TokenManager.class);
+    	   String accessToken=tokenManager.getSSJAccessToken();
+    	   WxTemplateService wxTemplateService = (WxTemplateService) SystemResourceLocator.getBean(WxTemplateService.class);
+    	   RedisUtil redisUtil= (RedisUtil) SystemResourceLocator.getBean(RedisUtil.class);
+    	   
+    	   //如果当前的数据大于25条,先删最右边的
+    	   Long countNum=getTbWxTemplateSize();
+    	   logger.info("当前模板数量为"+countNum+"...");
+		   if(countNum>=25){
+			  logger.info("当前模板数量为25,开始启动删除模板任务");
+			  //通过key从list尾部删除一个value,并返回该元素 
+			  String jsonStr=redisUtil.rpop(TaskConstant.TEMP_KEY);
+			  TbWxTemplate temp= JSONObject.toJavaObject(JSONObject.parseObject(jsonStr), TbWxTemplate.class);
+			  JSONObject reqObj= NewsUtil.delTemplate(accessToken,temp.getTemplateId());
+   		      logger.info("当前模板数量为25,开始启动删除模板任务,结束返回值:"+reqObj.toJSONString());
+   		      if("0".equals(reqObj.get("errcode").toString())){
+   		    	  temp.setStatus(2);
+   		    	  wxTemplateService.save(temp); 
+   		      }
+   	       }  
+		   
+		   TbWxTemplate tbWxTemplate=null;
+    	   String templateId= NewsUtil.getTemplate(accessToken, templateIdShort);//添加模板ID
+    	   if(StringUtils.isNotEmpty(templateId)){
+    		       tbWxTemplate=new TbWxTemplate();
+    		       String keyType=TaskConstant.TEMP_KEY;
+				   tbWxTemplate.setAppid(WeixinUtil.APPID);
+				   tbWxTemplate.setLastTime(new Date());
+				   tbWxTemplate.setStatus(1);
+				   tbWxTemplate.setTemplateId(templateId);
+				   tbWxTemplate.setTemplateIdShort(templateIdShort);
+				   tbWxTemplate.setCreateTime(new Date());
+				   tbWxTemplate.setNumber(1);
+				   if(startTime!=null && endTime!=null){
+			    	  tbWxTemplate.setStartTime(startTime);
+			    	  tbWxTemplate.setEndTime(endTime);
+			    	  keyType=TaskConstant.LONG_KEY;
+				   }
+				   tbWxTemplate=wxTemplateService.save(tbWxTemplate);//保存到数据,得到id
+				   
+				   //开始操作redis
+				   //通过key向list头部添加字符串
+				   redisUtil.lpush(
+						   keyType,
+						   JSONObject.toJSONString(tbWxTemplate));
+				
+				   //添加数据库明细数据,异步任务-----开始
+				   //ThreadPool.getInstance().addTask(new SyncTemplateInfoSaveTask(templateIdShort,tbWxTemplate,accessToken));
+				   
+				   SyncTemplateInfoSaveTask syncTemplateInfoSaveTask = (SyncTemplateInfoSaveTask) SystemResourceLocator.getBean(SyncTemplateInfoSaveTask.class);
+				   syncTemplateInfoSaveTask.run(templateIdShort,tbWxTemplate,accessToken);
+				   
+				   
+				   //添加数据库明细数据,异步任务-----结束
+    	    }
+		   return tbWxTemplate;
+    }
+    
+
+}