diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java b/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java index b7df5c68..ebac921c 100644 --- a/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java @@ -23,7 +23,7 @@ import java.util.Optional; import static io.xpipe.beacon.BeaconConfig.BODY_SEPARATOR; -public class BeaconClient { +public class BeaconClient implements AutoCloseable { @FunctionalInterface public interface FailableBiConsumer { @@ -76,7 +76,7 @@ public class BeaconClient { public void exchange( REQ req, FailableConsumer reqWriter, - FailableBiPredicate resReader) + FailableBiConsumer resReader) throws ConnectorException, ClientException, ServerException { try { sendRequest(req); @@ -91,23 +91,16 @@ public class BeaconClient { throw new ConnectorException("Invalid body separator"); } - if (resReader.test(res, in)) { - close(); - } + resReader.accept(res, in); } catch (IOException ex) { - close(); throw new ConnectorException("Couldn't communicate with socket", ex); } } public RES simpleExchange(REQ req) - throws ServerException, ConnectorException, ClientException { - try { - sendRequest(req); - return this.receiveResponse(); - } finally { - close(); - } + throws ServerException, ConnectorException, ClientException { + sendRequest(req); + return this.receiveResponse(); } private void sendRequest(T req) throws ClientException, ConnectorException { diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconConnector.java b/beacon/src/main/java/io/xpipe/beacon/BeaconConnector.java index 4685e24c..5e6e96ab 100644 --- a/beacon/src/main/java/io/xpipe/beacon/BeaconConnector.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconConnector.java @@ -16,7 +16,7 @@ public abstract class BeaconConnector { protected void performInputExchange( BeaconClient socket, REQ req, - BeaconClient.FailableBiPredicate responseConsumer) throws ServerException, ConnectorException, ClientException { + BeaconClient.FailableBiConsumer responseConsumer) throws ServerException, ConnectorException, ClientException { performInputOutputExchange(socket, req, null, responseConsumer); } @@ -24,7 +24,7 @@ public abstract class BeaconConnector { BeaconClient socket, REQ req, BeaconClient.FailableConsumer reqWriter, - BeaconClient.FailableBiPredicate responseConsumer) + BeaconClient.FailableBiConsumer responseConsumer) throws ServerException, ConnectorException, ClientException { socket.exchange(req, reqWriter, responseConsumer); } @@ -37,7 +37,6 @@ public abstract class BeaconConnector { AtomicReference response = new AtomicReference<>(); socket.exchange(req, reqWriter, (RES res, InputStream in) -> { response.set(res); - return true; }); return response.get(); } diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/DialogExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/DialogExchange.java new file mode 100644 index 00000000..5b46bf2f --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/DialogExchange.java @@ -0,0 +1,43 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.core.source.DataSourceConfigInstance; +import lombok.Builder; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class DialogExchange implements MessageExchange { + + @Override + public String getId() { + return "dialog"; + } + + @Override + public Class getRequestClass() { + return DialogExchange.Request.class; + } + + @Override + public Class getResponseClass() { + return DialogExchange.Response.class; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + DataSourceConfigInstance instance; + String key; + String value; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + DataSourceConfigInstance instance; + String errorMsg; + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/InfoExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/InfoExchange.java new file mode 100644 index 00000000..6a6a3509 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/InfoExchange.java @@ -0,0 +1,45 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.core.source.DataSourceConfigInstance; +import io.xpipe.core.source.DataSourceId; +import io.xpipe.core.source.DataSourceInfo; +import io.xpipe.core.store.DataStore; +import lombok.Builder; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class InfoExchange implements MessageExchange { + + @Override + public String getId() { + return "info"; + } + + @Override + public Class getRequestClass() { + return InfoExchange.Request.class; + } + + @Override + public Class getResponseClass() { + return InfoExchange.Response.class; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + DataSourceId id; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + DataSourceInfo info; + DataStore store; + DataSourceConfigInstance config; + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/ReadExecuteExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/ReadExecuteExchange.java new file mode 100644 index 00000000..4acaf24b --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/ReadExecuteExchange.java @@ -0,0 +1,47 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.core.source.DataSourceConfigInstance; +import io.xpipe.core.source.DataSourceId; +import io.xpipe.core.store.DataStore; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class ReadExecuteExchange implements MessageExchange { + + @Override + public String getId() { + return "readExecute"; + } + + @Override + public Class getRequestClass() { + return ReadExecuteExchange.Request.class; + } + + @Override + public Class getResponseClass() { + return ReadExecuteExchange.Response.class; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + @NonNull + DataStore dataStore; + @NonNull + DataSourceConfigInstance config; + @NonNull + DataSourceId targetId; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/ReadPreparationExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/ReadPreparationExchange.java new file mode 100644 index 00000000..4390125e --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/ReadPreparationExchange.java @@ -0,0 +1,43 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.core.source.DataSourceConfigInstance; +import io.xpipe.core.store.DataStore; +import lombok.Builder; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class ReadPreparationExchange implements MessageExchange { + + @Override + public String getId() { + return "readPreparation"; + } + + @Override + public Class getRequestClass() { + return ReadPreparationExchange.Request.class; + } + + @Override + public Class getResponseClass() { + return ReadPreparationExchange.Response.class; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + String providerType; + String dataStore; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + DataSourceConfigInstance config; + DataStore dataStore; + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/SelectExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/SelectExchange.java new file mode 100644 index 00000000..3c749582 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/SelectExchange.java @@ -0,0 +1,39 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.core.source.DataSourceId; +import lombok.Builder; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class SelectExchange implements MessageExchange { + + @Override + public String getId() { + return "select"; + } + + @Override + public Class getRequestClass() { + return SelectExchange.Request.class; + } + + @Override + public Class getResponseClass() { + return SelectExchange.Response.class; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + DataSourceId id; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/WriteExecuteExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/WriteExecuteExchange.java new file mode 100644 index 00000000..5c7cbde2 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/WriteExecuteExchange.java @@ -0,0 +1,47 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.core.source.DataSourceConfigInstance; +import io.xpipe.core.source.DataSourceId; +import io.xpipe.core.store.DataStore; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class WriteExecuteExchange implements MessageExchange { + + @Override + public String getId() { + return "writeExecute"; + } + + @Override + public Class getRequestClass() { + return WriteExecuteExchange.Request.class; + } + + @Override + public Class getResponseClass() { + return WriteExecuteExchange.Response.class; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + @NonNull + DataSourceId sourceId; + @NonNull + DataStore dataStore; + @NonNull + DataSourceConfigInstance config; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/WritePreparationExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/WritePreparationExchange.java new file mode 100644 index 00000000..a40dd4cf --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/WritePreparationExchange.java @@ -0,0 +1,50 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.core.source.DataSourceConfigInstance; +import io.xpipe.core.source.DataSourceId; +import io.xpipe.core.store.DataStore; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class WritePreparationExchange implements MessageExchange { + + @Override + public String getId() { + return "writePreparation"; + } + + @Override + public Class getRequestClass() { + return WritePreparationExchange.Request.class; + } + + @Override + public Class getResponseClass() { + return WritePreparationExchange.Response.class; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + String providerType; + String output; + @NonNull + DataSourceId sourceId; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + @NonNull + DataStore dataStore; + + @NonNull + DataSourceConfigInstance config; + } +} diff --git a/beacon/src/main/java/module-info.java b/beacon/src/main/java/module-info.java index d2887a0f..4cb627a0 100644 --- a/beacon/src/main/java/module-info.java +++ b/beacon/src/main/java/module-info.java @@ -25,5 +25,12 @@ module io.xpipe.beacon { StatusExchange, StopExchange, StoreResourceExchange, + WritePreparationExchange, + WriteExecuteExchange, + SelectExchange, + ReadPreparationExchange, + ReadExecuteExchange, + DialogExchange, + InfoExchange, VersionExchange; } \ No newline at end of file diff --git a/core/src/main/java/io/xpipe/core/data/type/TupleType.java b/core/src/main/java/io/xpipe/core/data/type/TupleType.java index a7192c6d..555283fe 100644 --- a/core/src/main/java/io/xpipe/core/data/type/TupleType.java +++ b/core/src/main/java/io/xpipe/core/data/type/TupleType.java @@ -50,6 +50,10 @@ public class TupleType extends DataType { return new TupleType(Collections.nCopies(types.size(), null), types); } + public boolean hasAllNames() { + return names.stream().allMatch(Objects::nonNull); + } + @Override public String getName() { return "tuple"; diff --git a/core/src/main/java/io/xpipe/core/source/DataSourceConfig.java b/core/src/main/java/io/xpipe/core/source/DataSourceConfig.java index 550237b8..f4b26fd4 100644 --- a/core/src/main/java/io/xpipe/core/source/DataSourceConfig.java +++ b/core/src/main/java/io/xpipe/core/source/DataSourceConfig.java @@ -1,49 +1,29 @@ package io.xpipe.core.source; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Singular; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + import java.util.List; +@Value +@Builder +@Jacksonized public class DataSourceConfig { + String description; - private String description; - private List> options; + @Singular + List