OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 93 | Rev 177 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
17 ilm 1
/*
2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3
 *
4
 * Copyright 2011 OpenConcerto, by ILM Informatique. All rights reserved.
5
 *
6
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
7
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
8
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
9
 * language governing permissions and limitations under the License.
10
 *
11
 * When distributing the software, include this License Header Notice in each file.
12
 */
13
 
14
 package org.openconcerto.utils;
15
 
16
import org.openconcerto.utils.cc.IClosure;
17
import org.openconcerto.utils.cc.IPredicate;
18
 
19
import java.beans.PropertyChangeListener;
20
import java.beans.PropertyChangeSupport;
156 ilm 21
import java.lang.Thread.UncaughtExceptionHandler;
17 ilm 22
import java.util.Collection;
61 ilm 23
import java.util.Deque;
83 ilm 24
import java.util.concurrent.Callable;
17 ilm 25
import java.util.concurrent.CancellationException;
26
import java.util.concurrent.ExecutionException;
27
import java.util.concurrent.Future;
28
import java.util.concurrent.FutureTask;
93 ilm 29
import java.util.concurrent.ScheduledFuture;
30
import java.util.concurrent.ScheduledThreadPoolExecutor;
31
import java.util.concurrent.TimeUnit;
83 ilm 32
import java.util.concurrent.atomic.AtomicBoolean;
93 ilm 33
import java.util.concurrent.atomic.AtomicReference;
34
import java.util.logging.Level;
17 ilm 35
 
93 ilm 36
import net.jcip.annotations.GuardedBy;
37
 
17 ilm 38
/**
39
 * A queue that can be put to sleep. Submitted runnables are converted to FutureTask, that can later
40
 * be cancelled.
41
 *
42
 * @author Sylvain
43
 */
44
public class SleepingQueue {
45
 
93 ilm 46
    public static enum RunningState {
47
        NEW, RUNNING, WILL_DIE, DYING, DEAD
48
    }
49
 
50
    /**
51
     * A task that can kill a queue.
52
     *
53
     * @author Sylvain
54
     *
55
     * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
56
     */
57
    public static final class LethalFutureTask<V> extends FutureTask<V> {
58
        private final SleepingQueue q;
59
 
60
        public LethalFutureTask(final SleepingQueue q, final Callable<V> c) {
61
            super(c);
62
            this.q = q;
63
        }
64
 
65
        public final SleepingQueue getQueue() {
66
            return this.q;
67
        }
68
 
69
        @Override
70
        public String toString() {
71
            // don't includeCurrentTask as it could be us
72
            return this.getClass().getSimpleName() + " for " + this.getQueue().toString(false);
73
        }
74
    }
75
 
76
    private static final ScheduledThreadPoolExecutor exec;
77
 
78
    static {
79
        // daemon thread to allow the VM to exit
80
        exec = new ScheduledThreadPoolExecutor(2, new ThreadFactory("DieMonitor", true).setPriority(Thread.MIN_PRIORITY));
81
        // allow threads to die
82
        exec.setKeepAliveTime(30, TimeUnit.SECONDS);
83
        exec.allowCoreThreadTimeOut(true);
84
        exec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
85
        exec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
86
 
87
        assert exec.getPoolSize() == 0 : "Wasting resources";
88
    }
89
 
90
    public static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture) {
91
        return watchDying(lethalFuture, 1, 1, TimeUnit.MINUTES);
92
    }
93
 
94
    /**
95
     * Watch the passed future until it's done. When
96
     * {@link SleepingQueue#die(boolean, Runnable, Callable) killing} a queue, the currently running
97
     * task must first complete then the actual killing (represented by a {@link LethalFutureTask})
98
     * begins. This involves running methods and passed runnables which can all hang or throw an
99
     * exception. Therefore this method will periodically report on the status of the killing, and
100
     * report any exception that was thrown.
101
     *
102
     * @param lethalFuture the killing to watch.
103
     * @param initialDelay the time to delay first execution.
104
     * @param delay the delay between the termination of one execution and the commencement of the
105
     *        next.
106
     * @param unit the time unit of the initialDelay and delay parameters.
107
     * @return a future representing the watching.
108
     */
109
    public static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture, final int initialDelay, final int delay, final TimeUnit unit) {
110
        return watchDying(lethalFuture, initialDelay, delay, unit, null);
111
    }
112
 
113
    static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture, final int initialDelay, final int delay, final TimeUnit unit,
114
            final IClosure<? super ExecutionException> exnHandler) {
115
        // don't use fixed rate as it might burden our threads and if we just checked the status
116
        // while being late, there's no need to check sooner the next time.
117
        final AtomicReference<Future<?>> f = new AtomicReference<Future<?>>();
118
        final ScheduledFuture<?> res = exec.scheduleWithFixedDelay(new Runnable() {
119
 
120
            // lethalFuture won't kill the queue, i.e. willDie threw an exception and forceDie was
121
            // false.
122
            private void wontKill(final RunningState runningState, final boolean isDone) {
123
                Log.get().fine("Our watched future won't kill the queue, current state : " + runningState + " " + lethalFuture);
124
                if (isDone)
125
                    cancel();
126
            }
127
 
128
            private void cancel() {
129
                assert lethalFuture.isDone();
130
                try {
131
                    lethalFuture.get();
132
                } catch (InterruptedException e) {
133
                    // either we were cancelled or the executor is shutting down (i.e. the VM is
134
                    // terminating)
135
                    Log.get().log(Level.FINER, "Interrupted while waiting on a finished future " + lethalFuture, e);
136
                } catch (ExecutionException e) {
137
                    if (exnHandler == null)
138
                        Log.get().log(Level.WARNING, "Threw an exception : " + lethalFuture, e);
139
                    else
140
                        exnHandler.executeChecked(e);
141
                }
142
                f.get().cancel(true);
143
            }
144
 
145
            @Override
146
            public void run() {
147
                final boolean isDone;
148
                final RunningState runningState;
149
                final FutureTask<?> beingRun;
150
                final SleepingQueue q = lethalFuture.getQueue();
151
                synchronized (q) {
152
                    runningState = q.getRunningState();
153
                    beingRun = q.getBeingRun();
154
                    isDone = lethalFuture.isDone();
155
                }
156
                final Level l = Level.INFO;
157
                if (runningState == RunningState.RUNNING) {
158
                    // willDie threw an exception but lethalFuture might not be completely done
159
                    // in that case, wait for the next execution
160
                    wontKill(runningState, isDone);
161
                } else if (runningState == RunningState.WILL_DIE) {
162
                    if (isDone) {
163
                        wontKill(runningState, isDone);
164
                    } else if (beingRun == lethalFuture) {
165
                        // in willDie() method or Runnable
166
                        Log.get().log(l, "Pre-death has not yet finished " + lethalFuture);
167
                    } else {
168
                        Log.get().log(l, "Death has not yet begun for " + lethalFuture + "\ncurrently running : " + beingRun);
169
                    }
170
                } else if (runningState == RunningState.DYING) {
171
                    assert beingRun == null || beingRun instanceof LethalFutureTask;
172
                    if (beingRun == null) {
173
                        // should be dead real soon
174
                        // just wait for the next execution
175
                        assert isDone;
176
                        Log.get().log(l, "Death was carried out but the thread is not yet terminated. Watching " + lethalFuture);
177
                    } else if (beingRun == lethalFuture) {
178
                        // in dying() method or Callable
179
                        Log.get().log(l, "Post-death has not yet finished " + lethalFuture);
180
                    } else {
181
                        assert isDone;
182
                        wontKill(runningState, isDone);
183
                    }
184
                } else if (runningState == RunningState.DEAD) {
185
                    // OK
186
                    Log.get().log(l, "Death was carried out and the thread is terminated but not necessarily by " + lethalFuture);
187
                    cancel();
188
                } else {
189
                    Log.get().warning("Illegal state " + runningState + " for " + lethalFuture);
190
                }
191
            }
192
        }, initialDelay, delay, unit);
193
        f.set(res);
194
        return res;
195
    }
196
 
17 ilm 197
    private final String name;
198
 
93 ilm 199
    @GuardedBy("this")
200
    private RunningState state;
201
 
17 ilm 202
    private final PropertyChangeSupport support;
93 ilm 203
    @GuardedBy("this")
17 ilm 204
    private FutureTask<?> beingRun;
205
 
206
    private final SingleThreadedExecutor tasksQueue;
93 ilm 207
    @GuardedBy("this")
17 ilm 208
    private boolean canceling;
93 ilm 209
    @GuardedBy("this")
17 ilm 210
    private IPredicate<FutureTask<?>> cancelPredicate;
211
 
212
    public SleepingQueue() {
213
        this(SleepingQueue.class.getName() + System.currentTimeMillis());
214
    }
215
 
216
    public SleepingQueue(String name) {
217
        super();
218
        this.name = name;
219
 
93 ilm 220
        this.state = RunningState.NEW;
221
 
17 ilm 222
        this.canceling = false;
223
        this.cancelPredicate = null;
224
        this.support = new PropertyChangeSupport(this);
225
        this.setBeingRun(null);
226
 
227
        this.tasksQueue = new SingleThreadedExecutor();
93 ilm 228
    }
17 ilm 229
 
93 ilm 230
    public final void start() {
156 ilm 231
        this.start(null);
232
    }
233
 
234
    public final void start(final IClosure<Thread> customizeThread) {
235
        customizeThread(this.tasksQueue);
236
        if (customizeThread != null)
237
            customizeThread.executeChecked(this.tasksQueue);
93 ilm 238
        synchronized (this) {
239
            this.tasksQueue.start();
240
            this.setState(RunningState.RUNNING);
241
            started();
242
        }
17 ilm 243
    }
244
 
245
    /**
93 ilm 246
     * Start this queue only if not already started.
247
     *
248
     * @return <code>true</code> if the queue was started.
249
     */
250
    public final boolean startIfNew() {
251
        // don't use getRunningState() which calls isAlive()
252
        synchronized (this) {
253
            final boolean starting = this.state == RunningState.NEW;
254
            if (starting)
255
                this.start();
256
            assert this.state.compareTo(RunningState.NEW) > 0;
257
            return starting;
258
        }
259
    }
260
 
261
    protected void started() {
262
    }
263
 
264
    protected synchronized final void setState(final RunningState s) {
265
        this.state = s;
266
    }
267
 
268
    public synchronized final RunningState getRunningState() {
269
        // an Error could have stopped our thread so can't rely on this.state
270
        if (this.state == RunningState.NEW || this.tasksQueue.isAlive())
271
            return this.state;
272
        else
273
            return RunningState.DEAD;
274
    }
275
 
276
    public final boolean currentlyInQueue() {
277
        return Thread.currentThread() == this.tasksQueue;
278
    }
279
 
280
    /**
17 ilm 281
     * Customize the thread used to execute the passed runnables. This implementation sets the
282
     * priority to the minimum.
283
     *
284
     * @param thr the thread used by this queue.
285
     */
286
    protected void customizeThread(Thread thr) {
287
        thr.setPriority(Thread.MIN_PRIORITY);
288
    }
289
 
93 ilm 290
    protected final <T> FutureTask<T> newTaskFor(final Runnable task) {
291
        return this.newTaskFor(task, null);
292
    }
17 ilm 293
 
93 ilm 294
    protected <T> FutureTask<T> newTaskFor(final Runnable task, T value) {
295
        return new IFutureTask<T>(task, value, " for {" + this.name + "}");
17 ilm 296
    }
297
 
93 ilm 298
    public final FutureTask<?> put(Runnable workRunnable) {
299
        // otherwise if passing a FutureTask, it will itself be wrapped in another FutureTask. The
300
        // outer FutureTask will call the inner one's run(), which just record any exception. So the
301
        // outer one's get() won't throw it and the exception will effectively be swallowed.
302
        final FutureTask<?> t;
303
        if (workRunnable instanceof FutureTask) {
304
            t = ((FutureTask<?>) workRunnable);
305
        } else {
306
            t = this.newTaskFor(workRunnable);
307
        }
308
        return this.execute(t);
309
    }
310
 
17 ilm 311
    public final <F extends FutureTask<?>> F execute(F t) {
312
        if (this.shallAdd(t)) {
313
            this.add(t);
314
            return t;
315
        } else
316
            return null;
317
    }
318
 
319
    private void add(FutureTask<?> t) {
320
        // no need to synchronize, if die() is called after our test, t won't be executed anyway
83 ilm 321
        if (this.dieCalled())
41 ilm 322
            throw new IllegalStateException("Already dead, cannot exec " + t);
17 ilm 323
 
324
        this.tasksQueue.put(t);
325
    }
326
 
93 ilm 327
    private final boolean shallAdd(FutureTask<?> runnable) {
17 ilm 328
        if (runnable == null)
329
            throw new NullPointerException("null runnable");
330
        try {
331
            this.willPut(runnable);
332
            return true;
333
        } catch (InterruptedException e) {
334
            // si on interrompt, ne pas ajouter
335
            return false;
336
        }
337
    }
338
 
339
    /**
340
     * Give subclass the ability to reject runnables.
341
     *
342
     * @param r the runnable that is being added.
343
     * @throws InterruptedException if r should not be added to this queue.
344
     */
93 ilm 345
    protected void willPut(FutureTask<?> r) throws InterruptedException {
17 ilm 346
    }
347
 
348
    /**
156 ilm 349
     * An exception was thrown by a task. This implementation uses
350
     * {@link Thread#getUncaughtExceptionHandler()} or
351
     * {@link Thread#getDefaultUncaughtExceptionHandler()} if available, otherwise falls back to
352
     * just {@link Exception#printStackTrace()}. To set the handler, {@link #start(IClosure)} can be
353
     * used.
83 ilm 354
     *
355
     * @param exn the exception thrown.
356
     */
357
    protected void exceptionThrown(final ExecutionException exn) {
156 ilm 358
        final Thread thr = this.tasksQueue;
359
        UncaughtExceptionHandler h = thr.getUncaughtExceptionHandler();
360
        if (h == null)
361
            h = Thread.getDefaultUncaughtExceptionHandler();
362
        if (h != null) {
363
            h.uncaughtException(thr, exn);
364
        } else {
365
            exn.printStackTrace();
366
        }
83 ilm 367
    }
368
 
369
    /**
17 ilm 370
     * Cancel all queued tasks and the current task.
371
     */
372
    protected final void cancel() {
373
        this.cancel(null);
374
    }
375
 
376
    /**
377
     * Cancel only tasks for which pred is <code>true</code>.
378
     *
379
     * @param pred a predicate to know which tasks to cancel.
380
     */
381
    protected final void cancel(final IPredicate<FutureTask<?>> pred) {
382
        this.tasksDo(new IClosure<Collection<FutureTask<?>>>() {
383
            @Override
384
            public void executeChecked(Collection<FutureTask<?>> tasks) {
385
                cancel(pred, tasks);
386
            }
387
        });
388
    }
389
 
390
    private final void cancel(IPredicate<FutureTask<?>> pred, Collection<FutureTask<?>> tasks) {
391
        try {
392
            synchronized (this) {
393
                this.canceling = true;
394
                this.cancelPredicate = pred;
395
                this.cancelCheck(this.getBeingRun());
396
            }
397
 
398
            for (final FutureTask<?> t : tasks) {
399
                this.cancelCheck(t);
400
            }
401
        } finally {
402
            synchronized (this) {
403
                this.canceling = false;
404
                // allow the predicate to be gc'd
405
                this.cancelPredicate = null;
406
            }
407
        }
408
    }
409
 
61 ilm 410
    public final void tasksDo(IClosure<? super Deque<FutureTask<?>>> c) {
17 ilm 411
        this.tasksQueue.itemsDo(c);
412
    }
413
 
414
    private void cancelCheck(FutureTask<?> t) {
415
        if (t != null)
416
            synchronized (this) {
417
                if (this.canceling && (this.cancelPredicate == null || this.cancelPredicate.evaluateChecked(t)))
418
                    t.cancel(true);
419
            }
420
    }
421
 
422
    private void setBeingRun(final FutureTask<?> beingRun) {
83 ilm 423
        final Future<?> old;
17 ilm 424
        synchronized (this) {
425
            old = this.beingRun;
426
            this.beingRun = beingRun;
427
        }
428
        this.support.firePropertyChange("beingRun", old, beingRun);
429
    }
430
 
91 ilm 431
    public final synchronized FutureTask<?> getBeingRun() {
17 ilm 432
        return this.beingRun;
433
    }
434
 
435
    public boolean isSleeping() {
436
        return this.tasksQueue.isSleeping();
437
    }
438
 
83 ilm 439
    public boolean setSleeping(boolean sleeping) {
440
        final boolean res = this.tasksQueue.setSleeping(sleeping);
441
        if (res) {
93 ilm 442
            this.support.firePropertyChange("sleeping", null, sleeping);
17 ilm 443
        }
83 ilm 444
        return res;
17 ilm 445
    }
446
 
447
    /**
448
     * Stops this queue. Once this method returns, it is guaranteed that no other task will be taken
83 ilm 449
     * from the queue to be started, and that this queue will die. But the already executing task
450
     * will complete unless it checks for interrupt.
451
     *
452
     * @return the future killing.
17 ilm 453
     */
93 ilm 454
    public final LethalFutureTask<?> die() {
83 ilm 455
        return this.die(true, null, null);
17 ilm 456
    }
457
 
83 ilm 458
    /**
93 ilm 459
     * Stops this queue. All tasks in the queue, including the {@link #getBeingRun() currently
460
     * running}, will be {@link Future#cancel(boolean) cancelled}. The currently running task will
461
     * thus complete unless it checks for interrupt. Once the returned future completes successfully
462
     * then no task is executing ( {@link #isDead()} will happen sometimes later, the time for the
463
     * thread to terminate). If the returned future throws an exception because of the passed
464
     * runnables or of {@link #willDie()} or {@link #dying()}, one can check with
465
     * {@link #dieCalled()} to see if the queue is dying.
466
     * <p>
467
     * This method tries to limit the cases where the returned Future will not get executed : it
468
     * checks that this was {@link #start() started} and is not already {@link RunningState#DYING}
469
     * or {@link RunningState#DEAD}. It also doesn't allow {@link RunningState#WILL_DIE} as it could
470
     * cancel the previously passed runnables or never run the passed runnables. But even with these
471
     * restrictions a number of things can prevent the result from getting executed : the
472
     * {@link #getBeingRun() currently running} task hangs indefinitely, it throws an {@link Error}
473
     * ; the passed runnables hang indefinitely.
474
     * </p>
83 ilm 475
     *
476
     * @param force <code>true</code> if this is guaranteed to die (even if <code>willDie</code> or
477
     *        {@link #willDie()} throw an exception).
478
     * @param willDie the last actions to take before killing this queue.
479
     * @param dying the last actions to take before this queue is dead.
480
     * @return the future killing, which will return <code>dying</code> result.
93 ilm 481
     * @throws IllegalStateException if the state isn't {@link RunningState#RUNNING}.
83 ilm 482
     * @see #dieCalled()
483
     */
93 ilm 484
    public final <V> LethalFutureTask<V> die(final boolean force, final Runnable willDie, final Callable<V> dying) throws IllegalStateException {
485
        synchronized (this) {
486
            final RunningState state = this.getRunningState();
487
            if (state == RunningState.NEW)
488
                throw new IllegalStateException("Not started");
489
            if (state.compareTo(RunningState.RUNNING) > 0)
490
                throw new IllegalStateException("die() already called or thread was killed by an Error : " + state);
491
            assert state == RunningState.RUNNING;
492
            this.setState(RunningState.WILL_DIE);
493
        }
83 ilm 494
        // reset sleeping to original value if die not effective
495
        final AtomicBoolean resetSleeping = new AtomicBoolean(false);
93 ilm 496
        final LethalFutureTask<V> res = new LethalFutureTask<V>(this, new Callable<V>() {
83 ilm 497
            @Override
498
            public V call() throws Exception {
499
                Exception willDieExn = null;
500
                try {
501
                    willDie();
502
                    if (willDie != null) {
503
                        willDie.run();
504
                        // handle Future like runnable, i.e. check right away for exception
505
                        if (willDie instanceof Future) {
506
                            final Future<?> f = (Future<?>) willDie;
507
                            assert f.isDone() : "Ran but not done: " + f;
508
                            try {
509
                                f.get();
510
                            } catch (ExecutionException e) {
511
                                throw (Exception) e.getCause();
512
                            }
513
                        }
514
                    }
515
                } catch (Exception e) {
93 ilm 516
                    if (!force) {
517
                        setState(RunningState.RUNNING);
83 ilm 518
                        throw e;
93 ilm 519
                    } else {
83 ilm 520
                        willDieExn = e;
93 ilm 521
                    }
83 ilm 522
                }
523
                try {
524
                    // don't interrupt ourselves
525
                    SleepingQueue.this.tasksQueue.die(false);
526
                    assert SleepingQueue.this.tasksQueue.isDying();
93 ilm 527
                    setState(RunningState.DYING);
83 ilm 528
                    // since there's already been an exception, throw it as soon as possible
529
                    // also dying() might itself throw an exception for the same reason or we now
530
                    // have 2 exceptions to throw
531
                    if (willDieExn != null)
532
                        throw willDieExn;
533
                    dying();
534
                    final V res;
535
                    if (dying != null)
536
                        res = dying.call();
537
                    else
538
                        res = null;
539
 
540
                    return res;
541
                } finally {
542
                    // if die is effective, this won't have any consequences
543
                    if (resetSleeping.get())
544
                        SleepingQueue.this.tasksQueue.setSleeping(true);
545
                }
546
            }
547
        });
548
        // die as soon as possible not after all currently queued tasks
549
        this.tasksQueue.itemsDo(new IClosure<Deque<FutureTask<?>>>() {
550
            @Override
551
            public void executeChecked(Deque<FutureTask<?>> input) {
552
                // since we cancel the current task, we might as well remove all of them since they
553
                // might depend on the cancelled one
93 ilm 554
                // cancel removed tasks so that callers of get() don't wait forever
555
                for (final FutureTask<?> ft : input) {
556
                    // by definition tasks in the queue aren't executing, so interrupt parameter is
557
                    // useless. On the other hand cancel() might return false if already cancelled.
558
                    ft.cancel(false);
559
                }
83 ilm 560
                input.clear();
93 ilm 561
 
83 ilm 562
                input.addFirst(res);
563
                // die as soon as possible, even if there's a long task already running
564
                final FutureTask<?> beingRun = getBeingRun();
565
                // since we hold the lock on items
566
                assert beingRun != res : "beingRun: " + beingRun + " ; res: " + res;
567
                if (beingRun != null)
568
                    beingRun.cancel(true);
569
            }
570
        });
571
        // force execution of our task
572
        resetSleeping.set(this.setSleeping(false));
573
        return res;
574
    }
575
 
576
    protected void willDie() {
577
        // nothing by default
578
    }
579
 
93 ilm 580
    protected void dying() throws Exception {
17 ilm 581
        // nothing by default
582
    }
583
 
584
    /**
83 ilm 585
     * Whether this will die. If this method returns <code>true</code>, it is guaranteed that no
93 ilm 586
     * other task will be taken from the queue to be started. Note: this method doesn't return
587
     * <code>true</code> right after {@link #die()} as the method is asynchronous and if
588
     * {@link #willDie()} fails it may not die at all ; as explained in its comment you may use its
589
     * returned future to wait for the killing.
17 ilm 590
     *
93 ilm 591
     * @return <code>true</code> if this queue will not execute any more tasks (but it may hang
592
     *         indefinitely if the dying runnable blocks).
83 ilm 593
     * @see #isDead()
594
     */
595
    public final boolean dieCalled() {
596
        return this.tasksQueue.dieCalled();
597
    }
598
 
599
    /**
600
     * Whether this queue is dead, i.e. if die() has been called and all tasks have completed.
601
     *
602
     * @return <code>true</code> if this queue will not execute any more tasks and isn't executing
603
     *         any.
17 ilm 604
     * @see #die()
605
     */
606
    public final boolean isDead() {
607
        return this.tasksQueue.isDead();
608
    }
609
 
93 ilm 610
    /**
611
     * Allow to wait for the thread to end. Once this method returns {@link #getRunningState()} will
612
     * always return {@link RunningState#DEAD}. Useful since the future from
613
     * {@link #die(boolean, Runnable, Callable)} returns when all tasks are finished but the
614
     * {@link #getRunningState()} is still {@link RunningState#DYING} since the Thread takes a
615
     * little time to die.
616
     *
617
     * @throws InterruptedException if interrupted while waiting.
618
     * @see Thread#join()
619
     */
620
    public final void join() throws InterruptedException {
621
        this.tasksQueue.join();
622
    }
623
 
624
    public final void join(long millis, int nanos) throws InterruptedException {
625
        this.tasksQueue.join(millis, nanos);
626
    }
627
 
17 ilm 628
    public void addPropertyChangeListener(PropertyChangeListener l) {
629
        this.support.addPropertyChangeListener(l);
630
    }
631
 
632
    public void rmPropertyChangeListener(PropertyChangeListener l) {
633
        this.support.removePropertyChangeListener(l);
634
    }
635
 
636
    private final class SingleThreadedExecutor extends DropperQueue<FutureTask<?>> {
637
        private SingleThreadedExecutor() {
638
            super(SleepingQueue.this.name + System.currentTimeMillis());
639
        }
640
 
641
        @Override
642
        protected void process(FutureTask<?> task) {
643
            if (!task.isDone()) {
644
                /*
645
                 * From ThreadPoolExecutor : Track execution state to ensure that afterExecute is
646
                 * called only if task completed or threw exception. Otherwise, the caught runtime
647
                 * exception will have been thrown by afterExecute itself, in which case we don't
648
                 * want to call it again.
649
                 */
650
                boolean ran = false;
651
                beforeExecute(task);
652
                try {
653
                    task.run();
654
                    ran = true;
655
                    afterExecute(task, null);
656
                } catch (RuntimeException ex) {
657
                    if (!ran)
658
                        afterExecute(task, ex);
659
                    // don't throw ex, afterExecute() can do whatever needs to be done (like killing
660
                    // this queue)
661
                }
662
            }
663
        }
664
 
665
        protected void beforeExecute(final FutureTask<?> f) {
666
            cancelCheck(f);
667
            setBeingRun(f);
668
        }
669
 
670
        protected void afterExecute(final FutureTask<?> f, final Throwable t) {
671
            setBeingRun(null);
672
 
673
            try {
674
                f.get();
675
            } catch (CancellationException e) {
676
                // don't care
677
            } catch (InterruptedException e) {
83 ilm 678
                // f was interrupted : e.g. we're dying or f was cancelled
17 ilm 679
            } catch (ExecutionException e) {
83 ilm 680
                // f.run() raised an exception
681
                exceptionThrown(e);
17 ilm 682
            }
683
        }
684
    }
685
 
93 ilm 686
    @Override
17 ilm 687
    public String toString() {
93 ilm 688
        return this.toString(true);
17 ilm 689
    }
690
 
93 ilm 691
    public String toString(final boolean includeCurrentTask) {
692
        return super.toString() + " Queue: " + this.tasksQueue + (includeCurrentTask ? " run:" + this.getBeingRun() : "");
693
    }
694
 
17 ilm 695
}