OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 156 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
17 ilm 1
/*
2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3
 *
4
 * Copyright 2011 OpenConcerto, by ILM Informatique. All rights reserved.
5
 *
6
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
7
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
8
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
9
 * language governing permissions and limitations under the License.
10
 *
11
 * When distributing the software, include this License Header Notice in each file.
12
 */
13
 
14
 package org.openconcerto.utils;
15
 
16
import org.openconcerto.utils.cc.IClosure;
17
import org.openconcerto.utils.cc.IPredicate;
18
 
19
import java.beans.PropertyChangeListener;
20
import java.beans.PropertyChangeSupport;
156 ilm 21
import java.lang.Thread.UncaughtExceptionHandler;
17 ilm 22
import java.util.Collection;
61 ilm 23
import java.util.Deque;
83 ilm 24
import java.util.concurrent.Callable;
17 ilm 25
import java.util.concurrent.CancellationException;
26
import java.util.concurrent.ExecutionException;
177 ilm 27
import java.util.concurrent.Executor;
17 ilm 28
import java.util.concurrent.Future;
29
import java.util.concurrent.FutureTask;
177 ilm 30
import java.util.concurrent.RunnableFuture;
93 ilm 31
import java.util.concurrent.ScheduledFuture;
32
import java.util.concurrent.ScheduledThreadPoolExecutor;
33
import java.util.concurrent.TimeUnit;
83 ilm 34
import java.util.concurrent.atomic.AtomicBoolean;
93 ilm 35
import java.util.concurrent.atomic.AtomicReference;
36
import java.util.logging.Level;
17 ilm 37
 
93 ilm 38
import net.jcip.annotations.GuardedBy;
39
 
17 ilm 40
/**
177 ilm 41
 * A queue that can be put to sleep. Submitted runnables are converted to RunnableFuture, that can
42
 * later be cancelled.
17 ilm 43
 *
44
 * @author Sylvain
45
 */
177 ilm 46
public class SleepingQueue implements Executor {
17 ilm 47
 
93 ilm 48
    public static enum RunningState {
49
        NEW, RUNNING, WILL_DIE, DYING, DEAD
50
    }
51
 
52
    /**
53
     * A task that can kill a queue.
54
     *
55
     * @author Sylvain
56
     *
57
     * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
58
     */
59
    public static final class LethalFutureTask<V> extends FutureTask<V> {
60
        private final SleepingQueue q;
61
 
62
        public LethalFutureTask(final SleepingQueue q, final Callable<V> c) {
63
            super(c);
64
            this.q = q;
65
        }
66
 
67
        public final SleepingQueue getQueue() {
68
            return this.q;
69
        }
70
 
71
        @Override
72
        public String toString() {
73
            // don't includeCurrentTask as it could be us
74
            return this.getClass().getSimpleName() + " for " + this.getQueue().toString(false);
75
        }
76
    }
77
 
78
    private static final ScheduledThreadPoolExecutor exec;
79
 
80
    static {
81
        // daemon thread to allow the VM to exit
82
        exec = new ScheduledThreadPoolExecutor(2, new ThreadFactory("DieMonitor", true).setPriority(Thread.MIN_PRIORITY));
83
        // allow threads to die
84
        exec.setKeepAliveTime(30, TimeUnit.SECONDS);
85
        exec.allowCoreThreadTimeOut(true);
86
        exec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
87
        exec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
88
 
89
        assert exec.getPoolSize() == 0 : "Wasting resources";
90
    }
91
 
92
    public static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture) {
93
        return watchDying(lethalFuture, 1, 1, TimeUnit.MINUTES);
94
    }
95
 
96
    /**
97
     * Watch the passed future until it's done. When
98
     * {@link SleepingQueue#die(boolean, Runnable, Callable) killing} a queue, the currently running
99
     * task must first complete then the actual killing (represented by a {@link LethalFutureTask})
100
     * begins. This involves running methods and passed runnables which can all hang or throw an
101
     * exception. Therefore this method will periodically report on the status of the killing, and
102
     * report any exception that was thrown.
103
     *
104
     * @param lethalFuture the killing to watch.
105
     * @param initialDelay the time to delay first execution.
106
     * @param delay the delay between the termination of one execution and the commencement of the
107
     *        next.
108
     * @param unit the time unit of the initialDelay and delay parameters.
109
     * @return a future representing the watching.
110
     */
111
    public static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture, final int initialDelay, final int delay, final TimeUnit unit) {
112
        return watchDying(lethalFuture, initialDelay, delay, unit, null);
113
    }
114
 
115
    static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture, final int initialDelay, final int delay, final TimeUnit unit,
116
            final IClosure<? super ExecutionException> exnHandler) {
117
        // don't use fixed rate as it might burden our threads and if we just checked the status
118
        // while being late, there's no need to check sooner the next time.
119
        final AtomicReference<Future<?>> f = new AtomicReference<Future<?>>();
120
        final ScheduledFuture<?> res = exec.scheduleWithFixedDelay(new Runnable() {
121
 
122
            // lethalFuture won't kill the queue, i.e. willDie threw an exception and forceDie was
123
            // false.
124
            private void wontKill(final RunningState runningState, final boolean isDone) {
125
                Log.get().fine("Our watched future won't kill the queue, current state : " + runningState + " " + lethalFuture);
126
                if (isDone)
127
                    cancel();
128
            }
129
 
130
            private void cancel() {
131
                assert lethalFuture.isDone();
132
                try {
133
                    lethalFuture.get();
134
                } catch (InterruptedException e) {
135
                    // either we were cancelled or the executor is shutting down (i.e. the VM is
136
                    // terminating)
137
                    Log.get().log(Level.FINER, "Interrupted while waiting on a finished future " + lethalFuture, e);
138
                } catch (ExecutionException e) {
139
                    if (exnHandler == null)
140
                        Log.get().log(Level.WARNING, "Threw an exception : " + lethalFuture, e);
141
                    else
142
                        exnHandler.executeChecked(e);
143
                }
144
                f.get().cancel(true);
145
            }
146
 
147
            @Override
148
            public void run() {
149
                final boolean isDone;
150
                final RunningState runningState;
177 ilm 151
                final RunnableFuture<?> beingRun;
93 ilm 152
                final SleepingQueue q = lethalFuture.getQueue();
153
                synchronized (q) {
154
                    runningState = q.getRunningState();
155
                    beingRun = q.getBeingRun();
156
                    isDone = lethalFuture.isDone();
157
                }
158
                final Level l = Level.INFO;
159
                if (runningState == RunningState.RUNNING) {
160
                    // willDie threw an exception but lethalFuture might not be completely done
161
                    // in that case, wait for the next execution
162
                    wontKill(runningState, isDone);
163
                } else if (runningState == RunningState.WILL_DIE) {
164
                    if (isDone) {
165
                        wontKill(runningState, isDone);
166
                    } else if (beingRun == lethalFuture) {
167
                        // in willDie() method or Runnable
168
                        Log.get().log(l, "Pre-death has not yet finished " + lethalFuture);
169
                    } else {
170
                        Log.get().log(l, "Death has not yet begun for " + lethalFuture + "\ncurrently running : " + beingRun);
171
                    }
172
                } else if (runningState == RunningState.DYING) {
173
                    assert beingRun == null || beingRun instanceof LethalFutureTask;
174
                    if (beingRun == null) {
175
                        // should be dead real soon
176
                        // just wait for the next execution
177
                        assert isDone;
178
                        Log.get().log(l, "Death was carried out but the thread is not yet terminated. Watching " + lethalFuture);
179
                    } else if (beingRun == lethalFuture) {
180
                        // in dying() method or Callable
181
                        Log.get().log(l, "Post-death has not yet finished " + lethalFuture);
182
                    } else {
183
                        assert isDone;
184
                        wontKill(runningState, isDone);
185
                    }
186
                } else if (runningState == RunningState.DEAD) {
187
                    // OK
188
                    Log.get().log(l, "Death was carried out and the thread is terminated but not necessarily by " + lethalFuture);
189
                    cancel();
190
                } else {
191
                    Log.get().warning("Illegal state " + runningState + " for " + lethalFuture);
192
                }
193
            }
194
        }, initialDelay, delay, unit);
195
        f.set(res);
196
        return res;
197
    }
198
 
17 ilm 199
    private final String name;
200
 
93 ilm 201
    @GuardedBy("this")
202
    private RunningState state;
203
 
17 ilm 204
    private final PropertyChangeSupport support;
93 ilm 205
    @GuardedBy("this")
177 ilm 206
    private RunnableFuture<?> beingRun;
17 ilm 207
 
208
    private final SingleThreadedExecutor tasksQueue;
93 ilm 209
    @GuardedBy("this")
17 ilm 210
    private boolean canceling;
93 ilm 211
    @GuardedBy("this")
177 ilm 212
    private IPredicate<? super RunnableFuture<?>> cancelPredicate;
17 ilm 213
 
214
    public SleepingQueue() {
215
        this(SleepingQueue.class.getName() + System.currentTimeMillis());
216
    }
217
 
218
    public SleepingQueue(String name) {
219
        super();
220
        this.name = name;
221
 
93 ilm 222
        this.state = RunningState.NEW;
223
 
17 ilm 224
        this.canceling = false;
225
        this.cancelPredicate = null;
226
        this.support = new PropertyChangeSupport(this);
227
        this.setBeingRun(null);
228
 
229
        this.tasksQueue = new SingleThreadedExecutor();
93 ilm 230
    }
17 ilm 231
 
93 ilm 232
    public final void start() {
156 ilm 233
        this.start(null);
234
    }
235
 
236
    public final void start(final IClosure<Thread> customizeThread) {
237
        customizeThread(this.tasksQueue);
238
        if (customizeThread != null)
239
            customizeThread.executeChecked(this.tasksQueue);
93 ilm 240
        synchronized (this) {
241
            this.tasksQueue.start();
242
            this.setState(RunningState.RUNNING);
243
            started();
244
        }
17 ilm 245
    }
246
 
247
    /**
93 ilm 248
     * Start this queue only if not already started.
249
     *
250
     * @return <code>true</code> if the queue was started.
251
     */
252
    public final boolean startIfNew() {
253
        // don't use getRunningState() which calls isAlive()
254
        synchronized (this) {
255
            final boolean starting = this.state == RunningState.NEW;
256
            if (starting)
257
                this.start();
258
            assert this.state.compareTo(RunningState.NEW) > 0;
259
            return starting;
260
        }
261
    }
262
 
263
    protected void started() {
264
    }
265
 
266
    protected synchronized final void setState(final RunningState s) {
267
        this.state = s;
268
    }
269
 
270
    public synchronized final RunningState getRunningState() {
271
        // an Error could have stopped our thread so can't rely on this.state
272
        if (this.state == RunningState.NEW || this.tasksQueue.isAlive())
273
            return this.state;
274
        else
275
            return RunningState.DEAD;
276
    }
277
 
278
    public final boolean currentlyInQueue() {
279
        return Thread.currentThread() == this.tasksQueue;
280
    }
281
 
282
    /**
17 ilm 283
     * Customize the thread used to execute the passed runnables. This implementation sets the
284
     * priority to the minimum.
285
     *
286
     * @param thr the thread used by this queue.
287
     */
288
    protected void customizeThread(Thread thr) {
289
        thr.setPriority(Thread.MIN_PRIORITY);
290
    }
291
 
177 ilm 292
    protected final <T> RunnableFuture<T> newTaskFor(final Runnable task) {
93 ilm 293
        return this.newTaskFor(task, null);
294
    }
17 ilm 295
 
177 ilm 296
    protected <T> RunnableFuture<T> newTaskFor(final Runnable task, T value) {
93 ilm 297
        return new IFutureTask<T>(task, value, " for {" + this.name + "}");
17 ilm 298
    }
299
 
177 ilm 300
    public final RunnableFuture<?> put(Runnable workRunnable) {
301
        /*
302
         * Otherwise if passing a RunnableFuture, it will itself be wrapped in another
303
         * RunnableFuture. The outer RunnableFuture will call the inner one's run(), which just
304
         * record any exception. So the outer one's get() won't throw it and the exception will
305
         * effectively be swallowed.
306
         */
307
        final RunnableFuture<?> t;
308
        if (workRunnable instanceof RunnableFuture) {
309
            t = ((RunnableFuture<?>) workRunnable);
93 ilm 310
        } else {
311
            t = this.newTaskFor(workRunnable);
312
        }
177 ilm 313
        return this.add(t);
93 ilm 314
    }
315
 
177 ilm 316
    @Override
317
    public final void execute(Runnable command) {
318
        this.put(command);
319
    }
320
 
321
    public final <F extends RunnableFuture<?>> F add(F t) {
17 ilm 322
        if (this.shallAdd(t)) {
177 ilm 323
            // no need to synchronize, if die() is called after our test, t won't be executed anyway
324
            if (this.dieCalled())
325
                throw new IllegalStateException("Already dead, cannot exec " + t);
326
 
327
            this.tasksQueue.put(t);
17 ilm 328
            return t;
177 ilm 329
        } else {
17 ilm 330
            return null;
177 ilm 331
        }
17 ilm 332
    }
333
 
177 ilm 334
    private final boolean shallAdd(RunnableFuture<?> runnable) {
17 ilm 335
        if (runnable == null)
336
            throw new NullPointerException("null runnable");
337
        try {
338
            this.willPut(runnable);
339
            return true;
340
        } catch (InterruptedException e) {
341
            // si on interrompt, ne pas ajouter
342
            return false;
343
        }
344
    }
345
 
346
    /**
347
     * Give subclass the ability to reject runnables.
348
     *
349
     * @param r the runnable that is being added.
350
     * @throws InterruptedException if r should not be added to this queue.
351
     */
177 ilm 352
    protected void willPut(RunnableFuture<?> r) throws InterruptedException {
17 ilm 353
    }
354
 
355
    /**
156 ilm 356
     * An exception was thrown by a task. This implementation uses
357
     * {@link Thread#getUncaughtExceptionHandler()} or
358
     * {@link Thread#getDefaultUncaughtExceptionHandler()} if available, otherwise falls back to
359
     * just {@link Exception#printStackTrace()}. To set the handler, {@link #start(IClosure)} can be
360
     * used.
83 ilm 361
     *
362
     * @param exn the exception thrown.
363
     */
364
    protected void exceptionThrown(final ExecutionException exn) {
156 ilm 365
        final Thread thr = this.tasksQueue;
366
        UncaughtExceptionHandler h = thr.getUncaughtExceptionHandler();
367
        if (h == null)
368
            h = Thread.getDefaultUncaughtExceptionHandler();
369
        if (h != null) {
370
            h.uncaughtException(thr, exn);
371
        } else {
372
            exn.printStackTrace();
373
        }
83 ilm 374
    }
375
 
376
    /**
17 ilm 377
     * Cancel all queued tasks and the current task.
378
     */
379
    protected final void cancel() {
380
        this.cancel(null);
381
    }
382
 
383
    /**
384
     * Cancel only tasks for which pred is <code>true</code>.
385
     *
386
     * @param pred a predicate to know which tasks to cancel.
387
     */
177 ilm 388
    protected final void cancel(final IPredicate<? super RunnableFuture<?>> pred) {
389
        this.tasksDo(new IClosure<Collection<RunnableFuture<?>>>() {
17 ilm 390
            @Override
177 ilm 391
            public void executeChecked(Collection<RunnableFuture<?>> tasks) {
17 ilm 392
                cancel(pred, tasks);
393
            }
394
        });
395
    }
396
 
177 ilm 397
    private final void cancel(IPredicate<? super RunnableFuture<?>> pred, Collection<RunnableFuture<?>> tasks) {
17 ilm 398
        try {
399
            synchronized (this) {
400
                this.canceling = true;
401
                this.cancelPredicate = pred;
402
                this.cancelCheck(this.getBeingRun());
403
            }
404
 
177 ilm 405
            for (final RunnableFuture<?> t : tasks) {
17 ilm 406
                this.cancelCheck(t);
407
            }
408
        } finally {
409
            synchronized (this) {
410
                this.canceling = false;
411
                // allow the predicate to be gc'd
412
                this.cancelPredicate = null;
413
            }
414
        }
415
    }
416
 
177 ilm 417
    public final void tasksDo(IClosure<? super Deque<RunnableFuture<?>>> c) {
17 ilm 418
        this.tasksQueue.itemsDo(c);
419
    }
420
 
177 ilm 421
    private void cancelCheck(RunnableFuture<?> t) {
17 ilm 422
        if (t != null)
423
            synchronized (this) {
424
                if (this.canceling && (this.cancelPredicate == null || this.cancelPredicate.evaluateChecked(t)))
425
                    t.cancel(true);
426
            }
427
    }
428
 
177 ilm 429
    private void setBeingRun(final RunnableFuture<?> beingRun) {
83 ilm 430
        final Future<?> old;
17 ilm 431
        synchronized (this) {
432
            old = this.beingRun;
433
            this.beingRun = beingRun;
434
        }
435
        this.support.firePropertyChange("beingRun", old, beingRun);
436
    }
437
 
177 ilm 438
    public final synchronized RunnableFuture<?> getBeingRun() {
17 ilm 439
        return this.beingRun;
440
    }
441
 
442
    public boolean isSleeping() {
443
        return this.tasksQueue.isSleeping();
444
    }
445
 
83 ilm 446
    public boolean setSleeping(boolean sleeping) {
447
        final boolean res = this.tasksQueue.setSleeping(sleeping);
448
        if (res) {
93 ilm 449
            this.support.firePropertyChange("sleeping", null, sleeping);
17 ilm 450
        }
83 ilm 451
        return res;
17 ilm 452
    }
453
 
454
    /**
455
     * Stops this queue. Once this method returns, it is guaranteed that no other task will be taken
83 ilm 456
     * from the queue to be started, and that this queue will die. But the already executing task
457
     * will complete unless it checks for interrupt.
458
     *
459
     * @return the future killing.
17 ilm 460
     */
93 ilm 461
    public final LethalFutureTask<?> die() {
83 ilm 462
        return this.die(true, null, null);
17 ilm 463
    }
464
 
83 ilm 465
    /**
93 ilm 466
     * Stops this queue. All tasks in the queue, including the {@link #getBeingRun() currently
467
     * running}, will be {@link Future#cancel(boolean) cancelled}. The currently running task will
468
     * thus complete unless it checks for interrupt. Once the returned future completes successfully
469
     * then no task is executing ( {@link #isDead()} will happen sometimes later, the time for the
470
     * thread to terminate). If the returned future throws an exception because of the passed
471
     * runnables or of {@link #willDie()} or {@link #dying()}, one can check with
472
     * {@link #dieCalled()} to see if the queue is dying.
473
     * <p>
474
     * This method tries to limit the cases where the returned Future will not get executed : it
475
     * checks that this was {@link #start() started} and is not already {@link RunningState#DYING}
476
     * or {@link RunningState#DEAD}. It also doesn't allow {@link RunningState#WILL_DIE} as it could
477
     * cancel the previously passed runnables or never run the passed runnables. But even with these
478
     * restrictions a number of things can prevent the result from getting executed : the
479
     * {@link #getBeingRun() currently running} task hangs indefinitely, it throws an {@link Error}
480
     * ; the passed runnables hang indefinitely.
481
     * </p>
83 ilm 482
     *
483
     * @param force <code>true</code> if this is guaranteed to die (even if <code>willDie</code> or
484
     *        {@link #willDie()} throw an exception).
485
     * @param willDie the last actions to take before killing this queue.
486
     * @param dying the last actions to take before this queue is dead.
487
     * @return the future killing, which will return <code>dying</code> result.
93 ilm 488
     * @throws IllegalStateException if the state isn't {@link RunningState#RUNNING}.
83 ilm 489
     * @see #dieCalled()
490
     */
93 ilm 491
    public final <V> LethalFutureTask<V> die(final boolean force, final Runnable willDie, final Callable<V> dying) throws IllegalStateException {
492
        synchronized (this) {
493
            final RunningState state = this.getRunningState();
494
            if (state == RunningState.NEW)
495
                throw new IllegalStateException("Not started");
496
            if (state.compareTo(RunningState.RUNNING) > 0)
497
                throw new IllegalStateException("die() already called or thread was killed by an Error : " + state);
498
            assert state == RunningState.RUNNING;
499
            this.setState(RunningState.WILL_DIE);
500
        }
83 ilm 501
        // reset sleeping to original value if die not effective
502
        final AtomicBoolean resetSleeping = new AtomicBoolean(false);
93 ilm 503
        final LethalFutureTask<V> res = new LethalFutureTask<V>(this, new Callable<V>() {
83 ilm 504
            @Override
505
            public V call() throws Exception {
506
                Exception willDieExn = null;
507
                try {
508
                    willDie();
509
                    if (willDie != null) {
510
                        willDie.run();
511
                        // handle Future like runnable, i.e. check right away for exception
512
                        if (willDie instanceof Future) {
513
                            final Future<?> f = (Future<?>) willDie;
514
                            assert f.isDone() : "Ran but not done: " + f;
515
                            try {
516
                                f.get();
517
                            } catch (ExecutionException e) {
518
                                throw (Exception) e.getCause();
519
                            }
520
                        }
521
                    }
522
                } catch (Exception e) {
93 ilm 523
                    if (!force) {
524
                        setState(RunningState.RUNNING);
83 ilm 525
                        throw e;
93 ilm 526
                    } else {
83 ilm 527
                        willDieExn = e;
93 ilm 528
                    }
83 ilm 529
                }
530
                try {
531
                    // don't interrupt ourselves
532
                    SleepingQueue.this.tasksQueue.die(false);
533
                    assert SleepingQueue.this.tasksQueue.isDying();
93 ilm 534
                    setState(RunningState.DYING);
83 ilm 535
                    // since there's already been an exception, throw it as soon as possible
536
                    // also dying() might itself throw an exception for the same reason or we now
537
                    // have 2 exceptions to throw
538
                    if (willDieExn != null)
539
                        throw willDieExn;
540
                    dying();
541
                    final V res;
542
                    if (dying != null)
543
                        res = dying.call();
544
                    else
545
                        res = null;
546
 
547
                    return res;
548
                } finally {
549
                    // if die is effective, this won't have any consequences
550
                    if (resetSleeping.get())
551
                        SleepingQueue.this.tasksQueue.setSleeping(true);
552
                }
553
            }
554
        });
555
        // die as soon as possible not after all currently queued tasks
177 ilm 556
        this.tasksQueue.itemsDo(new IClosure<Deque<RunnableFuture<?>>>() {
83 ilm 557
            @Override
177 ilm 558
            public void executeChecked(Deque<RunnableFuture<?>> input) {
83 ilm 559
                // since we cancel the current task, we might as well remove all of them since they
560
                // might depend on the cancelled one
93 ilm 561
                // cancel removed tasks so that callers of get() don't wait forever
177 ilm 562
                for (final RunnableFuture<?> ft : input) {
93 ilm 563
                    // by definition tasks in the queue aren't executing, so interrupt parameter is
564
                    // useless. On the other hand cancel() might return false if already cancelled.
565
                    ft.cancel(false);
566
                }
83 ilm 567
                input.clear();
93 ilm 568
 
83 ilm 569
                input.addFirst(res);
570
                // die as soon as possible, even if there's a long task already running
177 ilm 571
                final RunnableFuture<?> beingRun = getBeingRun();
83 ilm 572
                // since we hold the lock on items
573
                assert beingRun != res : "beingRun: " + beingRun + " ; res: " + res;
574
                if (beingRun != null)
575
                    beingRun.cancel(true);
576
            }
577
        });
578
        // force execution of our task
579
        resetSleeping.set(this.setSleeping(false));
580
        return res;
581
    }
582
 
583
    protected void willDie() {
584
        // nothing by default
585
    }
586
 
93 ilm 587
    protected void dying() throws Exception {
17 ilm 588
        // nothing by default
589
    }
590
 
591
    /**
83 ilm 592
     * Whether this will die. If this method returns <code>true</code>, it is guaranteed that no
93 ilm 593
     * other task will be taken from the queue to be started. Note: this method doesn't return
594
     * <code>true</code> right after {@link #die()} as the method is asynchronous and if
595
     * {@link #willDie()} fails it may not die at all ; as explained in its comment you may use its
596
     * returned future to wait for the killing.
17 ilm 597
     *
93 ilm 598
     * @return <code>true</code> if this queue will not execute any more tasks (but it may hang
599
     *         indefinitely if the dying runnable blocks).
83 ilm 600
     * @see #isDead()
601
     */
602
    public final boolean dieCalled() {
603
        return this.tasksQueue.dieCalled();
604
    }
605
 
606
    /**
607
     * Whether this queue is dead, i.e. if die() has been called and all tasks have completed.
608
     *
609
     * @return <code>true</code> if this queue will not execute any more tasks and isn't executing
610
     *         any.
17 ilm 611
     * @see #die()
612
     */
613
    public final boolean isDead() {
614
        return this.tasksQueue.isDead();
615
    }
616
 
93 ilm 617
    /**
618
     * Allow to wait for the thread to end. Once this method returns {@link #getRunningState()} will
619
     * always return {@link RunningState#DEAD}. Useful since the future from
620
     * {@link #die(boolean, Runnable, Callable)} returns when all tasks are finished but the
621
     * {@link #getRunningState()} is still {@link RunningState#DYING} since the Thread takes a
622
     * little time to die.
623
     *
624
     * @throws InterruptedException if interrupted while waiting.
625
     * @see Thread#join()
626
     */
627
    public final void join() throws InterruptedException {
628
        this.tasksQueue.join();
629
    }
630
 
631
    public final void join(long millis, int nanos) throws InterruptedException {
632
        this.tasksQueue.join(millis, nanos);
633
    }
634
 
17 ilm 635
    public void addPropertyChangeListener(PropertyChangeListener l) {
636
        this.support.addPropertyChangeListener(l);
637
    }
638
 
639
    public void rmPropertyChangeListener(PropertyChangeListener l) {
640
        this.support.removePropertyChangeListener(l);
641
    }
642
 
177 ilm 643
    private final class SingleThreadedExecutor extends DropperQueue<RunnableFuture<?>> {
17 ilm 644
        private SingleThreadedExecutor() {
645
            super(SleepingQueue.this.name + System.currentTimeMillis());
646
        }
647
 
648
        @Override
177 ilm 649
        protected void process(RunnableFuture<?> task) {
17 ilm 650
            if (!task.isDone()) {
651
                /*
652
                 * From ThreadPoolExecutor : Track execution state to ensure that afterExecute is
653
                 * called only if task completed or threw exception. Otherwise, the caught runtime
654
                 * exception will have been thrown by afterExecute itself, in which case we don't
655
                 * want to call it again.
656
                 */
657
                boolean ran = false;
658
                beforeExecute(task);
659
                try {
660
                    task.run();
661
                    ran = true;
662
                    afterExecute(task, null);
663
                } catch (RuntimeException ex) {
664
                    if (!ran)
665
                        afterExecute(task, ex);
666
                    // don't throw ex, afterExecute() can do whatever needs to be done (like killing
667
                    // this queue)
668
                }
669
            }
670
        }
671
 
177 ilm 672
        protected void beforeExecute(final RunnableFuture<?> f) {
17 ilm 673
            cancelCheck(f);
674
            setBeingRun(f);
675
        }
676
 
177 ilm 677
        protected void afterExecute(final RunnableFuture<?> f, final Throwable t) {
17 ilm 678
            setBeingRun(null);
679
 
680
            try {
681
                f.get();
682
            } catch (CancellationException e) {
683
                // don't care
684
            } catch (InterruptedException e) {
83 ilm 685
                // f was interrupted : e.g. we're dying or f was cancelled
17 ilm 686
            } catch (ExecutionException e) {
83 ilm 687
                // f.run() raised an exception
688
                exceptionThrown(e);
17 ilm 689
            }
690
        }
691
    }
692
 
93 ilm 693
    @Override
17 ilm 694
    public String toString() {
93 ilm 695
        return this.toString(true);
17 ilm 696
    }
697
 
93 ilm 698
    public String toString(final boolean includeCurrentTask) {
699
        return super.toString() + " Queue: " + this.tasksQueue + (includeCurrentTask ? " run:" + this.getBeingRun() : "");
700
    }
701
 
17 ilm 702
}