package cn.xxx.send; import java.net.URL; import java.net.URLClassLoader; import java.sql.*; import java.util.Date; import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong;
public class pkgListTest {
/** * 下个ID. */ final static public AtomicLong seqNum = new AtomicLong(1);
private static final String url = "jdbc:postgresql://192.168.1.10:5432/xxxTest?options=-c%20deadlock_timeout%3D2000"; private static final String user = "postgres"; private static final String password = "123456";
static final boolean isSort = false;
//deadlock_timeout,默认是1秒
//不排序: //300线程,20包,执行10次有10次有deadlock,很多! //同上,deadlock_timeout改成2秒,还是10次deadlock,很多!
//排序: //300线程,20包,执行10次有3次有deadlock,一两个。 //同上,deadlock_timeout改成2秒,则0次deadlock。 //1000线程,20包,deadlock_timeout改成2秒,则0次deadlock。
//综合:排序作用很大,一两个deadlock也不是真的死锁,只是超过1秒没获取锁而已。
public static Connection getConnection() throws SQLException { return DriverManager.getConnection(url, user, password); }
/** * 模拟同时发包. */ public static void main(String[] args) {
for (int i = 0; i < 300; i++) { final int shopId = i; new Thread() { @Override public void run() {
//随机包数,模拟发包回来后,更改状态 int pkgCount = ThreadLocalRandom.current().nextInt(20); for (int i = 0; i < pkgCount; i++) {
//前提,db已经有对应id的记录,要真的更新,否则无法出现deadlock long pkgId = 35700000+seqNum.getAndIncrement(); xxUtil.pkgDownTimeList.add(new Object[]{new Timestamp(new Date().getTime()), pkgId});
//睡一会 int sleepTime = ThreadLocalRandom.current().nextInt(500); try { Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); }
//假装发包ack回来了,添加包id到List long a = System.currentTimeMillis(); synchronized (xxUtil.pkgIdListLock) { xxUtil.pkgIdList.add(new Object[]{pkgId,shopId}); } long b = System.currentTimeMillis(); //System.out.println("add cost:" + (b - a) + "ms"); }
//都发完了,需要更新DB状态了 List<Object[]> oldList = null; List<Object[]> oldTimeList = null; synchronized (xxUtil.pkgIdListLock) { if (xxUtil.pkgIdList.size() > 0) {
System.out.println("update db..pkgIdList..." + xxUtil.pkgIdList.size());
//切换List(旧List用来更新DB,新List用来存放新Id) oldList = xxUtil.pkgIdList; xxUtil.pkgIdList = new ArrayList<Object[]>(); } //包的下发时间 if (xxUtil.pkgDownTimeList.size() > 0) {
//切换List(旧List用来更新DB,新List用来存放新Id) oldTimeList = xxUtil.pkgDownTimeList; xxUtil.pkgDownTimeList = new ArrayList<Object[]>(); } } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }
//旧List if (oldList != null && oldList.size() > 0) { System.out.println("update db.." + oldList.size());
if(isSort){ Collections.sort(oldList, new Comparator<Object[]>() { @Override public int compare(Object[] o1, Object[] o2) { Long pkgId1 = (Long) o1[0]; Long pkgId2 = (Long) o2[0]; return pkgId1.compareTo(pkgId2); } }); }
StringBuilder sb = new StringBuilder(); for(Object[] row:oldList){ sb.append(row[0]).append(","); } System.out.println("update db:"+sb.toString());
try { Connection conn = getConnection(); // 设置自动提交为 false,以便使用批量更新操作 conn.setAutoCommit(false);
// 准备批量更新语句 String sql = "update xx_download set state=? where id=?"; PreparedStatement pstmt = conn.prepareStatement(sql);
// 添加批量更新操作 for(Object[] row:oldList){ pstmt.setInt(1, 3); pstmt.setLong(2, (Long)row[0]); pstmt.addBatch(); }
// 执行批量更新 int[] updateCounts = pstmt.executeBatch(); int totalCount = 0; for(int count:updateCounts){ totalCount += count; } System.out.println("更新DB完毕,共"+oldList.size()+"条,更新条数:"+totalCount);
// 提交事务 conn.commit(); pstmt.close(); conn.close();
} catch (SQLException e) { e.printStackTrace(); }
// //模拟更新DB耗时 // //睡一会 // int sleepTime = ThreadLocalRandom.current().nextInt(500); // try { // Thread.sleep(200 + sleepTime); // } catch (InterruptedException e) { // e.printStackTrace(); // } System.out.println("update db end"); }
//oldTimeList if (oldTimeList != null && oldTimeList.size() > 0) { System.out.println("update db time .." + oldTimeList.size());
if(isSort){ Collections.sort(oldTimeList, new Comparator<Object[]>() { @Override public int compare(Object[] o1, Object[] o2) { Long pkgId1 = (Long) o1[1]; Long pkgId2 = (Long) o2[1]; return pkgId1.compareTo(pkgId2); } }); }
StringBuilder sb = new StringBuilder(); for(Object[] row:oldTimeList){ sb.append(row[1]).append(","); } System.out.println("update db time:"+sb.toString());
try { Connection conn = getConnection(); // 设置自动提交为 false,以便使用批量更新操作 conn.setAutoCommit(false);
// 准备批量更新语句 String sql = "update xx_download set download_time=? where id=?"; PreparedStatement pstmt = conn.prepareStatement(sql);
// 添加批量更新操作 for(Object[] row:oldTimeList){ pstmt.setTimestamp(1, (Timestamp)row[0]); pstmt.setLong(2, (Long)row[1]); pstmt.addBatch(); }
// 执行批量更新 int[] updateCounts = pstmt.executeBatch(); int totalCount = 0; for(int count:updateCounts){ totalCount += count; } System.out.println("更新DB完毕,共"+oldTimeList.size()+"条,更新条数:"+totalCount);
// 提交事务 conn.commit(); pstmt.close(); conn.close();
} catch (SQLException e) { e.printStackTrace(); } // //模拟更新DB耗时 // //睡一会 // int sleepTime = ThreadLocalRandom.current().nextInt(500); // try { // Thread.sleep(200 + sleepTime); // } catch (InterruptedException e) { // e.printStackTrace(); // } System.out.println("update db time end"); } } }.start(); }
System.out.println("main sleep"); try { Thread.sleep(10000000); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
|