博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java 微信公众号迁移
阅读量:5214 次
发布时间:2019-06-14

本文共 9563 字,大约阅读时间需要 31 分钟。

背景:公众号换主体,要迁移,粉丝(openId)的业务数据要做处理.

第一步:参照我的另一篇文章,Java 导出微信公众号粉丝。

第二部:数据处理(master-worker模式)

程序主入口:Main

我导出来的粉丝文件格式是:

{    "info":[        {
"openId":"ogVous494ltuNmO4zHb1seHeGLSk"}       ..... 1万条 ]}
package changeOpenId;import java.util.List;import java.util.Map;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import com.odao.weixin.api.support.AccessTokenKit;import com.odao.weixin.site.cases2017.change.service.ChangeService;import com.odao.weixin.site.cases2017.push.entity.JsonDataReadUtil;/** * 多线程转换openId * @author wangfj */public class ChangeMain {      @SuppressWarnings({ "unchecked", "static-access", "resource" })    public static void main(String[] args) throws Exception {          ApplicationContext appContext = new ClassPathXmlApplicationContext(new String[] {"odao-weixin-site-servlet.xml"});        String token = AccessTokenKit.getTokenNew("APPID", "APP秘钥");        String accesstoken = (String) ((Map) JSON.parseObject(token, Map.class)).get("access_token");        //根据粉丝文件来读取数据        JSONObject openIdJson = JsonDataReadUtil.getReadJsonByPath("第一步导出来的文件");        String info= openIdJson.get("info").toString();        JSONArray jsonArr = JSONObject.parseArray(info);        List
> list = jsonArr.toJavaObject(jsonArr, List.class);    //如果你们是直接操作数据库,可以直接写一个业务类,从数据库获取要转换的数据,如下: /*ChangeService changeService = (ChangeService) appContext.getBean("changeService"); List
> list = changeService.queryRecord(40000,50000);*/ //构造master ChangeMaster master = new ChangeMaster(new ChangeWorker(),100,accesstoken,appContext);//第二个参数100(worker工作线程数),这个根据你们自己的需求定 for(int i=0;i
> newList = list.subList(i, (i+100)>list.size()?list.size():(i+100)); master.submit(newList); } //多线程执行 master.execute(); while(true){ if(master.isComplate()){ long allTime = master.getResult(); System.out.println("主线程执行完毕,总耗时:"+allTime+"毫秒"); break; } } } }

 

 Master:

package changeOpenId;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentLinkedQueue;import org.springframework.context.ApplicationContext;/** * Master任务指派者,分发者  * @author wangfj */public class ChangeMaster{        private ConcurrentLinkedQueue
>> workerQueue = new ConcurrentLinkedQueue
>>(); //存放所有的工作者 private HashMap
workers = new HashMap
(); //存放数据的结果集 private ConcurrentHashMap
resultMap = new ConcurrentHashMap
(); //构造master public ChangeMaster(Worker worker,int workerCount,String accessToken,ApplicationContext appContext){ worker.setApplicationContext(appContext); worker.setWorkerQueue(this.workerQueue); worker.setAccessToken(accessToken); worker.setResultMap(resultMap); for(int i=0;i
> list){ this.workerQueue.add(list); } //执行 public void execute(){ for(Map.Entry
me :workers.entrySet()){ me.getValue().start(); } } //获得执行结果集 public long getResult(){ long result = 0l; for(Map.Entry
me :resultMap.entrySet()){ result += (Long)me.getValue(); } return result; } //所有子线程是否执行完毕 public boolean isComplate() { for(Map.Entry
me :workers.entrySet()){ if(Thread.State.TERMINATED != me.getValue().getState()){ return false; } } return true; }}

 Worker(转换openId核心代码):

package changeOpenId;import java.io.BufferedReader;import java.io.DataOutputStream;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.OutputStreamWriter;import java.net.HttpURLConnection;import java.net.URL;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.Map;import org.apache.commons.httpclient.HttpStatus;import org.apache.http.HttpResponse;import org.apache.http.client.methods.HttpPost;import org.apache.http.entity.StringEntity;import org.apache.http.impl.client.DefaultHttpClient;import org.apache.http.message.BasicHeader;import org.apache.http.protocol.HTTP;import org.springframework.context.ApplicationContext;import org.springframework.stereotype.Service;import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import com.odao.weixin.site.cases2017.change.service.ChangeService;@Servicepublic class ChangeWorker extends Worker{        public static String url = "http://api.weixin.qq.com/cgi-bin/changeopenid?access_token=";  //微信提供中的openId转换接口        public static long handle(List
> list,String accessToken,ApplicationContext appContext) throws Exception { ChangeService changeService = (ChangeService) appContext.getBean("changeService");//我处理数据的业务类 long end = System.currentTimeMillis(); JSONObject params = new JSONObject(); params.put("from_appid", "xxxx");//此处from_appid为原帐号的appid ArrayList
openIds = new ArrayList
(); for(int i=0;i
> obj = arr.toJavaObject(arr, List.class); if(!obj.get(0).get("err_msg").equals("ori_openid error")){ List
> openIdObj = arr.toJavaObject(arr, List.class); changeService.batchUpdateAccountsMapping(openIdObj); } }else{ System.out.println("请求微信转换接口返回异常"); } return System.currentTimeMillis()-end; } public static String JsonSMS(String postData, String token) { String result = ""; try { //发送POST请求 URL urls = new URL(url.concat(token)); HttpURLConnection conn = (HttpURLConnection) urls.openConnection(); conn.setRequestMethod("POST"); conn.setRequestProperty("Content-Type", "application/json"); conn.setRequestProperty("Connection", "Keep-Alive"); conn.setUseCaches(false); conn.setDoOutput(true); conn.setRequestProperty("Content-Length", "" + postData.length()); OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream(), "UTF-8"); out.write(postData); out.flush(); out.close(); //获取响应状态 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { System.out.println("connect failed!"); return ""; } //获取响应内容体 String line; BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "utf-8")); while ((line = in.readLine()) != null) { result += line + "\n"; } in.close(); } catch (IOException e) { e.printStackTrace(System.out); } return result; }}

 

package changeOpenId;import java.util.List;import java.util.Map;import java.util.Random;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentLinkedQueue;import org.springframework.context.ApplicationContext;public class Worker implements Runnable{        private ConcurrentLinkedQueue
>> workerQueue; private ConcurrentHashMap
resultMap; private ApplicationContext appContext; private String accessToken; public void setWorkerQueue(ConcurrentLinkedQueue
>> workerQueue) { this.workerQueue = workerQueue; } public void setResultMap(ConcurrentHashMap
resultMap) { this.resultMap = resultMap; } public void setAccessToken(String accessToken) { this.accessToken = accessToken; } @Override public void run() { while(true){ List
> input = this.workerQueue.poll(); if(input==null) break; try { Random random = new Random(); long time = ChangeWorker.handle(input,accessToken,appContext); resultMap.put(String.valueOf(random.nextInt(100)), time); } catch (Exception e) { e.printStackTrace(); } } } @SuppressWarnings("unused") private static void handle(List
list) {} public void setApplicationContext(ApplicationContext appContext) { this.appContext = appContext; }}

处理数据的业务类:

package com.odao.weixin.site.cases2017.change.service;import java.sql.SQLException;import java.util.List;import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationContext;import org.springframework.jdbc.core.BatchPreparedStatementSetter;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.stereotype.Service;@Servicepublic class ChangeService {        static final Logger logger = LoggerFactory.getLogger(ChangeService.class);        @Autowired    private JdbcTemplate jdbcTemplateWebSiteActivityDB;        /**     * 批量更新玩家openId     * @param list     */    public void batchUpdateAccountsMapping(final List
> list) { String sql = "update 表名 set newOpenId=? ,createTime=getdate() where openId=?"; jdbcTemplateWebSiteActivityDB.batchUpdate(sql, new BatchPreparedStatementSetter() { public int getBatchSize() { return list.size(); //这个方法设定更新记录数,通常List里面存放的都是我们要更新的,所以返回list.size(); } public void setValues(java.sql.PreparedStatement ps, int i) throws SQLException { try{ ps.setString(1, list.get(i).get("new_openid").toString()); ps.setString(2, list.get(i).get("ori_openid").toString()); }catch(Exception e){ } } }); System.out.println(Thread.currentThread().getName()+"成功改变条数:"+list.size()); }}

 业务类操作的表字段最好加上索引,基本上几秒钟几万数据就跑完了。

 

转载于:https://www.cnblogs.com/wangfajun/p/8399677.html

你可能感兴趣的文章
CSS笔记-文本缩略显示
查看>>
S7-200PLC间的PPI通信
查看>>
第三章家庭作业3.65
查看>>
javascript有哪些优秀的库,把你喜欢的都说出来吧
查看>>
Web后端 JAVA学习之路
查看>>
Arc076_E Connected?
查看>>
Java线程:新特征-锁(上)(转)
查看>>
MySQL Troubleshoting:Waiting on query cache mutex
查看>>
盒子模型&position定位
查看>>
docker容器里设置中文时区
查看>>
微服务应用日志处理与组件封装
查看>>
springmvc的异常处理
查看>>
Python+Selenium与Chrome如何进行完美结合
查看>>
Windows10一周年庆典壁纸
查看>>
kibana对logstash监控获取不到数据
查看>>
UPC 2224 Boring Counting ★(山东省第四届ACM程序设计竞赛 tag:线段树)
查看>>
IIS7上设置MIME让其支持android和Iphone的更新下载
查看>>
数据库系统原理
查看>>
leetcode 947. Most Stones Removed with Same Row or Column
查看>>
mong 按 geometry 搜索 地理位置信息
查看>>