@SpringBootApplication
public class CuratorApplication implements InitializingBean, DisposableBean {
private AtomicInteger atomicInteger;
private ExecutorService executorService;
@Autowired
private CuratorFramework curatorFramework;
public static void main(String[] args) {
CuratorApplication app = SpringApplication.run(CuratorApplication.class, args).getBean(CuratorApplication.class);
app.tryCuratorLock();
}
private void tryCuratorLock() {
final Map<String, Long> t = new HashMap<String, Long>() {{
put("st", System.currentTimeMillis());
}};
Runnable r = () -> {
while(true) {
int calls = atomicInteger.incrementAndGet();
try {
interProcessMutex();
} catch (Exception e) {
e.printStackTrace();
}
if (calls % 1000 == 0) {
long et = System.currentTimeMillis();
long st = t.get("st");
System.out.println("call " + calls + " times, consume: "+ (et-st));
t.put("st", et);
}
}
};
for (int i=0; i<15; i++) {
executorService.submit(r);
}
}
private void interProcessMutex() throws Exception {
InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/lock-path");
try {
interProcessMutex.acquire();
// Do nothing
} finally {
interProcessMutex.release();
}
}
@Override
public void afterPropertiesSet() throws Exception {
this.atomicInteger = new AtomicInteger();
this.executorService = Executors.newFixedThreadPool(15);
}
@Override
public void destroy() throws Exception {
this.executorService.shutdown();
}
}
@Configuration
public class CuratorConfiguration {
@Autowired
CuratorConfigurationProperties curatorConfiguration;
@Bean
public CuratorFramework curatorFramework() {
RetryPolicy retryPolicy = new RetryNTimes(curatorConfiguration.getRetryCount(), curatorConfiguration.getElapsedTimeMs());
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(curatorConfiguration.getConnectString())
.sessionTimeoutMs(curatorConfiguration.getSessionTimeoutMs())
.connectionTimeoutMs(curatorConfiguration.getConnectionTimeoutMs())
.retryPolicy(retryPolicy)
.build();
return curatorFramework;
}
}
curator.retryCount=5
curator.elapsedTimeMs=5000
#curator.connectString=localhost:2181
curator.connectString=172.22.122.21:2181,172.22.122.21:2182,172.22.122.21:2183
curator.sessionTimeoutMs=60000
curator.connectionTimeoutMs=5000
我们用一个空业务逻辑来进行锁的性能测试:
call 1000 times, consume: 6273
call 2000 times, consume: 6073
call 3000 times, consume: 6184
call 4000 times, consume: 6200
call 5000 times, consume: 5996
call 6000 times, consume: 6333
call 7000 times, consume: 6185
call 8000 times, consume: 6159
call 9000 times, consume: 7171
平均一千次调用需要花费6秒左右,这还仅仅是3点的Zk集群,如果点数更多,同步时间更长,吞吐量会更低
网友评论