OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 83 | Rev 93 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
17 ilm 1
/*
2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3
 *
4
 * Copyright 2011 OpenConcerto, by ILM Informatique. All rights reserved.
5
 *
6
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
7
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
8
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
9
 * language governing permissions and limitations under the License.
10
 *
11
 * When distributing the software, include this License Header Notice in each file.
12
 */
13
 
14
 package org.openconcerto.utils;
15
 
16
import org.openconcerto.utils.cc.IClosure;
17
import org.openconcerto.utils.cc.IPredicate;
18
 
19
import java.beans.PropertyChangeListener;
20
import java.beans.PropertyChangeSupport;
21
import java.util.Collection;
61 ilm 22
import java.util.Deque;
83 ilm 23
import java.util.concurrent.Callable;
17 ilm 24
import java.util.concurrent.CancellationException;
25
import java.util.concurrent.ExecutionException;
26
import java.util.concurrent.Future;
27
import java.util.concurrent.FutureTask;
83 ilm 28
import java.util.concurrent.atomic.AtomicBoolean;
17 ilm 29
 
30
/**
31
 * A queue that can be put to sleep. Submitted runnables are converted to FutureTask, that can later
32
 * be cancelled.
33
 *
34
 * @author Sylvain
35
 */
36
public class SleepingQueue {
37
 
38
    private final String name;
39
 
40
    private final PropertyChangeSupport support;
41
    private FutureTask<?> beingRun;
42
 
43
    private final SingleThreadedExecutor tasksQueue;
44
    private boolean canceling;
45
    private IPredicate<FutureTask<?>> cancelPredicate;
46
 
47
    public SleepingQueue() {
48
        this(SleepingQueue.class.getName() + System.currentTimeMillis());
49
    }
50
 
51
    public SleepingQueue(String name) {
52
        super();
53
        this.name = name;
54
 
55
        this.canceling = false;
56
        this.cancelPredicate = null;
57
        this.support = new PropertyChangeSupport(this);
58
        this.setBeingRun(null);
59
 
60
        this.tasksQueue = new SingleThreadedExecutor();
61
 
62
        this.tasksQueue.start();
63
    }
64
 
65
    /**
66
     * Customize the thread used to execute the passed runnables. This implementation sets the
67
     * priority to the minimum.
68
     *
69
     * @param thr the thread used by this queue.
70
     */
71
    protected void customizeThread(Thread thr) {
72
        thr.setPriority(Thread.MIN_PRIORITY);
73
    }
74
 
75
    public final FutureTask<?> put(Runnable workRunnable) {
76
        if (this.shallAdd(workRunnable)) {
77
            final IFutureTask<Object> t = this.tasksQueue.newTaskFor(workRunnable);
78
            this.add(t);
79
            return t;
80
        } else
81
            return null;
82
 
83
    }
84
 
85
    public final <F extends FutureTask<?>> F execute(F t) {
86
        if (this.shallAdd(t)) {
87
            this.add(t);
88
            return t;
89
        } else
90
            return null;
91
    }
92
 
93
    private void add(FutureTask<?> t) {
94
        // no need to synchronize, if die() is called after our test, t won't be executed anyway
83 ilm 95
        if (this.dieCalled())
41 ilm 96
            throw new IllegalStateException("Already dead, cannot exec " + t);
17 ilm 97
 
98
        this.tasksQueue.put(t);
99
    }
100
 
101
    private final boolean shallAdd(Runnable runnable) {
102
        if (runnable == null)
103
            throw new NullPointerException("null runnable");
104
        try {
105
            this.willPut(runnable);
106
            return true;
107
        } catch (InterruptedException e) {
108
            // si on interrompt, ne pas ajouter
109
            return false;
110
        }
111
    }
112
 
113
    /**
114
     * Give subclass the ability to reject runnables.
115
     *
116
     * @param r the runnable that is being added.
117
     * @throws InterruptedException if r should not be added to this queue.
118
     */
119
    protected void willPut(Runnable r) throws InterruptedException {
120
    }
121
 
122
    /**
83 ilm 123
     * An exception was thrown by a task. This implementation merely
124
     * {@link Exception#printStackTrace()}.
125
     *
126
     * @param exn the exception thrown.
127
     */
128
    protected void exceptionThrown(final ExecutionException exn) {
129
        exn.printStackTrace();
130
    }
131
 
132
    /**
17 ilm 133
     * Cancel all queued tasks and the current task.
134
     */
135
    protected final void cancel() {
136
        this.cancel(null);
137
    }
138
 
139
    /**
140
     * Cancel only tasks for which pred is <code>true</code>.
141
     *
142
     * @param pred a predicate to know which tasks to cancel.
143
     */
144
    protected final void cancel(final IPredicate<FutureTask<?>> pred) {
145
        this.tasksDo(new IClosure<Collection<FutureTask<?>>>() {
146
            @Override
147
            public void executeChecked(Collection<FutureTask<?>> tasks) {
148
                cancel(pred, tasks);
149
            }
150
        });
151
    }
152
 
153
    private final void cancel(IPredicate<FutureTask<?>> pred, Collection<FutureTask<?>> tasks) {
154
        try {
155
            synchronized (this) {
156
                this.canceling = true;
157
                this.cancelPredicate = pred;
158
                this.cancelCheck(this.getBeingRun());
159
            }
160
 
161
            for (final FutureTask<?> t : tasks) {
162
                this.cancelCheck(t);
163
            }
164
        } finally {
165
            synchronized (this) {
166
                this.canceling = false;
167
                // allow the predicate to be gc'd
168
                this.cancelPredicate = null;
169
            }
170
        }
171
    }
172
 
61 ilm 173
    public final void tasksDo(IClosure<? super Deque<FutureTask<?>>> c) {
17 ilm 174
        this.tasksQueue.itemsDo(c);
175
    }
176
 
177
    private void cancelCheck(FutureTask<?> t) {
178
        if (t != null)
179
            synchronized (this) {
180
                if (this.canceling && (this.cancelPredicate == null || this.cancelPredicate.evaluateChecked(t)))
181
                    t.cancel(true);
182
            }
183
    }
184
 
185
    private void setBeingRun(final FutureTask<?> beingRun) {
83 ilm 186
        final Future<?> old;
17 ilm 187
        synchronized (this) {
188
            old = this.beingRun;
189
            this.beingRun = beingRun;
190
        }
191
        this.support.firePropertyChange("beingRun", old, beingRun);
192
    }
193
 
91 ilm 194
    public final synchronized FutureTask<?> getBeingRun() {
17 ilm 195
        return this.beingRun;
196
    }
197
 
198
    public boolean isSleeping() {
199
        return this.tasksQueue.isSleeping();
200
    }
201
 
83 ilm 202
    public boolean setSleeping(boolean sleeping) {
203
        final boolean res = this.tasksQueue.setSleeping(sleeping);
204
        if (res) {
17 ilm 205
            this.support.firePropertyChange("sleeping", null, this.isSleeping());
206
        }
83 ilm 207
        return res;
17 ilm 208
    }
209
 
210
    /**
211
     * Stops this queue. Once this method returns, it is guaranteed that no other task will be taken
83 ilm 212
     * from the queue to be started, and that this queue will die. But the already executing task
213
     * will complete unless it checks for interrupt.
214
     *
215
     * @return the future killing.
17 ilm 216
     */
83 ilm 217
    public final Future<?> die() {
218
        return this.die(true, null, null);
17 ilm 219
    }
220
 
83 ilm 221
    /**
222
     * Stops this queue. Once the returned future completes successfully then no task is executing (
223
     * {@link #isDead()} will happen sometimes later, the time for the thread to terminate). If the
224
     * returned future throws an exception because of the passed runnables or of {@link #willDie()}
225
     * or {@link #dying()}, one can check with {@link #dieCalled()} to see if the queue is dying.
226
     *
227
     * @param force <code>true</code> if this is guaranteed to die (even if <code>willDie</code> or
228
     *        {@link #willDie()} throw an exception).
229
     * @param willDie the last actions to take before killing this queue.
230
     * @param dying the last actions to take before this queue is dead.
231
     * @return the future killing, which will return <code>dying</code> result.
232
     * @see #dieCalled()
233
     */
234
    public final <V> Future<V> die(final boolean force, final Runnable willDie, final Callable<V> dying) {
235
        // reset sleeping to original value if die not effective
236
        final AtomicBoolean resetSleeping = new AtomicBoolean(false);
237
        final FutureTask<V> res = new FutureTask<V>(new Callable<V>() {
238
            @Override
239
            public V call() throws Exception {
240
                Exception willDieExn = null;
241
                try {
242
                    willDie();
243
                    if (willDie != null) {
244
                        willDie.run();
245
                        // handle Future like runnable, i.e. check right away for exception
246
                        if (willDie instanceof Future) {
247
                            final Future<?> f = (Future<?>) willDie;
248
                            assert f.isDone() : "Ran but not done: " + f;
249
                            try {
250
                                f.get();
251
                            } catch (ExecutionException e) {
252
                                throw (Exception) e.getCause();
253
                            }
254
                        }
255
                    }
256
                } catch (Exception e) {
257
                    if (!force)
258
                        throw e;
259
                    else
260
                        willDieExn = e;
261
                }
262
                try {
263
                    // don't interrupt ourselves
264
                    SleepingQueue.this.tasksQueue.die(false);
265
                    assert SleepingQueue.this.tasksQueue.isDying();
266
                    // since there's already been an exception, throw it as soon as possible
267
                    // also dying() might itself throw an exception for the same reason or we now
268
                    // have 2 exceptions to throw
269
                    if (willDieExn != null)
270
                        throw willDieExn;
271
                    dying();
272
                    final V res;
273
                    if (dying != null)
274
                        res = dying.call();
275
                    else
276
                        res = null;
277
 
278
                    return res;
279
                } finally {
280
                    // if die is effective, this won't have any consequences
281
                    if (resetSleeping.get())
282
                        SleepingQueue.this.tasksQueue.setSleeping(true);
283
                }
284
            }
285
        });
286
        // die as soon as possible not after all currently queued tasks
287
        this.tasksQueue.itemsDo(new IClosure<Deque<FutureTask<?>>>() {
288
            @Override
289
            public void executeChecked(Deque<FutureTask<?>> input) {
290
                // since we cancel the current task, we might as well remove all of them since they
291
                // might depend on the cancelled one
292
                input.clear();
293
                input.addFirst(res);
294
                // die as soon as possible, even if there's a long task already running
295
                final FutureTask<?> beingRun = getBeingRun();
296
                // since we hold the lock on items
297
                assert beingRun != res : "beingRun: " + beingRun + " ; res: " + res;
298
                if (beingRun != null)
299
                    beingRun.cancel(true);
300
            }
301
        });
302
        // force execution of our task
303
        resetSleeping.set(this.setSleeping(false));
304
        return res;
305
    }
306
 
307
    protected void willDie() {
308
        // nothing by default
309
    }
310
 
17 ilm 311
    protected void dying() {
312
        // nothing by default
313
    }
314
 
315
    /**
83 ilm 316
     * Whether this will die. If this method returns <code>true</code>, it is guaranteed that no
317
     * other task will be taken from the queue to be started, and that this queue will die. But the
318
     * already executing task will complete unless it checks for interrupt. Note: this method
319
     * doesn't return <code>true</code> right after {@link #die()} as the method is asynchronous and
320
     * if {@link #willDie()} fails it may not die at all ; as explained in its comment you may use
321
     * its returned future to wait for the killing.
17 ilm 322
     *
83 ilm 323
     * @return <code>true</code> if this queue will not execute any more tasks (but it may finish
324
     *         one last task).
325
     * @see #isDead()
326
     */
327
    public final boolean dieCalled() {
328
        return this.tasksQueue.dieCalled();
329
    }
330
 
331
    /**
332
     * Whether this queue is dead, i.e. if die() has been called and all tasks have completed.
333
     *
334
     * @return <code>true</code> if this queue will not execute any more tasks and isn't executing
335
     *         any.
17 ilm 336
     * @see #die()
337
     */
338
    public final boolean isDead() {
339
        return this.tasksQueue.isDead();
340
    }
341
 
342
    public void addPropertyChangeListener(PropertyChangeListener l) {
343
        this.support.addPropertyChangeListener(l);
344
    }
345
 
346
    public void rmPropertyChangeListener(PropertyChangeListener l) {
347
        this.support.removePropertyChangeListener(l);
348
    }
349
 
350
    private final class SingleThreadedExecutor extends DropperQueue<FutureTask<?>> {
351
        private SingleThreadedExecutor() {
352
            super(SleepingQueue.this.name + System.currentTimeMillis());
353
            customizeThread(this);
354
        }
355
 
356
        protected <T> IFutureTask<T> newTaskFor(final Runnable task) {
357
            return this.newTaskFor(task, null);
358
        }
359
 
360
        protected <T> IFutureTask<T> newTaskFor(final Runnable task, T value) {
361
            return new IFutureTask<T>(task, value, " for {" + SleepingQueue.this.name + "}");
362
        }
363
 
364
        @Override
365
        protected void process(FutureTask<?> task) {
366
            if (!task.isDone()) {
367
                /*
368
                 * From ThreadPoolExecutor : Track execution state to ensure that afterExecute is
369
                 * called only if task completed or threw exception. Otherwise, the caught runtime
370
                 * exception will have been thrown by afterExecute itself, in which case we don't
371
                 * want to call it again.
372
                 */
373
                boolean ran = false;
374
                beforeExecute(task);
375
                try {
376
                    task.run();
377
                    ran = true;
378
                    afterExecute(task, null);
379
                } catch (RuntimeException ex) {
380
                    if (!ran)
381
                        afterExecute(task, ex);
382
                    // don't throw ex, afterExecute() can do whatever needs to be done (like killing
383
                    // this queue)
384
                }
385
            }
386
        }
387
 
388
        protected void beforeExecute(final FutureTask<?> f) {
389
            cancelCheck(f);
390
            setBeingRun(f);
391
        }
392
 
393
        protected void afterExecute(final FutureTask<?> f, final Throwable t) {
394
            setBeingRun(null);
395
 
396
            try {
397
                f.get();
398
            } catch (CancellationException e) {
399
                // don't care
400
            } catch (InterruptedException e) {
83 ilm 401
                // f was interrupted : e.g. we're dying or f was cancelled
17 ilm 402
            } catch (ExecutionException e) {
83 ilm 403
                // f.run() raised an exception
404
                exceptionThrown(e);
17 ilm 405
            }
406
        }
407
    }
408
 
409
    public String toString() {
410
        return super.toString() + " Queue: " + this.tasksQueue + " run:" + this.getBeingRun();
411
    }
412
 
413
}