[心缘地方]同学录
首页 | 功能说明 | 站长通知 | 最近更新 | 编码查看转换 | 代码下载 | 常见问题及讨论 | 《深入解析ASP核心技术》 | 王小鸭自动发工资条VBA版
登录系统:用户名: 密码: 如果要讨论问题,请先注册。

[备忘]测试并发批量update,导致deadlock的代码

上一篇:[备忘]写个bat自动恢复被暂停(挂起)的程序
下一篇:[备忘]arthas简单使用方法

添加日期:2024/9/18 10:50:06 快速返回   返回列表 阅读161次


package cn.xxx.send;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.*;
import java.util.Date;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;


public class pkgListTest {

    /**
     * 下个ID.
     */
    final static public AtomicLong seqNum = new AtomicLong(1);

    private static final String url = "jdbc:postgresql://192.168.1.10:5432/xxxTest?options=-c%20deadlock_timeout%3D2000";
    private static final String user = "postgres";
    private static final String password = "123456";

    static final boolean isSort = false;

    //deadlock_timeout,默认是1秒

    //不排序:
    //300线程,20包,执行10次有10次有deadlock,很多!
    //同上,deadlock_timeout改成2秒,还是10次deadlock,很多!

    //排序:
    //300线程,20包,执行10次有3次有deadlock,一两个。
    //同上,deadlock_timeout改成2秒,则0次deadlock。
    //1000线程,20包,deadlock_timeout改成2秒,则0次deadlock。

    //综合:排序作用很大,一两个deadlock也不是真的死锁,只是超过1秒没获取锁而已。

    public static Connection getConnection() throws SQLException {
        return DriverManager.getConnection(url, user, password);
    }

    /**
     * 模拟同时发包.
     */
    public static void main(String[] args) {

        for (int i = 0; i < 300; i++) {
            final int shopId = i;
            new Thread() {
                @Override
                public void run() {

                    //随机包数,模拟发包回来后,更改状态
                    int pkgCount = ThreadLocalRandom.current().nextInt(20);
                    for (int i = 0; i < pkgCount; i++) {

                        //前提,db已经有对应id的记录,要真的更新,否则无法出现deadlock
                        long pkgId = 35700000+seqNum.getAndIncrement();
                        xxUtil.pkgDownTimeList.add(new Object[]{new Timestamp(new Date().getTime()), pkgId});

                        //睡一会
                        int sleepTime = ThreadLocalRandom.current().nextInt(500);
                        try {
                            Thread.sleep(sleepTime);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                        //假装发包ack回来了,添加包id到List
                        long a = System.currentTimeMillis();
                        synchronized (xxUtil.pkgIdListLock) {
                            xxUtil.pkgIdList.add(new Object[]{pkgId,shopId});
                        }
                        long b = System.currentTimeMillis();
                        //System.out.println("add cost:" + (b - a) + "ms");
                    }

                    //都发完了,需要更新DB状态了
                    List<Object[]> oldList = null;
                    List<Object[]> oldTimeList = null;
                    synchronized (xxUtil.pkgIdListLock) {
                        if (xxUtil.pkgIdList.size() > 0) {

                            System.out.println("update db..pkgIdList..." + xxUtil.pkgIdList.size());

                            //切换List(旧List用来更新DB,新List用来存放新Id)
                            oldList = xxUtil.pkgIdList;
                            xxUtil.pkgIdList = new ArrayList<Object[]>();
                        }
                        //包的下发时间
                        if (xxUtil.pkgDownTimeList.size() > 0) {

                            //切换List(旧List用来更新DB,新List用来存放新Id)
                            oldTimeList = xxUtil.pkgDownTimeList;
                            xxUtil.pkgDownTimeList = new ArrayList<Object[]>();
                        }
                    }
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    //旧List
                    if (oldList != null && oldList.size() > 0) {
                        System.out.println("update db.." + oldList.size());

                        if(isSort){
                            Collections.sort(oldList, new Comparator<Object[]>() {
                                @Override
                                public int compare(Object[] o1, Object[] o2) {
                                    Long pkgId1 = (Long) o1[0];
                                    Long pkgId2 = (Long) o2[0];
                                    return pkgId1.compareTo(pkgId2);
                                }
                            });
                        }

                        StringBuilder sb = new StringBuilder();
                        for(Object[] row:oldList){
                            sb.append(row[0]).append(",");
                        }
                        System.out.println("update db:"+sb.toString());

                        try {
                            Connection conn = getConnection();
                            // 设置自动提交为 false,以便使用批量更新操作
                            conn.setAutoCommit(false);

                            // 准备批量更新语句
                            String sql = "update xx_download set state=? where id=?";
                            PreparedStatement pstmt = conn.prepareStatement(sql);

                            // 添加批量更新操作
                            for(Object[] row:oldList){
                                pstmt.setInt(1, 3);
                                pstmt.setLong(2, (Long)row[0]);
                                pstmt.addBatch();
                            }

                            // 执行批量更新
                            int[] updateCounts = pstmt.executeBatch();
                            int totalCount = 0;
                            for(int count:updateCounts){
                                totalCount += count;
                            }
                            System.out.println("更新DB完毕,共"+oldList.size()+"条,更新条数:"+totalCount);

                            // 提交事务
                            conn.commit();
                            pstmt.close();
                            conn.close();

                        } catch (SQLException e) {
                            e.printStackTrace();
                        }

//                        //模拟更新DB耗时
//                        //睡一会
//                        int sleepTime = ThreadLocalRandom.current().nextInt(500);
//                        try {
//                            Thread.sleep(200 + sleepTime);
//                        } catch (InterruptedException e) {
//                            e.printStackTrace();
//                        }
                        System.out.println("update db end");
                    }

                    //oldTimeList
                    if (oldTimeList != null && oldTimeList.size() > 0) {
                        System.out.println("update db time .." + oldTimeList.size());

                        if(isSort){
                            Collections.sort(oldTimeList, new Comparator<Object[]>() {
                                @Override
                                public int compare(Object[] o1, Object[] o2) {
                                    Long pkgId1 = (Long) o1[1];
                                    Long pkgId2 = (Long) o2[1];
                                    return pkgId1.compareTo(pkgId2);
                                }
                            });
                        }

                        StringBuilder sb = new StringBuilder();
                        for(Object[] row:oldTimeList){
                            sb.append(row[1]).append(",");
                        }
                        System.out.println("update db time:"+sb.toString());


                        try {
                            Connection conn = getConnection();
                            // 设置自动提交为 false,以便使用批量更新操作
                            conn.setAutoCommit(false);

                            // 准备批量更新语句
                            String sql = "update xx_download set download_time=? where id=?";
                            PreparedStatement pstmt = conn.prepareStatement(sql);

                            // 添加批量更新操作
                            for(Object[] row:oldTimeList){
                                pstmt.setTimestamp(1, (Timestamp)row[0]);
                                pstmt.setLong(2, (Long)row[1]);
                                pstmt.addBatch();
                            }

                            // 执行批量更新
                            int[] updateCounts = pstmt.executeBatch();
                            int totalCount = 0;
                            for(int count:updateCounts){
                                totalCount += count;
                            }
                            System.out.println("更新DB完毕,共"+oldTimeList.size()+"条,更新条数:"+totalCount);

                            // 提交事务
                            conn.commit();
                            pstmt.close();
                            conn.close();

                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
//                        //模拟更新DB耗时
//                        //睡一会
//                        int sleepTime = ThreadLocalRandom.current().nextInt(500);
//                        try {
//                            Thread.sleep(200 + sleepTime);
//                        } catch (InterruptedException e) {
//                            e.printStackTrace();
//                        }
                        System.out.println("update db time end");
                    }
                }
            }.start();
        }

        System.out.println("main sleep");
        try {
            Thread.sleep(10000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

 

评论 COMMENTS
没有评论 No Comments.

添加评论 Add new comment.
昵称 Name:
评论内容 Comment:
验证码(不区分大小写)
Validation Code:
(not case sensitive)
看不清?点这里换一张!(Change it here!)
 
评论由管理员查看后才能显示。the comment will be showed after it is checked by admin.
CopyRight © 心缘地方 2005-2999. All Rights Reserved