From f5cccd568750ed816c9ad5cec23c6cddb232204f Mon Sep 17 00:00:00 2001 From: Christopher Schnick Date: Mon, 7 Mar 2022 22:59:48 +0100 Subject: [PATCH] Rework beacon connection and implement various improvements --- api/build.gradle | 17 ++-- .../io/xpipe/api/DataTableAccumulator.java | 18 +++- .../xpipe/api/connector/XPipeConnection.java | 4 + .../io/xpipe/api/impl/DataSourceImpl.java | 2 +- .../api/impl/DataTableAccumulatorImpl.java | 30 +++++- .../io/xpipe/api/util/TypeDescriptor.java | 13 +++ .../io/xpipe/api/test/ConnectionFactory.java | 11 +++ .../java/io/xpipe/api/test/DaemonControl.java | 4 +- .../api/test/DataTableAccumulatorTest.java | 33 +++++++ .../java/io/xpipe/api/test/DataTableTest.java | 2 +- .../java/io/xpipe/beacon/BeaconClient.java | 16 +-- .../io/xpipe/beacon/BeaconConnection.java | 57 ++++++----- .../java/io/xpipe/beacon/BeaconFormat.java | 99 +++++++++++++++++++ .../java/io/xpipe/beacon/BeaconHandler.java | 6 +- .../java/io/xpipe/beacon/BeaconServer.java | 23 ++++- .../exchange/StoreResourceExchange.java | 47 --------- .../exchange/api/QueryTableDataExchange.java | 3 +- beacon/src/main/java/module-info.java | 1 - .../io/xpipe/core/data/type/ArrayType.java | 26 +++++ .../io/xpipe/core/data/type/DataType.java | 7 ++ .../io/xpipe/core/data/type/TupleType.java | 39 +++++++- .../io/xpipe/core/data/type/ValueType.java | 16 +++ .../io/xpipe/core/data/type/WildcardType.java | 7 ++ .../data/typed/TypedDataStreamWriter.java | 10 +- .../typed/TypedDataStructureNodeReader.java | 3 + .../io/xpipe/core/util/CoreJacksonModule.java | 2 + .../io/xpipe/core/util/JacksonHelper.java | 1 + .../xpipe/extension/DataSourceProvider.java | 4 +- .../xpipe/extension/DataSourceProviders.java | 2 +- 29 files changed, 386 insertions(+), 117 deletions(-) create mode 100644 api/src/main/java/io/xpipe/api/util/TypeDescriptor.java create mode 100644 api/src/test/java/io/xpipe/api/test/DataTableAccumulatorTest.java create mode 100644 beacon/src/main/java/io/xpipe/beacon/BeaconFormat.java delete mode 100644 beacon/src/main/java/io/xpipe/beacon/exchange/StoreResourceExchange.java diff --git a/api/build.gradle b/api/build.gradle index 58ba1f48..d20f4606 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -31,12 +31,17 @@ test { } workingDir = rootDir - systemProperty "io.xpipe.storage.dir", "$projectDir/local/storage" - systemProperty "io.xpipe.storage.persist", "false" - systemProperty 'io.xpipe.app.writeSysOut', "true" - systemProperty 'io.xpipe.app.logLevel', "trace" + // Daemon properties + systemProperty "io.xpipe.beacon.exec", "cmd.exe /c \"$rootDir\\gradlew.bat\" :app:run" + + " -Dio.xpipe.app.mode=tray" + + " -Dio.xpipe.beacon.port=21722" + + " -Dio.xpipe.app.dataDir=$projectDir/local/" + + " -Dio.xpipe.storage.persist=false" + + " -Dio.xpipe.app.writeSysOut=true" + + " -Dio.xpipe.beacon.debugOutput=true" + + " -Dio.xpipe.app.logLevel=trace" - systemProperty "io.xpipe.beacon.exec", "cmd.exe /c \"$rootDir\\gradlew.bat\" :app:run -Dio.xpipe.daemon.mode=tray -Dio.xpipe.beacon.port=21722 -Dio.xpipe.app.dataDir=$projectDir/local/" - systemProperty 'io.xpipe.beacon.debugOutput', "true" + // API properties + // systemProperty 'io.xpipe.beacon.debugOutput', "true" systemProperty "io.xpipe.beacon.port", "21722" } diff --git a/api/src/main/java/io/xpipe/api/DataTableAccumulator.java b/api/src/main/java/io/xpipe/api/DataTableAccumulator.java index c9581fce..09df3b21 100644 --- a/api/src/main/java/io/xpipe/api/DataTableAccumulator.java +++ b/api/src/main/java/io/xpipe/api/DataTableAccumulator.java @@ -1,7 +1,10 @@ package io.xpipe.api; +import io.xpipe.api.impl.DataTableAccumulatorImpl; +import io.xpipe.core.data.node.DataStructureNode; import io.xpipe.core.data.node.DataStructureNodeAcceptor; import io.xpipe.core.data.node.TupleNode; +import io.xpipe.core.data.type.TupleType; import io.xpipe.core.source.DataSourceId; /** @@ -13,6 +16,17 @@ import io.xpipe.core.source.DataSourceId; */ public interface DataTableAccumulator { + public static DataTableAccumulator create(TupleType type) { + return new DataTableAccumulatorImpl(type); + } + + /** + * Wrapper for {@link #finish(DataSourceId)}. + */ + default DataTable finish(String id) { + return finish(DataSourceId.fromString(id)); + } + /** * Finishes the construction process and returns the data source reference. * @@ -25,12 +39,12 @@ public interface DataTableAccumulator { * * @param row the row to add */ - void add(TupleNode row); + void add(DataStructureNode row); /** * Creates a tuple acceptor that adds all accepted tuples to the table. */ - DataStructureNodeAcceptor acceptor(); + DataStructureNodeAcceptor acceptor(); /** * Returns the current amount of rows added to the table. diff --git a/api/src/main/java/io/xpipe/api/connector/XPipeConnection.java b/api/src/main/java/io/xpipe/api/connector/XPipeConnection.java index c9c37e1f..b5bc5911 100644 --- a/api/src/main/java/io/xpipe/api/connector/XPipeConnection.java +++ b/api/src/main/java/io/xpipe/api/connector/XPipeConnection.java @@ -17,6 +17,8 @@ public final class XPipeConnection extends BeaconConnection { try (var con = new XPipeConnection()) { con.constructSocket(); handler.handle(con); + } catch (BeaconException e) { + throw e; } catch (Exception e) { throw new BeaconException(e); } @@ -26,6 +28,8 @@ public final class XPipeConnection extends BeaconConnection { try (var con = new XPipeConnection()) { con.constructSocket(); return mapper.handle(con); + } catch (BeaconException e) { + throw e; } catch (Exception e) { throw new BeaconException(e); } diff --git a/api/src/main/java/io/xpipe/api/impl/DataSourceImpl.java b/api/src/main/java/io/xpipe/api/impl/DataSourceImpl.java index feca4ef7..f305388f 100644 --- a/api/src/main/java/io/xpipe/api/impl/DataSourceImpl.java +++ b/api/src/main/java/io/xpipe/api/impl/DataSourceImpl.java @@ -44,7 +44,7 @@ public abstract class DataSourceImpl implements DataSource { public static DataSource create(DataSourceId id, String type, Map config, InputStream in) { var res = XPipeConnection.execute(con -> { var req = PreStoreExchange.Request.builder().build(); - PreStoreExchange.Response r = con.performOutputExchange(req, in::transferTo); + PreStoreExchange.Response r = con.performOutputExchange(req, out -> in.transferTo(out)); return r; }); diff --git a/api/src/main/java/io/xpipe/api/impl/DataTableAccumulatorImpl.java b/api/src/main/java/io/xpipe/api/impl/DataTableAccumulatorImpl.java index 4c6b01f2..3a5716a4 100644 --- a/api/src/main/java/io/xpipe/api/impl/DataTableAccumulatorImpl.java +++ b/api/src/main/java/io/xpipe/api/impl/DataTableAccumulatorImpl.java @@ -4,8 +4,10 @@ import io.xpipe.api.DataSource; import io.xpipe.api.DataTable; import io.xpipe.api.DataTableAccumulator; import io.xpipe.api.connector.XPipeConnection; +import io.xpipe.api.util.TypeDescriptor; import io.xpipe.beacon.exchange.PreStoreExchange; import io.xpipe.beacon.exchange.ReadExecuteExchange; +import io.xpipe.core.data.node.DataStructureNode; import io.xpipe.core.data.node.DataStructureNodeAcceptor; import io.xpipe.core.data.node.TupleNode; import io.xpipe.core.data.type.TupleType; @@ -14,21 +16,26 @@ import io.xpipe.core.source.DataSourceConfigInstance; import io.xpipe.core.source.DataSourceId; import io.xpipe.core.source.DataSourceReference; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + public class DataTableAccumulatorImpl implements DataTableAccumulator { private final XPipeConnection connection; private final TupleType type; private int rows; + private TupleType writtenDescriptor; public DataTableAccumulatorImpl(TupleType type) { this.type = type; connection = XPipeConnection.open(); connection.sendRequest(PreStoreExchange.Request.builder().build()); - connection.sendBodyStart(); + connection.sendBody(); } @Override public synchronized DataTable finish(DataSourceId id) { + connection.withOutputStream(OutputStream::close); PreStoreExchange.Response res = connection.receiveResponse(); connection.close(); @@ -40,16 +47,29 @@ public class DataTableAccumulatorImpl implements DataTableAccumulator { return DataSource.get(DataSourceReference.id(id)).asTable(); } - @Override - public synchronized void add(TupleNode row) { + private void writeDescriptor() { + if (writtenDescriptor != null) { + return; + } + writtenDescriptor = TupleType.tableType(type.getNames()); + connection.withOutputStream(out -> { - TypedDataStreamWriter.writeStructure(connection.getOutputStream(), row, type); + out.write((TypeDescriptor.create(type.getNames())).getBytes(StandardCharsets.UTF_8)); + }); + } + + @Override + public synchronized void add(DataStructureNode row) { + TupleNode toUse = type.matches(row) ? row.asTuple() : type.convert(row).orElseThrow().asTuple(); + connection.withOutputStream(out -> { + writeDescriptor(); + TypedDataStreamWriter.writeStructure(out, toUse, writtenDescriptor); rows++; }); } @Override - public synchronized DataStructureNodeAcceptor acceptor() { + public synchronized DataStructureNodeAcceptor acceptor() { return node -> { add(node); return true; diff --git a/api/src/main/java/io/xpipe/api/util/TypeDescriptor.java b/api/src/main/java/io/xpipe/api/util/TypeDescriptor.java new file mode 100644 index 00000000..42e8bf90 --- /dev/null +++ b/api/src/main/java/io/xpipe/api/util/TypeDescriptor.java @@ -0,0 +1,13 @@ +package io.xpipe.api.util; + +import java.util.List; +import java.util.stream.Collectors; + +public class TypeDescriptor { + + public static String create(List names) { + return "[" + names.stream() + .map(n -> n != null ? "\"" + n + "\"" : null) + .collect(Collectors.joining(",")) + "]\n"; + } +} diff --git a/api/src/test/java/io/xpipe/api/test/ConnectionFactory.java b/api/src/test/java/io/xpipe/api/test/ConnectionFactory.java index 6f5bb728..6790da80 100644 --- a/api/src/test/java/io/xpipe/api/test/ConnectionFactory.java +++ b/api/src/test/java/io/xpipe/api/test/ConnectionFactory.java @@ -6,7 +6,14 @@ import io.xpipe.beacon.BeaconServer; public class ConnectionFactory { + private static boolean alreadyStarted; + public static void start() throws Exception { + if (BeaconServer.isRunning()) { + alreadyStarted = true; + return; + } + if (!BeaconServer.tryStart()) { throw new AssertionError(); } @@ -18,6 +25,10 @@ public class ConnectionFactory { } public static void stop() throws Exception { + if (alreadyStarted) { + return; + } + if (!BeaconServer.isRunning()) { return; } diff --git a/api/src/test/java/io/xpipe/api/test/DaemonControl.java b/api/src/test/java/io/xpipe/api/test/DaemonControl.java index 5c8bdf52..37e20a13 100644 --- a/api/src/test/java/io/xpipe/api/test/DaemonControl.java +++ b/api/src/test/java/io/xpipe/api/test/DaemonControl.java @@ -6,12 +6,12 @@ import org.junit.jupiter.api.BeforeAll; public class DaemonControl { @BeforeAll - static void setup() throws Exception { + public static void setup() throws Exception { ConnectionFactory.start(); } @AfterAll - static void teardown() throws Exception { + public static void teardown() throws Exception { ConnectionFactory.stop(); } } diff --git a/api/src/test/java/io/xpipe/api/test/DataTableAccumulatorTest.java b/api/src/test/java/io/xpipe/api/test/DataTableAccumulatorTest.java new file mode 100644 index 00000000..745d2c6d --- /dev/null +++ b/api/src/test/java/io/xpipe/api/test/DataTableAccumulatorTest.java @@ -0,0 +1,33 @@ +package io.xpipe.api.test; + +import io.xpipe.api.DataTableAccumulator; +import io.xpipe.core.data.node.TupleNode; +import io.xpipe.core.data.node.ValueNode; +import io.xpipe.core.data.type.TupleType; +import io.xpipe.core.data.type.ValueType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.OptionalInt; + +public class DataTableAccumulatorTest extends DaemonControl { + + @Test + public void test() { + var type = TupleType.of( + List.of("col1", "col2"), + List.of(ValueType.of(), ValueType.of())); + var acc = DataTableAccumulator.create(type); + + var val = type.convert( + TupleNode.of(List.of(ValueNode.of("val1"), ValueNode.of("val2")))).orElseThrow(); + acc.add(val); + var table = acc.finish(":test"); + + Assertions.assertEquals(table.getInfo().getDataType(), TupleType.tableType(List.of("col1", "col2"))); + Assertions.assertEquals(table.getInfo().getRowCountIfPresent(), OptionalInt.empty()); + var read = table.read(1).at(0); + Assertions.assertEquals(val, read); + } +} diff --git a/api/src/test/java/io/xpipe/api/test/DataTableTest.java b/api/src/test/java/io/xpipe/api/test/DataTableTest.java index c882a9f8..c5f1b80f 100644 --- a/api/src/test/java/io/xpipe/api/test/DataTableTest.java +++ b/api/src/test/java/io/xpipe/api/test/DataTableTest.java @@ -10,7 +10,7 @@ import java.util.Map; public class DataTableTest extends DaemonControl { @BeforeAll - static void setup() throws Exception { + public static void setupStorage() throws Exception { DataSource.create(DataSourceId.fromString(":usernames"), "csv", Map.of(), DataTableTest.class.getResource("username.csv")); } diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java b/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java index f0f2ebc5..ede304c5 100644 --- a/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java @@ -104,31 +104,27 @@ public class BeaconClient implements AutoCloseable { } } - public void receiveBody() throws ConnectorException { + public InputStream receiveBody() throws ConnectorException { try { var sep = in.readNBytes(BODY_SEPARATOR.length); if (sep.length != 0 && !Arrays.equals(BODY_SEPARATOR, sep)) { throw new ConnectorException("Invalid body separator"); } + return BeaconFormat.readBlocks(socket); } catch (IOException ex) { throw new ConnectorException(ex); } } - public void startBody() throws ConnectorException { + public OutputStream sendBody() throws ConnectorException { try { out.write(BODY_SEPARATOR); + return BeaconFormat.writeBlocks(socket); } catch (IOException ex) { throw new ConnectorException(ex); } } - public RES simpleExchange(REQ req) - throws ServerException, ConnectorException, ClientException { - sendRequest(req); - return this.receiveResponse(); - } - public void sendRequest(T req) throws ClientException, ConnectorException { ObjectNode json = JacksonHelper.newMapper().valueToTree(req); var prov = MessageExchanges.byRequest(req); @@ -245,4 +241,8 @@ public class BeaconClient implements AutoCloseable { public OutputStream getOutputStream() { return out; } + + public Socket getSocket() { + return socket; + } } diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconConnection.java b/beacon/src/main/java/io/xpipe/beacon/BeaconConnection.java index 99af1153..ca0d2e97 100644 --- a/beacon/src/main/java/io/xpipe/beacon/BeaconConnection.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconConnection.java @@ -11,6 +11,9 @@ public abstract class BeaconConnection implements AutoCloseable { protected BeaconClient socket; + private InputStream bodyInput; + private OutputStream bodyOutput; + protected abstract void constructSocket(); @Override @@ -26,14 +29,6 @@ public abstract class BeaconConnection implements AutoCloseable { } } - public void closeOutput() { - try { - socket.getOutputStream().close(); - } catch (Exception e) { - throw new BeaconException("Could not close beacon output stream", e); - } - } - public void withOutputStream(BeaconClient.FailableConsumer ex) { try { ex.accept(getOutputStream()); @@ -59,13 +54,21 @@ public abstract class BeaconConnection implements AutoCloseable { public OutputStream getOutputStream() { checkClosed(); - return socket.getOutputStream(); + if (bodyOutput == null) { + throw new IllegalStateException("Body output has not started yet"); + } + + return bodyOutput; } public InputStream getInputStream() { checkClosed(); - return socket.getInputStream(); + if (bodyInput == null) { + throw new IllegalStateException("Body input has not started yet"); + } + + return bodyInput; } public void performInputExchange( @@ -83,7 +86,16 @@ public abstract class BeaconConnection implements AutoCloseable { checkClosed(); try { - socket.exchange(req, reqWriter, responseConsumer); + socket.sendRequest(req); + if (reqWriter != null) { + try (var out = socket.sendBody()) { + reqWriter.accept(out); + } + } + RES res = socket.receiveResponse(); + try (var in = socket.receiveBody()) { + responseConsumer.accept(res, in); + } } catch (Exception e) { throw new BeaconException("Could not communicate with beacon", e); } @@ -110,21 +122,23 @@ public abstract class BeaconConnection implements AutoCloseable { } } - public void sendBodyStart() { + public OutputStream sendBody() { checkClosed(); try { - socket.startBody(); + bodyOutput = socket.sendBody(); + return bodyOutput; } catch (Exception e) { throw new BeaconException("Could not communicate with beacon", e); } } - public void receiveBody() { + public InputStream receiveBody() { checkClosed(); try { - socket.receiveBody(); + bodyInput = socket.receiveBody(); + return bodyInput; } catch (Exception e) { throw new BeaconException("Could not communicate with beacon", e); } @@ -137,25 +151,22 @@ public abstract class BeaconConnection implements AutoCloseable { try { socket.sendRequest(req); - socket.startBody(); - reqWriter.accept(socket.getOutputStream()); + try (var out = socket.sendBody()) { + reqWriter.accept(out); + } return socket.receiveResponse(); } catch (Exception e) { throw new BeaconException("Could not communicate with beacon", e); } } -// public void writeLength(int bytes) throws IOException { -// checkClosed(); -// socket.getOutputStream().write(ByteBuffer.allocate(4).putInt(bytes).array()); -// } - public RES performSimpleExchange( REQ req) { checkClosed(); try { - return socket.simpleExchange(req); + socket.sendRequest(req); + return socket.receiveResponse(); } catch (Exception e) { throw new BeaconException("Could not communicate with beacon", e); } diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconFormat.java b/beacon/src/main/java/io/xpipe/beacon/BeaconFormat.java new file mode 100644 index 00000000..b5b0c89e --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconFormat.java @@ -0,0 +1,99 @@ +package io.xpipe.beacon; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; + +public class BeaconFormat { + + public static OutputStream writeBlocks(Socket socket) throws IOException { + int size = 65536 - 4; + var out = socket.getOutputStream(); + return new OutputStream() { + private final byte[] currentBytes = new byte[size]; + private int index; + + @Override + public void close() throws IOException { + finishBlock(); + out.flush(); + } + + @Override + public void write(int b) throws IOException { + if (index == currentBytes.length) { + finishBlock(); + } + + currentBytes[index] = (byte) b; + index++; + } + + private void finishBlock() throws IOException { + if (BeaconConfig.debugEnabled()) { + System.out.println("Sending data block of length " + index); + } + + int length = index; + var lengthBuffer = ByteBuffer.allocate(4).putInt(length); + out.write(lengthBuffer.array()); + out.write(currentBytes, 0, length); + index = 0; + } + }; +// while (true) { +// var bytes = in.readNBytes(size); +// int length = bytes.length; +// var lengthBuffer = ByteBuffer.allocate(4).putInt(length); +// socket.getOutputStream().write(lengthBuffer.array()); +// socket.getOutputStream().write(bytes); +// +// if (length == 0) { +// return; +// } +// } + } + + public static InputStream readBlocks(Socket socket) throws IOException { + int size = 65536 - 4; + var in = socket.getInputStream(); + return new InputStream() { + + private byte[] currentBytes; + private int index; + private boolean finished; + + @Override + public int read() throws IOException { + if ((currentBytes == null || index == currentBytes.length) && !finished) { + readBlock(); + } + + if (currentBytes != null && index == currentBytes.length && finished) { + return -1; + } + + int out = currentBytes[index]; + index++; + return out; + } + + private void readBlock() throws IOException { + var length = in.readNBytes(4); + var lengthInt = ByteBuffer.wrap(length).getInt(); + + if (BeaconConfig.debugEnabled()) { + System.out.println("Receiving data block of length " + lengthInt); + } + + currentBytes = in.readNBytes(lengthInt); + index = 0; + if (lengthInt < size) { + finished = true; + } + } + }; + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconHandler.java b/beacon/src/main/java/io/xpipe/beacon/BeaconHandler.java index 83b41367..44317c08 100644 --- a/beacon/src/main/java/io/xpipe/beacon/BeaconHandler.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconHandler.java @@ -8,9 +8,7 @@ public interface BeaconHandler { void postResponse(BeaconClient.FailableRunnable r); - void prepareBody() throws IOException; + OutputStream sendBody() throws IOException; - InputStream startBodyRead() throws IOException; - - OutputStream getOutputStream() throws Exception; + InputStream receiveBody() throws IOException; } diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java b/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java index 79f24b56..00e69eaf 100644 --- a/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java @@ -2,7 +2,9 @@ package io.xpipe.beacon; import io.xpipe.beacon.exchange.StopExchange; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.net.DatagramSocket; import java.net.ServerSocket; import java.nio.file.Files; @@ -24,10 +26,26 @@ public class BeaconServer { return !isPortAvailable(port); } + private static void startFork(String custom) throws IOException { + boolean print = true; + var proc = Runtime.getRuntime().exec(custom); + new Thread(null, () -> { + try { + InputStreamReader isr = new InputStreamReader(proc.getInputStream()); + BufferedReader br = new BufferedReader(isr); + String line = null; + while ((line = br.readLine()) != null) + System.out.println("[xpiped] " + line); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + }, "daemon fork").start(); + } + public static boolean tryStart() throws Exception { var custom = BeaconConfig.getCustomExecCommand(); if (custom != null) { - Runtime.getRuntime().exec(custom); + startFork(custom); return true; } @@ -45,7 +63,8 @@ public class BeaconServer { } public static boolean tryStop(BeaconClient client) throws Exception { - StopExchange.Response res = client.simpleExchange(StopExchange.Request.builder().build()); + client.sendRequest(StopExchange.Request.builder().build()); + StopExchange.Response res =client.receiveResponse(); return res.isSuccess(); } diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/StoreResourceExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/StoreResourceExchange.java deleted file mode 100644 index 9641547d..00000000 --- a/beacon/src/main/java/io/xpipe/beacon/exchange/StoreResourceExchange.java +++ /dev/null @@ -1,47 +0,0 @@ -package io.xpipe.beacon.exchange; - -import io.xpipe.beacon.message.RequestMessage; -import io.xpipe.beacon.message.ResponseMessage; -import io.xpipe.core.source.DataSourceConfigOptions; -import io.xpipe.core.source.DataSourceId; -import io.xpipe.core.source.DataSourceInfo; -import lombok.Builder; -import lombok.Value; -import lombok.extern.jackson.Jacksonized; - -import java.net.URL; - -public class StoreResourceExchange implements MessageExchange { - - @Override - public String getId() { - return "storeResource"; - } - - @Override - public Class getRequestClass() { - return StoreResourceExchange.Request.class; - } - - @Override - public Class getResponseClass() { - return StoreResourceExchange.Response.class; - } - - @Jacksonized - @Builder - @Value - public static class Request implements RequestMessage { - URL url; - String providerId; - } - - @Jacksonized - @Builder - @Value - public static class Response implements ResponseMessage { - DataSourceId sourceId; - DataSourceConfigOptions config; - DataSourceInfo info; - } -} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/api/QueryTableDataExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/api/QueryTableDataExchange.java index 4c13f7bd..3734cef0 100644 --- a/beacon/src/main/java/io/xpipe/beacon/exchange/api/QueryTableDataExchange.java +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/api/QueryTableDataExchange.java @@ -33,8 +33,7 @@ public class QueryTableDataExchange implements MessageExchange convert(DataStructureNode node) { + if (matches(node)) { + return Optional.of(node); + } + + if (node.isValue()) { + return Optional.of(ArrayNode.of(node)); + } + + List nodes = new ArrayList<>(node.size()); + for (int i = 0; i < node.size(); i++) { + var converted = sharedType.convert(node.at(i)); + if (converted.isEmpty()) { + return Optional.empty(); + } + + nodes.add(converted.get()); + } + + return Optional.of(ArrayNode.of(nodes)); + } + @Override public boolean matches(DataStructureNode node) { if (!node.isArray()) { diff --git a/core/src/main/java/io/xpipe/core/data/type/DataType.java b/core/src/main/java/io/xpipe/core/data/type/DataType.java index fda2d19b..e51be181 100644 --- a/core/src/main/java/io/xpipe/core/data/type/DataType.java +++ b/core/src/main/java/io/xpipe/core/data/type/DataType.java @@ -3,6 +3,8 @@ package io.xpipe.core.data.type; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.xpipe.core.data.node.DataStructureNode; +import java.util.Optional; + /** * Represents the type of a {@link DataStructureNode} object. * To check whether a {@link DataStructureNode} instance conforms to the specified type, @@ -16,6 +18,11 @@ public abstract class DataType { */ public abstract String getName(); + /** + * Checks whether a node can be converted to this data type. + */ + public abstract Optional convert(DataStructureNode node); + /** * Checks whether a node conforms to this data type. */ diff --git a/core/src/main/java/io/xpipe/core/data/type/TupleType.java b/core/src/main/java/io/xpipe/core/data/type/TupleType.java index 555283fe..3dd9d00f 100644 --- a/core/src/main/java/io/xpipe/core/data/type/TupleType.java +++ b/core/src/main/java/io/xpipe/core/data/type/TupleType.java @@ -3,14 +3,13 @@ package io.xpipe.core.data.type; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonTypeName; import io.xpipe.core.data.node.DataStructureNode; +import io.xpipe.core.data.node.TupleNode; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Value; -import java.util.Collections; -import java.util.List; -import java.util.Objects; +import java.util.*; /** * A tuple type in the context of XPipe is defined as an ordered, @@ -26,6 +25,13 @@ public class TupleType extends DataType { List names; List types; + /** + * Creates a new tuple type that represents a table data type. + */ + public static TupleType tableType(List names) { + return TupleType.of(names, Collections.nCopies(names.size(), WildcardType.of())); + } + /** * Creates a new tuple type that contains no entries. */ @@ -59,6 +65,33 @@ public class TupleType extends DataType { return "tuple"; } + @Override + public Optional convert(DataStructureNode node) { + if (matches(node)) { + return Optional.of(node); + } + + if (node.isValue() && types.size() == 1) { + return types.get(0).convert(node); + } + + if (node.size() != types.size()) { + return Optional.empty(); + } + + List nodes = new ArrayList<>(node.size()); + for (int i = 0; i < node.size(); i++) { + var converted = types.get(i).convert(node.at(i)); + if (converted.isEmpty()) { + return Optional.empty(); + } + + nodes.add(converted.get()); + } + + return Optional.of(TupleNode.of(names, nodes)); + } + @Override public boolean matches(DataStructureNode node) { if (!node.isTuple()) { diff --git a/core/src/main/java/io/xpipe/core/data/type/ValueType.java b/core/src/main/java/io/xpipe/core/data/type/ValueType.java index 45c4dacb..ab2f9a28 100644 --- a/core/src/main/java/io/xpipe/core/data/type/ValueType.java +++ b/core/src/main/java/io/xpipe/core/data/type/ValueType.java @@ -7,6 +7,8 @@ import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Value; +import java.util.Optional; + /** * A value type represents any node that holds some atomic value, i.e. it has no subtypes. */ @@ -28,6 +30,20 @@ public class ValueType extends DataType { return "value"; } + @Override + public Optional convert(DataStructureNode node) { + if (matches(node)) { + return Optional.of(node); + } + + if (node.size() == 1) { + var n = node.at(0); + return convert(n); + } + + return Optional.empty(); + } + @Override public boolean matches(DataStructureNode node) { return node.isValue(); diff --git a/core/src/main/java/io/xpipe/core/data/type/WildcardType.java b/core/src/main/java/io/xpipe/core/data/type/WildcardType.java index 1cf8a141..127e096a 100644 --- a/core/src/main/java/io/xpipe/core/data/type/WildcardType.java +++ b/core/src/main/java/io/xpipe/core/data/type/WildcardType.java @@ -5,6 +5,8 @@ import io.xpipe.core.data.node.DataStructureNode; import lombok.EqualsAndHashCode; import lombok.Value; +import java.util.Optional; + /** * A wildcard type matches any {@link DataStructureNode} instance. * For simplicity reasons it is not possible to further specify a wildcard instance to only match a certain @@ -29,6 +31,11 @@ public class WildcardType extends DataType { return "wildcard"; } + @Override + public Optional convert(DataStructureNode node) { + return Optional.of(node); + } + @Override public boolean matches(DataStructureNode node) { return true; diff --git a/core/src/main/java/io/xpipe/core/data/typed/TypedDataStreamWriter.java b/core/src/main/java/io/xpipe/core/data/typed/TypedDataStreamWriter.java index 0a143375..c749e624 100644 --- a/core/src/main/java/io/xpipe/core/data/typed/TypedDataStreamWriter.java +++ b/core/src/main/java/io/xpipe/core/data/typed/TypedDataStreamWriter.java @@ -1,11 +1,7 @@ package io.xpipe.core.data.typed; -import io.xpipe.core.data.node.DataStructureNode; -import io.xpipe.core.data.node.DataStructureNodeIO; import io.xpipe.core.data.generic.GenericDataStreamWriter; -import io.xpipe.core.data.node.ArrayNode; -import io.xpipe.core.data.node.SimpleTupleNode; -import io.xpipe.core.data.node.ValueNode; +import io.xpipe.core.data.node.*; import io.xpipe.core.data.type.ArrayType; import io.xpipe.core.data.type.DataType; import io.xpipe.core.data.type.TupleType; @@ -22,7 +18,7 @@ public class TypedDataStreamWriter { private static void write(OutputStream out, DataStructureNode node, DataType type) throws IOException { if (type.isTuple() && node.isTuple()) { - writeTuple(out, (SimpleTupleNode) node, (TupleType) type); + writeTuple(out, (TupleNode) node, (TupleType) type); } else if (node.isArray() && type.isArray()) { writeArray(out, (ArrayNode) node, (ArrayType) type); } else if (node.isValue() && type.isValue()) { @@ -40,7 +36,7 @@ public class TypedDataStreamWriter { out.write(n.getRawData()); } - private static void writeTuple(OutputStream out, SimpleTupleNode tuple, TupleType type) throws IOException { + private static void writeTuple(OutputStream out, TupleNode tuple, TupleType type) throws IOException { if (tuple.size() != type.getSize()) { throw new IllegalArgumentException("Tuple size mismatch"); } diff --git a/core/src/main/java/io/xpipe/core/data/typed/TypedDataStructureNodeReader.java b/core/src/main/java/io/xpipe/core/data/typed/TypedDataStructureNodeReader.java index 2c8abd4d..0df133ce 100644 --- a/core/src/main/java/io/xpipe/core/data/typed/TypedDataStructureNodeReader.java +++ b/core/src/main/java/io/xpipe/core/data/typed/TypedDataStructureNodeReader.java @@ -67,6 +67,9 @@ public class TypedDataStructureNodeReader implements TypedAbstractReader { if (nodes.size() != 0 || children.size() != 0 || readNode == null) { throw new IllegalStateException("Reader is not finished yet"); } + + expectedType = flattened.get(0); + currentExpectedTypeIndex = 0; } private void finishNode(DataStructureNode node) { diff --git a/core/src/main/java/io/xpipe/core/util/CoreJacksonModule.java b/core/src/main/java/io/xpipe/core/util/CoreJacksonModule.java index 35c53071..4477f57b 100644 --- a/core/src/main/java/io/xpipe/core/util/CoreJacksonModule.java +++ b/core/src/main/java/io/xpipe/core/util/CoreJacksonModule.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import io.xpipe.core.data.type.ArrayType; import io.xpipe.core.data.type.TupleType; import io.xpipe.core.data.type.ValueType; +import io.xpipe.core.data.type.WildcardType; import io.xpipe.core.source.DataSourceInfo; import io.xpipe.core.source.DataSourceReference; import io.xpipe.core.store.LocalFileDataStore; @@ -29,6 +30,7 @@ public class CoreJacksonModule extends SimpleModule { new NamedType(ValueType.class), new NamedType(TupleType.class), new NamedType(ArrayType.class), + new NamedType(WildcardType.class), new NamedType(DataSourceInfo.Table.class) ); diff --git a/core/src/main/java/io/xpipe/core/util/JacksonHelper.java b/core/src/main/java/io/xpipe/core/util/JacksonHelper.java index 4be99919..7eb40c47 100644 --- a/core/src/main/java/io/xpipe/core/util/JacksonHelper.java +++ b/core/src/main/java/io/xpipe/core/util/JacksonHelper.java @@ -21,6 +21,7 @@ public class JacksonHelper { public static synchronized void initModularized(ModuleLayer layer) { ObjectMapper objectMapper = INSTANCE; objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); objectMapper.registerModules(findModules(layer)); objectMapper.setVisibility(objectMapper.getSerializationConfig().getDefaultVisibilityChecker() diff --git a/extension/src/main/java/io/xpipe/extension/DataSourceProvider.java b/extension/src/main/java/io/xpipe/extension/DataSourceProvider.java index 16efffcc..545e32bc 100644 --- a/extension/src/main/java/io/xpipe/extension/DataSourceProvider.java +++ b/extension/src/main/java/io/xpipe/extension/DataSourceProvider.java @@ -33,7 +33,7 @@ public interface DataSourceProvider { Supplier getDescription(DataSourceDescriptor source); } - interface CliProvider { + interface ConfigProvider { static String booleanName(String name) { return name + " (y/n)"; @@ -82,7 +82,7 @@ public interface DataSourceProvider { GuiProvider getGuiProvider(); - CliProvider getCliProvider(); + ConfigProvider getConfigProvider(); String getId(); diff --git a/extension/src/main/java/io/xpipe/extension/DataSourceProviders.java b/extension/src/main/java/io/xpipe/extension/DataSourceProviders.java index fbc3cd82..001a57a8 100644 --- a/extension/src/main/java/io/xpipe/extension/DataSourceProviders.java +++ b/extension/src/main/java/io/xpipe/extension/DataSourceProviders.java @@ -55,7 +55,7 @@ public class DataSourceProviders { throw new IllegalStateException("Not initialized"); } - return ALL.stream().filter(d -> d.getCliProvider() != null && d.getCliProvider().getPossibleNames().stream() + return ALL.stream().filter(d -> d.getConfigProvider().getPossibleNames().stream() .anyMatch(s -> s.equalsIgnoreCase(name))).findAny(); }