[心缘地方]同学录
首页 | 功能说明 | 站长通知 | 最近更新 | 编码查看转换 | 代码下载 | 常见问题及讨论 | 《深入解析ASP核心技术》 | 王小鸭自动发工资条VBA版
登录系统:用户名: 密码: 如果要讨论问题,请先注册。

[备忘]java缓存id范围,并发获取只有一个执行。

上一篇:[备忘]FileInputStream和FileOutputStream对象,引起的大量Finalizer对象,内存溢出?
下一篇:[备忘]netty的DirectByteBuffer备忘

添加日期:2021/9/10 10:09:03 快速返回   返回列表 阅读783次
用法:
创建多个seqService,分别指定SeqNextRangeCall,
然后把seqSerivce缓存起来,使用。


package seq;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class SeqService {

    private AtomicLong seqNum = new AtomicLong(1);
    private long seqMax = -1;
    AtomicBoolean seqFirst = new AtomicBoolean(false);
    CountDownLatch seqLatch = new CountDownLatch(1);
    Object x = new Object();

    SeqNextRangeCall nextCall;
    int rangeNum = 0;
    
    public SeqService(SeqNextRangeCall nextCall,int rangeNum){
        this.nextCall = nextCall;
        this.rangeNum = rangeNum;
    }
    public static void main(String[] args) {

        final SeqService service = new SeqService(new SeqNextRangeCall() {

            @Override
            public long getNextRangeStart() {
                
                // 从DB获取序列
                long begin = System.currentTimeMillis();
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return begin;
            }
            
        },10000);
        for (int i = 0; i < 10; i++) {
            final int x = i;
            new Thread() {

                @Override
                public void run() {
                    currentThread().setName("main " + x);
                    
                    for(int j=0;j<1000000;j++) {
                        
                        System.out.println(Thread.currentThread().getName() + " 获取到:" + service.getId());
                        try {
                            Thread.sleep(new Random().nextInt(10));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    
                }
            }.start();
        }
        
        xx2();
    }
    
    public static void xx2() {
        final SeqService service = new SeqService(new SeqNextRangeCall() {

            @Override
            public long getNextRangeStart() {
                
                // 从DB获取序列
                String s = ""+System.currentTimeMillis();
                long begin = Long.valueOf(s.substring(s.length()-5));
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return begin;
            }
            
        },10000);
        for (int i = 0; i < 3; i++) {
            final int x = i;
            new Thread() {

                @Override
                public void run() {
                    currentThread().setName("sub " + x);
                    
                    for(int j=0;j<1000000;j++) {
                        
                        System.out.println(Thread.currentThread().getName() + " 获取到:" + service.getId());
                        try {
                            Thread.sleep(new Random().nextInt(10));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    
                }
            }.start();
        }
    }

    public long getId() {
        long nextId = seqNum.getAndIncrement();
        if (nextId <= seqMax) {
            return nextId;
        }

        //极端情况下,100个线程在此处等待,一个执行完,下一个执行。
//        if(Thread.currentThread().getName().contains("2") || Thread.currentThread().getName().contains("1")) {
//            System.out.println(Thread.currentThread().getName() + " x.wait");
//            try {
//                synchronized(x) {
//                    x.wait();
//                }
//            } catch (InterruptedException e) {
//                // TODO Auto-generated catch block
//                e.printStackTrace();
//            }
//            System.out.println(Thread.currentThread().getName() + " up");
//        }
        
        // 这里得排他一下,只有一个线程去获取,其他等待。
        // 策略是初始化取max, 然后程序内部+1.
        if (!seqFirst.getAndSet(true)) {
            
            //再判断一次(对应极端情况。第一个线程执行完,肯定已经更新了id范围,后面的线程直接获取返回即可)
            nextId = seqNum.getAndIncrement();
            if (nextId <= seqMax) {
                System.out.println(Thread.currentThread().getName() + " 二次检查,获取到ID,无需从DB获取。");
                
                //其他线程可能在wait,所以要通知
                
                //通知其他线程,可以获取ID了
                seqLatch.countDown();
                
                //清理变量
                seqLatch = new CountDownLatch(1);
                seqFirst.set(false); //最后重置它
                
                return nextId;
            }
            
            
            // 第一个线程
            System.out.println(Thread.currentThread().getName() + " 获取新ID");

            long begin = this.nextCall.getNextRangeStart();

            // 更新序列
            //可能A线程刚执行完这句,之前max是100,现在begin是201,外面B线程恰好来获取,不符合<Max的条件,会往下执行
            //<1>A线程未执行完,B线程执行wait等待,没有问题
            //<2>恰好A线程获取完,变量都重置了。那么B线程会再次执行DB获取
            seqNum.set(begin+1);  //先更新它,因为它是开始值
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            seqMax = begin + this.rangeNum; //后更新它(如果先更新它,之前max是100,nextId是101,现在max是500,因为101<500,则会返回101,而开始值现在是400)
            System.out.println(Thread.currentThread().getName() + " 获取完毕。");
            
            //通知其他线程,可以获取ID了
            seqLatch.countDown(); //执行完这句,在等待的线程被唤醒
            
            //清理变量
            seqLatch = new CountDownLatch(1); //可能刚执行完这句,其他进程恰好跑到等待分支,要等下次获取才会唤醒,所以等待加了超时。
            seqFirst.set(false); //最后重置它

//            synchronized(x) {
//                x.notifyAll();
//            }
        } else {
            // 其他线程来晚了,等待
            System.out.println(Thread.currentThread().getName() + " 等待");
            try {
                seqLatch.await(3, TimeUnit.SECONDS); //最多等3秒
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                System.out.println(Thread.currentThread().getName() + " InterruptedException");
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 等待结束");
        }
        return getId();
    }
}




package seq;

public interface SeqNextRangeCall {

    public long getNextRangeStart();
}




package seq;

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;

public class SeqCenter {

    static ConcurrentHashMap<String,SeqService> serviceMap = new ConcurrentHashMap<>();
    
    public static long getNextId(String key) {
        SeqService service = serviceMap.get(key);
        if(service!=null) {
            return service.getId();
        }
        

        service = new SeqService(new SeqNextRangeCall() {

            @Override
            public long getNextRangeStart() {
                
                // 从DB获取序列
                String s = ""+System.currentTimeMillis();
                long begin = Long.valueOf(s.substring(s.length()-5));
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return begin;
            }
            
        },10000); 
        
        //如果不存在(新的entry),那么会向map中添加该键值对,并返回null。
        //如果已经存在,那么不会覆盖已有的值,直接返回已经存在的值。
        SeqService x = serviceMap.putIfAbsent(key, service);
        if(x==null) {
            //新的放进去了
            return service.getId();
        }else {
            //别的线程放进去了
            return x.getId();
        }
        
    }
    
    
    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            final int x = i;
            new Thread() {

                @Override
                public void run() {
                    currentThread().setName("sub " + x);
                    
                    for(int j=0;j<1000000;j++) {
                        
                        System.out.println(Thread.currentThread().getName() + " 获取到:" + SeqCenter.getNextId("aaaa"));
                        try {
                            Thread.sleep(new Random().nextInt(10));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    
                }
            }.start();
        }
        
        for (int i = 0; i < 3; i++) {
            final int x = i;
            new Thread() {

                @Override
                public void run() {
                    currentThread().setName("mmm " + x);
                    
                    for(int j=0;j<1000000;j++) {
                        
                        System.out.println(Thread.currentThread().getName() + " 获取到:" + SeqCenter.getNextId("bbbb"));
                        try {
                            Thread.sleep(new Random().nextInt(10));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    
                }
            }.start();
        }
        
    }
    

}




调用SeqCenter.getNextId("bbbb")这样就行了。
nextCall也传的话,感觉有点累赘呢。
要是SeqCenter里写死,又感觉不灵活。
不完美啊~~
 

评论 COMMENTS
没有评论 No Comments.

添加评论 Add new comment.
昵称 Name:
评论内容 Comment:
验证码(不区分大小写)
Validation Code:
(not case sensitive)
看不清?点这里换一张!(Change it here!)
 
评论由管理员查看后才能显示。the comment will be showed after it is checked by admin.
CopyRight © 心缘地方 2005-2999. All Rights Reserved