Skip to content

Commit

Permalink
Merge pull request #13 from cpiotr/feature/close-socket-on-shutdown-#12
Browse files Browse the repository at this point in the history
Close socket on proxy shutdown
  • Loading branch information
terma authored Mar 22, 2020
2 parents 2800105 + 5c591e6 commit b7e0c99
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 6 deletions.
25 changes: 19 additions & 6 deletions src/com/github/terma/javaniotcpserver/TcpServerWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.github.terma.javaniotcpserver;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
Expand Down Expand Up @@ -62,14 +63,26 @@ public void run() {
LOGGER.log(Level.SEVERE, "Problem with selector, worker will be stopped!", exception);
} finally {
if (selector != null) {
try {
selector.close();
} catch (IOException exception) {
if (LOGGER.isLoggable(Level.WARNING))
LOGGER.log(Level.WARNING, "Could not close selector properly.", exception);
}
closeSelector(selector);
}
}
}

private void closeSelector(Selector selector) {
for (final SelectionKey key : selector.keys()) {
closeOrLog(key.channel(), "Could not selector channel properly.");
}

closeOrLog(selector, "Could not close selector properly.");
}

private void closeOrLog(Closeable closeable, String errorMessage) {
try {
closeable.close();
} catch (final IOException exception) {
if (LOGGER.isLoggable(Level.WARNING))
LOGGER.log(Level.WARNING, errorMessage, exception);
}
}

}
109 changes: 109 additions & 0 deletions test/com/github/terma/javaniotcpserver/TcpServerTest.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
package com.github.terma.javaniotcpserver;

import com.github.terma.javaniotcpproxy.StaticTcpProxyConfig;
import com.github.terma.javaniotcpproxy.TcpProxy;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

public class TcpServerTest {

private final TcpServerHandlerFactory handlerFactory = Mockito.mock(TcpServerHandlerFactory.class);
Expand Down Expand Up @@ -57,4 +74,96 @@ public void shouldFailWhenCreateWithNullConfig() {
new TcpServer(null);
}

@Test
public void shouldCloseProxySocketOnShutdown() throws IOException {
EchoServer echoServer = new EchoServer();
int serverPort = echoServer.startAndGetPort();

int proxyPort = 12345;
StaticTcpProxyConfig config = new StaticTcpProxyConfig(proxyPort, "localhost", serverPort);
config.setWorkerCount(1);
TcpProxy proxy = new TcpProxy(config);
proxy.start();

sendTextTo(proxyPort, "First message");

proxy.shutdown();
echoServer.shutdown();

try {
sendTextTo(proxyPort, "Second message");
fail("Proxy was shutdown but server socket is still listening");
} catch (IOException exception) {
assertEquals("Connection refused (Connection refused)", exception.getMessage());
}
}

private void sendTextTo(int localPort, String text) throws IOException {
Socket socket = new Socket("localhost", localPort);
OutputStream outputStream = socket.getOutputStream();
outputStream.write(text.getBytes());
outputStream.flush();
outputStream.close();
socket.close();
}

private static class EchoServer {

private final ExecutorService threadPool = Executors.newSingleThreadExecutor();

int startAndGetPort() throws IOException {
CountDownLatch serverStarted = new CountDownLatch(1);
ServerSocket serverSocket = new ServerSocket(0);
threadPool.submit(new BlockingEcho(serverStarted, serverSocket));

try {
serverStarted.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return serverSocket.getLocalPort();
}

void shutdown() {
threadPool.shutdown();
try {
threadPool.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

}

private static class BlockingEcho implements Runnable {

private final static Logger LOGGER = Logger.getAnonymousLogger();

private final CountDownLatch serverStarted;
private final ServerSocket serverSocket;

public BlockingEcho(CountDownLatch serverStarted, ServerSocket serverSocket) {
this.serverStarted = serverStarted;
this.serverSocket = serverSocket;
}

@Override
public void run() {
try {
serverStarted.countDown();
Socket socket = serverSocket.accept();
int readBytesCount;
byte[] bytes = new byte[1024];
while ((readBytesCount = socket.getInputStream().read(bytes)) > -1) {
OutputStream outputStream = socket.getOutputStream();
outputStream.write(Arrays.copyOfRange(bytes, 0, readBytesCount));
outputStream.flush();
}
} catch (final IOException exception) {
LOGGER.log(Level.WARNING, "Could not echo message.", exception);
}
}
}

}

0 comments on commit b7e0c99

Please sign in to comment.