OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 93 | Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | RSS feed

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 * 
 * Copyright 2011 OpenConcerto, by ILM Informatique. All rights reserved.
 * 
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
 * language governing permissions and limitations under the License.
 * 
 * When distributing the software, include this License Header Notice in each file.
 */
 
 package org.openconcerto.utils;

import org.openconcerto.utils.cc.IClosure;

import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;

/**
 * Holds items and give them one at a time to {@link #process(Object)}. At any time this process can
 * be put on hold by setting it to sleep. All actions in a thread modifying ({@link #put(Object)},
 * {@link #itemsDo(IClosure)}) the queue happens-before the processing of this modification.
 * 
 * @author Sylvain
 * @param <T> type of item
 */
@ThreadSafe
public abstract class DropperQueue<T> extends Thread {

    @GuardedBy("itemsLock")
    private final Deque<T> items;
    private final Lock itemsLock;
    private final Condition notEmpty;
    @GuardedBy("this")
    private boolean stop;
    @GuardedBy("this")
    private boolean sleeping;
    @GuardedBy("this")
    private boolean executing;

    /**
     * Construct a new instance.
     * 
     * @param name name of this thread.
     */
    public DropperQueue(String name) {
        super(name);
        this.items = new LinkedList<T>();
        this.stop = false;
        this.sleeping = false;
        this.executing = false;
        this.itemsLock = new ReentrantLock();
        this.notEmpty = this.itemsLock.newCondition();
    }

    // *** boolean

    /**
     * Whether this queue should stop temporarily.
     * 
     * @param b <code>true</code> to put this to sleep, <code>false</code> to wake it.
     * @return <code>true</code> if sleeping has changed.
     */
    public synchronized final boolean setSleeping(boolean b) {
        if (this.sleeping != b) {
            this.sleeping = b;
            this.signalChange();
            return true;
        } else
            return false;
    }

    public synchronized boolean isSleeping() {
        return this.sleeping;
    }

    private synchronized void signalChange() {
        this.signalChange(false);
    }

    private synchronized void signalChange(boolean signalClosure) {
        // interrompt la thread, si elle est dans le await() ou le take()
        // elle recheckera les booleens, ATTN elle peut-être en attente du lock juste après take(),
        // c'est pourquoi on efface le flag avant process()
        // en général pas besoin d'interrompre la closure, puisque tant qu'on l'exécute
        // on ne peut, ni on n'a besoin de prendre depuis la queue
        if (signalClosure || !this.isExecuting())
            this.interrupt();
    }

    private synchronized void setExecuting(boolean b) {
        this.executing = b;
    }

    private synchronized boolean isExecuting() {
        return this.executing;
    }

    private void await() throws InterruptedException {
        synchronized (this) {
            if (this.sleeping) {
                this.wait();
            }
        }
    }

    /**
     * Signal that this thread must stop indefinitely. This method interrupts
     * {@link #process(Object)}.
     * 
     * @see #die(boolean)
     */
    public final void die() {
        this.die(true);
    }

    /**
     * Signal that this thread must stop indefinitely. Once this method returns, it is guaranteed
     * that no new item will be processed, and that this thread will {@link #isDead() die}. But if
     * this thread is currently {@link #process(Object) processing} an item, then the method will
     * finish normally if :
     * <ul>
     * <li><code>mayInterruptIfRunning</code> is <code>false</code></li>
     * <li><code>mayInterruptIfRunning</code> is <code>true</code> but the {@link #interrupt()}
     * isn't checked by the implementing subclass</li>
     * </ul>
     * 
     * @param mayInterruptIfRunning <code>true</code> to interrupt while in {@link #process(Object)}
     * @see #isDying()
     */
    public synchronized final void die(boolean mayInterruptIfRunning) {
        this.stop = true;
        this.signalChange(mayInterruptIfRunning);
    }

    /**
     * Whether this queue is dying.
     * 
     * @return <code>true</code> if {@link #die(boolean)} has been called and
     *         {@link #process(Object)} is still executing.
     * @see #isDead()
     */
    public synchronized final boolean isDying() {
        return this.dieCalled() && this.isExecuting();
    }

    /**
     * Whether this queue is active.
     * 
     * @return <code>true</code> if {@link #process(Object)} isn't executed and won't ever be again.
     * @see #isDying()
     */
    public final boolean isDead() {
        // either we're dead because die() has been called, or because process() threw an Error
        return this.getState().equals(State.TERMINATED);
    }

    public synchronized final boolean dieCalled() {
        return this.stop;
    }

    // *** Run

    @Override
    public void run() {
        while (!this.dieCalled()) {
            try {
                this.await();
                final T item;
                // lockInterruptibly() to avoid taking another item after being put to sleep
                this.itemsLock.lockInterruptibly();
                try {
                    while (this.items.isEmpty())
                        this.notEmpty.await();
                    item = this.items.removeFirst();
                } finally {
                    this.itemsLock.unlock();
                }
                this.setExecuting(true);
                // we should not carry the interrupted status in process()
                // we only use it to stop waiting and check variables again, but if we're here we
                // have removed an item and must process it
                Thread.interrupted();
                process(item);
            } catch (InterruptedException e) {
                // rien a faire, on recommence la boucle
            } catch (RuntimeException e) {
                e.printStackTrace();
                // une exn s'est produite, on considère qu'on peut passer à la suite
            } finally {
                this.setExecuting(false);
            }
        }
    }

    abstract protected void process(final T item);

    // *** items

    /**
     * Adds an item to this queue. Actions in the thread prior to calling this method happen-before
     * the passed argument is {@link #process(Object) processed}.
     * 
     * @param item the item to add.
     */
    public final void put(T item) {
        this.itemsLock.lock();
        try {
            this.items.add(item);
            this.notEmpty.signal();
        } finally {
            this.itemsLock.unlock();
        }
    }

    public final void eachItemDo(final IClosure<T> c) {
        this.itemsDo((items) -> {
            for (final T t : items) {
                c.executeChecked(t);
            }
            return null;
        });
    }

    /**
     * Allows <code>c</code> to arbitrarily modify our queue as it is locked during this method.
     * I.e. no items will be removed (passed to the closure) nor added.
     * 
     * @param c what to do with our queue.
     */
    public final void itemsDo(IClosure<? super Deque<T>> c) {
        this.itemsDo((q) -> {
            c.executeChecked(q);
            return null;
        });
    }

    public final <R> R itemsDo(Function<? super Deque<T>, R> c) {
        this.itemsLock.lock();
        try {
            final R res = c.apply(this.items);
            if (!this.items.isEmpty())
                this.notEmpty.signal();
            return res;
        } finally {
            this.itemsLock.unlock();
        }
    }

}