OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 91 | Rev 156 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
17 ilm 1
/*
2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3
 *
4
 * Copyright 2011 OpenConcerto, by ILM Informatique. All rights reserved.
5
 *
6
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
7
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
8
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
9
 * language governing permissions and limitations under the License.
10
 *
11
 * When distributing the software, include this License Header Notice in each file.
12
 */
13
 
14
 package org.openconcerto.utils;
15
 
16
import org.openconcerto.utils.cc.IClosure;
17
import org.openconcerto.utils.cc.IPredicate;
18
 
19
import java.beans.PropertyChangeListener;
20
import java.beans.PropertyChangeSupport;
21
import java.util.Collection;
61 ilm 22
import java.util.Deque;
83 ilm 23
import java.util.concurrent.Callable;
17 ilm 24
import java.util.concurrent.CancellationException;
25
import java.util.concurrent.ExecutionException;
26
import java.util.concurrent.Future;
27
import java.util.concurrent.FutureTask;
93 ilm 28
import java.util.concurrent.ScheduledFuture;
29
import java.util.concurrent.ScheduledThreadPoolExecutor;
30
import java.util.concurrent.TimeUnit;
83 ilm 31
import java.util.concurrent.atomic.AtomicBoolean;
93 ilm 32
import java.util.concurrent.atomic.AtomicReference;
33
import java.util.logging.Level;
17 ilm 34
 
93 ilm 35
import net.jcip.annotations.GuardedBy;
36
 
17 ilm 37
/**
38
 * A queue that can be put to sleep. Submitted runnables are converted to FutureTask, that can later
39
 * be cancelled.
40
 *
41
 * @author Sylvain
42
 */
43
public class SleepingQueue {
44
 
93 ilm 45
    public static enum RunningState {
46
        NEW, RUNNING, WILL_DIE, DYING, DEAD
47
    }
48
 
49
    /**
50
     * A task that can kill a queue.
51
     *
52
     * @author Sylvain
53
     *
54
     * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
55
     */
56
    public static final class LethalFutureTask<V> extends FutureTask<V> {
57
        private final SleepingQueue q;
58
 
59
        public LethalFutureTask(final SleepingQueue q, final Callable<V> c) {
60
            super(c);
61
            this.q = q;
62
        }
63
 
64
        public final SleepingQueue getQueue() {
65
            return this.q;
66
        }
67
 
68
        @Override
69
        public String toString() {
70
            // don't includeCurrentTask as it could be us
71
            return this.getClass().getSimpleName() + " for " + this.getQueue().toString(false);
72
        }
73
    }
74
 
75
    private static final ScheduledThreadPoolExecutor exec;
76
 
77
    static {
78
        // daemon thread to allow the VM to exit
79
        exec = new ScheduledThreadPoolExecutor(2, new ThreadFactory("DieMonitor", true).setPriority(Thread.MIN_PRIORITY));
80
        // allow threads to die
81
        exec.setKeepAliveTime(30, TimeUnit.SECONDS);
82
        exec.allowCoreThreadTimeOut(true);
83
        exec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
84
        exec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
85
 
86
        assert exec.getPoolSize() == 0 : "Wasting resources";
87
    }
88
 
89
    public static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture) {
90
        return watchDying(lethalFuture, 1, 1, TimeUnit.MINUTES);
91
    }
92
 
93
    /**
94
     * Watch the passed future until it's done. When
95
     * {@link SleepingQueue#die(boolean, Runnable, Callable) killing} a queue, the currently running
96
     * task must first complete then the actual killing (represented by a {@link LethalFutureTask})
97
     * begins. This involves running methods and passed runnables which can all hang or throw an
98
     * exception. Therefore this method will periodically report on the status of the killing, and
99
     * report any exception that was thrown.
100
     *
101
     * @param lethalFuture the killing to watch.
102
     * @param initialDelay the time to delay first execution.
103
     * @param delay the delay between the termination of one execution and the commencement of the
104
     *        next.
105
     * @param unit the time unit of the initialDelay and delay parameters.
106
     * @return a future representing the watching.
107
     */
108
    public static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture, final int initialDelay, final int delay, final TimeUnit unit) {
109
        return watchDying(lethalFuture, initialDelay, delay, unit, null);
110
    }
111
 
112
    static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture, final int initialDelay, final int delay, final TimeUnit unit,
113
            final IClosure<? super ExecutionException> exnHandler) {
114
        // don't use fixed rate as it might burden our threads and if we just checked the status
115
        // while being late, there's no need to check sooner the next time.
116
        final AtomicReference<Future<?>> f = new AtomicReference<Future<?>>();
117
        final ScheduledFuture<?> res = exec.scheduleWithFixedDelay(new Runnable() {
118
 
119
            // lethalFuture won't kill the queue, i.e. willDie threw an exception and forceDie was
120
            // false.
121
            private void wontKill(final RunningState runningState, final boolean isDone) {
122
                Log.get().fine("Our watched future won't kill the queue, current state : " + runningState + " " + lethalFuture);
123
                if (isDone)
124
                    cancel();
125
            }
126
 
127
            private void cancel() {
128
                assert lethalFuture.isDone();
129
                try {
130
                    lethalFuture.get();
131
                } catch (InterruptedException e) {
132
                    // either we were cancelled or the executor is shutting down (i.e. the VM is
133
                    // terminating)
134
                    Log.get().log(Level.FINER, "Interrupted while waiting on a finished future " + lethalFuture, e);
135
                } catch (ExecutionException e) {
136
                    if (exnHandler == null)
137
                        Log.get().log(Level.WARNING, "Threw an exception : " + lethalFuture, e);
138
                    else
139
                        exnHandler.executeChecked(e);
140
                }
141
                f.get().cancel(true);
142
            }
143
 
144
            @Override
145
            public void run() {
146
                final boolean isDone;
147
                final RunningState runningState;
148
                final FutureTask<?> beingRun;
149
                final SleepingQueue q = lethalFuture.getQueue();
150
                synchronized (q) {
151
                    runningState = q.getRunningState();
152
                    beingRun = q.getBeingRun();
153
                    isDone = lethalFuture.isDone();
154
                }
155
                final Level l = Level.INFO;
156
                if (runningState == RunningState.RUNNING) {
157
                    // willDie threw an exception but lethalFuture might not be completely done
158
                    // in that case, wait for the next execution
159
                    wontKill(runningState, isDone);
160
                } else if (runningState == RunningState.WILL_DIE) {
161
                    if (isDone) {
162
                        wontKill(runningState, isDone);
163
                    } else if (beingRun == lethalFuture) {
164
                        // in willDie() method or Runnable
165
                        Log.get().log(l, "Pre-death has not yet finished " + lethalFuture);
166
                    } else {
167
                        Log.get().log(l, "Death has not yet begun for " + lethalFuture + "\ncurrently running : " + beingRun);
168
                    }
169
                } else if (runningState == RunningState.DYING) {
170
                    assert beingRun == null || beingRun instanceof LethalFutureTask;
171
                    if (beingRun == null) {
172
                        // should be dead real soon
173
                        // just wait for the next execution
174
                        assert isDone;
175
                        Log.get().log(l, "Death was carried out but the thread is not yet terminated. Watching " + lethalFuture);
176
                    } else if (beingRun == lethalFuture) {
177
                        // in dying() method or Callable
178
                        Log.get().log(l, "Post-death has not yet finished " + lethalFuture);
179
                    } else {
180
                        assert isDone;
181
                        wontKill(runningState, isDone);
182
                    }
183
                } else if (runningState == RunningState.DEAD) {
184
                    // OK
185
                    Log.get().log(l, "Death was carried out and the thread is terminated but not necessarily by " + lethalFuture);
186
                    cancel();
187
                } else {
188
                    Log.get().warning("Illegal state " + runningState + " for " + lethalFuture);
189
                }
190
            }
191
        }, initialDelay, delay, unit);
192
        f.set(res);
193
        return res;
194
    }
195
 
17 ilm 196
    private final String name;
197
 
93 ilm 198
    @GuardedBy("this")
199
    private RunningState state;
200
 
17 ilm 201
    private final PropertyChangeSupport support;
93 ilm 202
    @GuardedBy("this")
17 ilm 203
    private FutureTask<?> beingRun;
204
 
205
    private final SingleThreadedExecutor tasksQueue;
93 ilm 206
    @GuardedBy("this")
17 ilm 207
    private boolean canceling;
93 ilm 208
    @GuardedBy("this")
17 ilm 209
    private IPredicate<FutureTask<?>> cancelPredicate;
210
 
211
    public SleepingQueue() {
212
        this(SleepingQueue.class.getName() + System.currentTimeMillis());
213
    }
214
 
215
    public SleepingQueue(String name) {
216
        super();
217
        this.name = name;
218
 
93 ilm 219
        this.state = RunningState.NEW;
220
 
17 ilm 221
        this.canceling = false;
222
        this.cancelPredicate = null;
223
        this.support = new PropertyChangeSupport(this);
224
        this.setBeingRun(null);
225
 
226
        this.tasksQueue = new SingleThreadedExecutor();
93 ilm 227
    }
17 ilm 228
 
93 ilm 229
    public final void start() {
230
        synchronized (this) {
231
            this.tasksQueue.start();
232
            this.setState(RunningState.RUNNING);
233
            started();
234
        }
17 ilm 235
    }
236
 
237
    /**
93 ilm 238
     * Start this queue only if not already started.
239
     *
240
     * @return <code>true</code> if the queue was started.
241
     */
242
    public final boolean startIfNew() {
243
        // don't use getRunningState() which calls isAlive()
244
        synchronized (this) {
245
            final boolean starting = this.state == RunningState.NEW;
246
            if (starting)
247
                this.start();
248
            assert this.state.compareTo(RunningState.NEW) > 0;
249
            return starting;
250
        }
251
    }
252
 
253
    protected void started() {
254
    }
255
 
256
    protected synchronized final void setState(final RunningState s) {
257
        this.state = s;
258
    }
259
 
260
    public synchronized final RunningState getRunningState() {
261
        // an Error could have stopped our thread so can't rely on this.state
262
        if (this.state == RunningState.NEW || this.tasksQueue.isAlive())
263
            return this.state;
264
        else
265
            return RunningState.DEAD;
266
    }
267
 
268
    public final boolean currentlyInQueue() {
269
        return Thread.currentThread() == this.tasksQueue;
270
    }
271
 
272
    /**
17 ilm 273
     * Customize the thread used to execute the passed runnables. This implementation sets the
274
     * priority to the minimum.
275
     *
276
     * @param thr the thread used by this queue.
277
     */
278
    protected void customizeThread(Thread thr) {
279
        thr.setPriority(Thread.MIN_PRIORITY);
280
    }
281
 
93 ilm 282
    protected final <T> FutureTask<T> newTaskFor(final Runnable task) {
283
        return this.newTaskFor(task, null);
284
    }
17 ilm 285
 
93 ilm 286
    protected <T> FutureTask<T> newTaskFor(final Runnable task, T value) {
287
        return new IFutureTask<T>(task, value, " for {" + this.name + "}");
17 ilm 288
    }
289
 
93 ilm 290
    public final FutureTask<?> put(Runnable workRunnable) {
291
        // otherwise if passing a FutureTask, it will itself be wrapped in another FutureTask. The
292
        // outer FutureTask will call the inner one's run(), which just record any exception. So the
293
        // outer one's get() won't throw it and the exception will effectively be swallowed.
294
        final FutureTask<?> t;
295
        if (workRunnable instanceof FutureTask) {
296
            t = ((FutureTask<?>) workRunnable);
297
        } else {
298
            t = this.newTaskFor(workRunnable);
299
        }
300
        return this.execute(t);
301
    }
302
 
17 ilm 303
    public final <F extends FutureTask<?>> F execute(F t) {
304
        if (this.shallAdd(t)) {
305
            this.add(t);
306
            return t;
307
        } else
308
            return null;
309
    }
310
 
311
    private void add(FutureTask<?> t) {
312
        // no need to synchronize, if die() is called after our test, t won't be executed anyway
83 ilm 313
        if (this.dieCalled())
41 ilm 314
            throw new IllegalStateException("Already dead, cannot exec " + t);
17 ilm 315
 
316
        this.tasksQueue.put(t);
317
    }
318
 
93 ilm 319
    private final boolean shallAdd(FutureTask<?> runnable) {
17 ilm 320
        if (runnable == null)
321
            throw new NullPointerException("null runnable");
322
        try {
323
            this.willPut(runnable);
324
            return true;
325
        } catch (InterruptedException e) {
326
            // si on interrompt, ne pas ajouter
327
            return false;
328
        }
329
    }
330
 
331
    /**
332
     * Give subclass the ability to reject runnables.
333
     *
334
     * @param r the runnable that is being added.
335
     * @throws InterruptedException if r should not be added to this queue.
336
     */
93 ilm 337
    protected void willPut(FutureTask<?> r) throws InterruptedException {
17 ilm 338
    }
339
 
340
    /**
83 ilm 341
     * An exception was thrown by a task. This implementation merely
342
     * {@link Exception#printStackTrace()}.
343
     *
344
     * @param exn the exception thrown.
345
     */
346
    protected void exceptionThrown(final ExecutionException exn) {
347
        exn.printStackTrace();
348
    }
349
 
350
    /**
17 ilm 351
     * Cancel all queued tasks and the current task.
352
     */
353
    protected final void cancel() {
354
        this.cancel(null);
355
    }
356
 
357
    /**
358
     * Cancel only tasks for which pred is <code>true</code>.
359
     *
360
     * @param pred a predicate to know which tasks to cancel.
361
     */
362
    protected final void cancel(final IPredicate<FutureTask<?>> pred) {
363
        this.tasksDo(new IClosure<Collection<FutureTask<?>>>() {
364
            @Override
365
            public void executeChecked(Collection<FutureTask<?>> tasks) {
366
                cancel(pred, tasks);
367
            }
368
        });
369
    }
370
 
371
    private final void cancel(IPredicate<FutureTask<?>> pred, Collection<FutureTask<?>> tasks) {
372
        try {
373
            synchronized (this) {
374
                this.canceling = true;
375
                this.cancelPredicate = pred;
376
                this.cancelCheck(this.getBeingRun());
377
            }
378
 
379
            for (final FutureTask<?> t : tasks) {
380
                this.cancelCheck(t);
381
            }
382
        } finally {
383
            synchronized (this) {
384
                this.canceling = false;
385
                // allow the predicate to be gc'd
386
                this.cancelPredicate = null;
387
            }
388
        }
389
    }
390
 
61 ilm 391
    public final void tasksDo(IClosure<? super Deque<FutureTask<?>>> c) {
17 ilm 392
        this.tasksQueue.itemsDo(c);
393
    }
394
 
395
    private void cancelCheck(FutureTask<?> t) {
396
        if (t != null)
397
            synchronized (this) {
398
                if (this.canceling && (this.cancelPredicate == null || this.cancelPredicate.evaluateChecked(t)))
399
                    t.cancel(true);
400
            }
401
    }
402
 
403
    private void setBeingRun(final FutureTask<?> beingRun) {
83 ilm 404
        final Future<?> old;
17 ilm 405
        synchronized (this) {
406
            old = this.beingRun;
407
            this.beingRun = beingRun;
408
        }
409
        this.support.firePropertyChange("beingRun", old, beingRun);
410
    }
411
 
91 ilm 412
    public final synchronized FutureTask<?> getBeingRun() {
17 ilm 413
        return this.beingRun;
414
    }
415
 
416
    public boolean isSleeping() {
417
        return this.tasksQueue.isSleeping();
418
    }
419
 
83 ilm 420
    public boolean setSleeping(boolean sleeping) {
421
        final boolean res = this.tasksQueue.setSleeping(sleeping);
422
        if (res) {
93 ilm 423
            this.support.firePropertyChange("sleeping", null, sleeping);
17 ilm 424
        }
83 ilm 425
        return res;
17 ilm 426
    }
427
 
428
    /**
429
     * Stops this queue. Once this method returns, it is guaranteed that no other task will be taken
83 ilm 430
     * from the queue to be started, and that this queue will die. But the already executing task
431
     * will complete unless it checks for interrupt.
432
     *
433
     * @return the future killing.
17 ilm 434
     */
93 ilm 435
    public final LethalFutureTask<?> die() {
83 ilm 436
        return this.die(true, null, null);
17 ilm 437
    }
438
 
83 ilm 439
    /**
93 ilm 440
     * Stops this queue. All tasks in the queue, including the {@link #getBeingRun() currently
441
     * running}, will be {@link Future#cancel(boolean) cancelled}. The currently running task will
442
     * thus complete unless it checks for interrupt. Once the returned future completes successfully
443
     * then no task is executing ( {@link #isDead()} will happen sometimes later, the time for the
444
     * thread to terminate). If the returned future throws an exception because of the passed
445
     * runnables or of {@link #willDie()} or {@link #dying()}, one can check with
446
     * {@link #dieCalled()} to see if the queue is dying.
447
     * <p>
448
     * This method tries to limit the cases where the returned Future will not get executed : it
449
     * checks that this was {@link #start() started} and is not already {@link RunningState#DYING}
450
     * or {@link RunningState#DEAD}. It also doesn't allow {@link RunningState#WILL_DIE} as it could
451
     * cancel the previously passed runnables or never run the passed runnables. But even with these
452
     * restrictions a number of things can prevent the result from getting executed : the
453
     * {@link #getBeingRun() currently running} task hangs indefinitely, it throws an {@link Error}
454
     * ; the passed runnables hang indefinitely.
455
     * </p>
83 ilm 456
     *
457
     * @param force <code>true</code> if this is guaranteed to die (even if <code>willDie</code> or
458
     *        {@link #willDie()} throw an exception).
459
     * @param willDie the last actions to take before killing this queue.
460
     * @param dying the last actions to take before this queue is dead.
461
     * @return the future killing, which will return <code>dying</code> result.
93 ilm 462
     * @throws IllegalStateException if the state isn't {@link RunningState#RUNNING}.
83 ilm 463
     * @see #dieCalled()
464
     */
93 ilm 465
    public final <V> LethalFutureTask<V> die(final boolean force, final Runnable willDie, final Callable<V> dying) throws IllegalStateException {
466
        synchronized (this) {
467
            final RunningState state = this.getRunningState();
468
            if (state == RunningState.NEW)
469
                throw new IllegalStateException("Not started");
470
            if (state.compareTo(RunningState.RUNNING) > 0)
471
                throw new IllegalStateException("die() already called or thread was killed by an Error : " + state);
472
            assert state == RunningState.RUNNING;
473
            this.setState(RunningState.WILL_DIE);
474
        }
83 ilm 475
        // reset sleeping to original value if die not effective
476
        final AtomicBoolean resetSleeping = new AtomicBoolean(false);
93 ilm 477
        final LethalFutureTask<V> res = new LethalFutureTask<V>(this, new Callable<V>() {
83 ilm 478
            @Override
479
            public V call() throws Exception {
480
                Exception willDieExn = null;
481
                try {
482
                    willDie();
483
                    if (willDie != null) {
484
                        willDie.run();
485
                        // handle Future like runnable, i.e. check right away for exception
486
                        if (willDie instanceof Future) {
487
                            final Future<?> f = (Future<?>) willDie;
488
                            assert f.isDone() : "Ran but not done: " + f;
489
                            try {
490
                                f.get();
491
                            } catch (ExecutionException e) {
492
                                throw (Exception) e.getCause();
493
                            }
494
                        }
495
                    }
496
                } catch (Exception e) {
93 ilm 497
                    if (!force) {
498
                        setState(RunningState.RUNNING);
83 ilm 499
                        throw e;
93 ilm 500
                    } else {
83 ilm 501
                        willDieExn = e;
93 ilm 502
                    }
83 ilm 503
                }
504
                try {
505
                    // don't interrupt ourselves
506
                    SleepingQueue.this.tasksQueue.die(false);
507
                    assert SleepingQueue.this.tasksQueue.isDying();
93 ilm 508
                    setState(RunningState.DYING);
83 ilm 509
                    // since there's already been an exception, throw it as soon as possible
510
                    // also dying() might itself throw an exception for the same reason or we now
511
                    // have 2 exceptions to throw
512
                    if (willDieExn != null)
513
                        throw willDieExn;
514
                    dying();
515
                    final V res;
516
                    if (dying != null)
517
                        res = dying.call();
518
                    else
519
                        res = null;
520
 
521
                    return res;
522
                } finally {
523
                    // if die is effective, this won't have any consequences
524
                    if (resetSleeping.get())
525
                        SleepingQueue.this.tasksQueue.setSleeping(true);
526
                }
527
            }
528
        });
529
        // die as soon as possible not after all currently queued tasks
530
        this.tasksQueue.itemsDo(new IClosure<Deque<FutureTask<?>>>() {
531
            @Override
532
            public void executeChecked(Deque<FutureTask<?>> input) {
533
                // since we cancel the current task, we might as well remove all of them since they
534
                // might depend on the cancelled one
93 ilm 535
                // cancel removed tasks so that callers of get() don't wait forever
536
                for (final FutureTask<?> ft : input) {
537
                    // by definition tasks in the queue aren't executing, so interrupt parameter is
538
                    // useless. On the other hand cancel() might return false if already cancelled.
539
                    ft.cancel(false);
540
                }
83 ilm 541
                input.clear();
93 ilm 542
 
83 ilm 543
                input.addFirst(res);
544
                // die as soon as possible, even if there's a long task already running
545
                final FutureTask<?> beingRun = getBeingRun();
546
                // since we hold the lock on items
547
                assert beingRun != res : "beingRun: " + beingRun + " ; res: " + res;
548
                if (beingRun != null)
549
                    beingRun.cancel(true);
550
            }
551
        });
552
        // force execution of our task
553
        resetSleeping.set(this.setSleeping(false));
554
        return res;
555
    }
556
 
557
    protected void willDie() {
558
        // nothing by default
559
    }
560
 
93 ilm 561
    protected void dying() throws Exception {
17 ilm 562
        // nothing by default
563
    }
564
 
565
    /**
83 ilm 566
     * Whether this will die. If this method returns <code>true</code>, it is guaranteed that no
93 ilm 567
     * other task will be taken from the queue to be started. Note: this method doesn't return
568
     * <code>true</code> right after {@link #die()} as the method is asynchronous and if
569
     * {@link #willDie()} fails it may not die at all ; as explained in its comment you may use its
570
     * returned future to wait for the killing.
17 ilm 571
     *
93 ilm 572
     * @return <code>true</code> if this queue will not execute any more tasks (but it may hang
573
     *         indefinitely if the dying runnable blocks).
83 ilm 574
     * @see #isDead()
575
     */
576
    public final boolean dieCalled() {
577
        return this.tasksQueue.dieCalled();
578
    }
579
 
580
    /**
581
     * Whether this queue is dead, i.e. if die() has been called and all tasks have completed.
582
     *
583
     * @return <code>true</code> if this queue will not execute any more tasks and isn't executing
584
     *         any.
17 ilm 585
     * @see #die()
586
     */
587
    public final boolean isDead() {
588
        return this.tasksQueue.isDead();
589
    }
590
 
93 ilm 591
    /**
592
     * Allow to wait for the thread to end. Once this method returns {@link #getRunningState()} will
593
     * always return {@link RunningState#DEAD}. Useful since the future from
594
     * {@link #die(boolean, Runnable, Callable)} returns when all tasks are finished but the
595
     * {@link #getRunningState()} is still {@link RunningState#DYING} since the Thread takes a
596
     * little time to die.
597
     *
598
     * @throws InterruptedException if interrupted while waiting.
599
     * @see Thread#join()
600
     */
601
    public final void join() throws InterruptedException {
602
        this.tasksQueue.join();
603
    }
604
 
605
    public final void join(long millis, int nanos) throws InterruptedException {
606
        this.tasksQueue.join(millis, nanos);
607
    }
608
 
17 ilm 609
    public void addPropertyChangeListener(PropertyChangeListener l) {
610
        this.support.addPropertyChangeListener(l);
611
    }
612
 
613
    public void rmPropertyChangeListener(PropertyChangeListener l) {
614
        this.support.removePropertyChangeListener(l);
615
    }
616
 
617
    private final class SingleThreadedExecutor extends DropperQueue<FutureTask<?>> {
618
        private SingleThreadedExecutor() {
619
            super(SleepingQueue.this.name + System.currentTimeMillis());
620
            customizeThread(this);
621
        }
622
 
623
        @Override
624
        protected void process(FutureTask<?> task) {
625
            if (!task.isDone()) {
626
                /*
627
                 * From ThreadPoolExecutor : Track execution state to ensure that afterExecute is
628
                 * called only if task completed or threw exception. Otherwise, the caught runtime
629
                 * exception will have been thrown by afterExecute itself, in which case we don't
630
                 * want to call it again.
631
                 */
632
                boolean ran = false;
633
                beforeExecute(task);
634
                try {
635
                    task.run();
636
                    ran = true;
637
                    afterExecute(task, null);
638
                } catch (RuntimeException ex) {
639
                    if (!ran)
640
                        afterExecute(task, ex);
641
                    // don't throw ex, afterExecute() can do whatever needs to be done (like killing
642
                    // this queue)
643
                }
644
            }
645
        }
646
 
647
        protected void beforeExecute(final FutureTask<?> f) {
648
            cancelCheck(f);
649
            setBeingRun(f);
650
        }
651
 
652
        protected void afterExecute(final FutureTask<?> f, final Throwable t) {
653
            setBeingRun(null);
654
 
655
            try {
656
                f.get();
657
            } catch (CancellationException e) {
658
                // don't care
659
            } catch (InterruptedException e) {
83 ilm 660
                // f was interrupted : e.g. we're dying or f was cancelled
17 ilm 661
            } catch (ExecutionException e) {
83 ilm 662
                // f.run() raised an exception
663
                exceptionThrown(e);
17 ilm 664
            }
665
        }
666
    }
667
 
93 ilm 668
    @Override
17 ilm 669
    public String toString() {
93 ilm 670
        return this.toString(true);
17 ilm 671
    }
672
 
93 ilm 673
    public String toString(final boolean includeCurrentTask) {
674
        return super.toString() + " Queue: " + this.tasksQueue + (includeCurrentTask ? " run:" + this.getBeingRun() : "");
675
    }
676
 
17 ilm 677
}