OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 91 | Rev 156 | Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | RSS feed

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 * 
 * Copyright 2011 OpenConcerto, by ILM Informatique. All rights reserved.
 * 
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
 * language governing permissions and limitations under the License.
 * 
 * When distributing the software, include this License Header Notice in each file.
 */
 
 package org.openconcerto.utils;

import org.openconcerto.utils.cc.IClosure;
import org.openconcerto.utils.cc.IPredicate;

import java.beans.PropertyChangeListener;
import java.beans.PropertyChangeSupport;
import java.util.Collection;
import java.util.Deque;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;

import net.jcip.annotations.GuardedBy;

/**
 * A queue that can be put to sleep. Submitted runnables are converted to FutureTask, that can later
 * be cancelled.
 * 
 * @author Sylvain
 */
public class SleepingQueue {

    public static enum RunningState {
        NEW, RUNNING, WILL_DIE, DYING, DEAD
    }

    /**
     * A task that can kill a queue.
     * 
     * @author Sylvain
     * 
     * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
     */
    public static final class LethalFutureTask<V> extends FutureTask<V> {
        private final SleepingQueue q;

        public LethalFutureTask(final SleepingQueue q, final Callable<V> c) {
            super(c);
            this.q = q;
        }

        public final SleepingQueue getQueue() {
            return this.q;
        }

        @Override
        public String toString() {
            // don't includeCurrentTask as it could be us
            return this.getClass().getSimpleName() + " for " + this.getQueue().toString(false);
        }
    }

    private static final ScheduledThreadPoolExecutor exec;

    static {
        // daemon thread to allow the VM to exit
        exec = new ScheduledThreadPoolExecutor(2, new ThreadFactory("DieMonitor", true).setPriority(Thread.MIN_PRIORITY));
        // allow threads to die
        exec.setKeepAliveTime(30, TimeUnit.SECONDS);
        exec.allowCoreThreadTimeOut(true);
        exec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        exec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);

        assert exec.getPoolSize() == 0 : "Wasting resources";
    }

    public static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture) {
        return watchDying(lethalFuture, 1, 1, TimeUnit.MINUTES);
    }

    /**
     * Watch the passed future until it's done. When
     * {@link SleepingQueue#die(boolean, Runnable, Callable) killing} a queue, the currently running
     * task must first complete then the actual killing (represented by a {@link LethalFutureTask})
     * begins. This involves running methods and passed runnables which can all hang or throw an
     * exception. Therefore this method will periodically report on the status of the killing, and
     * report any exception that was thrown.
     * 
     * @param lethalFuture the killing to watch.
     * @param initialDelay the time to delay first execution.
     * @param delay the delay between the termination of one execution and the commencement of the
     *        next.
     * @param unit the time unit of the initialDelay and delay parameters.
     * @return a future representing the watching.
     */
    public static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture, final int initialDelay, final int delay, final TimeUnit unit) {
        return watchDying(lethalFuture, initialDelay, delay, unit, null);
    }

    static final ScheduledFuture<?> watchDying(final LethalFutureTask<?> lethalFuture, final int initialDelay, final int delay, final TimeUnit unit,
            final IClosure<? super ExecutionException> exnHandler) {
        // don't use fixed rate as it might burden our threads and if we just checked the status
        // while being late, there's no need to check sooner the next time.
        final AtomicReference<Future<?>> f = new AtomicReference<Future<?>>();
        final ScheduledFuture<?> res = exec.scheduleWithFixedDelay(new Runnable() {

            // lethalFuture won't kill the queue, i.e. willDie threw an exception and forceDie was
            // false.
            private void wontKill(final RunningState runningState, final boolean isDone) {
                Log.get().fine("Our watched future won't kill the queue, current state : " + runningState + " " + lethalFuture);
                if (isDone)
                    cancel();
            }

            private void cancel() {
                assert lethalFuture.isDone();
                try {
                    lethalFuture.get();
                } catch (InterruptedException e) {
                    // either we were cancelled or the executor is shutting down (i.e. the VM is
                    // terminating)
                    Log.get().log(Level.FINER, "Interrupted while waiting on a finished future " + lethalFuture, e);
                } catch (ExecutionException e) {
                    if (exnHandler == null)
                        Log.get().log(Level.WARNING, "Threw an exception : " + lethalFuture, e);
                    else
                        exnHandler.executeChecked(e);
                }
                f.get().cancel(true);
            }

            @Override
            public void run() {
                final boolean isDone;
                final RunningState runningState;
                final FutureTask<?> beingRun;
                final SleepingQueue q = lethalFuture.getQueue();
                synchronized (q) {
                    runningState = q.getRunningState();
                    beingRun = q.getBeingRun();
                    isDone = lethalFuture.isDone();
                }
                final Level l = Level.INFO;
                if (runningState == RunningState.RUNNING) {
                    // willDie threw an exception but lethalFuture might not be completely done
                    // in that case, wait for the next execution
                    wontKill(runningState, isDone);
                } else if (runningState == RunningState.WILL_DIE) {
                    if (isDone) {
                        wontKill(runningState, isDone);
                    } else if (beingRun == lethalFuture) {
                        // in willDie() method or Runnable
                        Log.get().log(l, "Pre-death has not yet finished " + lethalFuture);
                    } else {
                        Log.get().log(l, "Death has not yet begun for " + lethalFuture + "\ncurrently running : " + beingRun);
                    }
                } else if (runningState == RunningState.DYING) {
                    assert beingRun == null || beingRun instanceof LethalFutureTask;
                    if (beingRun == null) {
                        // should be dead real soon
                        // just wait for the next execution
                        assert isDone;
                        Log.get().log(l, "Death was carried out but the thread is not yet terminated. Watching " + lethalFuture);
                    } else if (beingRun == lethalFuture) {
                        // in dying() method or Callable
                        Log.get().log(l, "Post-death has not yet finished " + lethalFuture);
                    } else {
                        assert isDone;
                        wontKill(runningState, isDone);
                    }
                } else if (runningState == RunningState.DEAD) {
                    // OK
                    Log.get().log(l, "Death was carried out and the thread is terminated but not necessarily by " + lethalFuture);
                    cancel();
                } else {
                    Log.get().warning("Illegal state " + runningState + " for " + lethalFuture);
                }
            }
        }, initialDelay, delay, unit);
        f.set(res);
        return res;
    }

    private final String name;

    @GuardedBy("this")
    private RunningState state;

    private final PropertyChangeSupport support;
    @GuardedBy("this")
    private FutureTask<?> beingRun;

    private final SingleThreadedExecutor tasksQueue;
    @GuardedBy("this")
    private boolean canceling;
    @GuardedBy("this")
    private IPredicate<FutureTask<?>> cancelPredicate;

    public SleepingQueue() {
        this(SleepingQueue.class.getName() + System.currentTimeMillis());
    }

    public SleepingQueue(String name) {
        super();
        this.name = name;

        this.state = RunningState.NEW;

        this.canceling = false;
        this.cancelPredicate = null;
        this.support = new PropertyChangeSupport(this);
        this.setBeingRun(null);

        this.tasksQueue = new SingleThreadedExecutor();
    }

    public final void start() {
        synchronized (this) {
            this.tasksQueue.start();
            this.setState(RunningState.RUNNING);
            started();
        }
    }

    /**
     * Start this queue only if not already started.
     * 
     * @return <code>true</code> if the queue was started.
     */
    public final boolean startIfNew() {
        // don't use getRunningState() which calls isAlive()
        synchronized (this) {
            final boolean starting = this.state == RunningState.NEW;
            if (starting)
                this.start();
            assert this.state.compareTo(RunningState.NEW) > 0;
            return starting;
        }
    }

    protected void started() {
    }

    protected synchronized final void setState(final RunningState s) {
        this.state = s;
    }

    public synchronized final RunningState getRunningState() {
        // an Error could have stopped our thread so can't rely on this.state
        if (this.state == RunningState.NEW || this.tasksQueue.isAlive())
            return this.state;
        else
            return RunningState.DEAD;
    }

    public final boolean currentlyInQueue() {
        return Thread.currentThread() == this.tasksQueue;
    }

    /**
     * Customize the thread used to execute the passed runnables. This implementation sets the
     * priority to the minimum.
     * 
     * @param thr the thread used by this queue.
     */
    protected void customizeThread(Thread thr) {
        thr.setPriority(Thread.MIN_PRIORITY);
    }

    protected final <T> FutureTask<T> newTaskFor(final Runnable task) {
        return this.newTaskFor(task, null);
    }

    protected <T> FutureTask<T> newTaskFor(final Runnable task, T value) {
        return new IFutureTask<T>(task, value, " for {" + this.name + "}");
    }

    public final FutureTask<?> put(Runnable workRunnable) {
        // otherwise if passing a FutureTask, it will itself be wrapped in another FutureTask. The
        // outer FutureTask will call the inner one's run(), which just record any exception. So the
        // outer one's get() won't throw it and the exception will effectively be swallowed.
        final FutureTask<?> t;
        if (workRunnable instanceof FutureTask) {
            t = ((FutureTask<?>) workRunnable);
        } else {
            t = this.newTaskFor(workRunnable);
        }
        return this.execute(t);
    }

    public final <F extends FutureTask<?>> F execute(F t) {
        if (this.shallAdd(t)) {
            this.add(t);
            return t;
        } else
            return null;
    }

    private void add(FutureTask<?> t) {
        // no need to synchronize, if die() is called after our test, t won't be executed anyway
        if (this.dieCalled())
            throw new IllegalStateException("Already dead, cannot exec " + t);

        this.tasksQueue.put(t);
    }

    private final boolean shallAdd(FutureTask<?> runnable) {
        if (runnable == null)
            throw new NullPointerException("null runnable");
        try {
            this.willPut(runnable);
            return true;
        } catch (InterruptedException e) {
            // si on interrompt, ne pas ajouter
            return false;
        }
    }

    /**
     * Give subclass the ability to reject runnables.
     * 
     * @param r the runnable that is being added.
     * @throws InterruptedException if r should not be added to this queue.
     */
    protected void willPut(FutureTask<?> r) throws InterruptedException {
    }

    /**
     * An exception was thrown by a task. This implementation merely
     * {@link Exception#printStackTrace()}.
     * 
     * @param exn the exception thrown.
     */
    protected void exceptionThrown(final ExecutionException exn) {
        exn.printStackTrace();
    }

    /**
     * Cancel all queued tasks and the current task.
     */
    protected final void cancel() {
        this.cancel(null);
    }

    /**
     * Cancel only tasks for which pred is <code>true</code>.
     * 
     * @param pred a predicate to know which tasks to cancel.
     */
    protected final void cancel(final IPredicate<FutureTask<?>> pred) {
        this.tasksDo(new IClosure<Collection<FutureTask<?>>>() {
            @Override
            public void executeChecked(Collection<FutureTask<?>> tasks) {
                cancel(pred, tasks);
            }
        });
    }

    private final void cancel(IPredicate<FutureTask<?>> pred, Collection<FutureTask<?>> tasks) {
        try {
            synchronized (this) {
                this.canceling = true;
                this.cancelPredicate = pred;
                this.cancelCheck(this.getBeingRun());
            }

            for (final FutureTask<?> t : tasks) {
                this.cancelCheck(t);
            }
        } finally {
            synchronized (this) {
                this.canceling = false;
                // allow the predicate to be gc'd
                this.cancelPredicate = null;
            }
        }
    }

    public final void tasksDo(IClosure<? super Deque<FutureTask<?>>> c) {
        this.tasksQueue.itemsDo(c);
    }

    private void cancelCheck(FutureTask<?> t) {
        if (t != null)
            synchronized (this) {
                if (this.canceling && (this.cancelPredicate == null || this.cancelPredicate.evaluateChecked(t)))
                    t.cancel(true);
            }
    }

    private void setBeingRun(final FutureTask<?> beingRun) {
        final Future<?> old;
        synchronized (this) {
            old = this.beingRun;
            this.beingRun = beingRun;
        }
        this.support.firePropertyChange("beingRun", old, beingRun);
    }

    public final synchronized FutureTask<?> getBeingRun() {
        return this.beingRun;
    }

    public boolean isSleeping() {
        return this.tasksQueue.isSleeping();
    }

    public boolean setSleeping(boolean sleeping) {
        final boolean res = this.tasksQueue.setSleeping(sleeping);
        if (res) {
            this.support.firePropertyChange("sleeping", null, sleeping);
        }
        return res;
    }

    /**
     * Stops this queue. Once this method returns, it is guaranteed that no other task will be taken
     * from the queue to be started, and that this queue will die. But the already executing task
     * will complete unless it checks for interrupt.
     * 
     * @return the future killing.
     */
    public final LethalFutureTask<?> die() {
        return this.die(true, null, null);
    }

    /**
     * Stops this queue. All tasks in the queue, including the {@link #getBeingRun() currently
     * running}, will be {@link Future#cancel(boolean) cancelled}. The currently running task will
     * thus complete unless it checks for interrupt. Once the returned future completes successfully
     * then no task is executing ( {@link #isDead()} will happen sometimes later, the time for the
     * thread to terminate). If the returned future throws an exception because of the passed
     * runnables or of {@link #willDie()} or {@link #dying()}, one can check with
     * {@link #dieCalled()} to see if the queue is dying.
     * <p>
     * This method tries to limit the cases where the returned Future will not get executed : it
     * checks that this was {@link #start() started} and is not already {@link RunningState#DYING}
     * or {@link RunningState#DEAD}. It also doesn't allow {@link RunningState#WILL_DIE} as it could
     * cancel the previously passed runnables or never run the passed runnables. But even with these
     * restrictions a number of things can prevent the result from getting executed : the
     * {@link #getBeingRun() currently running} task hangs indefinitely, it throws an {@link Error}
     * ; the passed runnables hang indefinitely.
     * </p>
     * 
     * @param force <code>true</code> if this is guaranteed to die (even if <code>willDie</code> or
     *        {@link #willDie()} throw an exception).
     * @param willDie the last actions to take before killing this queue.
     * @param dying the last actions to take before this queue is dead.
     * @return the future killing, which will return <code>dying</code> result.
     * @throws IllegalStateException if the state isn't {@link RunningState#RUNNING}.
     * @see #dieCalled()
     */
    public final <V> LethalFutureTask<V> die(final boolean force, final Runnable willDie, final Callable<V> dying) throws IllegalStateException {
        synchronized (this) {
            final RunningState state = this.getRunningState();
            if (state == RunningState.NEW)
                throw new IllegalStateException("Not started");
            if (state.compareTo(RunningState.RUNNING) > 0)
                throw new IllegalStateException("die() already called or thread was killed by an Error : " + state);
            assert state == RunningState.RUNNING;
            this.setState(RunningState.WILL_DIE);
        }
        // reset sleeping to original value if die not effective
        final AtomicBoolean resetSleeping = new AtomicBoolean(false);
        final LethalFutureTask<V> res = new LethalFutureTask<V>(this, new Callable<V>() {
            @Override
            public V call() throws Exception {
                Exception willDieExn = null;
                try {
                    willDie();
                    if (willDie != null) {
                        willDie.run();
                        // handle Future like runnable, i.e. check right away for exception
                        if (willDie instanceof Future) {
                            final Future<?> f = (Future<?>) willDie;
                            assert f.isDone() : "Ran but not done: " + f;
                            try {
                                f.get();
                            } catch (ExecutionException e) {
                                throw (Exception) e.getCause();
                            }
                        }
                    }
                } catch (Exception e) {
                    if (!force) {
                        setState(RunningState.RUNNING);
                        throw e;
                    } else {
                        willDieExn = e;
                    }
                }
                try {
                    // don't interrupt ourselves
                    SleepingQueue.this.tasksQueue.die(false);
                    assert SleepingQueue.this.tasksQueue.isDying();
                    setState(RunningState.DYING);
                    // since there's already been an exception, throw it as soon as possible
                    // also dying() might itself throw an exception for the same reason or we now
                    // have 2 exceptions to throw
                    if (willDieExn != null)
                        throw willDieExn;
                    dying();
                    final V res;
                    if (dying != null)
                        res = dying.call();
                    else
                        res = null;

                    return res;
                } finally {
                    // if die is effective, this won't have any consequences
                    if (resetSleeping.get())
                        SleepingQueue.this.tasksQueue.setSleeping(true);
                }
            }
        });
        // die as soon as possible not after all currently queued tasks
        this.tasksQueue.itemsDo(new IClosure<Deque<FutureTask<?>>>() {
            @Override
            public void executeChecked(Deque<FutureTask<?>> input) {
                // since we cancel the current task, we might as well remove all of them since they
                // might depend on the cancelled one
                // cancel removed tasks so that callers of get() don't wait forever
                for (final FutureTask<?> ft : input) {
                    // by definition tasks in the queue aren't executing, so interrupt parameter is
                    // useless. On the other hand cancel() might return false if already cancelled.
                    ft.cancel(false);
                }
                input.clear();

                input.addFirst(res);
                // die as soon as possible, even if there's a long task already running
                final FutureTask<?> beingRun = getBeingRun();
                // since we hold the lock on items
                assert beingRun != res : "beingRun: " + beingRun + " ; res: " + res;
                if (beingRun != null)
                    beingRun.cancel(true);
            }
        });
        // force execution of our task
        resetSleeping.set(this.setSleeping(false));
        return res;
    }

    protected void willDie() {
        // nothing by default
    }

    protected void dying() throws Exception {
        // nothing by default
    }

    /**
     * Whether this will die. If this method returns <code>true</code>, it is guaranteed that no
     * other task will be taken from the queue to be started. Note: this method doesn't return
     * <code>true</code> right after {@link #die()} as the method is asynchronous and if
     * {@link #willDie()} fails it may not die at all ; as explained in its comment you may use its
     * returned future to wait for the killing.
     * 
     * @return <code>true</code> if this queue will not execute any more tasks (but it may hang
     *         indefinitely if the dying runnable blocks).
     * @see #isDead()
     */
    public final boolean dieCalled() {
        return this.tasksQueue.dieCalled();
    }

    /**
     * Whether this queue is dead, i.e. if die() has been called and all tasks have completed.
     * 
     * @return <code>true</code> if this queue will not execute any more tasks and isn't executing
     *         any.
     * @see #die()
     */
    public final boolean isDead() {
        return this.tasksQueue.isDead();
    }

    /**
     * Allow to wait for the thread to end. Once this method returns {@link #getRunningState()} will
     * always return {@link RunningState#DEAD}. Useful since the future from
     * {@link #die(boolean, Runnable, Callable)} returns when all tasks are finished but the
     * {@link #getRunningState()} is still {@link RunningState#DYING} since the Thread takes a
     * little time to die.
     * 
     * @throws InterruptedException if interrupted while waiting.
     * @see Thread#join()
     */
    public final void join() throws InterruptedException {
        this.tasksQueue.join();
    }

    public final void join(long millis, int nanos) throws InterruptedException {
        this.tasksQueue.join(millis, nanos);
    }

    public void addPropertyChangeListener(PropertyChangeListener l) {
        this.support.addPropertyChangeListener(l);
    }

    public void rmPropertyChangeListener(PropertyChangeListener l) {
        this.support.removePropertyChangeListener(l);
    }

    private final class SingleThreadedExecutor extends DropperQueue<FutureTask<?>> {
        private SingleThreadedExecutor() {
            super(SleepingQueue.this.name + System.currentTimeMillis());
            customizeThread(this);
        }

        @Override
        protected void process(FutureTask<?> task) {
            if (!task.isDone()) {
                /*
                 * From ThreadPoolExecutor : Track execution state to ensure that afterExecute is
                 * called only if task completed or threw exception. Otherwise, the caught runtime
                 * exception will have been thrown by afterExecute itself, in which case we don't
                 * want to call it again.
                 */
                boolean ran = false;
                beforeExecute(task);
                try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    // don't throw ex, afterExecute() can do whatever needs to be done (like killing
                    // this queue)
                }
            }
        }

        protected void beforeExecute(final FutureTask<?> f) {
            cancelCheck(f);
            setBeingRun(f);
        }

        protected void afterExecute(final FutureTask<?> f, final Throwable t) {
            setBeingRun(null);

            try {
                f.get();
            } catch (CancellationException e) {
                // don't care
            } catch (InterruptedException e) {
                // f was interrupted : e.g. we're dying or f was cancelled
            } catch (ExecutionException e) {
                // f.run() raised an exception
                exceptionThrown(e);
            }
        }
    }

    @Override
    public String toString() {
        return this.toString(true);
    }

    public String toString(final boolean includeCurrentTask) {
        return super.toString() + " Queue: " + this.tasksQueue + (includeCurrentTask ? " run:" + this.getBeingRun() : "");
    }

}