Basic protocol

This commit is contained in:
MrLetsplay 2023-12-03 21:31:27 +01:00
parent 1e1d861d91
commit 72b4e547fb
Signed by: mr
SSH Key Fingerprint: SHA256:92jBH80vpXyaZHjaIl47pjRq+Yt7XGTArqQg1V7hSqg
17 changed files with 427 additions and 103 deletions

View File

@ -0,0 +1,25 @@
package me.mrletsplay.shareclientcore.connection;
public class ConnectionException extends Exception {
private static final long serialVersionUID = -6133726852202889620L;
public ConnectionException() {
super();
}
public ConnectionException(String message, Throwable cause) {
super(message, cause);
}
public ConnectionException(String message) {
super(message);
}
public ConnectionException(Throwable cause) {
super(cause);
}
}

View File

@ -1,31 +1,31 @@
package me.mrletsplay.shareclientcore.connection;
import java.io.IOException;
import me.mrletsplay.shareclientcore.connection.message.Message;
public class DummyConnection implements RemoteConnection {
@Override
public void connect() throws IOException, InterruptedException {
public void connect(String sessionID) throws ConnectionException {
}
@Override
public int retrieveSiteID() {
public int getSiteID() {
return 0;
}
@Override
public void send(Change... changes) {
public void send(Message message) {
}
@Override
public void addListener(RemoteListener listener) {
public void addListener(MessageListener listener) {
}
@Override
public void removeListener(RemoteListener listener) {
public void removeListener(MessageListener listener) {
}

View File

@ -0,0 +1,12 @@
package me.mrletsplay.shareclientcore.connection;
import me.mrletsplay.shareclientcore.connection.message.Message;
/**
* A message listener that can receive remote messages
*/
public interface MessageListener {
public void onMessage(Message message);
}

View File

@ -1,20 +1,22 @@
package me.mrletsplay.shareclientcore.connection;
import java.io.IOException;
import me.mrletsplay.shareclientcore.connection.message.Message;
/**
* Represents a connection to a remote user or server
*/
public interface RemoteConnection {
public void connect() throws IOException, InterruptedException;
public static final int PROTOCOL_VERSION = 1;
public int retrieveSiteID();
public void connect(String sessionID) throws ConnectionException;
public void send(Change... changes);
public int getSiteID();
public void addListener(RemoteListener listener);
public void send(Message message) throws ConnectionException;
public void removeListener(RemoteListener listener);
public void addListener(MessageListener listener);
public void removeListener(MessageListener listener);
}

View File

@ -1,10 +0,0 @@
package me.mrletsplay.shareclientcore.connection;
/**
* Represents something that can receive remote changes
*/
public interface RemoteListener {
public void onRemoteChange(Change... changes);
}

View File

@ -0,0 +1,10 @@
package me.mrletsplay.shareclientcore.connection;
import java.io.DataOutputStream;
import java.io.IOException;
public interface SerializableObject {
public void serialize(DataOutputStream out) throws IOException;
}

View File

@ -1,109 +1,86 @@
package me.mrletsplay.shareclientcore.connection;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import me.mrletsplay.shareclientcore.document.Char;
import me.mrletsplay.shareclientcore.document.Identifier;
import me.mrletsplay.shareclientcore.connection.message.ClientHelloMessage;
import me.mrletsplay.shareclientcore.connection.message.Message;
import me.mrletsplay.shareclientcore.connection.message.ServerHelloMessage;
public class WebSocketConnection implements RemoteConnection {
private WSClient client;
private Set<RemoteListener> listeners;
private String username;
private Set<MessageListener> listeners;
private int siteID;
public WebSocketConnection(URI uri, Map<String, String> httpHeaders) {
private Object wait = new Object();
private boolean helloReceived;
private ConnectionException connectException;
public WebSocketConnection(URI uri, String username, Map<String, String> httpHeaders) {
this.client = new WSClient(uri, httpHeaders);
this.username = username;
this.listeners = new HashSet<>();
}
public WebSocketConnection(URI uri) {
this(uri, null);
public WebSocketConnection(URI uri, String username) {
this(uri, username, null);
}
@Override
public void connect() throws IOException, InterruptedException {
if(!client.connectBlocking()) throw new IOException("Failed to connect to WebSocket server");
}
@Override
public int retrieveSiteID() {
// TODO: implement
return new Random().nextInt();
}
@Override
public void send(Change... changes) {
for(Change c : changes) {
client.send(serialize(c));
public void connect(String sessionID) throws ConnectionException {
try {
if(!client.connectBlocking(30, TimeUnit.SECONDS)) throw new IOException("Failed to connect to WebSocket server");
send(new ClientHelloMessage(username, sessionID));
wait.wait(30_000L);
if(!helloReceived) throw new ConnectionException("Server did not send hello");
if(connectException != null) throw connectException;
} catch (InterruptedException | IOException e) {
throw new ConnectionException("Failed to establish connection", e);
}
}
@Override
public void addListener(RemoteListener listener) {
public int getSiteID() {
return siteID;
}
@Override
public void send(Message message) throws ConnectionException {
ByteArrayOutputStream bOut = new ByteArrayOutputStream();
DataOutputStream dOut = new DataOutputStream(bOut);
try {
dOut.writeUTF(message.getType().name());
message.serialize(dOut);
} catch (IOException e) {
throw new ConnectionException("Failed to serialize message", e);
}
client.send(bOut.toByteArray());
}
@Override
public void addListener(MessageListener listener) {
listeners.add(listener);
}
@Override
public void removeListener(RemoteListener listener) {
public void removeListener(MessageListener listener) {
listeners.remove(listener);
}
private byte[] serialize(Change change) {
try {
ByteArrayOutputStream bOut = new ByteArrayOutputStream();
DataOutputStream dOut = new DataOutputStream(bOut);
dOut.writeInt(change.document());
dOut.writeUTF(change.type().name());
Char ch = change.character();
dOut.writeInt(ch.position().length);
for(int i = 0; i < ch.position().length; i++) {
Identifier id = ch.position()[i];
dOut.writeInt(id.digit());
dOut.writeInt(id.site());
}
dOut.writeInt(ch.lamport());
dOut.writeChar(ch.value());
return bOut.toByteArray();
}catch(IOException e) {
throw new RuntimeException("Something went very wrong", e);
}
}
private Change deserialize(byte[] bytes) {
try {
DataInputStream dIn = new DataInputStream(new ByteArrayInputStream(bytes));
int document = dIn.readInt();
ChangeType type = ChangeType.valueOf(dIn.readUTF());
Identifier[] pos = new Identifier[dIn.readInt()];
for(int i = 0; i < pos.length; i++) {
pos[i] = new Identifier(dIn.readInt(), dIn.readInt());
}
int lamport = dIn.readInt();
char value = dIn.readChar();
return new Change(document, type, new Char(pos, lamport, value));
}catch(IllegalArgumentException e) {
throw new IllegalArgumentException("Failed to deserialize change", e);
}catch(IOException e) {
throw new RuntimeException("Something went very wrong", e);
}
}
private class WSClient extends WebSocketClient {
public WSClient(URI serverUri) {
@ -116,7 +93,7 @@ public class WebSocketConnection implements RemoteConnection {
@Override
public void onOpen(ServerHandshake handshakedata) {
// TODO: request site id
}
@Override
@ -126,9 +103,28 @@ public class WebSocketConnection implements RemoteConnection {
@Override
public void onMessage(ByteBuffer bytes) {
byte[] bytesArray = new byte[bytes.remaining()];
bytes.get(bytesArray);
listeners.forEach(l -> l.onRemoteChange(deserialize(bytesArray)));
Message m;
try {
m = Message.deserialize(bytes);
}catch(IOException e) {
e.printStackTrace(); // TODO: custom logging (e.g. via error callback)
return;
}
if(m instanceof ServerHelloMessage hello) {
helloReceived = true;
siteID = hello.siteID();
if(hello.protocolVersion() != PROTOCOL_VERSION) {
connectException = new ConnectionException(String.format("Protocol version mismatch: (Server has %s, Client has %s)", hello.protocolVersion(), PROTOCOL_VERSION));
close();
}
wait.notifyAll();
return;
}
listeners.forEach(l -> l.onMessage(m));
}
@Override

View File

@ -0,0 +1,16 @@
package me.mrletsplay.shareclientcore.connection.message;
import java.io.DataOutputStream;
import java.io.IOException;
public record BasicMessage(MessageType type) implements Message {
@Override
public MessageType getType() {
return type;
}
@Override
public void serialize(DataOutputStream out) throws IOException {}
}

View File

@ -0,0 +1,33 @@
package me.mrletsplay.shareclientcore.connection.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import me.mrletsplay.shareclientcore.connection.Change;
import me.mrletsplay.shareclientcore.connection.ChangeType;
import me.mrletsplay.shareclientcore.document.Char;
public record ChangeMessage(Change change) implements Message {
@Override
public MessageType getType() {
return MessageType.CHANGE;
}
@Override
public void serialize(DataOutputStream out) throws IOException {
out.writeInt(change.document());
out.writeUTF(change.type().name());
change.character().serialize(out);
}
public static ChangeMessage deserialize(DataInputStream in) throws IOException {
try {
return new ChangeMessage(new Change(in.readInt(), ChangeType.valueOf(in.readUTF()), Char.deserialize(in)));
}catch(IllegalArgumentException e) {
throw new IOException("Invalid change type", e);
}
}
}

View File

@ -0,0 +1,24 @@
package me.mrletsplay.shareclientcore.connection.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
public record ClientHelloMessage(String username, String sessionID) implements Message {
@Override
public MessageType getType() {
return MessageType.CLIENT_HELLO;
}
@Override
public void serialize(DataOutputStream out) throws IOException {
out.writeUTF(username);
out.writeUTF(sessionID);
}
public static ClientHelloMessage deserialize(DataInputStream in) throws IOException {
return new ClientHelloMessage(in.readUTF(), in.readUTF());
}
}

View File

@ -0,0 +1,38 @@
package me.mrletsplay.shareclientcore.connection.message;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import me.mrletsplay.shareclientcore.connection.SerializableObject;
public interface Message extends SerializableObject {
public MessageType getType();
public static Message deserialize(ByteBuffer buffer) throws IOException {
byte[] bytesArray = new byte[buffer.remaining()];
buffer.get(bytesArray);
try {
DataInputStream dIn = new DataInputStream(new ByteArrayInputStream(bytesArray));
MessageType type = MessageType.valueOf(dIn.readUTF());
Message m;
switch(type) {
case CLIENT_HELLO -> m = ClientHelloMessage.deserialize(dIn);
case SERVER_HELLO -> m = ServerHelloMessage.deserialize(dIn);
case PEER_JOIN -> m = PeerJoinMessage.deserialize(dIn);
case PEER_LEAVE -> m = PeerLeaveMessage.deserialize(dIn);
case CHANGE -> m = ChangeMessage.deserialize(dIn);
default -> m = new BasicMessage(type);
}
return m;
}catch(IllegalArgumentException e) {
throw new IOException("Invalid message type", e);
}
}
}

View File

@ -0,0 +1,61 @@
package me.mrletsplay.shareclientcore.connection.message;
public enum MessageType {
/**
* Client hello containing user information
*/
CLIENT_HELLO,
/**
* Server hello containing protocol information
*/
SERVER_HELLO,
/**
* Peer has joined
*/
PEER_JOIN,
/**
* Peer has left
*/
PEER_LEAVE,
/**
* Full synchronization message, containing the full contents of a file
*/
FULL_SYNC,
/**
* Request for the synchronization of a particular file or all shared files
*/
REQUEST_FULL_SYNC,
/**
* A single change made by one peer
*/
CHANGE,
/**
* Request for the checksum of a file
*/
REQUEST_CHECKSUM,
/**
* Checksum of a file
*/
CHECKSUM,
/**
* Creation of a file
*/
CREATE_FILE,
/**
* Deletion of a file
*/
DELETE_FILE,
;
}

View File

@ -0,0 +1,24 @@
package me.mrletsplay.shareclientcore.connection.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
public record PeerJoinMessage(String peerName, int peerSiteID) implements Message {
@Override
public MessageType getType() {
return MessageType.PEER_JOIN;
}
@Override
public void serialize(DataOutputStream out) throws IOException {
out.writeUTF(peerName);
out.writeInt(peerSiteID);
}
public static PeerJoinMessage deserialize(DataInputStream in) throws IOException {
return new PeerJoinMessage(in.readUTF(), in.readInt());
}
}

View File

@ -0,0 +1,23 @@
package me.mrletsplay.shareclientcore.connection.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
public record PeerLeaveMessage(int peerSiteID) implements Message {
@Override
public MessageType getType() {
return MessageType.PEER_LEAVE;
}
@Override
public void serialize(DataOutputStream out) throws IOException {
out.writeInt(peerSiteID);
}
public static PeerLeaveMessage deserialize(DataInputStream in) throws IOException {
return new PeerLeaveMessage(in.readInt());
}
}

View File

@ -0,0 +1,24 @@
package me.mrletsplay.shareclientcore.connection.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
public record ServerHelloMessage(int protocolVersion, int siteID) implements Message {
@Override
public MessageType getType() {
return MessageType.SERVER_HELLO;
}
@Override
public void serialize(DataOutputStream out) throws IOException {
out.writeInt(protocolVersion);
out.writeInt(siteID);
}
public static ServerHelloMessage deserialize(DataInputStream in) throws IOException {
return new ServerHelloMessage(in.readInt(), in.readInt());
}
}

View File

@ -1,9 +1,14 @@
package me.mrletsplay.shareclientcore.document;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
public record Char(Identifier[] position, int lamport, char value) {
import me.mrletsplay.shareclientcore.connection.SerializableObject;
public record Char(Identifier[] position, int lamport, char value) implements SerializableObject {
public static final Char START_OF_DOCUMENT = new Char(new Identifier[] { new Identifier(1, 0) }, 0, '^');
public static final Char END_OF_DOCUMENT = new Char(new Identifier[] { new Identifier(Util.BASE - 1, 0) }, 0, '$');
@ -28,4 +33,28 @@ public record Char(Identifier[] position, int lamport, char value) {
return lamport == other.lamport && Arrays.equals(position, other.position) && value == other.value;
}
@Override
public void serialize(DataOutputStream out) throws IOException {
out.writeInt(position.length);
for(int i = 0; i < position.length; i++) {
Identifier id = position[i];
out.writeInt(id.digit());
out.writeInt(id.site());
}
out.writeInt(lamport);
out.writeChar(value);
}
public static Char deserialize(DataInputStream in) throws IOException {
Identifier[] pos = new Identifier[in.readInt()];
for(int i = 0; i < pos.length; i++) {
pos[i] = new Identifier(in.readInt(), in.readInt());
}
int lamport = in.readInt();
char value = in.readChar();
return new Char(pos, lamport, value);
}
}

View File

@ -6,10 +6,13 @@ import java.util.Set;
import me.mrletsplay.shareclientcore.connection.Change;
import me.mrletsplay.shareclientcore.connection.ChangeType;
import me.mrletsplay.shareclientcore.connection.ConnectionException;
import me.mrletsplay.shareclientcore.connection.MessageListener;
import me.mrletsplay.shareclientcore.connection.RemoteConnection;
import me.mrletsplay.shareclientcore.connection.RemoteListener;
import me.mrletsplay.shareclientcore.connection.message.ChangeMessage;
import me.mrletsplay.shareclientcore.connection.message.Message;
public class SharedDocument implements RemoteListener {
public class SharedDocument implements MessageListener {
private RemoteConnection connection;
private CharBag charBag;
@ -27,7 +30,7 @@ public class SharedDocument implements RemoteListener {
charBag.add(Char.END_OF_DOCUMENT);
this.document = 0; // TODO: implement
this.site = connection.retrieveSiteID();
this.site = connection.getSiteID();
this.listeners = new HashSet<>();
}
@ -53,7 +56,13 @@ public class SharedDocument implements RemoteListener {
charBefore = ch;
}
connection.send(changes);
for(Change c : changes) {
try {
connection.send(new ChangeMessage(c));
} catch (ConnectionException e) {
e.printStackTrace(); // TODO: throw error
}
}
}
/**
@ -73,7 +82,14 @@ public class SharedDocument implements RemoteListener {
charBag.remove(toRemove);
}
connection.send(changes);
for(Change c : changes) {
try {
connection.send(new ChangeMessage(c));
} catch (ConnectionException e) {
e.printStackTrace(); // TODO: throw error
}
}
}
/**
@ -120,8 +136,9 @@ public class SharedDocument implements RemoteListener {
}
@Override
public void onRemoteChange(Change... changes) {
for(Change c : changes) {
public void onMessage(Message message) {
if(message instanceof ChangeMessage change) {
Change c = change.change();
System.out.println("Change: " + c + " | " + Arrays.toString(c.character().position()));
switch(c.type()) {
case ADD -> remoteInsert(c.character());