TCP 클라이언트는 SocketChannel 클래스를 사용해서 넌블러킹 소켓을 구현한다.
* 서버 종료시 현재 연결된 클라이언트들에 대해 3-way handshake 를 통해 자연스럽게 연결종료 시키는 것에 주목(stopServer 메소드)
< TCPServer.java >
package com.kimdh.dxmediaplayer;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import android.os.StrictMode;
public class TcpServer {
protected ServerSocketChannel mChannel;
protected ServerThread mThread;
private static final int BUFFER_SIZE = 1024 * 1024;
public boolean isOpened() {
if (mChannel == null) return false;
return mChannel.isOpen();
}
protected void setNetworkThreadPolicy() {
StrictMode.ThreadPolicy policy = new StrictMode.ThreadPolicy.Builder().permitAll().build();
StrictMode.setThreadPolicy(policy);
}
public boolean startServer(int port, ReceiveEventHandler handler) {
setNetworkThreadPolicy();
try {
if (mChannel != null) return false;
mChannel = ServerSocketChannel.open();
mChannel.configureBlocking(false);
mChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
mChannel.register(selector, SelectionKey.OP_ACCEPT);
mThread = new ServerThread(mChannel, selector, handler);
mThread.start();
return true;
} catch (Exception ex) {
ex.printStackTrace();
return false;
}
}
public void stopServer() {
setNetworkThreadPolicy();
try {
if (mChannel != null) {
mThread.closeAllClient();
while (mThread.getClientCount() > 0)
Thread.sleep(100);
if (mThread != null) {
mThread.mIsRunning = false;
mThread.join();
}
mChannel.close();
mChannel = null;
System.out.println("tcp server channel closed");
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
public void closeAllClient() {
setNetworkThreadPolicy();
if (mChannel != null)
mThread.closeAllClient();
}
public boolean send(SocketChannel client, ByteBuffer buffer) {
try {
if (client.write(buffer) == buffer.limit())
return true;
} catch (Exception ex) {
ex.printStackTrace();
}
return false;
}
public interface ReceiveEventHandler {
public void onClientConnected(SocketChannel client);
public void onReceived(SocketChannel client, ByteBuffer buffer, int len);
public void onClientDisconnected(SocketChannel client);
}
protected class ServerThread extends Thread {
private ServerSocketChannel mChannel;
private Selector mSelector;
private List<SocketChannel> mClientList = new ArrayList<SocketChannel>();
public boolean mIsRunning = false;
private ReceiveEventHandler mHandler;
public ServerThread(ServerSocketChannel channel, Selector selector, ReceiveEventHandler handler) {
mChannel = channel;
mSelector = selector;
mIsRunning = true;
mHandler = handler;
}
public void closeAllClient() {
synchronized (mClientList) {
for (int i=0; i<mClientList.size(); i++) {
try {
mClientList.get(i).socket().shutdownOutput();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
public int getClientCount() {
synchronized (mClientList) {
return mClientList.size();
}
}
@Override
public void run() {
System.out.println("server thread start");
try {
while (mIsRunning) {
mSelector.select(2*1000);
Set keys = mSelector.selectedKeys();
Iterator i = keys.iterator();
while (i.hasNext()) {
SelectionKey key = (SelectionKey)i.next();
i.remove();
if (key.isAcceptable()) {
SocketChannel client = mChannel.accept();
client.configureBlocking(false);
client.register(mSelector, SelectionKey.OP_READ);
synchronized (mClientList) {
mClientList.add(client);
}
System.out.println("client connected : " + client.socket().getRemoteSocketAddress().toString());
if (mHandler != null) mHandler.onClientConnected(client);
continue;
}
if (key.isReadable()) {
SocketChannel client = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
buffer.clear();
int ret;
try {
ret = client.read(buffer);
} catch (Exception ex) {
ex.printStackTrace();
key.cancel();
synchronized (mClientList) {
mClientList.remove(client);
}
System.out.println("client disconnected : " + client.socket().getRemoteSocketAddress().toString());
if (mHandler != null) mHandler.onClientDisconnected(client);
continue;
}
if (ret <= 0) {
key.cancel();
synchronized (mClientList) {
mClientList.remove(client);
}
System.out.println("socket read : " + ret + ", client disconnected : " + client.socket().getRemoteSocketAddress().toString());
if (mHandler != null) mHandler.onClientDisconnected(client);
continue;
}
buffer.rewind();
if (mHandler != null) mHandler.onReceived(client, buffer, ret);
}
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println("server thread end");
}
}
}
< TCPClient.java >
package com.kimdh.dxmediaplayer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import android.os.StrictMode;
public class TcpClient {
protected SocketChannel mChannel;
protected ReceiveThread mThread;
public int RECV_BUFFER_SIZE = 1024 * 1024;
public boolean isConnected() {
if (mChannel == null) return false;
return mChannel.isConnected();
}
protected void setNetworkThreadPolicy() {
StrictMode.ThreadPolicy policy = new StrictMode.ThreadPolicy.Builder().permitAll().build();
StrictMode.setThreadPolicy(policy);
}
public boolean connect(String ipAddress, short port, int timeout, ReceiveEventHandler handler) {
setNetworkThreadPolicy();
try {
if (mChannel != null && mChannel.isConnected() == true)
return false;
mChannel = SocketChannel.open();
mChannel.configureBlocking(false);
mChannel.socket().setReceiveBufferSize(RECV_BUFFER_SIZE);
mChannel.connect(new InetSocketAddress(ipAddress, port));
Selector selector = Selector.open();
SelectionKey clientKey = mChannel.register(selector, SelectionKey.OP_CONNECT);
if (selector.select(timeout*1000) > 0) {
if (clientKey.isConnectable()) {
if (mChannel.finishConnect()) {
mThread = new ReceiveThread(mChannel, handler);
mThread.start();
return true;
}
}
mChannel.close();
mChannel = null;
return false;
} else {
return false;
}
} catch (Exception ex) {
ex.printStackTrace();
return false;
}
}
public void close() {
setNetworkThreadPolicy();
try {
if (mChannel != null) {
if (mThread != null) {
mThread.mIsRunning = false;
mThread.join();
}
mChannel.close();
mChannel = null;
System.out.println("tcp client channel closed");
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
public int send(ByteBuffer buffer) {
setNetworkThreadPolicy();
if (mChannel == null || !mChannel.isConnected()) return -1;
try {
return mChannel.write(buffer);
} catch (IOException ex) {
ex.printStackTrace();
return -1;
}
}
public interface ReceiveEventHandler {
public void onReceived(ByteBuffer buffer, int len);
public void onClosed();
public void onThreadEvent();
}
protected class ReceiveThread extends Thread {
private SocketChannel mChannel;
public boolean mIsRunning = false;
private ReceiveEventHandler mHandler;
private static final int BUFFER_SIZE = 1024 * 4;
public ReceiveThread(SocketChannel channel, ReceiveEventHandler handler) {
mChannel = channel;
mHandler = handler;
mIsRunning = true;
}
@Override
public void run() {
System.out.println("receive thread start");
try {
Selector selector = Selector.open();
mChannel.register(selector, SelectionKey.OP_READ);
while (mIsRunning) {
if (selector.select(2*1000) > 0) {
Set<SelectionKey> selectedKey = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKey.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
buffer.clear();
int ret;
try {
ret = channel.read(buffer);
} catch (IOException ex) {
ex.printStackTrace();
if (mHandler != null) mHandler.onClosed();
key.cancel();
continue;
}
if (ret <= 0) {
System.out.println("SocketChannel.read returned " + ret);
if (mHandler != null) mHandler.onClosed();
key.cancel();
continue;
}
buffer.rewind();
if (mHandler != null) mHandler.onReceived(buffer, ret);
}
}
}
if (mHandler != null) mHandler.onThreadEvent();
}
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println("receive thread end");
}
}
}
< TCP 서버 사용 >
public class MainActivity extends Activity implements TcpServer.ReceiveEventHandler {
private TcpServer mServer = new TcpServer();
private byte[] mReceiveBuffer = new byte[1024*4];
private int mReceiveBufferIndex = 0;
...
public void onClick(View v) {
...
mServer.startServer(8112, this);
}
protected void onDestroy() {
...
mServer.stopServer();
}
@Override
public void onClientConnected(SocketChannel client) {
// do something when client connected
}
@Override
public void onReceived(SocketChannel client, ByteBuffer buffer, int len) {
System.out.println("onReceived : " + len);
buffer.get(mReceiveBuffer, mReceiveBufferIndex, len);
mReceiveBufferIndex += len;
// process receive buffer
...
ByteBuffer response = ByteBuffer.allocate(len+4);
buffer.putInt(len);
buffer.put(4, payload);
buffer.rewind();
mServer.send(client, response);
...
ByteBuffer response = ByteBuffer.allocate(len+4);
buffer.putInt(len);
buffer.put(4, payload);
buffer.rewind();
mServer.send(client, response);
}
@Override
public void onClientDisconnected(SocketChannel client) {
// do something when client disconnected
}
}