[cooja/serialsocket] SeriaSocketServer: Moved Observer and Handler to inner classes and let handler thread join before notifying server terminated

This commit is contained in:
Enrico Joerns 2014-04-16 03:42:41 +02:00
parent dea03493bd
commit 894a88d08e
1 changed files with 86 additions and 70 deletions

View File

@ -111,8 +111,6 @@ public class SerialSocketServer extends VisPlugin implements MotePlugin {
private ServerSocket serverSocket;
private Socket clientSocket;
private DataInputStream in;
private DataOutputStream out;
private Mote mote;
@ -282,13 +280,15 @@ public class SerialSocketServer extends VisPlugin implements MotePlugin {
@Override
public void run() {
socketStatusLabel.setForeground(COLOR_POSITIVE);
socketStatusLabel.setText(String.format("Client " + client.getInetAddress() + " connected."));
socketStatusLabel.setText("Client "
+ client.getInetAddress() + ":" + client.getPort()
+ " connected.");
}
});
}
@Override
public void onClientDisconnected(final Socket client) {
public void onClientDisconnected() {
SwingUtilities.invokeLater(new Runnable() {
@Override
@ -338,7 +338,7 @@ public class SerialSocketServer extends VisPlugin implements MotePlugin {
public interface ServerListener {
void onServerStarted(int port);
void onClientConnected(Socket client);
void onClientDisconnected(Socket client);
void onClientDisconnected();
void onServerStopped();
void onServerError(String msg);
}
@ -359,9 +359,9 @@ public class SerialSocketServer extends VisPlugin implements MotePlugin {
}
}
public void notifyClientDisconnected(Socket client) {
public void notifyClientDisconnected() {
for (ServerListener listener : listeners) {
listener.onClientDisconnected(client);
listener.onClientDisconnected();
}
}
@ -393,6 +393,7 @@ public class SerialSocketServer extends VisPlugin implements MotePlugin {
}
new Thread() {
private Thread incomingDataHandler;
@Override
public void run() {
while (!serverSocket.isClosed()) {
@ -409,31 +410,13 @@ public class SerialSocketServer extends VisPlugin implements MotePlugin {
clientSocket = candidateSocket;
in = new DataInputStream(clientSocket.getInputStream());
out = new DataOutputStream(clientSocket.getOutputStream());
out.flush();
startSocketReadThread(in);
/* Start handler for data input from socket */
incomingDataHandler = new Thread(new IncomingDataHandler());
incomingDataHandler.start();
/* Observe serial port for outgoing data */
serialPort.addSerialDataObserver(serialDataObserver = new Observer() {
@Override
public void update(Observable obs, Object obj) {
try {
if (out == null) {
/*logger.debug("out is null");*/
return;
}
out.write(serialPort.getLastSerialData());
out.flush();
outBytes++;
} catch (IOException ex) {
logger.error(ex);
cleanupClient();
}
}
});
serialDataObserver = new SerialDataObserver();
serialPort.addSerialDataObserver(serialDataObserver);
inBytes = outBytes = 0;
@ -450,11 +433,19 @@ public class SerialSocketServer extends VisPlugin implements MotePlugin {
}
}
cleanupClient();
if (incomingDataHandler != null) {
// Wait for reader thread to terminate
try {
incomingDataHandler.join(500);
} catch (InterruptedException ex) {
logger.warn(ex);
}
}
notifyServerStopped();
}
}.start();
}
/**
* Stops server by closing server listen socket.
*/
@ -466,32 +457,72 @@ public class SerialSocketServer extends VisPlugin implements MotePlugin {
}
}
private void startSocketReadThread(final DataInputStream in) {
/* Forward data: virtual port -> mote */
Thread incomingDataThread = new Thread(new Runnable() {
@Override
public void run() {
int numRead = 0;
byte[] data = new byte[1024];
logger.info("Forwarder: socket -> serial port");
while (numRead >= 0) {
for (int i = 0; i < numRead; i++) {
serialPort.writeByte(data[i]);
}
inBytes += numRead;
/* Forward data: virtual port -> mote */
private class IncomingDataHandler implements Runnable {
try {
numRead = in.read(data);
} catch (IOException e) {
logger.info(e.getMessage());
numRead = -1;
}
DataInputStream in;
@Override
public void run() {
int numRead = 0;
byte[] data = new byte[1024];
try {
in = new DataInputStream(clientSocket.getInputStream());
} catch (IOException ex) {
logger.error(ex);
return;
}
logger.info("Forwarder: socket -> serial port");
while (numRead >= 0) {
for (int i = 0; i < numRead; i++) {
serialPort.writeByte(data[i]);
}
logger.info("End of Stream");
inBytes += numRead;
try {
numRead = in.read(data);
} catch (IOException e) {
logger.info(e.getMessage());
numRead = -1;
}
}
logger.info("End of Stream");
cleanupClient();
}
}
private class SerialDataObserver implements Observer {
DataOutputStream out;
public SerialDataObserver() {
try {
out = new DataOutputStream(clientSocket.getOutputStream());
} catch (IOException ex) {
logger.error(ex);
out = null;
}
}
@Override
public void update(Observable obs, Object obj) {
try {
if (out == null) {
/*logger.debug("out is null");*/
return;
}
out.write(serialPort.getLastSerialData());
out.flush();
outBytes++;
} catch (IOException ex) {
logger.error(ex);
cleanupClient();
}
});
incomingDataThread.start();
}
}
@Override
@ -572,25 +603,10 @@ public class SerialSocketServer extends VisPlugin implements MotePlugin {
} catch (IOException e1) {
logger.error(e1.getMessage());
}
try {
if (in != null) {
in.close();
in = null;
}
} catch (IOException e) {
logger.error(e.getMessage());
}
try {
if (out != null) {
out.close();
out = null;
}
} catch (IOException e) {
logger.error(e.getMessage());
}
serialPort.deleteSerialDataObserver(serialDataObserver);
notifyClientDisconnected(null);
notifyClientDisconnected();
}
private boolean closed = false;