Skip to content

Commit

Permalink
Merge branch '3.4' of https://github.com/Parth-Brahmbhatt/zkclient in…
Browse files Browse the repository at this point in the history
…to Parth-Brahmbhatt-3.4

* '3.4' of https://github.com/Parth-Brahmbhatt/zkclient:
  upgrading ivy.xml to zk 3.4.3
  upgrading to zookeeper 3.4.3, supporting multi
  • Loading branch information
jzillmann committed Apr 15, 2015
2 parents e2b7560 + 55b5cf4 commit d85741e
Show file tree
Hide file tree
Showing 15 changed files with 90 additions and 26 deletions.
14 changes: 6 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ repositories {
//}
}
dependencies {
//compile 'com.101tec:zkclient:0.2'
compile ('log4j:log4j:1.2.14') {
exclude group: "com.sun.jdmk", module: "jmxtools"
exclude group: "com.sun.jmx", module: "jmxri"
exclude group: "javax.jms", module: "jms"
}
compile ('org.apache.zookeeper:zookeeper:3.3.1') {
exclude group: "com.sun.jdmk", module: "jmxtools"
compile ('log4j:log4j:1.2.15') {
exclude group: "com.sun.jdmk", module: "jmxtools"
exclude group: "com.sun.jmx", module: "jmxri"
exclude group: "javax.jms", module: "jms"
}
compile 'org.apache.zookeeper:zookeeper:3.4.3'
compile 'org.slf4j:slf4j-api:1.6.1'
compile 'org.slf4j:slf4j-log4j12:1.6.1'

testCompile 'junit:junit:4.7'
testCompile 'commons-io:commons-io:1.4'
testCompile 'org.mockito:mockito-core:1.8.0'
Expand Down
Binary file removed lib/log4j-1.2.14.jar
Binary file not shown.
Binary file added lib/log4j-1.2.15.jar
Binary file not shown.
Binary file added lib/slf4j-api-1.6.1.jar
Binary file not shown.
Binary file added lib/slf4j-log4j12-1.6.1.jar
Binary file not shown.
Binary file removed lib/zookeeper-3.3.1.jar
Binary file not shown.
Binary file added lib/zookeeper-3.4.3.jar
Binary file not shown.
4 changes: 4 additions & 0 deletions src/main/java/org/I0Itec/zkclient/IZkConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.ACL;
Expand Down Expand Up @@ -52,5 +54,7 @@ public interface IZkConnection {

public String getServers();

public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException;

public void addAuthInfo(String scheme, byte[] auth);
}
30 changes: 30 additions & 0 deletions src/main/java/org/I0Itec/zkclient/InMemoryConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
Expand All @@ -43,6 +45,10 @@
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.CheckVersionRequest;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.SetDataRequest;

/**
* Emulating a ZooKeeper server with few hash tables. Basically a mock class used for testing. Please avoid using this
Expand Down Expand Up @@ -347,6 +353,30 @@ public String getServers() {
return "mem";
}

public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException {
List<OpResult> opResults = new ArrayList<OpResult>();
for (Op op : ops) {
if (Op.Check.class.isAssignableFrom(op.getClass())) {
CheckVersionRequest check = (CheckVersionRequest) op.toRequestRecord();
exists(check.getPath(), false);
opResults.add(new OpResult.CheckResult());
} else if (Op.Create.class.isAssignableFrom(op.getClass())) {
CreateRequest create = (CreateRequest) op.toRequestRecord();
String path = create(create.getPath(), create.getData(), CreateMode.fromFlag(create.getFlags()));
opResults.add(new OpResult.CreateResult(path));
} else if (Op.Delete.class.isAssignableFrom(op.getClass())) {
DeleteRequest delete = (DeleteRequest) op.toRequestRecord();
delete(delete.getPath());
opResults.add(new OpResult.DeleteResult());
} else if (Op.SetData.class.isAssignableFrom(op.getClass())) {
SetDataRequest setData = (SetDataRequest) op.toRequestRecord();
writeData(setData.getPath(), setData.getData(), setData.getVersion());
opResults.add(new OpResult.SetDataResult(null));
}
}
return opResults;
}

@Override
public void addAuthInfo(String scheme, byte[] auth) {
_ids.add(new Id(scheme, new String(auth)));
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/I0Itec/zkclient/ZkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
Expand Down Expand Up @@ -1188,4 +1190,18 @@ public int numberOfListeners() {

return listeners;
}

public List<OpResult> multi(final Iterable<Op> ops) throws ZkException {
if (ops == null) {
throw new NullPointerException("ops must not be null.");
}

return retryUntilConnected(new Callable<List<OpResult>>() {

@Override
public List<OpResult> call() throws Exception {
return _connection.multi(ops);
}
});
}
}
6 changes: 6 additions & 0 deletions src/main/java/org/I0Itec/zkclient/ZkConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
Expand Down Expand Up @@ -143,6 +145,10 @@ public String getServers() {
return _servers;
}

public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException {
return _zk.multi(ops);
}

@Override
public void addAuthInfo(String scheme, byte[] auth) {
_zk.addAuthInfo(scheme, auth);
Expand Down
20 changes: 11 additions & 9 deletions src/main/java/org/I0Itec/zkclient/ZkServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;

public class ZkServer {

Expand All @@ -44,7 +43,7 @@ public class ZkServer {
private IDefaultNameSpace _defaultNameSpace;

private ZooKeeperServer _zk;
private Factory _nioFactory;
private NIOServerCnxnFactory _nioFactory;
private ZkClient _zkClient;
private int _port;
private int _tickTime;
Expand All @@ -57,17 +56,18 @@ public ZkServer(String dataDir, String logDir, IDefaultNameSpace defaultNameSpac
public ZkServer(String dataDir, String logDir, IDefaultNameSpace defaultNameSpace, int port) {
this(dataDir, logDir, defaultNameSpace, port, DEFAULT_TICK_TIME);
}
public ZkServer(String dataDir, String logDir, IDefaultNameSpace defaultNameSpace, int port, int tickTime) {
this(dataDir, logDir, defaultNameSpace, port, tickTime, DEFAULT_MIN_SESSION_TIMEOUT);
}

public ZkServer(String dataDir, String logDir, IDefaultNameSpace defaultNameSpace, int port, int tickTime) {
this(dataDir, logDir, defaultNameSpace, port, tickTime, DEFAULT_MIN_SESSION_TIMEOUT);
}

public ZkServer(String dataDir, String logDir, IDefaultNameSpace defaultNameSpace, int port, int tickTime, int minSessionTimeout) {
_dataDir = dataDir;
_logDir = logDir;
_defaultNameSpace = defaultNameSpace;
_port = port;
_tickTime = tickTime;
_minSessionTimeout = minSessionTimeout;
_minSessionTimeout = minSessionTimeout;
}

public int getPort() {
Expand Down Expand Up @@ -131,8 +131,10 @@ private void startZooKeeperServer() {
private void startSingleZkServer(final int tickTime, final File dataDir, final File dataLogDir, final int port) {
try {
_zk = new ZooKeeperServer(dataDir, dataLogDir, tickTime);
_zk.setMinSessionTimeout(_minSessionTimeout);
_nioFactory = new NIOServerCnxn.Factory(new InetSocketAddress(port));
_zk.setMinSessionTimeout(_minSessionTimeout);
_nioFactory = new NIOServerCnxnFactory();
int maxClientConnections = 0; // 0 means unlimited
_nioFactory.configure(new InetSocketAddress(port), maxClientConnections);
_nioFactory.startup(_zk);
} catch (IOException e) {
throw new ZkException("Unable to start single ZooKeeper server.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ public void tearDown() throws Exception {
LOG.info("------------ AFTER -------------");
}

@Test(expected = ZkTimeoutException.class, timeout = 5000)
@Test(expected = ZkTimeoutException.class, timeout = 15000)
public void testUnableToConnect() throws Exception {
LOG.info("--- testUnableToConnect");
// we are using port 4711 to avoid conflicts with the zk server that is
// started by the Spring context
new ZkClient("localhost:4712", 1000);
new ZkClient("localhost:4712", 5000);
}

@Test
Expand Down
15 changes: 8 additions & 7 deletions src/test/java/org/I0Itec/zkclient/ServerZkClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@
import org.junit.Test;

public class ServerZkClientTest extends AbstractBaseZkClientTest {
private static final int CONNECTION_TIMEOUT = 15000;
private AtomicInteger _counter = new AtomicInteger();

@Override
@Before
public void setUp() throws Exception {
super.setUp();
_zkServer = TestUtil.startZkServer("ZkClientTest_" + _counter.addAndGet(1), 4711);
_client = new ZkClient("localhost:4711", 5000);
_client = new ZkClient("localhost:4711", CONNECTION_TIMEOUT);
}

@Override
Expand All @@ -62,7 +63,7 @@ public void testRetryUntilConnected() throws Exception {
Gateway gateway = new Gateway(4712, 4711);
gateway.start();
final ZkConnection zkConnection = new ZkConnection("localhost:4712");
final ZkClient zkClient = new ZkClient(zkConnection, 1000);
final ZkClient zkClient = new ZkClient(zkConnection, CONNECTION_TIMEOUT);

gateway.stop();

Expand All @@ -86,7 +87,7 @@ public Object call() throws Exception {

@Test(timeout = 10000)
public void testReadWithTimeout() throws Exception {
final ZkClient zkClient = new ZkClient("localhost:4711", 2000, 2000, new SerializableSerializer(), 5000);
final ZkClient zkClient = new ZkClient("localhost:4711", 5000, 5000, new SerializableSerializer(), 5000);
// shutdown the server
LOG.info("Shutting down zookeeper server " + _zkServer);
_zkServer.shutdown();
Expand Down Expand Up @@ -126,7 +127,7 @@ public Boolean call() throws Exception {
@Test(timeout = 15000)
public void testWaitUntilConnected() throws Exception {
LOG.info("--- testWaitUntilConnected");
ZkClient _client = new ZkClient("localhost:4711", 5000);
ZkClient _client = new ZkClient("localhost:4711", CONNECTION_TIMEOUT);

_zkServer.shutdown();

Expand All @@ -147,7 +148,7 @@ public void testRetryUntilConnected_SessionExpiredException() {
gateway.start();

// Use a session timeout of 200ms
final ZkClient zkClient = new ZkClient("localhost:4712", 200, 5000);
final ZkClient zkClient = new ZkClient("localhost:4712", 200, CONNECTION_TIMEOUT);

gateway.stop();

Expand Down Expand Up @@ -181,7 +182,7 @@ public void testChildListenerAfterSessionExpiredException() throws Exception {
Gateway gateway = new Gateway(4712, 4711);
gateway.start();

final ZkClient disconnectedZkClient = new ZkClient("localhost:4712", sessionTimeout, 5000);
final ZkClient disconnectedZkClient = new ZkClient("localhost:4712", sessionTimeout, CONNECTION_TIMEOUT);
final Holder<List<String>> children = new Holder<List<String>>();
disconnectedZkClient.subscribeChildChanges("/root", new IZkChildListener() {

Expand Down Expand Up @@ -220,7 +221,7 @@ public void testZkClientConnectedToGatewayClosesQuickly() throws Exception {
final Gateway gateway = new Gateway(4712, 4711);
gateway.start();

ZkClient zkClient = new ZkClient("localhost:4712", 5000);
ZkClient zkClient = new ZkClient("localhost:4712", CONNECTION_TIMEOUT);
zkClient.close();

gateway.stop();
Expand Down
7 changes: 7 additions & 0 deletions src/test/java/org/I0Itec/zkclient/ZkStateChangeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -180,6 +182,11 @@ public String getServers() {
return "test";
}

@Override
public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public void addAuthInfo(String scheme, byte[] auth) {
throw new RuntimeException("not implemented");
Expand Down

0 comments on commit d85741e

Please sign in to comment.