OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 180 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
17 ilm 1
/*
2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3
 *
185 ilm 4
 * Copyright 2011-2019 OpenConcerto, by ILM Informatique. All rights reserved.
17 ilm 5
 *
6
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
7
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
8
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
9
 * language governing permissions and limitations under the License.
10
 *
11
 * When distributing the software, include this License Header Notice in each file.
12
 */
13
 
14
 package org.openconcerto.utils;
15
 
16
import org.openconcerto.utils.cc.IClosure;
17
import org.openconcerto.utils.cc.IPredicate;
18
 
19
import java.beans.PropertyChangeListener;
20
import java.beans.PropertyChangeSupport;
156 ilm 21
import java.lang.Thread.UncaughtExceptionHandler;
17 ilm 22
import java.util.Collection;
61 ilm 23
import java.util.Deque;
83 ilm 24
import java.util.concurrent.Callable;
17 ilm 25
import java.util.concurrent.CancellationException;
26
import java.util.concurrent.ExecutionException;
177 ilm 27
import java.util.concurrent.Executor;
17 ilm 28
import java.util.concurrent.Future;
29
import java.util.concurrent.FutureTask;
177 ilm 30
import java.util.concurrent.RunnableFuture;
93 ilm 31
import java.util.concurrent.ScheduledFuture;
32
import java.util.concurrent.ScheduledThreadPoolExecutor;
33
import java.util.concurrent.TimeUnit;
83 ilm 34
import java.util.concurrent.atomic.AtomicBoolean;
93 ilm 35
import java.util.concurrent.atomic.AtomicReference;
180 ilm 36
import java.util.function.Function;
93 ilm 37
import java.util.logging.Level;
17 ilm 38
 
93 ilm 39
import net.jcip.annotations.GuardedBy;
40
 
17 ilm 41
/**
177 ilm 42
 * A queue that can be put to sleep. Submitted runnables are converted to RunnableFuture, that can
43
 * later be cancelled.
17 ilm 44
 *
45
 * @author Sylvain
46
 */
177 ilm 47
public class SleepingQueue implements Executor {
17 ilm 48
 
93 ilm 49
    public static enum RunningState {
50
        NEW, RUNNING, WILL_DIE, DYING, DEAD
51
    }
52
 
53
    /**
54
     * A task that can kill a queue.
55
     *
56
     * @author Sylvain
57
     *
58
     * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
59
     */
60
    public static final class LethalFutureTask<V> extends FutureTask<V> {
61
        private final SleepingQueue q;
62
 
63
        public LethalFutureTask(final SleepingQueue q, final Callable<V> c) {
64
            super(c);
65
            this.q = q;
66
        }
67
 
68
        public final SleepingQueue getQueue() {
69
            return this.q;
70
        }
71
 
72
        @Override
73
        public String toString() {
74
            // don't includeCurrentTask as it could be us
75
            return this.getClass().getSimpleName() + " for " + this.getQueue().toString(false);
76
        }
77
    }
78
 
79
    private static final ScheduledThreadPoolExecutor exec;
80
 
81
    static {
82
        // daemon thread to allow the VM to exit
83
        exec = new ScheduledThreadPoolExecutor(2, new ThreadFactory("DieMonitor", true).setPriority(Thread.MIN_PRIORITY));
84
        // allow threads to die
85
        exec.setKeepAliveTime(30, TimeUnit.SECONDS);
86
        exec.allowCoreThreadTimeOut(true);
87
        exec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
88
        exec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
89
 
90
        assert exec.getPoolSize() == 0 : "Wasting resources";
91
    }
92
 
93
    public static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture) {
94
        return watchDying(lethalFuture, 1, 1, TimeUnit.MINUTES);
95
    }
96
 
97
    /**
98
     * Watch the passed future until it's done. When
99
     * {@link SleepingQueue#die(boolean, Runnable, Callable) killing} a queue, the currently running
100
     * task must first complete then the actual killing (represented by a {@link LethalFutureTask})
101
     * begins. This involves running methods and passed runnables which can all hang or throw an
102
     * exception. Therefore this method will periodically report on the status of the killing, and
103
     * report any exception that was thrown.
104
     *
105
     * @param lethalFuture the killing to watch.
106
     * @param initialDelay the time to delay first execution.
107
     * @param delay the delay between the termination of one execution and the commencement of the
108
     *        next.
109
     * @param unit the time unit of the initialDelay and delay parameters.
110
     * @return a future representing the watching.
111
     */
112
    public static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture, final int initialDelay, final int delay, final TimeUnit unit) {
113
        return watchDying(lethalFuture, initialDelay, delay, unit, null);
114
    }
115
 
116
    static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture, final int initialDelay, final int delay, final TimeUnit unit,
117
            final IClosure<? super ExecutionException> exnHandler) {
118
        // don't use fixed rate as it might burden our threads and if we just checked the status
119
        // while being late, there's no need to check sooner the next time.
120
        final AtomicReference<Future<?>> f = new AtomicReference<Future<?>>();
121
        final ScheduledFuture<?> res = exec.scheduleWithFixedDelay(new Runnable() {
122
 
123
            // lethalFuture won't kill the queue, i.e. willDie threw an exception and forceDie was
124
            // false.
125
            private void wontKill(final RunningState runningState, final boolean isDone) {
126
                Log.get().fine("Our watched future won't kill the queue, current state : " + runningState + " " + lethalFuture);
127
                if (isDone)
128
                    cancel();
129
            }
130
 
131
            private void cancel() {
132
                assert lethalFuture.isDone();
133
                try {
134
                    lethalFuture.get();
135
                } catch (InterruptedException e) {
136
                    // either we were cancelled or the executor is shutting down (i.e. the VM is
137
                    // terminating)
138
                    Log.get().log(Level.FINER, "Interrupted while waiting on a finished future " + lethalFuture, e);
139
                } catch (ExecutionException e) {
140
                    if (exnHandler == null)
141
                        Log.get().log(Level.WARNING, "Threw an exception : " + lethalFuture, e);
142
                    else
143
                        exnHandler.executeChecked(e);
144
                }
145
                f.get().cancel(true);
146
            }
147
 
148
            @Override
149
            public void run() {
150
                final boolean isDone;
151
                final RunningState runningState;
177 ilm 152
                final RunnableFuture<?> beingRun;
93 ilm 153
                final SleepingQueue q = lethalFuture.getQueue();
154
                synchronized (q) {
155
                    runningState = q.getRunningState();
156
                    beingRun = q.getBeingRun();
157
                    isDone = lethalFuture.isDone();
158
                }
159
                final Level l = Level.INFO;
160
                if (runningState == RunningState.RUNNING) {
161
                    // willDie threw an exception but lethalFuture might not be completely done
162
                    // in that case, wait for the next execution
163
                    wontKill(runningState, isDone);
164
                } else if (runningState == RunningState.WILL_DIE) {
165
                    if (isDone) {
166
                        wontKill(runningState, isDone);
167
                    } else if (beingRun == lethalFuture) {
168
                        // in willDie() method or Runnable
169
                        Log.get().log(l, "Pre-death has not yet finished " + lethalFuture);
170
                    } else {
171
                        Log.get().log(l, "Death has not yet begun for " + lethalFuture + "\ncurrently running : " + beingRun);
172
                    }
173
                } else if (runningState == RunningState.DYING) {
174
                    assert beingRun == null || beingRun instanceof LethalFutureTask;
175
                    if (beingRun == null) {
176
                        // should be dead real soon
177
                        // just wait for the next execution
178
                        assert isDone;
179
                        Log.get().log(l, "Death was carried out but the thread is not yet terminated. Watching " + lethalFuture);
180
                    } else if (beingRun == lethalFuture) {
181
                        // in dying() method or Callable
182
                        Log.get().log(l, "Post-death has not yet finished " + lethalFuture);
183
                    } else {
184
                        assert isDone;
185
                        wontKill(runningState, isDone);
186
                    }
187
                } else if (runningState == RunningState.DEAD) {
188
                    // OK
189
                    Log.get().log(l, "Death was carried out and the thread is terminated but not necessarily by " + lethalFuture);
190
                    cancel();
191
                } else {
192
                    Log.get().warning("Illegal state " + runningState + " for " + lethalFuture);
193
                }
194
            }
195
        }, initialDelay, delay, unit);
196
        f.set(res);
197
        return res;
198
    }
199
 
17 ilm 200
    private final String name;
201
 
93 ilm 202
    @GuardedBy("this")
203
    private RunningState state;
204
 
17 ilm 205
    private final PropertyChangeSupport support;
93 ilm 206
    @GuardedBy("this")
177 ilm 207
    private RunnableFuture<?> beingRun;
17 ilm 208
 
209
    private final SingleThreadedExecutor tasksQueue;
93 ilm 210
    @GuardedBy("this")
17 ilm 211
    private boolean canceling;
93 ilm 212
    @GuardedBy("this")
177 ilm 213
    private IPredicate<? super RunnableFuture<?>> cancelPredicate;
17 ilm 214
 
215
    public SleepingQueue() {
216
        this(SleepingQueue.class.getName() + System.currentTimeMillis());
217
    }
218
 
219
    public SleepingQueue(String name) {
220
        super();
221
        this.name = name;
222
 
93 ilm 223
        this.state = RunningState.NEW;
224
 
17 ilm 225
        this.canceling = false;
226
        this.cancelPredicate = null;
227
        this.support = new PropertyChangeSupport(this);
228
        this.setBeingRun(null);
229
 
230
        this.tasksQueue = new SingleThreadedExecutor();
93 ilm 231
    }
17 ilm 232
 
93 ilm 233
    public final void start() {
156 ilm 234
        this.start(null);
235
    }
236
 
237
    public final void start(final IClosure<Thread> customizeThread) {
238
        customizeThread(this.tasksQueue);
239
        if (customizeThread != null)
240
            customizeThread.executeChecked(this.tasksQueue);
93 ilm 241
        synchronized (this) {
242
            this.tasksQueue.start();
243
            this.setState(RunningState.RUNNING);
244
            started();
245
        }
17 ilm 246
    }
247
 
248
    /**
93 ilm 249
     * Start this queue only if not already started.
250
     *
251
     * @return <code>true</code> if the queue was started.
252
     */
253
    public final boolean startIfNew() {
254
        // don't use getRunningState() which calls isAlive()
255
        synchronized (this) {
256
            final boolean starting = this.state == RunningState.NEW;
257
            if (starting)
258
                this.start();
259
            assert this.state.compareTo(RunningState.NEW) > 0;
260
            return starting;
261
        }
262
    }
263
 
264
    protected void started() {
265
    }
266
 
267
    protected synchronized final void setState(final RunningState s) {
268
        this.state = s;
269
    }
270
 
271
    public synchronized final RunningState getRunningState() {
272
        // an Error could have stopped our thread so can't rely on this.state
273
        if (this.state == RunningState.NEW || this.tasksQueue.isAlive())
274
            return this.state;
275
        else
276
            return RunningState.DEAD;
277
    }
278
 
279
    public final boolean currentlyInQueue() {
280
        return Thread.currentThread() == this.tasksQueue;
281
    }
282
 
283
    /**
17 ilm 284
     * Customize the thread used to execute the passed runnables. This implementation sets the
285
     * priority to the minimum.
286
     *
287
     * @param thr the thread used by this queue.
288
     */
289
    protected void customizeThread(Thread thr) {
290
        thr.setPriority(Thread.MIN_PRIORITY);
291
    }
292
 
177 ilm 293
    protected final <T> RunnableFuture<T> newTaskFor(final Runnable task) {
93 ilm 294
        return this.newTaskFor(task, null);
295
    }
17 ilm 296
 
177 ilm 297
    protected <T> RunnableFuture<T> newTaskFor(final Runnable task, T value) {
93 ilm 298
        return new IFutureTask<T>(task, value, " for {" + this.name + "}");
17 ilm 299
    }
300
 
177 ilm 301
    public final RunnableFuture<?> put(Runnable workRunnable) {
302
        /*
303
         * Otherwise if passing a RunnableFuture, it will itself be wrapped in another
304
         * RunnableFuture. The outer RunnableFuture will call the inner one's run(), which just
305
         * record any exception. So the outer one's get() won't throw it and the exception will
306
         * effectively be swallowed.
307
         */
308
        final RunnableFuture<?> t;
309
        if (workRunnable instanceof RunnableFuture) {
310
            t = ((RunnableFuture<?>) workRunnable);
93 ilm 311
        } else {
312
            t = this.newTaskFor(workRunnable);
313
        }
177 ilm 314
        return this.add(t);
93 ilm 315
    }
316
 
177 ilm 317
    @Override
318
    public final void execute(Runnable command) {
319
        this.put(command);
320
    }
321
 
322
    public final <F extends RunnableFuture<?>> F add(F t) {
17 ilm 323
        if (this.shallAdd(t)) {
177 ilm 324
            // no need to synchronize, if die() is called after our test, t won't be executed anyway
325
            if (this.dieCalled())
326
                throw new IllegalStateException("Already dead, cannot exec " + t);
327
 
328
            this.tasksQueue.put(t);
17 ilm 329
            return t;
177 ilm 330
        } else {
17 ilm 331
            return null;
177 ilm 332
        }
17 ilm 333
    }
334
 
177 ilm 335
    private final boolean shallAdd(RunnableFuture<?> runnable) {
17 ilm 336
        if (runnable == null)
337
            throw new NullPointerException("null runnable");
338
        try {
339
            this.willPut(runnable);
340
            return true;
341
        } catch (InterruptedException e) {
342
            // si on interrompt, ne pas ajouter
343
            return false;
344
        }
345
    }
346
 
347
    /**
348
     * Give subclass the ability to reject runnables.
349
     *
350
     * @param r the runnable that is being added.
351
     * @throws InterruptedException if r should not be added to this queue.
352
     */
177 ilm 353
    protected void willPut(RunnableFuture<?> r) throws InterruptedException {
17 ilm 354
    }
355
 
356
    /**
156 ilm 357
     * An exception was thrown by a task. This implementation uses
358
     * {@link Thread#getUncaughtExceptionHandler()} or
359
     * {@link Thread#getDefaultUncaughtExceptionHandler()} if available, otherwise falls back to
360
     * just {@link Exception#printStackTrace()}. To set the handler, {@link #start(IClosure)} can be
361
     * used.
83 ilm 362
     *
363
     * @param exn the exception thrown.
364
     */
365
    protected void exceptionThrown(final ExecutionException exn) {
156 ilm 366
        final Thread thr = this.tasksQueue;
367
        UncaughtExceptionHandler h = thr.getUncaughtExceptionHandler();
368
        if (h == null)
369
            h = Thread.getDefaultUncaughtExceptionHandler();
370
        if (h != null) {
371
            h.uncaughtException(thr, exn);
372
        } else {
373
            exn.printStackTrace();
374
        }
83 ilm 375
    }
376
 
377
    /**
17 ilm 378
     * Cancel all queued tasks and the current task.
379
     */
380
    protected final void cancel() {
381
        this.cancel(null);
382
    }
383
 
384
    /**
385
     * Cancel only tasks for which pred is <code>true</code>.
386
     *
387
     * @param pred a predicate to know which tasks to cancel.
388
     */
177 ilm 389
    protected final void cancel(final IPredicate<? super RunnableFuture<?>> pred) {
390
        this.tasksDo(new IClosure<Collection<RunnableFuture<?>>>() {
17 ilm 391
            @Override
177 ilm 392
            public void executeChecked(Collection<RunnableFuture<?>> tasks) {
17 ilm 393
                cancel(pred, tasks);
394
            }
395
        });
396
    }
397
 
177 ilm 398
    private final void cancel(IPredicate<? super RunnableFuture<?>> pred, Collection<RunnableFuture<?>> tasks) {
17 ilm 399
        try {
400
            synchronized (this) {
401
                this.canceling = true;
402
                this.cancelPredicate = pred;
403
                this.cancelCheck(this.getBeingRun());
404
            }
405
 
177 ilm 406
            for (final RunnableFuture<?> t : tasks) {
17 ilm 407
                this.cancelCheck(t);
408
            }
409
        } finally {
410
            synchronized (this) {
411
                this.canceling = false;
412
                // allow the predicate to be gc'd
413
                this.cancelPredicate = null;
414
            }
415
        }
416
    }
417
 
177 ilm 418
    public final void tasksDo(IClosure<? super Deque<RunnableFuture<?>>> c) {
17 ilm 419
        this.tasksQueue.itemsDo(c);
420
    }
421
 
180 ilm 422
    public final <R> R tasksDo(Function<? super Deque<RunnableFuture<?>>, R> c) {
423
        return this.tasksQueue.itemsDo(c);
424
    }
425
 
177 ilm 426
    private void cancelCheck(RunnableFuture<?> t) {
17 ilm 427
        if (t != null)
428
            synchronized (this) {
429
                if (this.canceling && (this.cancelPredicate == null || this.cancelPredicate.evaluateChecked(t)))
430
                    t.cancel(true);
431
            }
432
    }
433
 
177 ilm 434
    private void setBeingRun(final RunnableFuture<?> beingRun) {
83 ilm 435
        final Future<?> old;
17 ilm 436
        synchronized (this) {
437
            old = this.beingRun;
438
            this.beingRun = beingRun;
439
        }
440
        this.support.firePropertyChange("beingRun", old, beingRun);
441
    }
442
 
177 ilm 443
    public final synchronized RunnableFuture<?> getBeingRun() {
17 ilm 444
        return this.beingRun;
445
    }
446
 
447
    public boolean isSleeping() {
448
        return this.tasksQueue.isSleeping();
449
    }
450
 
83 ilm 451
    public boolean setSleeping(boolean sleeping) {
452
        final boolean res = this.tasksQueue.setSleeping(sleeping);
453
        if (res) {
93 ilm 454
            this.support.firePropertyChange("sleeping", null, sleeping);
17 ilm 455
        }
83 ilm 456
        return res;
17 ilm 457
    }
458
 
459
    /**
460
     * Stops this queue. Once this method returns, it is guaranteed that no other task will be taken
83 ilm 461
     * from the queue to be started, and that this queue will die. But the already executing task
462
     * will complete unless it checks for interrupt.
463
     *
464
     * @return the future killing.
17 ilm 465
     */
93 ilm 466
    public final LethalFutureTask<?> die() {
83 ilm 467
        return this.die(true, null, null);
17 ilm 468
    }
469
 
83 ilm 470
    /**
93 ilm 471
     * Stops this queue. All tasks in the queue, including the {@link #getBeingRun() currently
472
     * running}, will be {@link Future#cancel(boolean) cancelled}. The currently running task will
473
     * thus complete unless it checks for interrupt. Once the returned future completes successfully
474
     * then no task is executing ( {@link #isDead()} will happen sometimes later, the time for the
475
     * thread to terminate). If the returned future throws an exception because of the passed
476
     * runnables or of {@link #willDie()} or {@link #dying()}, one can check with
477
     * {@link #dieCalled()} to see if the queue is dying.
478
     * <p>
479
     * This method tries to limit the cases where the returned Future will not get executed : it
480
     * checks that this was {@link #start() started} and is not already {@link RunningState#DYING}
481
     * or {@link RunningState#DEAD}. It also doesn't allow {@link RunningState#WILL_DIE} as it could
482
     * cancel the previously passed runnables or never run the passed runnables. But even with these
483
     * restrictions a number of things can prevent the result from getting executed : the
484
     * {@link #getBeingRun() currently running} task hangs indefinitely, it throws an {@link Error}
485
     * ; the passed runnables hang indefinitely.
486
     * </p>
83 ilm 487
     *
488
     * @param force <code>true</code> if this is guaranteed to die (even if <code>willDie</code> or
489
     *        {@link #willDie()} throw an exception).
490
     * @param willDie the last actions to take before killing this queue.
491
     * @param dying the last actions to take before this queue is dead.
492
     * @return the future killing, which will return <code>dying</code> result.
93 ilm 493
     * @throws IllegalStateException if the state isn't {@link RunningState#RUNNING}.
83 ilm 494
     * @see #dieCalled()
495
     */
93 ilm 496
    public final <V> LethalFutureTask<V> die(final boolean force, final Runnable willDie, final Callable<V> dying) throws IllegalStateException {
497
        synchronized (this) {
498
            final RunningState state = this.getRunningState();
499
            if (state == RunningState.NEW)
500
                throw new IllegalStateException("Not started");
501
            if (state.compareTo(RunningState.RUNNING) > 0)
502
                throw new IllegalStateException("die() already called or thread was killed by an Error : " + state);
503
            assert state == RunningState.RUNNING;
504
            this.setState(RunningState.WILL_DIE);
505
        }
83 ilm 506
        // reset sleeping to original value if die not effective
507
        final AtomicBoolean resetSleeping = new AtomicBoolean(false);
93 ilm 508
        final LethalFutureTask<V> res = new LethalFutureTask<V>(this, new Callable<V>() {
83 ilm 509
            @Override
510
            public V call() throws Exception {
511
                Exception willDieExn = null;
512
                try {
513
                    willDie();
514
                    if (willDie != null) {
515
                        willDie.run();
516
                        // handle Future like runnable, i.e. check right away for exception
517
                        if (willDie instanceof Future) {
518
                            final Future<?> f = (Future<?>) willDie;
519
                            assert f.isDone() : "Ran but not done: " + f;
520
                            try {
521
                                f.get();
522
                            } catch (ExecutionException e) {
523
                                throw (Exception) e.getCause();
524
                            }
525
                        }
526
                    }
527
                } catch (Exception e) {
93 ilm 528
                    if (!force) {
529
                        setState(RunningState.RUNNING);
83 ilm 530
                        throw e;
93 ilm 531
                    } else {
83 ilm 532
                        willDieExn = e;
93 ilm 533
                    }
83 ilm 534
                }
535
                try {
536
                    // don't interrupt ourselves
537
                    SleepingQueue.this.tasksQueue.die(false);
538
                    assert SleepingQueue.this.tasksQueue.isDying();
93 ilm 539
                    setState(RunningState.DYING);
83 ilm 540
                    // since there's already been an exception, throw it as soon as possible
541
                    // also dying() might itself throw an exception for the same reason or we now
542
                    // have 2 exceptions to throw
543
                    if (willDieExn != null)
544
                        throw willDieExn;
545
                    dying();
546
                    final V res;
547
                    if (dying != null)
548
                        res = dying.call();
549
                    else
550
                        res = null;
551
 
552
                    return res;
553
                } finally {
554
                    // if die is effective, this won't have any consequences
555
                    if (resetSleeping.get())
556
                        SleepingQueue.this.tasksQueue.setSleeping(true);
557
                }
558
            }
559
        });
560
        // die as soon as possible not after all currently queued tasks
177 ilm 561
        this.tasksQueue.itemsDo(new IClosure<Deque<RunnableFuture<?>>>() {
83 ilm 562
            @Override
177 ilm 563
            public void executeChecked(Deque<RunnableFuture<?>> input) {
83 ilm 564
                // since we cancel the current task, we might as well remove all of them since they
565
                // might depend on the cancelled one
93 ilm 566
                // cancel removed tasks so that callers of get() don't wait forever
177 ilm 567
                for (final RunnableFuture<?> ft : input) {
93 ilm 568
                    // by definition tasks in the queue aren't executing, so interrupt parameter is
569
                    // useless. On the other hand cancel() might return false if already cancelled.
570
                    ft.cancel(false);
571
                }
83 ilm 572
                input.clear();
93 ilm 573
 
83 ilm 574
                input.addFirst(res);
575
                // die as soon as possible, even if there's a long task already running
177 ilm 576
                final RunnableFuture<?> beingRun = getBeingRun();
83 ilm 577
                // since we hold the lock on items
578
                assert beingRun != res : "beingRun: " + beingRun + " ; res: " + res;
579
                if (beingRun != null)
580
                    beingRun.cancel(true);
581
            }
582
        });
583
        // force execution of our task
584
        resetSleeping.set(this.setSleeping(false));
585
        return res;
586
    }
587
 
588
    protected void willDie() {
589
        // nothing by default
590
    }
591
 
93 ilm 592
    protected void dying() throws Exception {
17 ilm 593
        // nothing by default
594
    }
595
 
596
    /**
83 ilm 597
     * Whether this will die. If this method returns <code>true</code>, it is guaranteed that no
93 ilm 598
     * other task will be taken from the queue to be started. Note: this method doesn't return
599
     * <code>true</code> right after {@link #die()} as the method is asynchronous and if
600
     * {@link #willDie()} fails it may not die at all ; as explained in its comment you may use its
601
     * returned future to wait for the killing.
17 ilm 602
     *
93 ilm 603
     * @return <code>true</code> if this queue will not execute any more tasks (but it may hang
604
     *         indefinitely if the dying runnable blocks).
83 ilm 605
     * @see #isDead()
606
     */
607
    public final boolean dieCalled() {
608
        return this.tasksQueue.dieCalled();
609
    }
610
 
611
    /**
612
     * Whether this queue is dead, i.e. if die() has been called and all tasks have completed.
613
     *
614
     * @return <code>true</code> if this queue will not execute any more tasks and isn't executing
615
     *         any.
17 ilm 616
     * @see #die()
617
     */
618
    public final boolean isDead() {
619
        return this.tasksQueue.isDead();
620
    }
621
 
93 ilm 622
    /**
623
     * Allow to wait for the thread to end. Once this method returns {@link #getRunningState()} will
624
     * always return {@link RunningState#DEAD}. Useful since the future from
625
     * {@link #die(boolean, Runnable, Callable)} returns when all tasks are finished but the
626
     * {@link #getRunningState()} is still {@link RunningState#DYING} since the Thread takes a
627
     * little time to die.
628
     *
629
     * @throws InterruptedException if interrupted while waiting.
630
     * @see Thread#join()
631
     */
632
    public final void join() throws InterruptedException {
633
        this.tasksQueue.join();
634
    }
635
 
636
    public final void join(long millis, int nanos) throws InterruptedException {
637
        this.tasksQueue.join(millis, nanos);
638
    }
639
 
17 ilm 640
    public void addPropertyChangeListener(PropertyChangeListener l) {
641
        this.support.addPropertyChangeListener(l);
642
    }
643
 
644
    public void rmPropertyChangeListener(PropertyChangeListener l) {
645
        this.support.removePropertyChangeListener(l);
646
    }
647
 
177 ilm 648
    private final class SingleThreadedExecutor extends DropperQueue<RunnableFuture<?>> {
17 ilm 649
        private SingleThreadedExecutor() {
650
            super(SleepingQueue.this.name + System.currentTimeMillis());
651
        }
652
 
653
        @Override
177 ilm 654
        protected void process(RunnableFuture<?> task) {
17 ilm 655
            if (!task.isDone()) {
656
                /*
657
                 * From ThreadPoolExecutor : Track execution state to ensure that afterExecute is
658
                 * called only if task completed or threw exception. Otherwise, the caught runtime
659
                 * exception will have been thrown by afterExecute itself, in which case we don't
660
                 * want to call it again.
661
                 */
662
                boolean ran = false;
663
                beforeExecute(task);
664
                try {
665
                    task.run();
666
                    ran = true;
667
                    afterExecute(task, null);
668
                } catch (RuntimeException ex) {
669
                    if (!ran)
670
                        afterExecute(task, ex);
671
                    // don't throw ex, afterExecute() can do whatever needs to be done (like killing
672
                    // this queue)
673
                }
674
            }
675
        }
676
 
177 ilm 677
        protected void beforeExecute(final RunnableFuture<?> f) {
17 ilm 678
            cancelCheck(f);
679
            setBeingRun(f);
680
        }
681
 
177 ilm 682
        protected void afterExecute(final RunnableFuture<?> f, final Throwable t) {
17 ilm 683
            setBeingRun(null);
684
 
685
            try {
686
                f.get();
687
            } catch (CancellationException e) {
688
                // don't care
689
            } catch (InterruptedException e) {
83 ilm 690
                // f was interrupted : e.g. we're dying or f was cancelled
17 ilm 691
            } catch (ExecutionException e) {
83 ilm 692
                // f.run() raised an exception
693
                exceptionThrown(e);
17 ilm 694
            }
695
        }
696
    }
697
 
93 ilm 698
    @Override
17 ilm 699
    public String toString() {
93 ilm 700
        return this.toString(true);
17 ilm 701
    }
702
 
93 ilm 703
    public String toString(final boolean includeCurrentTask) {
704
        return super.toString() + " Queue: " + this.tasksQueue + (includeCurrentTask ? " run:" + this.getBeingRun() : "");
705
    }
706
 
17 ilm 707
}