Implement various fixes for sink drains

This commit is contained in:
Christopher Schnick 2022-12-22 03:57:15 +01:00
parent aa32c52284
commit 71b5d2d716
16 changed files with 264 additions and 55 deletions

View file

@ -1,7 +1,7 @@
package io.xpipe.api;
import io.xpipe.api.connector.XPipeApiConnection;
import io.xpipe.api.util.QuietDialogHandler;
import io.xpipe.beacon.util.QuietDialogHandler;
import io.xpipe.beacon.exchange.cli.StoreAddExchange;
import io.xpipe.core.store.DataStore;

View file

@ -0,0 +1,32 @@
package io.xpipe.beacon.exchange;
import io.xpipe.beacon.RequestMessage;
import io.xpipe.beacon.ResponseMessage;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
/**
* Stores a stream of data in a storage.
*/
public class ReadStreamExchange implements MessageExchange {
@Override
public String getId() {
return "readStream";
}
@Jacksonized
@Builder
@Value
public static class Request implements RequestMessage {
@NonNull String name;
}
@Jacksonized
@Builder
@Value
public static class Response implements ResponseMessage {
}
}

View file

@ -0,0 +1,32 @@
package io.xpipe.beacon.exchange;
import io.xpipe.beacon.RequestMessage;
import io.xpipe.beacon.ResponseMessage;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
/**
* Stores a stream of data in a storage.
*/
public class WriteStreamExchange implements MessageExchange {
@Override
public String getId() {
return "writeStream";
}
@Jacksonized
@Builder
@Value
public static class Request implements RequestMessage {
@NonNull String name;
}
@Jacksonized
@Builder
@Value
public static class Response implements ResponseMessage {
}
}

View file

@ -1,4 +1,4 @@
package io.xpipe.api.util;
package io.xpipe.beacon.util;
import io.xpipe.beacon.BeaconConnection;
import io.xpipe.beacon.ClientException;
@ -13,6 +13,10 @@ import java.util.UUID;
public class QuietDialogHandler {
public static void handle(DialogReference ref, BeaconConnection connection) throws ClientException {
new QuietDialogHandler(ref, connection, Map.of()).handle();
}
private final UUID dialogKey;
private final BeaconConnection connection;
private final Map<String, String> overrides;

View file

@ -1,12 +1,12 @@
import com.fasterxml.jackson.databind.Module;
import io.xpipe.beacon.BeaconJacksonModule;
import io.xpipe.beacon.BeaconProxyImpl;
import io.xpipe.core.util.ProxyFunction;
import io.xpipe.beacon.exchange.*;
import io.xpipe.beacon.exchange.api.QueryRawDataExchange;
import io.xpipe.beacon.exchange.api.QueryTableDataExchange;
import io.xpipe.beacon.exchange.api.QueryTextDataExchange;
import io.xpipe.beacon.exchange.cli.*;
import io.xpipe.core.util.ProxyFunction;
import io.xpipe.core.util.ProxyProvider;
module io.xpipe.beacon {
@ -21,6 +21,8 @@ module io.xpipe.beacon {
opens io.xpipe.beacon.exchange.api;
opens io.xpipe.beacon.exchange.data;
opens io.xpipe.beacon.exchange.cli;
exports io.xpipe.beacon.util;
opens io.xpipe.beacon.util;
requires static com.fasterxml.jackson.core;
requires static com.fasterxml.jackson.databind;
@ -37,6 +39,8 @@ module io.xpipe.beacon {
InstanceExchange,
EditStoreExchange,
AddSourceExchange,
WriteStreamExchange,
ReadStreamExchange,
StoreProviderListExchange,
ListCollectionsExchange,
ListEntriesExchange,

View file

@ -25,7 +25,7 @@ public enum NewLine {
.orElseThrow();
}
public static NewLine id(String id) {
public static NewLine byId(String id) {
return Arrays.stream(values())
.filter(n -> n.getId().equalsIgnoreCase(id))
.findFirst()

View file

@ -14,7 +14,7 @@ public abstract class QueryConverter<T> {
public static final QueryConverter<NewLine> NEW_LINE = new QueryConverter<NewLine>() {
@Override
protected NewLine fromString(String s) {
return NewLine.id(s);
return NewLine.byId(s);
}
@Override

View file

@ -1,33 +1,25 @@
package io.xpipe.core.impl;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.charsetter.NewLine;
import io.xpipe.core.charsetter.StreamCharset;
import io.xpipe.core.store.DataFlow;
import io.xpipe.core.store.KnownFormatStreamDataStore;
import io.xpipe.core.store.StatefulDataStore;
import io.xpipe.core.util.JacksonizedValue;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.*;
import java.nio.channels.Channels;
import java.nio.channels.Pipe;
/*
TODO: Properly enter closed State
*/
@JsonTypeName("drain")
@SuperBuilder
@Jacksonized
@Getter
public class SinkDrainStore extends JacksonizedValue implements KnownFormatStreamDataStore {
public class SinkDrainStore extends JacksonizedValue implements KnownFormatStreamDataStore, StatefulDataStore {
public static enum State {
NONE_CONNECTED,
@ -37,29 +29,38 @@ public class SinkDrainStore extends JacksonizedValue implements KnownFormatStrea
CLOSED
}
private final String description;
private final StreamCharset charset;
private final NewLine newLine;
@JsonIgnore
@Setter
@Builder.Default
private State state = State.NONE_CONNECTED;
public State getState() {
return getState("state", State.class, State.NONE_CONNECTED);
}
@JsonIgnore
private Pipe pipe;
private void setState(State n) {
setState("state", n);
}
public Pipe getOrOpenPipe() {
return getOrComputeState("pipe", Pipe.class, () -> {
try {
return Pipe.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
@Override
public DataFlow getFlow() {
if (state == State.NONE_CONNECTED) {
if (getState() == State.NONE_CONNECTED) {
return DataFlow.INPUT_OR_OUTPUT;
}
if (state == State.PRODUCER_CONNECTED) {
if (getState() == State.PRODUCER_CONNECTED) {
return DataFlow.INPUT;
}
if (state == State.CONSUMER_CONNECTED) {
if (getState() == State.CONSUMER_CONNECTED) {
return DataFlow.OUTPUT;
}
@ -67,7 +68,7 @@ public class SinkDrainStore extends JacksonizedValue implements KnownFormatStrea
}
private void waitForOpen() {
while (state != State.OPEN) {
while (getState() != State.OPEN) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
@ -76,29 +77,44 @@ public class SinkDrainStore extends JacksonizedValue implements KnownFormatStrea
}
}
@Override
public boolean shouldPersist() {
return getState() != State.CLOSED;
}
@Override
public boolean shouldSave() {
return false;
}
@Override
public boolean canOpen() throws Exception {
return state == State.PRODUCER_CONNECTED;
return getState() == State.PRODUCER_CONNECTED;
}
@Override
public InputStream openInput() throws Exception {
checkState(false);
if (state == State.PRODUCER_CONNECTED) {
state = State.OPEN;
if (getState() == State.PRODUCER_CONNECTED) {
setState(State.OPEN);
}
if (state == State.NONE_CONNECTED) {
state = State.CONSUMER_CONNECTED;
waitForOpen();
if (getState() == State.NONE_CONNECTED) {
setState(State.CONSUMER_CONNECTED);
//waitForOpen();
}
try {
openPipe();
return Channels.newInputStream(pipe.source());
return new FilterInputStream(Channels.newInputStream(getOrOpenPipe().source())) {
@Override
public void close() throws IOException {
super.close();
setState(State.CLOSED);
}
};
} catch (Exception ex) {
state = State.CLOSED;
setState(State.CLOSED);
throw ex;
}
}
@ -107,44 +123,43 @@ public class SinkDrainStore extends JacksonizedValue implements KnownFormatStrea
public OutputStream openOutput() throws Exception {
checkState(true);
if (state == State.CONSUMER_CONNECTED) {
state = State.OPEN;
if (getState() == State.CONSUMER_CONNECTED) {
setState(State.OPEN);
}
if (state == State.NONE_CONNECTED) {
state = State.PRODUCER_CONNECTED;
waitForOpen();
if (getState() == State.NONE_CONNECTED) {
setState(State.PRODUCER_CONNECTED);
//waitForOpen();
}
try {
openPipe();
return Channels.newOutputStream(pipe.sink());
return new FilterOutputStream(Channels.newOutputStream(getOrOpenPipe().sink())) {
@Override
public void close() throws IOException {
super.close();
setState(State.CLOSED);
}
};
} catch (Exception ex) {
state = State.CLOSED;
setState(State.CLOSED);
throw ex;
}
}
private void openPipe() throws IOException {
if (pipe == null) {
pipe = Pipe.open();
}
}
private void checkState(boolean isProducer) {
if (state == State.CLOSED) {
if (getState() == State.CLOSED) {
throw new IllegalStateException("Drain has already been closed");
}
if (state == State.OPEN) {
if (getState() == State.OPEN) {
throw new IllegalStateException("Drain is already open");
}
if (state == State.PRODUCER_CONNECTED && isProducer) {
if (getState() == State.PRODUCER_CONNECTED && isProducer) {
throw new IllegalStateException("Producer is already connected");
}
if (state == State.CONSUMER_CONNECTED && !isProducer) {
if (getState() == State.CONSUMER_CONNECTED && !isProducer) {
throw new IllegalStateException("Consumer is already connected");
}
}

View file

@ -18,6 +18,14 @@ import java.util.Optional;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface DataStore {
default boolean shouldPersist() {
return true;
}
default boolean shouldSave() {
return true;
}
default boolean isComplete() {
try {
checkComplete();

View file

@ -0,0 +1,19 @@
package io.xpipe.core.store;
import io.xpipe.core.util.DataStateProvider;
import java.util.function.Supplier;
public interface StatefulDataStore extends DataStore {
default <T> T getState(String key, Class<T> c, T def) {
return DataStateProvider.get().getState(this, key, c, () -> def);
}
default <T> T getOrComputeState(String key, Class<T> c, Supplier<T> def) {
return DataStateProvider.get().getState(this, key, c, def);
}
default void setState(String key, Object val) {
DataStateProvider.get().putState(this, key, val);
}
}

View file

@ -12,6 +12,8 @@ import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.xpipe.core.charsetter.NewLine;
import io.xpipe.core.charsetter.StreamCharset;
import io.xpipe.core.data.type.ArrayType;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.data.type.ValueType;
@ -55,6 +57,12 @@ public class CoreJacksonModule extends SimpleModule {
addSerializer(Charset.class, new CharsetSerializer());
addDeserializer(Charset.class, new CharsetDeserializer());
addSerializer(StreamCharset.class, new StreamCharsetSerializer());
addDeserializer(StreamCharset.class, new StreamCharsetDeserializer());
addSerializer(NewLine.class, new NewLineSerializer());
addDeserializer(NewLine.class, new NewLineDeserializer());
addSerializer(Path.class, new LocalPathSerializer());
addDeserializer(Path.class, new LocalPathDeserializer());
@ -122,6 +130,38 @@ public class CoreJacksonModule extends SimpleModule {
}
}
public static class NewLineSerializer extends JsonSerializer<NewLine> {
@Override
public void serialize(NewLine value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
jgen.writeString(value.getId());
}
}
public static class NewLineDeserializer extends JsonDeserializer<NewLine> {
@Override
public NewLine deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
return NewLine.byId(p.getValueAsString());
}
}
public static class StreamCharsetSerializer extends JsonSerializer<StreamCharset> {
@Override
public void serialize(StreamCharset value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
jgen.writeString(value.toString());
}
}
public static class StreamCharsetDeserializer extends JsonDeserializer<StreamCharset> {
@Override
public StreamCharset deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
return StreamCharset.get(p.getValueAsString());
}
}
public static class LocalPathSerializer extends JsonSerializer<Path> {
@Override

View file

@ -0,0 +1,25 @@
package io.xpipe.core.util;
import io.xpipe.core.store.DataStore;
import java.util.ServiceLoader;
import java.util.function.Supplier;
public abstract class DataStateProvider {
private static DataStateProvider INSTANCE;
public static DataStateProvider get() {
if (INSTANCE == null) {
INSTANCE = ServiceLoader.load(ModuleLayer.boot(), DataStateProvider.class)
.findFirst()
.orElseThrow();
}
return INSTANCE;
}
public abstract void putState(DataStore store, String key, Object value);
public abstract <T> T getState(DataStore store, String key, Class<T> c, Supplier<T> def);
}

View file

@ -27,6 +27,7 @@ open module io.xpipe.core {
uses LocalProcessControlProvider;
uses io.xpipe.core.util.ProxyProvider;
uses io.xpipe.core.util.ProxyManagerProvider;
uses io.xpipe.core.util.DataStateProvider;
provides WriteMode with WriteMode.Replace, WriteMode.Append, WriteMode.Prepend;
provides com.fasterxml.jackson.databind.Module with

View file

@ -47,7 +47,12 @@ public class DataStoreFormatter {
public static String toName(DataStore input) {
return toName(input, Integer.MAX_VALUE);
}
public static String toName(DataStore input, int length) {
if (input == null) {
return "?";
}
var named = XPipeDaemon.getInstance().getStoreName(input);
if (named.isPresent()) {
return cut(named.get(), length);

View file

@ -2,6 +2,7 @@ package io.xpipe.extension.util;
import io.xpipe.core.impl.FileNames;
import io.xpipe.core.process.ShellProcessControl;
import io.xpipe.core.store.ShellStore;
import io.xpipe.core.util.XPipeTempDirectory;
import lombok.SneakyThrows;
@ -10,9 +11,15 @@ import java.util.Objects;
public class ExecScriptHelper {
public static int getConnectionHash(String command) {
return Objects.hash(command);
return Math.abs(Objects.hash(command));
}
@SneakyThrows
public static String createLocalExecScript(String content) {
try (var l = ShellStore.local().create().start()) {
return createExecScript(l, content);
}
}
@SneakyThrows
public static String createExecScript(ShellProcessControl processControl, String content) {
var fileName = "exec-" + getConnectionHash(content);
@ -35,6 +42,9 @@ public class ExecScriptHelper {
c.getStdin().write(content.getBytes(processControl.getCharset()));
c.closeStdin();
}
processControl.restart();
return file;
}
}

View file

@ -1,5 +1,6 @@
package io.xpipe.extension.util;
import io.xpipe.extension.event.ErrorEvent;
import org.apache.commons.lang3.function.FailableRunnable;
public class ThreadHelper {
@ -18,6 +19,19 @@ public class ThreadHelper {
return t;
}
public static Thread runFailableAsync(FailableRunnable<Throwable> r) {
var t = new Thread(() -> {
try {
r.run();
} catch (Throwable e) {
ErrorEvent.fromThrowable(e).handle();
}
});
t.setDaemon(true);
t.start();
return t;
}
public static Thread create(String name, boolean daemon, Runnable r) {
var t = new Thread(r);
t.setDaemon(daemon);