OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 180 | Go to most recent revision | Show entire file | Regard whitespace | Details | Blame | Last modification | View Log | RSS feed

Rev 180 Rev 182
Line 19... Line 19...
19
import java.io.InputStreamReader;
19
import java.io.InputStreamReader;
20
import java.io.OutputStream;
20
import java.io.OutputStream;
21
import java.io.PrintStream;
21
import java.io.PrintStream;
22
import java.lang.ProcessBuilder.Redirect;
22
import java.lang.ProcessBuilder.Redirect;
23
import java.util.concurrent.Callable;
23
import java.util.concurrent.Callable;
-
 
24
import java.util.concurrent.CancellationException;
24
import java.util.concurrent.CountDownLatch;
25
import java.util.concurrent.CompletableFuture;
-
 
26
import java.util.concurrent.ExecutionException;
25
import java.util.concurrent.ExecutorService;
27
import java.util.concurrent.ExecutorService;
26
import java.util.concurrent.Executors;
28
import java.util.concurrent.Executors;
27
import java.util.concurrent.Future;
29
import java.util.concurrent.Future;
-
 
30
import java.util.concurrent.TimeUnit;
-
 
31
import java.util.concurrent.TimeoutException;
28
import java.util.function.Supplier;
32
import java.util.function.Supplier;
29
 
33
 
30
/**
34
/**
31
 * Redirect streams of a process to System.out and System.err.
35
 * Redirect streams of a process to System.out and System.err.
32
 *
36
 * 
33
 * @author Sylvain
37
 * @author Sylvain
34
 */
38
 */
35
public class ProcessStreams {
39
public class ProcessStreams implements AutoCloseable {
36
 
40
 
37
    static public enum Action {
-
 
38
        /**
-
 
39
         * Redirect process streams to ours.
-
 
40
         *
-
 
41
         * @deprecated use {@link ProcessStreams#redirect(ProcessBuilder)} (or
-
 
42
         *             {@link Redirect#INHERIT} directly) as it makes sure that
-
 
43
         *             {@link Process#waitFor()} only returns once all streams are flushed.
41
    // Don't set too low, this is a maximum, it would only fail on slow computers.
44
         */
-
 
45
        REDIRECT,
-
 
46
        /**
-
 
47
         * Consume streams.
-
 
48
         */
-
 
49
        CONSUME,
-
 
50
        /**
-
 
51
         * Close process streams. NOTE : some programs might fail (e.g. route on FreeBSD), in those
-
 
52
         * cases use {@link #CONSUME}.
42
    private static final int MAX_SHUTTING_DOWN_DELAY = 500;
53
         */
-
 
54
        CLOSE,
-
 
55
        /**
-
 
56
         * Do nothing, which is dangerous as the process will hang until its output is read.
-
 
57
         */
-
 
58
        DO_NOTHING
-
 
59
    }
-
 
60
 
43
 
61
    // Added to Java 9
44
    // Added to Java 9
62
    public static final Redirect DISCARD = Redirect.to(StreamUtils.NULL_FILE);
45
    public static final Redirect DISCARD = Redirect.to(StreamUtils.NULL_FILE);
63
 
46
 
64
    static public final ProcessBuilder redirect(final ProcessBuilder pb) {
47
    static public final ProcessBuilder redirect(final ProcessBuilder pb) {
-
 
48
        // ATTN don't use redirectErrorStream(true) as this would merge the error and output of pb
-
 
49
        // into the VM output.
65
        return pb.redirectErrorStream(true).redirectOutput(Redirect.INHERIT);
50
        return pb.redirectError(Redirect.INHERIT).redirectOutput(Redirect.INHERIT);
66
    }
51
    }
67
 
52
 
-
 
53
    /**
68
    static public final Process handle(final Process p, final Action action) throws IOException {
54
     * Consume all streams. Needed since some programs might fail (e.g. route on FreeBSD) if the
69
        if (action == Action.CLOSE) {
55
     * streams are just closed.
-
 
56
     * 
70
            p.getInputStream().close();
57
     * @param proc the process.
71
            p.getErrorStream().close();
58
     * @return the passed process.
-
 
59
     */
72
        } else if (action == Action.REDIRECT) {
60
    static public final Process consume(final Process proc) {
73
            new ProcessStreams(p, System.out, System.err);
61
        try (final ProcessStreams streams = new ProcessStreams(proc)) {
-
 
62
            streams.start(StreamUtils.NULL_OS, StreamUtils.NULL_OS);
-
 
63
            streams.awaitTermination();
74
        } else if (action == Action.CONSUME) {
64
        } catch (Exception e) {
75
            new ProcessStreams(p, StreamUtils.NULL_OS, StreamUtils.NULL_OS);
65
            throw new IllegalStateException("Couldn't consume output of " + proc, e);
76
        }
66
        }
77
        return p;
67
        return proc;
78
    }
68
    }
79
 
69
 
80
    private final ExecutorService exec = Executors.newFixedThreadPool(2);
70
    private final ExecutorService exec = Executors.newFixedThreadPool(2);
81
    private final CountDownLatch latch;
71
    private final Process process;
82
    private final Future<?> out;
72
    private Future<?> out;
83
    private final Future<?> err;
73
    private Future<?> err;
84
 
74
 
85
    /**
75
    /**
86
     * Create a new instance and start reading from the passed process. If a passed
76
     * Create a new instance and start reading from the passed process. If a passed
87
     * {@link OutputStream} is <code>null</code>, then the corresponding {@link InputStream} is not
77
     * {@link OutputStream} is <code>null</code>, then the corresponding {@link InputStream} is not
88
     * used at all, so the caller should handle it. If the output must be discarded, use
78
     * used at all, so the caller should handle it. If the output must be discarded, use
Line 90... Line 80...
90
     *
80
     * 
91
     * @param p the process to read from.
81
     * @param p the process to read from.
92
     * @param out where to write the {@link Process#getInputStream() standard output}.
82
     * @param out where to write the {@link Process#getInputStream() standard output}.
93
     * @param err where to write the {@link Process#getErrorStream() standard error}.
83
     * @param err where to write the {@link Process#getErrorStream() standard error}.
94
     */
84
     */
-
 
85
    public ProcessStreams(final Process p) {
-
 
86
        this.process = p;
-
 
87
    }
-
 
88
 
95
    public ProcessStreams(final Process p, final OutputStream out, final OutputStream err) {
89
    public final ProcessStreams start(final OutputStream out, final OutputStream err) {
96
        this.latch = new CountDownLatch(2);
90
        if (this.out != null)
-
 
91
            throw new IllegalStateException("Already started");
97
        this.out = writeToAsync(p::getInputStream, out);
92
        this.out = writeToAsync(this.process::getInputStream, out);
98
        this.err = writeToAsync(p::getErrorStream, err);
93
        this.err = writeToAsync(this.process::getErrorStream, err);
-
 
94
        assert this.out != null && this.err != null;
99
        this.exec.submit(new Runnable() {
95
        this.exec.submit(new Runnable() {
100
            @Override
96
            @Override
101
            public void run() {
97
            public void run() {
102
                try {
98
                try {
-
 
99
                    // OK even if the future is cancelled before having started
-
 
100
                    try {
103
                    ProcessStreams.this.latch.await();
101
                        ProcessStreams.this.out.get();
104
                } catch (final InterruptedException e) {
102
                    } catch (Exception e) {
-
 
103
                    }
105
                    // ne rien faire
104
                    try {
-
 
105
                        ProcessStreams.this.err.get();
106
                    e.printStackTrace();
106
                    } catch (Exception e) {
-
 
107
                    }
107
                } finally {
108
                } finally {
108
                    ProcessStreams.this.exec.shutdown();
109
                    ProcessStreams.this.exec.shutdown();
109
                }
110
                }
110
            }
111
            }
111
        });
112
        });
-
 
113
        return this;
112
    }
114
    }
113
 
115
 
114
    protected final void stopOut() {
116
    protected final void stopOut() {
115
        this.stop(this.out);
117
        this.stop(this.out);
116
    }
118
    }
Line 119... Line 121...
119
        this.stop(this.err);
121
        this.stop(this.err);
120
    }
122
    }
121
 
123
 
122
    private final void stop(final Future<?> f) {
124
    private final void stop(final Future<?> f) {
123
        if (f == null)
125
        if (f == null)
-
 
126
            throw new IllegalStateException("Not started");
-
 
127
        // ATTN Process returns InputStream which are not interruptible.
-
 
128
        f.cancel(true);
-
 
129
    }
-
 
130
 
-
 
131
    // From AutoCloseable : close() shouldn't throw InterruptedException
-
 
132
    @Override
-
 
133
    public void close() {
-
 
134
        this.exec.shutdownNow();
-
 
135
    }
-
 
136
 
-
 
137
    public final void awaitTermination() throws InterruptedException, ExecutionException {
-
 
138
        this.out.get();
-
 
139
        this.err.get();
-
 
140
        if (!this.exec.awaitTermination(MAX_SHUTTING_DOWN_DELAY, TimeUnit.MILLISECONDS))
-
 
141
            throw new IllegalStateException("Executor still not terminated after " + MAX_SHUTTING_DOWN_DELAY);
-
 
142
    }
-
 
143
 
-
 
144
    public final void awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-
 
145
        try {
-
 
146
            this.out.get(timeout, unit);
-
 
147
        } catch (CancellationException e) {
124
            return;
148
            // OK
-
 
149
        }
125
        // TODO
150
        try {
126
        // ATTN don't interrupt, hangs in readLine()
151
            this.err.get(timeout, unit);
-
 
152
        } catch (CancellationException e) {
127
        f.cancel(false);
153
            // OK
-
 
154
        }
-
 
155
        if (!this.exec.awaitTermination(MAX_SHUTTING_DOWN_DELAY, TimeUnit.MILLISECONDS))
-
 
156
            throw new TimeoutException("Executor still not terminated after " + MAX_SHUTTING_DOWN_DELAY);
128
    }
157
    }
129
 
158
 
130
    private final Future<?> writeToAsync(final Supplier<InputStream> insSupplier, final Object outs) {
159
    private final Future<?> writeToAsync(final Supplier<InputStream> insSupplier, final Object outs) {
131
        if (outs == null) {
160
        if (outs == null) {
132
            this.latch.countDown();
161
            return CompletableFuture.completedFuture(null);
133
            return null;
-
 
134
        }
162
        }
135
        return this.exec.submit(new Callable<Object>() {
163
        return this.exec.submit(new Callable<Object>() {
136
            @Override
164
            @Override
137
            public Void call() throws InterruptedException, IOException {
165
            public Void call() throws InterruptedException, IOException {
138
                try (final InputStream ins = insSupplier.get()) {
166
                try (final InputStream ins = insSupplier.get()) {
Line 140... Line 168...
140
                    if (outs instanceof PrintStream)
168
                    if (outs instanceof PrintStream)
141
                        writeTo(ins, (PrintStream) outs);
169
                        writeTo(ins, (PrintStream) outs);
142
                    else
170
                    else
143
                        StreamUtils.copy(ins, (OutputStream) outs);
171
                        StreamUtils.copy(ins, (OutputStream) outs);
144
                    return null;
172
                    return null;
145
                } finally {
-
 
146
                    ProcessStreams.this.latch.countDown();
-
 
147
                }
173
                }
148
            }
174
            }
149
        });
175
        });
150
    }
176
    }
151
 
177