(1)Maven工程,pom增加:
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.11.0</version> </dependency>
会自动下载curator-framework、curator-client、zookeeper等依赖的jar包
(2)spring配置增加:
<!-- 重连策略 --> <bean id="retryPolicy" class="org.apache.curator.retry.ExponentialBackoffRetry"> <constructor-arg index="0" value="1000" /> <!-- 间隔时间基数 --> <constructor-arg index="1" value="3" /><!-- 最多重试几次 --> </bean>
<bean id="curatorClient" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient" init-method="start"> <constructor-arg index="0" value="${curator.server.list}" /> <constructor-arg index="1" value="${curator.session.timeout}" /><!-- sessionTimeoutMs会话超时时间,单位为毫秒。默认是60000ms --> <constructor-arg index="2" value="${curator.connection.timeout}" /><!-- connectionTimeoutMs连接创建超时时间,单位毫秒,默认15000ms --> <constructor-arg index="3" ref="retryPolicy" /> </bean>
(3)properties文件增加:
curator.server.list=localhost:2181 curator.session.timeout=15000 curator.connection.timeout=10000
这里zookeeper地址只写了一个,实际环境应该部署多个,这里逗号分隔写上就行。
(4)定义job父类文件:
package com.xxx.quartzJob;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired;
/** * 执行前,需要抢锁的Job. * <p> * 抽象类,不用声明为Service. * </p> * */ public abstract class NeedLockJob {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired private CuratorFramework curatorClient;
/** * 锁路径. */ private static final String LOCK_BASE_PATH = "/lock/batch/";
/** * 执行入口. */ public void execute() {
// 可重入共享锁 InterProcessMutex sharedLock = new InterProcessMutex(curatorClient, LOCK_BASE_PATH + this.getLockName()); try {
// 尝试获取锁 boolean isGetLock = sharedLock.acquire(1000, TimeUnit.MILLISECONDS); if (isGetLock) { this.executeInternel(); }
} catch (Exception e) { // Zookeeper错误或连接中断 logger.error("抢锁时出错", e);
} finally { try {
// 持有锁,则释放锁 if (sharedLock.isAcquiredInThisProcess()) { sharedLock.release(); } } catch (Exception e) { logger.error("释放锁时出错", e); }
} }
/** * 获取锁名称. * * @return */ protected abstract String getLockName();
/** * 执行任务. */ protected abstract void executeInternel();
}
(5)然后子类继承它即可。 这样工程可以部署多个服务器,执行定时任务时,先抢锁,抢锁成功的那个机器执行任务。 挂掉一台机器,无所谓,因为每台机器都可以跑所有任务。
不过有个前提吧: (1)每台机器定时任务时间应该一致。 (2)每台机器的时间应该大概一致。 (3)每个任务至少要运行1秒钟以上。 否则,一台机器抢锁成功后,比如100毫秒就结束了,锁就释放了, 另一台机器又来抢锁,也能抢到了,导致任务重复执行。
不过,相信100毫秒能跑完的任务,应该是啥都没干,重复了也无所谓。
另外一个重要点: 最好能保证操作的幂等性,即可以重复操作多次,不影响。 这样就啥都不怕了。 ------------------------------------ 另外,curator也支持选举机制,就是多台机器,只有一个Leader, 这样可以由Leader的执行任务。 Leader挂了后,zookeeper会自动选举一个新Leader,这样由新Leader执行任务。 利用这个可以实现主备机制。
|