背景:公众号换主体,要迁移,粉丝(openId)的业务数据要做处理.
第一步:参照我的另一篇文章,Java 导出微信公众号粉丝。
第二部:数据处理(master-worker模式)
程序主入口:Main
我导出来的粉丝文件格式是:
{ "info":[ { "openId":"ogVous494ltuNmO4zHb1seHeGLSk"} ..... 1万条 ]}
package changeOpenId;import java.util.List;import java.util.Map;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import com.odao.weixin.api.support.AccessTokenKit;import com.odao.weixin.site.cases2017.change.service.ChangeService;import com.odao.weixin.site.cases2017.push.entity.JsonDataReadUtil;/** * 多线程转换openId * @author wangfj */public class ChangeMain { @SuppressWarnings({ "unchecked", "static-access", "resource" }) public static void main(String[] args) throws Exception { ApplicationContext appContext = new ClassPathXmlApplicationContext(new String[] {"odao-weixin-site-servlet.xml"}); String token = AccessTokenKit.getTokenNew("APPID", "APP秘钥"); String accesstoken = (String) ((Map) JSON.parseObject(token, Map.class)).get("access_token"); //根据粉丝文件来读取数据 JSONObject openIdJson = JsonDataReadUtil.getReadJsonByPath("第一步导出来的文件"); String info= openIdJson.get("info").toString(); JSONArray jsonArr = JSONObject.parseArray(info); List
Master:
package changeOpenId;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentLinkedQueue;import org.springframework.context.ApplicationContext;/** * Master任务指派者,分发者 * @author wangfj */public class ChangeMaster{ private ConcurrentLinkedQueue
>> workerQueue = new ConcurrentLinkedQueue
>>(); //存放所有的工作者 private HashMap workers = new HashMap (); //存放数据的结果集 private ConcurrentHashMap resultMap = new ConcurrentHashMap (); //构造master public ChangeMaster(Worker worker,int workerCount,String accessToken,ApplicationContext appContext){ worker.setApplicationContext(appContext); worker.setWorkerQueue(this.workerQueue); worker.setAccessToken(accessToken); worker.setResultMap(resultMap); for(int i=0;i > list){ this.workerQueue.add(list); } //执行 public void execute(){ for(Map.Entry me :workers.entrySet()){ me.getValue().start(); } } //获得执行结果集 public long getResult(){ long result = 0l; for(Map.Entry me :resultMap.entrySet()){ result += (Long)me.getValue(); } return result; } //所有子线程是否执行完毕 public boolean isComplate() { for(Map.Entry me :workers.entrySet()){ if(Thread.State.TERMINATED != me.getValue().getState()){ return false; } } return true; }}
Worker(转换openId核心代码):
package changeOpenId;import java.io.BufferedReader;import java.io.DataOutputStream;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.OutputStreamWriter;import java.net.HttpURLConnection;import java.net.URL;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.Map;import org.apache.commons.httpclient.HttpStatus;import org.apache.http.HttpResponse;import org.apache.http.client.methods.HttpPost;import org.apache.http.entity.StringEntity;import org.apache.http.impl.client.DefaultHttpClient;import org.apache.http.message.BasicHeader;import org.apache.http.protocol.HTTP;import org.springframework.context.ApplicationContext;import org.springframework.stereotype.Service;import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import com.odao.weixin.site.cases2017.change.service.ChangeService;@Servicepublic class ChangeWorker extends Worker{ public static String url = "http://api.weixin.qq.com/cgi-bin/changeopenid?access_token="; //微信提供中的openId转换接口 public static long handle(List> list,String accessToken,ApplicationContext appContext) throws Exception { ChangeService changeService = (ChangeService) appContext.getBean("changeService");//我处理数据的业务类 long end = System.currentTimeMillis(); JSONObject params = new JSONObject(); params.put("from_appid", "xxxx");//此处from_appid为原帐号的appid ArrayList openIds = new ArrayList (); for(int i=0;i > obj = arr.toJavaObject(arr, List.class); if(!obj.get(0).get("err_msg").equals("ori_openid error")){ List > openIdObj = arr.toJavaObject(arr, List.class); changeService.batchUpdateAccountsMapping(openIdObj); } }else{ System.out.println("请求微信转换接口返回异常"); } return System.currentTimeMillis()-end; } public static String JsonSMS(String postData, String token) { String result = ""; try { //发送POST请求 URL urls = new URL(url.concat(token)); HttpURLConnection conn = (HttpURLConnection) urls.openConnection(); conn.setRequestMethod("POST"); conn.setRequestProperty("Content-Type", "application/json"); conn.setRequestProperty("Connection", "Keep-Alive"); conn.setUseCaches(false); conn.setDoOutput(true); conn.setRequestProperty("Content-Length", "" + postData.length()); OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream(), "UTF-8"); out.write(postData); out.flush(); out.close(); //获取响应状态 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { System.out.println("connect failed!"); return ""; } //获取响应内容体 String line; BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "utf-8")); while ((line = in.readLine()) != null) { result += line + "\n"; } in.close(); } catch (IOException e) { e.printStackTrace(System.out); } return result; }}
package changeOpenId;import java.util.List;import java.util.Map;import java.util.Random;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentLinkedQueue;import org.springframework.context.ApplicationContext;public class Worker implements Runnable{ private ConcurrentLinkedQueue
>> workerQueue; private ConcurrentHashMap resultMap; private ApplicationContext appContext; private String accessToken; public void setWorkerQueue(ConcurrentLinkedQueue
>> workerQueue) { this.workerQueue = workerQueue; } public void setResultMap(ConcurrentHashMap resultMap) { this.resultMap = resultMap; } public void setAccessToken(String accessToken) { this.accessToken = accessToken; } @Override public void run() { while(true){ List > input = this.workerQueue.poll(); if(input==null) break; try { Random random = new Random(); long time = ChangeWorker.handle(input,accessToken,appContext); resultMap.put(String.valueOf(random.nextInt(100)), time); } catch (Exception e) { e.printStackTrace(); } } } @SuppressWarnings("unused") private static void handle(List list) {} public void setApplicationContext(ApplicationContext appContext) { this.appContext = appContext; }}
处理数据的业务类:
package com.odao.weixin.site.cases2017.change.service;import java.sql.SQLException;import java.util.List;import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationContext;import org.springframework.jdbc.core.BatchPreparedStatementSetter;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.stereotype.Service;@Servicepublic class ChangeService { static final Logger logger = LoggerFactory.getLogger(ChangeService.class); @Autowired private JdbcTemplate jdbcTemplateWebSiteActivityDB; /** * 批量更新玩家openId * @param list */ public void batchUpdateAccountsMapping(final List> list) { String sql = "update 表名 set newOpenId=? ,createTime=getdate() where openId=?"; jdbcTemplateWebSiteActivityDB.batchUpdate(sql, new BatchPreparedStatementSetter() { public int getBatchSize() { return list.size(); //这个方法设定更新记录数,通常List里面存放的都是我们要更新的,所以返回list.size(); } public void setValues(java.sql.PreparedStatement ps, int i) throws SQLException { try{ ps.setString(1, list.get(i).get("new_openid").toString()); ps.setString(2, list.get(i).get("ori_openid").toString()); }catch(Exception e){ } } }); System.out.println(Thread.currentThread().getName()+"成功改变条数:"+list.size()); }}
业务类操作的表字段最好加上索引,基本上几秒钟几万数据就跑完了。