diff --git a/api/src/main/java/io/xpipe/api/XPipeApiConnector.java b/api/src/main/java/io/xpipe/api/XPipeApiConnector.java index db0cdfef..8ebf2e4a 100644 --- a/api/src/main/java/io/xpipe/api/XPipeApiConnector.java +++ b/api/src/main/java/io/xpipe/api/XPipeApiConnector.java @@ -1,9 +1,9 @@ package io.xpipe.api; import io.xpipe.beacon.*; -import io.xpipe.beacon.socket.SocketClient; +import io.xpipe.beacon.BeaconClient; -public abstract class XPipeApiConnector extends XPipeConnector { +public abstract class XPipeApiConnector extends BeaconConnector { public void execute() { try { @@ -20,7 +20,7 @@ public abstract class XPipeApiConnector extends XPipeConnector { } } - protected abstract void handle(SocketClient sc) throws Exception; + protected abstract void handle(BeaconClient sc) throws Exception; @Override protected void waitForStartup() { @@ -34,6 +34,6 @@ public abstract class XPipeApiConnector extends XPipeConnector { @FunctionalInterface public static interface Handler { - void handle(SocketClient sc) throws ClientException, ServerException; + void handle(BeaconClient sc) throws ClientException, ServerException; } } diff --git a/api/src/main/java/io/xpipe/api/impl/DataTableImpl.java b/api/src/main/java/io/xpipe/api/impl/DataTableImpl.java index 4525a7f5..19fd970e 100644 --- a/api/src/main/java/io/xpipe/api/impl/DataTableImpl.java +++ b/api/src/main/java/io/xpipe/api/impl/DataTableImpl.java @@ -5,7 +5,7 @@ import io.xpipe.api.XPipeApiConnector; import io.xpipe.beacon.ClientException; import io.xpipe.beacon.ConnectorException; import io.xpipe.beacon.ServerException; -import io.xpipe.beacon.socket.SocketClient; +import io.xpipe.beacon.BeaconClient; import io.xpipe.beacon.exchange.ReadTableDataExchange; import io.xpipe.beacon.exchange.ReadTableInfoExchange; import io.xpipe.core.data.DataStructureNode; @@ -32,7 +32,7 @@ public class DataTableImpl implements DataTable { var ds = DataSourceId.fromString(s); new XPipeApiConnector() { @Override - protected void handle(SocketClient sc) throws ClientException, ServerException, ConnectorException { + protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException { var req = new ReadTableInfoExchange.Request(ds); ReadTableInfoExchange.Response res = performSimpleExchange(sc, req); table[0] = new DataTableImpl(res.sourceId(), res.rowCount(), res.dataType()); @@ -92,7 +92,7 @@ public class DataTableImpl implements DataTable { List nodes = new ArrayList<>(); new XPipeApiConnector() { @Override - protected void handle(SocketClient sc) throws ClientException, ServerException, ConnectorException { + protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException { var req = new ReadTableDataExchange.Request(id, maxToRead); performExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> { TypedDataStreamReader.readStructures(in, new TypedDataStructureNodeCallback(dataType, nodes::add)); @@ -115,7 +115,7 @@ public class DataTableImpl implements DataTable { { new XPipeApiConnector() { @Override - protected void handle(SocketClient sc) throws ClientException, ServerException, ConnectorException { + protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException { var req = new ReadTableDataExchange.Request(id, Integer.MAX_VALUE); performExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> { input = in; diff --git a/api/src/test/java/io/xpipe/api/test/XPipeConfig.java b/api/src/test/java/io/xpipe/api/test/XPipeConfig.java index 0d4cd005..b8328c15 100644 --- a/api/src/test/java/io/xpipe/api/test/XPipeConfig.java +++ b/api/src/test/java/io/xpipe/api/test/XPipeConfig.java @@ -1,6 +1,6 @@ package io.xpipe.api.test; -import io.xpipe.beacon.XPipeDaemon; +import io.xpipe.beacon.BeaconServer; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtensionContext; @@ -17,7 +17,7 @@ public class XPipeConfig implements BeforeAllCallback, ExtensionContext.Store.Cl // Your "before all tests" startup logic goes here // The following line registers a callback hook when the root test context is shut down context.getRoot().getStore(GLOBAL).put("any unique name", this); - XPipeDaemon.startDaemon(); + BeaconServer.start(); } } diff --git a/beacon/src/main/java/io/xpipe/beacon/socket/SocketClient.java b/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java similarity index 95% rename from beacon/src/main/java/io/xpipe/beacon/socket/SocketClient.java rename to beacon/src/main/java/io/xpipe/beacon/BeaconClient.java index a7df9c19..49d899e1 100644 --- a/beacon/src/main/java/io/xpipe/beacon/socket/SocketClient.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java @@ -1,4 +1,4 @@ -package io.xpipe.beacon.socket; +package io.xpipe.beacon; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; @@ -6,7 +6,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; -import io.xpipe.beacon.*; import io.xpipe.beacon.exchange.MessageExchanges; import io.xpipe.beacon.message.ClientErrorMessage; import io.xpipe.beacon.message.RequestMessage; @@ -26,18 +25,18 @@ import java.util.Arrays; import java.util.Optional; import java.util.function.Consumer; -import static io.xpipe.beacon.socket.Sockets.BODY_SEPARATOR; +import static io.xpipe.beacon.BeaconConfig.BODY_SEPARATOR; -public class SocketClient { +public class BeaconClient { - private static final Logger logger = LoggerFactory.getLogger(SocketClient.class); + private static final Logger logger = LoggerFactory.getLogger(BeaconClient.class); private final Socket socket; private final InputStream in; private final OutputStream out; - public SocketClient() throws IOException { - socket = new Socket(InetAddress.getLoopbackAddress(), SocketServer.determineUsedPort()); + public BeaconClient() throws IOException { + socket = new Socket(InetAddress.getLoopbackAddress(), BeaconConfig.getUsedPort()); in = socket.getInputStream(); out = socket.getOutputStream(); } @@ -120,7 +119,7 @@ public class SocketClient { throw new ConnectorException("Couldn't read from socket", ex); } - if (Sockets.debugEnabled()) { + if (BeaconConfig.debugEnabled()) { System.out.println("Recieved response:"); System.out.println(read.toPrettyString()); } diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconConfig.java b/beacon/src/main/java/io/xpipe/beacon/BeaconConfig.java new file mode 100644 index 00000000..207f59ae --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconConfig.java @@ -0,0 +1,52 @@ +package io.xpipe.beacon; + +import java.nio.charset.StandardCharsets; + +public class BeaconConfig { + + public static final byte[] BODY_SEPARATOR = "\n\n".getBytes(StandardCharsets.UTF_8); + private static final String DEBUG_PROP = "io.xpipe.beacon.debugOutput"; + + public static boolean debugEnabled() { + if (System.getProperty(DEBUG_PROP) != null) { + return Boolean.parseBoolean(System.getProperty(DEBUG_PROP)); + } + return false; + } + + + + private static final String BEACON_PORT_PROP = "io.xpipe.beacon.port"; + private static final int DEFAULT_PORT = 21721; + + public static int getUsedPort() { + if (System.getProperty(BEACON_PORT_PROP) != null) { + return Integer.parseInt(System.getProperty(BEACON_PORT_PROP)); + } + + return DEFAULT_PORT; + } + + + + private static final String IN_PROCESS_PROP = "io.xpipe.beacon.startInProcess"; + + public static boolean shouldStartInProcess() { + if (System.getProperty(IN_PROCESS_PROP) != null) { + return Boolean.parseBoolean(System.getProperty(IN_PROCESS_PROP)); + } + return false; + } + + + + private static final String EXEC_PROCESS_PROP = "io.xpipe.beacon.exec"; + + public static String getCustomExecCommand() { + if (System.getProperty(EXEC_PROCESS_PROP) != null) { + return System.getProperty(EXEC_PROCESS_PROP); + } + + return null; + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/XPipeConnector.java b/beacon/src/main/java/io/xpipe/beacon/BeaconConnector.java similarity index 81% rename from beacon/src/main/java/io/xpipe/beacon/XPipeConnector.java rename to beacon/src/main/java/io/xpipe/beacon/BeaconConnector.java index 923d6227..bc80a676 100644 --- a/beacon/src/main/java/io/xpipe/beacon/XPipeConnector.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconConnector.java @@ -2,7 +2,6 @@ package io.xpipe.beacon; import io.xpipe.beacon.message.RequestMessage; import io.xpipe.beacon.message.ResponseMessage; -import io.xpipe.beacon.socket.SocketClient; import org.apache.commons.lang3.function.FailableBiConsumer; import java.io.IOException; @@ -10,16 +9,16 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.function.Consumer; -public abstract class XPipeConnector { +public abstract class BeaconConnector { protected abstract void waitForStartup(); - protected SocketClient constructSocket() throws ConnectorException { - if (!XPipeDaemon.isDaemonRunning()) { + protected BeaconClient constructSocket() throws ConnectorException { + if (!BeaconServer.isRunning()) { try { - XPipeDaemon.startDaemon(); + BeaconServer.start(); waitForStartup(); - if (!XPipeDaemon.isDaemonRunning()) { + if (!BeaconServer.isRunning()) { throw new ConnectorException("Unable to start xpipe daemon"); } } catch (Exception ex) { @@ -28,14 +27,14 @@ public abstract class XPipeConnector { } try { - return new SocketClient(); + return new BeaconClient(); } catch (Exception ex) { throw new ConnectorException("Unable to connect to running xpipe daemon: " + ex.getMessage()); } } protected void performExchange( - SocketClient socket, + BeaconClient socket, REQ req, FailableBiConsumer responseConsumer, boolean keepOpen) throws ServerException, ConnectorException, ClientException { @@ -43,7 +42,7 @@ public abstract class XPipeConnector { } protected void performExchange( - SocketClient socket, + BeaconClient socket, REQ req, Consumer output, FailableBiConsumer responseConsumer, @@ -52,7 +51,7 @@ public abstract class XPipeConnector { } protected RES performSimpleExchange( - SocketClient socket, + BeaconClient socket, REQ req) throws ServerException, ConnectorException, ClientException { return socket.simpleExchange(req); } diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconHandler.java b/beacon/src/main/java/io/xpipe/beacon/BeaconHandler.java new file mode 100644 index 00000000..240b049e --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconHandler.java @@ -0,0 +1,22 @@ +package io.xpipe.beacon; + +import io.xpipe.beacon.message.ResponseMessage; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public interface BeaconHandler { + + void prepareBody() throws IOException; + + public void sendResponse(T obj) throws Exception; + + public void sendClientErrorResponse(String message) throws Exception; + + public void sendServerErrorResponse(Throwable ex) throws Exception; + + InputStream getInputStream() throws Exception; + + OutputStream getOutputStream() throws Exception; +} diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java b/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java new file mode 100644 index 00000000..fc203345 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java @@ -0,0 +1,49 @@ +package io.xpipe.beacon; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.DatagramSocket; +import java.net.ServerSocket; + +public class BeaconServer { + + private static boolean isPortAvailable(int port) { + try (var ss = new ServerSocket(port); var ds = new DatagramSocket(port)) { + return true; + } catch (IOException e) { + return false; + } + } + + public static boolean isRunning() { + var port = BeaconConfig.getUsedPort(); + return !isPortAvailable(port); + } + + public static void start() throws Exception { + if (BeaconConfig.shouldStartInProcess()) { + startInProcess(); + return; + } + + var custom = BeaconConfig.getCustomExecCommand(); + if (custom != null) { + Runtime.getRuntime().exec(System.getenv(custom)); + return; + } + + throw new IllegalArgumentException("Unable to start xpipe daemon"); + } + + private static void startInProcess() throws Exception { + var mainClass = Class.forName("io.xpipe.app.Main"); + var method = mainClass.getDeclaredMethod("main", String[].class); + new Thread(() -> { + try { + method.invoke(null, (Object) new String[0]); + } catch (IllegalAccessException | InvocationTargetException e) { + e.printStackTrace(); + } + }).start(); + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/XPipeDaemon.java b/beacon/src/main/java/io/xpipe/beacon/XPipeDaemon.java deleted file mode 100644 index 53e404cd..00000000 --- a/beacon/src/main/java/io/xpipe/beacon/XPipeDaemon.java +++ /dev/null @@ -1,65 +0,0 @@ -package io.xpipe.beacon; - -import io.xpipe.beacon.socket.SocketServer; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.net.DatagramSocket; -import java.net.ServerSocket; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Optional; - -public class XPipeDaemon { - - private static final String IN_PROCESS_PROP = "io.xpipe.beacon.startInProcess"; - - public static Path getUserDir() { - return Path.of(System.getProperty("user.home"), ".xpipe"); - } - - private static boolean isPortAvailable(int port) { - try (var ss = new ServerSocket(port); var ds = new DatagramSocket(port)) { - return true; - } catch (IOException e) { - return false; - } - } - - public static boolean isDaemonRunning() { - var port = SocketServer.determineUsedPort(); - return !isPortAvailable(port); - } - - public static void startDaemon() throws Exception { - if (Optional.ofNullable(System.getProperty("io.xpipe.beacon.startInProcess")) - .map(Boolean::parseBoolean).orElse(false)) { - startInProcess(); - return; - } - -// if (System.getenv().containsKey(EXEC_PROPERTY)) { -// Runtime.getRuntime().exec(System.getenv(EXEC_PROPERTY)); -// return; -// } - - var file = getUserDir().resolve("run"); - if (Files.exists(file)) { - Runtime.getRuntime().exec(Files.readString(file)); - } - - throw new IllegalArgumentException("Unable to find xpipe daemon installation"); - } - - private static void startInProcess() throws Exception { - var mainClass = Class.forName("io.xpipe.app.Main"); - var method = mainClass.getDeclaredMethod("main", String[].class); - new Thread(() -> { - try { - method.invoke(null, (Object) new String[0]); - } catch (IllegalAccessException | InvocationTargetException e) { - e.printStackTrace(); - } - }).start(); - } -} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/ListCollectionsExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/ListCollectionsExchange.java index 843e01ee..4a92046b 100644 --- a/beacon/src/main/java/io/xpipe/beacon/exchange/ListCollectionsExchange.java +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/ListCollectionsExchange.java @@ -27,7 +27,7 @@ public abstract class ListCollectionsExchange implements MessageExchange { @@ -15,5 +14,5 @@ public interface MessageExchange getResponseClass(); - void handleRequest(SocketServer server, RQ msg, InputStream body, Socket clientSocket) throws Exception; + void handleRequest(BeaconHandler handler, RQ msg, InputStream body) throws Exception; } diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/ReadTableDataExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/ReadTableDataExchange.java index 051a11dd..f4b56bee 100644 --- a/beacon/src/main/java/io/xpipe/beacon/exchange/ReadTableDataExchange.java +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/ReadTableDataExchange.java @@ -8,7 +8,7 @@ public abstract class ReadTableDataExchange implements MessageExchange { - while (running) { - try { - var clientSocket = socket.accept(); - handleClientConnection(clientSocket); - } catch (Exception ex) { - ex.printStackTrace(); - } - connectionCounter++; - } - }, "socket server"); - t.setDaemon(true); - t.start(); - } - - private void handleClientConnection(Socket clientSocket) { - var t = new Thread(() -> { - try { - var in = clientSocket.getInputStream(); - var read = JacksonHelper.newMapper().disable(JsonParser.Feature.AUTO_CLOSE_SOURCE).readTree(in); - logger.debug("Received request: \n" + read.toPrettyString()); - - var req = parseRequest(read); - var prov = MessageExchanges.byRequest(req).get(); - prov.handleRequest(this, req, in, clientSocket); - } catch (SocketException ex) { - try { - ex.printStackTrace(); - } catch (Exception ioex) { - ioex.printStackTrace(); - } - } catch (Exception ex) { - try { - ex.printStackTrace(); - sendServerErrorResponse(clientSocket, ex); - } catch (Exception ioex) { - ioex.printStackTrace(); - } - } finally { - try { - clientSocket.close(); - } catch (Exception ioex) { - ioex.printStackTrace(); - } - } - }, "socket connection #" + connectionCounter); - t.setDaemon(true); - t.start(); - } - - public void prepareBody(Socket outSocket) throws IOException { - outSocket.getOutputStream().write(Sockets.BODY_SEPARATOR); - } - - public void sendResponse(Socket outSocket, T obj) throws Exception { - ObjectNode json = JacksonHelper.newMapper().valueToTree(obj); - var prov = MessageExchanges.byResponse(obj).get(); - json.set("type", new TextNode(prov.getId())); - json.set("phase", new TextNode("response")); - var msg = JsonNodeFactory.instance.objectNode(); - msg.set("xPipeMessage", json); - - var mapper = JacksonHelper.newMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); - var gen = mapper.createGenerator(outSocket.getOutputStream()); - gen.writeTree(msg); - } - - public void sendClientErrorResponse(Socket outSocket, String message) throws Exception { - var err = new ClientErrorMessage(message); - ObjectNode json = JacksonHelper.newMapper().valueToTree(err); - var msg = JsonNodeFactory.instance.objectNode(); - msg.set("xPipeClientError", json); - - var mapper = JacksonHelper.newMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); - var gen = mapper.createGenerator(outSocket.getOutputStream()); - gen.writeTree(msg); - } - - public void sendServerErrorResponse(Socket outSocket, Throwable ex) throws Exception { - var err = new ServerErrorMessage(UUID.randomUUID(), ex); - ObjectNode json = JacksonHelper.newMapper().valueToTree(err); - var msg = JsonNodeFactory.instance.objectNode(); - msg.set("xPipeServerError", json); - - var mapper = JacksonHelper.newMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); - var gen = mapper.createGenerator(outSocket.getOutputStream()); - gen.writeTree(msg); - } - - private T parseRequest(JsonNode header) throws Exception { - ObjectNode content = (ObjectNode) header.required("xPipeMessage"); - - var type = content.required("type").textValue(); - var phase = content.required("phase").textValue(); - if (!phase.equals("request")) { - throw new IllegalArgumentException(); - } - content.remove("type"); - content.remove("phase"); - - var prov = MessageExchanges.byId(type); - if (prov.isEmpty()) { - throw new IllegalArgumentException(); - } - - var reader = JacksonHelper.newMapper().readerFor(prov.get().getRequestClass()); - return reader.readValue(content); - } -} diff --git a/beacon/src/main/java/io/xpipe/beacon/socket/Sockets.java b/beacon/src/main/java/io/xpipe/beacon/socket/Sockets.java deleted file mode 100644 index afe81499..00000000 --- a/beacon/src/main/java/io/xpipe/beacon/socket/Sockets.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.xpipe.beacon.socket; - -import java.nio.charset.StandardCharsets; - -public class Sockets { - - public static final byte[] BODY_SEPARATOR = "\n\n".getBytes(StandardCharsets.UTF_8); - private static final String DEBUG_PROP = "io.xpipe.beacon.debugOutput"; - - public static boolean debugEnabled() { - if (System.getProperty(DEBUG_PROP) != null) { - return Boolean.parseBoolean(System.getProperty(DEBUG_PROP)); - } - return false; - } -} diff --git a/beacon/src/main/java/module-info.java b/beacon/src/main/java/module-info.java index 32551315..5df4876f 100644 --- a/beacon/src/main/java/module-info.java +++ b/beacon/src/main/java/module-info.java @@ -14,8 +14,6 @@ module io.xpipe.beacon { opens io.xpipe.beacon; opens io.xpipe.beacon.exchange; - exports io.xpipe.beacon.socket; - opens io.xpipe.beacon.socket; opens io.xpipe.beacon.message; requires org.apache.commons.lang; diff --git a/core/src/main/java/io/xpipe/core/source/DataSourceId.java b/core/src/main/java/io/xpipe/core/source/DataSourceId.java index 6f6bc01a..d30dd3c3 100644 --- a/core/src/main/java/io/xpipe/core/source/DataSourceId.java +++ b/core/src/main/java/io/xpipe/core/source/DataSourceId.java @@ -6,34 +6,51 @@ public class DataSourceId { public static final char SEPARATOR = ':'; + public static DataSourceId create(String collectionName, String entryName) { + if (collectionName == null) { + throw new IllegalArgumentException("Collection name is null"); + } + if (collectionName.contains("" + SEPARATOR)) { + throw new IllegalArgumentException("Separator character " + SEPARATOR + " is not allowed in the collection name"); + } + + if (entryName == null) { + throw new IllegalArgumentException("Collection name is null"); + } + if (entryName.contains("" + SEPARATOR)) { + throw new IllegalArgumentException("Separator character " + SEPARATOR + " is not allowed in the entry name"); + } + + return new DataSourceId(collectionName, entryName); + } + private final String collectionName; private final String entryName; @JsonCreator - public DataSourceId(String collectionName, String entryName) { + private DataSourceId(String collectionName, String entryName) { this.collectionName = collectionName; this.entryName = entryName; } - public DataSourceId withEntryName(String newName) { - return new DataSourceId(collectionName, newName); - } - public static DataSourceId fromString(String s) { + if (s == null) { + throw new IllegalArgumentException("String is null"); + } + var split = s.split(String.valueOf(SEPARATOR)); if (split.length != 2) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Data source id must contain exactly one " + SEPARATOR); } + if (split[0].length() == 0) { + throw new IllegalArgumentException("Collection name must not be empty"); + } if (split[1].length() == 0) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Entry name must not be empty"); } - return new DataSourceId(split[0].length() > 0 ? split[0] : null, split[1]); - } - - public boolean hasCollection() { - return collectionName != null; + return new DataSourceId(split[0], split[1]); } @Override