OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 156 | Rev 180 | Go to most recent revision | Show entire file | Regard whitespace | Details | Blame | Last modification | View Log | RSS feed

Rev 156 Rev 177
Line 22... Line 22...
22
import java.util.Collection;
22
import java.util.Collection;
23
import java.util.Deque;
23
import java.util.Deque;
24
import java.util.concurrent.Callable;
24
import java.util.concurrent.Callable;
25
import java.util.concurrent.CancellationException;
25
import java.util.concurrent.CancellationException;
26
import java.util.concurrent.ExecutionException;
26
import java.util.concurrent.ExecutionException;
-
 
27
import java.util.concurrent.Executor;
27
import java.util.concurrent.Future;
28
import java.util.concurrent.Future;
28
import java.util.concurrent.FutureTask;
29
import java.util.concurrent.FutureTask;
-
 
30
import java.util.concurrent.RunnableFuture;
29
import java.util.concurrent.ScheduledFuture;
31
import java.util.concurrent.ScheduledFuture;
30
import java.util.concurrent.ScheduledThreadPoolExecutor;
32
import java.util.concurrent.ScheduledThreadPoolExecutor;
31
import java.util.concurrent.TimeUnit;
33
import java.util.concurrent.TimeUnit;
32
import java.util.concurrent.atomic.AtomicBoolean;
34
import java.util.concurrent.atomic.AtomicBoolean;
33
import java.util.concurrent.atomic.AtomicReference;
35
import java.util.concurrent.atomic.AtomicReference;
34
import java.util.logging.Level;
36
import java.util.logging.Level;
35
 
37
 
36
import net.jcip.annotations.GuardedBy;
38
import net.jcip.annotations.GuardedBy;
37
 
39
 
38
/**
40
/**
39
 * A queue that can be put to sleep. Submitted runnables are converted to FutureTask, that can later
41
 * A queue that can be put to sleep. Submitted runnables are converted to RunnableFuture, that can
40
 * be cancelled.
42
 * later be cancelled.
41
 * 
43
 * 
42
 * @author Sylvain
44
 * @author Sylvain
43
 */
45
 */
44
public class SleepingQueue {
46
public class SleepingQueue implements Executor {
45
 
47
 
46
    public static enum RunningState {
48
    public static enum RunningState {
47
        NEW, RUNNING, WILL_DIE, DYING, DEAD
49
        NEW, RUNNING, WILL_DIE, DYING, DEAD
48
    }
50
    }
49
 
51
 
Line 144... Line 146...
144
 
146
 
145
            @Override
147
            @Override
146
            public void run() {
148
            public void run() {
147
                final boolean isDone;
149
                final boolean isDone;
148
                final RunningState runningState;
150
                final RunningState runningState;
149
                final FutureTask<?> beingRun;
151
                final RunnableFuture<?> beingRun;
150
                final SleepingQueue q = lethalFuture.getQueue();
152
                final SleepingQueue q = lethalFuture.getQueue();
151
                synchronized (q) {
153
                synchronized (q) {
152
                    runningState = q.getRunningState();
154
                    runningState = q.getRunningState();
153
                    beingRun = q.getBeingRun();
155
                    beingRun = q.getBeingRun();
154
                    isDone = lethalFuture.isDone();
156
                    isDone = lethalFuture.isDone();
Line 199... Line 201...
199
    @GuardedBy("this")
201
    @GuardedBy("this")
200
    private RunningState state;
202
    private RunningState state;
201
 
203
 
202
    private final PropertyChangeSupport support;
204
    private final PropertyChangeSupport support;
203
    @GuardedBy("this")
205
    @GuardedBy("this")
204
    private FutureTask<?> beingRun;
206
    private RunnableFuture<?> beingRun;
205
 
207
 
206
    private final SingleThreadedExecutor tasksQueue;
208
    private final SingleThreadedExecutor tasksQueue;
207
    @GuardedBy("this")
209
    @GuardedBy("this")
208
    private boolean canceling;
210
    private boolean canceling;
209
    @GuardedBy("this")
211
    @GuardedBy("this")
210
    private IPredicate<FutureTask<?>> cancelPredicate;
212
    private IPredicate<? super RunnableFuture<?>> cancelPredicate;
211
 
213
 
212
    public SleepingQueue() {
214
    public SleepingQueue() {
213
        this(SleepingQueue.class.getName() + System.currentTimeMillis());
215
        this(SleepingQueue.class.getName() + System.currentTimeMillis());
214
    }
216
    }
215
 
217
 
Line 285... Line 287...
285
     */
287
     */
286
    protected void customizeThread(Thread thr) {
288
    protected void customizeThread(Thread thr) {
287
        thr.setPriority(Thread.MIN_PRIORITY);
289
        thr.setPriority(Thread.MIN_PRIORITY);
288
    }
290
    }
289
 
291
 
290
    protected final <T> FutureTask<T> newTaskFor(final Runnable task) {
292
    protected final <T> RunnableFuture<T> newTaskFor(final Runnable task) {
291
        return this.newTaskFor(task, null);
293
        return this.newTaskFor(task, null);
292
    }
294
    }
293
 
295
 
294
    protected <T> FutureTask<T> newTaskFor(final Runnable task, T value) {
296
    protected <T> RunnableFuture<T> newTaskFor(final Runnable task, T value) {
295
        return new IFutureTask<T>(task, value, " for {" + this.name + "}");
297
        return new IFutureTask<T>(task, value, " for {" + this.name + "}");
296
    }
298
    }
297
 
299
 
298
    public final FutureTask<?> put(Runnable workRunnable) {
300
    public final RunnableFuture<?> put(Runnable workRunnable) {
-
 
301
        /*
299
        // otherwise if passing a FutureTask, it will itself be wrapped in another FutureTask. The
302
         * Otherwise if passing a RunnableFuture, it will itself be wrapped in another
300
        // outer FutureTask will call the inner one's run(), which just record any exception. So the
303
         * RunnableFuture. The outer RunnableFuture will call the inner one's run(), which just
301
        // outer one's get() won't throw it and the exception will effectively be swallowed.
304
         * record any exception. So the outer one's get() won't throw it and the exception will
-
 
305
         * effectively be swallowed.
-
 
306
         */
302
        final FutureTask<?> t;
307
        final RunnableFuture<?> t;
303
        if (workRunnable instanceof FutureTask) {
308
        if (workRunnable instanceof RunnableFuture) {
304
            t = ((FutureTask<?>) workRunnable);
309
            t = ((RunnableFuture<?>) workRunnable);
305
        } else {
310
        } else {
306
            t = this.newTaskFor(workRunnable);
311
            t = this.newTaskFor(workRunnable);
307
        }
312
        }
308
        return this.execute(t);
313
        return this.add(t);
309
    }
314
    }
310
 
315
 
311
    public final <F extends FutureTask<?>> F execute(F t) {
-
 
312
        if (this.shallAdd(t)) {
-
 
313
            this.add(t);
316
    @Override
314
            return t;
317
    public final void execute(Runnable command) {
315
        } else
-
 
316
            return null;
318
        this.put(command);
317
    }
319
    }
318
 
320
 
319
    private void add(FutureTask<?> t) {
321
    public final <F extends RunnableFuture<?>> F add(F t) {
-
 
322
        if (this.shallAdd(t)) {
320
        // no need to synchronize, if die() is called after our test, t won't be executed anyway
323
            // no need to synchronize, if die() is called after our test, t won't be executed anyway
321
        if (this.dieCalled())
324
            if (this.dieCalled())
322
            throw new IllegalStateException("Already dead, cannot exec " + t);
325
                throw new IllegalStateException("Already dead, cannot exec " + t);
323
 
326
 
324
        this.tasksQueue.put(t);
327
            this.tasksQueue.put(t);
-
 
328
            return t;
-
 
329
        } else {
-
 
330
            return null;
-
 
331
        }
325
    }
332
    }
326
 
333
 
327
    private final boolean shallAdd(FutureTask<?> runnable) {
334
    private final boolean shallAdd(RunnableFuture<?> runnable) {
328
        if (runnable == null)
335
        if (runnable == null)
329
            throw new NullPointerException("null runnable");
336
            throw new NullPointerException("null runnable");
330
        try {
337
        try {
331
            this.willPut(runnable);
338
            this.willPut(runnable);
332
            return true;
339
            return true;
Line 340... Line 347...
340
     * Give subclass the ability to reject runnables.
347
     * Give subclass the ability to reject runnables.
341
     * 
348
     * 
342
     * @param r the runnable that is being added.
349
     * @param r the runnable that is being added.
343
     * @throws InterruptedException if r should not be added to this queue.
350
     * @throws InterruptedException if r should not be added to this queue.
344
     */
351
     */
345
    protected void willPut(FutureTask<?> r) throws InterruptedException {
352
    protected void willPut(RunnableFuture<?> r) throws InterruptedException {
346
    }
353
    }
347
 
354
 
348
    /**
355
    /**
349
     * An exception was thrown by a task. This implementation uses
356
     * An exception was thrown by a task. This implementation uses
350
     * {@link Thread#getUncaughtExceptionHandler()} or
357
     * {@link Thread#getUncaughtExceptionHandler()} or
Line 376... Line 383...
376
    /**
383
    /**
377
     * Cancel only tasks for which pred is <code>true</code>.
384
     * Cancel only tasks for which pred is <code>true</code>.
378
     * 
385
     * 
379
     * @param pred a predicate to know which tasks to cancel.
386
     * @param pred a predicate to know which tasks to cancel.
380
     */
387
     */
381
    protected final void cancel(final IPredicate<FutureTask<?>> pred) {
388
    protected final void cancel(final IPredicate<? super RunnableFuture<?>> pred) {
382
        this.tasksDo(new IClosure<Collection<FutureTask<?>>>() {
389
        this.tasksDo(new IClosure<Collection<RunnableFuture<?>>>() {
383
            @Override
390
            @Override
384
            public void executeChecked(Collection<FutureTask<?>> tasks) {
391
            public void executeChecked(Collection<RunnableFuture<?>> tasks) {
385
                cancel(pred, tasks);
392
                cancel(pred, tasks);
386
            }
393
            }
387
        });
394
        });
388
    }
395
    }
389
 
396
 
390
    private final void cancel(IPredicate<FutureTask<?>> pred, Collection<FutureTask<?>> tasks) {
397
    private final void cancel(IPredicate<? super RunnableFuture<?>> pred, Collection<RunnableFuture<?>> tasks) {
391
        try {
398
        try {
392
            synchronized (this) {
399
            synchronized (this) {
393
                this.canceling = true;
400
                this.canceling = true;
394
                this.cancelPredicate = pred;
401
                this.cancelPredicate = pred;
395
                this.cancelCheck(this.getBeingRun());
402
                this.cancelCheck(this.getBeingRun());
396
            }
403
            }
397
 
404
 
398
            for (final FutureTask<?> t : tasks) {
405
            for (final RunnableFuture<?> t : tasks) {
399
                this.cancelCheck(t);
406
                this.cancelCheck(t);
400
            }
407
            }
401
        } finally {
408
        } finally {
402
            synchronized (this) {
409
            synchronized (this) {
403
                this.canceling = false;
410
                this.canceling = false;
Line 405... Line 412...
405
                this.cancelPredicate = null;
412
                this.cancelPredicate = null;
406
            }
413
            }
407
        }
414
        }
408
    }
415
    }
409
 
416
 
410
    public final void tasksDo(IClosure<? super Deque<FutureTask<?>>> c) {
417
    public final void tasksDo(IClosure<? super Deque<RunnableFuture<?>>> c) {
411
        this.tasksQueue.itemsDo(c);
418
        this.tasksQueue.itemsDo(c);
412
    }
419
    }
413
 
420
 
414
    private void cancelCheck(FutureTask<?> t) {
421
    private void cancelCheck(RunnableFuture<?> t) {
415
        if (t != null)
422
        if (t != null)
416
            synchronized (this) {
423
            synchronized (this) {
417
                if (this.canceling && (this.cancelPredicate == null || this.cancelPredicate.evaluateChecked(t)))
424
                if (this.canceling && (this.cancelPredicate == null || this.cancelPredicate.evaluateChecked(t)))
418
                    t.cancel(true);
425
                    t.cancel(true);
419
            }
426
            }
420
    }
427
    }
421
 
428
 
422
    private void setBeingRun(final FutureTask<?> beingRun) {
429
    private void setBeingRun(final RunnableFuture<?> beingRun) {
423
        final Future<?> old;
430
        final Future<?> old;
424
        synchronized (this) {
431
        synchronized (this) {
425
            old = this.beingRun;
432
            old = this.beingRun;
426
            this.beingRun = beingRun;
433
            this.beingRun = beingRun;
427
        }
434
        }
428
        this.support.firePropertyChange("beingRun", old, beingRun);
435
        this.support.firePropertyChange("beingRun", old, beingRun);
429
    }
436
    }
430
 
437
 
431
    public final synchronized FutureTask<?> getBeingRun() {
438
    public final synchronized RunnableFuture<?> getBeingRun() {
432
        return this.beingRun;
439
        return this.beingRun;
433
    }
440
    }
434
 
441
 
435
    public boolean isSleeping() {
442
    public boolean isSleeping() {
436
        return this.tasksQueue.isSleeping();
443
        return this.tasksQueue.isSleeping();
Line 544... Line 551...
544
                        SleepingQueue.this.tasksQueue.setSleeping(true);
551
                        SleepingQueue.this.tasksQueue.setSleeping(true);
545
                }
552
                }
546
            }
553
            }
547
        });
554
        });
548
        // die as soon as possible not after all currently queued tasks
555
        // die as soon as possible not after all currently queued tasks
549
        this.tasksQueue.itemsDo(new IClosure<Deque<FutureTask<?>>>() {
556
        this.tasksQueue.itemsDo(new IClosure<Deque<RunnableFuture<?>>>() {
550
            @Override
557
            @Override
551
            public void executeChecked(Deque<FutureTask<?>> input) {
558
            public void executeChecked(Deque<RunnableFuture<?>> input) {
552
                // since we cancel the current task, we might as well remove all of them since they
559
                // since we cancel the current task, we might as well remove all of them since they
553
                // might depend on the cancelled one
560
                // might depend on the cancelled one
554
                // cancel removed tasks so that callers of get() don't wait forever
561
                // cancel removed tasks so that callers of get() don't wait forever
555
                for (final FutureTask<?> ft : input) {
562
                for (final RunnableFuture<?> ft : input) {
556
                    // by definition tasks in the queue aren't executing, so interrupt parameter is
563
                    // by definition tasks in the queue aren't executing, so interrupt parameter is
557
                    // useless. On the other hand cancel() might return false if already cancelled.
564
                    // useless. On the other hand cancel() might return false if already cancelled.
558
                    ft.cancel(false);
565
                    ft.cancel(false);
559
                }
566
                }
560
                input.clear();
567
                input.clear();
561
 
568
 
562
                input.addFirst(res);
569
                input.addFirst(res);
563
                // die as soon as possible, even if there's a long task already running
570
                // die as soon as possible, even if there's a long task already running
564
                final FutureTask<?> beingRun = getBeingRun();
571
                final RunnableFuture<?> beingRun = getBeingRun();
565
                // since we hold the lock on items
572
                // since we hold the lock on items
566
                assert beingRun != res : "beingRun: " + beingRun + " ; res: " + res;
573
                assert beingRun != res : "beingRun: " + beingRun + " ; res: " + res;
567
                if (beingRun != null)
574
                if (beingRun != null)
568
                    beingRun.cancel(true);
575
                    beingRun.cancel(true);
569
            }
576
            }
Line 631... Line 638...
631
 
638
 
632
    public void rmPropertyChangeListener(PropertyChangeListener l) {
639
    public void rmPropertyChangeListener(PropertyChangeListener l) {
633
        this.support.removePropertyChangeListener(l);
640
        this.support.removePropertyChangeListener(l);
634
    }
641
    }
635
 
642
 
636
    private final class SingleThreadedExecutor extends DropperQueue<FutureTask<?>> {
643
    private final class SingleThreadedExecutor extends DropperQueue<RunnableFuture<?>> {
637
        private SingleThreadedExecutor() {
644
        private SingleThreadedExecutor() {
638
            super(SleepingQueue.this.name + System.currentTimeMillis());
645
            super(SleepingQueue.this.name + System.currentTimeMillis());
639
        }
646
        }
640
 
647
 
641
        @Override
648
        @Override
642
        protected void process(FutureTask<?> task) {
649
        protected void process(RunnableFuture<?> task) {
643
            if (!task.isDone()) {
650
            if (!task.isDone()) {
644
                /*
651
                /*
645
                 * From ThreadPoolExecutor : Track execution state to ensure that afterExecute is
652
                 * From ThreadPoolExecutor : Track execution state to ensure that afterExecute is
646
                 * called only if task completed or threw exception. Otherwise, the caught runtime
653
                 * called only if task completed or threw exception. Otherwise, the caught runtime
647
                 * exception will have been thrown by afterExecute itself, in which case we don't
654
                 * exception will have been thrown by afterExecute itself, in which case we don't
Line 660... Line 667...
660
                    // this queue)
667
                    // this queue)
661
                }
668
                }
662
            }
669
            }
663
        }
670
        }
664
 
671
 
665
        protected void beforeExecute(final FutureTask<?> f) {
672
        protected void beforeExecute(final RunnableFuture<?> f) {
666
            cancelCheck(f);
673
            cancelCheck(f);
667
            setBeingRun(f);
674
            setBeingRun(f);
668
        }
675
        }
669
 
676
 
670
        protected void afterExecute(final FutureTask<?> f, final Throwable t) {
677
        protected void afterExecute(final RunnableFuture<?> f, final Throwable t) {
671
            setBeingRun(null);
678
            setBeingRun(null);
672
 
679
 
673
            try {
680
            try {
674
                f.get();
681
                f.get();
675
            } catch (CancellationException e) {
682
            } catch (CancellationException e) {