用法: 创建多个seqService,分别指定SeqNextRangeCall, 然后把seqSerivce缓存起来,使用。
package seq; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong;
public class SeqService {
private AtomicLong seqNum = new AtomicLong(1); private long seqMax = -1; AtomicBoolean seqFirst = new AtomicBoolean(false); CountDownLatch seqLatch = new CountDownLatch(1); Object x = new Object();
SeqNextRangeCall nextCall; int rangeNum = 0; public SeqService(SeqNextRangeCall nextCall,int rangeNum){ this.nextCall = nextCall; this.rangeNum = rangeNum; } public static void main(String[] args) {
final SeqService service = new SeqService(new SeqNextRangeCall() {
@Override public long getNextRangeStart() { // 从DB获取序列 long begin = System.currentTimeMillis(); try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return begin; } },10000); for (int i = 0; i < 10; i++) { final int x = i; new Thread() {
@Override public void run() { currentThread().setName("main " + x); for(int j=0;j<1000000;j++) { System.out.println(Thread.currentThread().getName() + " 获取到:" + service.getId()); try { Thread.sleep(new Random().nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } xx2(); } public static void xx2() { final SeqService service = new SeqService(new SeqNextRangeCall() {
@Override public long getNextRangeStart() { // 从DB获取序列 String s = ""+System.currentTimeMillis(); long begin = Long.valueOf(s.substring(s.length()-5)); try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return begin; } },10000); for (int i = 0; i < 3; i++) { final int x = i; new Thread() {
@Override public void run() { currentThread().setName("sub " + x); for(int j=0;j<1000000;j++) { System.out.println(Thread.currentThread().getName() + " 获取到:" + service.getId()); try { Thread.sleep(new Random().nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } }
public long getId() { long nextId = seqNum.getAndIncrement(); if (nextId <= seqMax) { return nextId; }
//极端情况下,100个线程在此处等待,一个执行完,下一个执行。 // if(Thread.currentThread().getName().contains("2") || Thread.currentThread().getName().contains("1")) { // System.out.println(Thread.currentThread().getName() + " x.wait"); // try { // synchronized(x) { // x.wait(); // } // } catch (InterruptedException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } // System.out.println(Thread.currentThread().getName() + " up"); // } // 这里得排他一下,只有一个线程去获取,其他等待。 // 策略是初始化取max, 然后程序内部+1. if (!seqFirst.getAndSet(true)) { //再判断一次(对应极端情况。第一个线程执行完,肯定已经更新了id范围,后面的线程直接获取返回即可) nextId = seqNum.getAndIncrement(); if (nextId <= seqMax) { System.out.println(Thread.currentThread().getName() + " 二次检查,获取到ID,无需从DB获取。"); //其他线程可能在wait,所以要通知 //通知其他线程,可以获取ID了 seqLatch.countDown(); //清理变量 seqLatch = new CountDownLatch(1); seqFirst.set(false); //最后重置它 return nextId; } // 第一个线程 System.out.println(Thread.currentThread().getName() + " 获取新ID");
long begin = this.nextCall.getNextRangeStart();
// 更新序列 //可能A线程刚执行完这句,之前max是100,现在begin是201,外面B线程恰好来获取,不符合<Max的条件,会往下执行 //<1>A线程未执行完,B线程执行wait等待,没有问题 //<2>恰好A线程获取完,变量都重置了。那么B线程会再次执行DB获取 seqNum.set(begin+1); //先更新它,因为它是开始值 try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } seqMax = begin + this.rangeNum; //后更新它(如果先更新它,之前max是100,nextId是101,现在max是500,因为101<500,则会返回101,而开始值现在是400) System.out.println(Thread.currentThread().getName() + " 获取完毕。"); //通知其他线程,可以获取ID了 seqLatch.countDown(); //执行完这句,在等待的线程被唤醒 //清理变量 seqLatch = new CountDownLatch(1); //可能刚执行完这句,其他进程恰好跑到等待分支,要等下次获取才会唤醒,所以等待加了超时。 seqFirst.set(false); //最后重置它
// synchronized(x) { // x.notifyAll(); // } } else { // 其他线程来晚了,等待 System.out.println(Thread.currentThread().getName() + " 等待"); try { seqLatch.await(3, TimeUnit.SECONDS); //最多等3秒 } catch (InterruptedException e) { // TODO Auto-generated catch block System.out.println(Thread.currentThread().getName() + " InterruptedException"); e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 等待结束"); } return getId(); } }
package seq;
public interface SeqNextRangeCall {
public long getNextRangeStart(); }
package seq;
import java.util.Random; import java.util.concurrent.ConcurrentHashMap;
public class SeqCenter {
static ConcurrentHashMap<String,SeqService> serviceMap = new ConcurrentHashMap<>(); public static long getNextId(String key) { SeqService service = serviceMap.get(key); if(service!=null) { return service.getId(); }
service = new SeqService(new SeqNextRangeCall() {
@Override public long getNextRangeStart() { // 从DB获取序列 String s = ""+System.currentTimeMillis(); long begin = Long.valueOf(s.substring(s.length()-5)); try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return begin; } },10000); //如果不存在(新的entry),那么会向map中添加该键值对,并返回null。 //如果已经存在,那么不会覆盖已有的值,直接返回已经存在的值。 SeqService x = serviceMap.putIfAbsent(key, service); if(x==null) { //新的放进去了 return service.getId(); }else { //别的线程放进去了 return x.getId(); } } public static void main(String[] args) { for (int i = 0; i < 3; i++) { final int x = i; new Thread() {
@Override public void run() { currentThread().setName("sub " + x); for(int j=0;j<1000000;j++) { System.out.println(Thread.currentThread().getName() + " 获取到:" + SeqCenter.getNextId("aaaa")); try { Thread.sleep(new Random().nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } for (int i = 0; i < 3; i++) { final int x = i; new Thread() {
@Override public void run() { currentThread().setName("mmm " + x); for(int j=0;j<1000000;j++) { System.out.println(Thread.currentThread().getName() + " 获取到:" + SeqCenter.getNextId("bbbb")); try { Thread.sleep(new Random().nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } }
}
调用SeqCenter.getNextId("bbbb")这样就行了。 nextCall也传的话,感觉有点累赘呢。 要是SeqCenter里写死,又感觉不灵活。 不完美啊~~
|