OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 156 | Blame | Compare with Previous | Last modification | View Log | RSS feed

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 * 
 * Copyright 2011 OpenConcerto, by ILM Informatique. All rights reserved.
 * 
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
 * language governing permissions and limitations under the License.
 * 
 * When distributing the software, include this License Header Notice in each file.
 */
 
 package org.openconcerto.sql.view.list;

import org.openconcerto.sql.Log;
import org.openconcerto.sql.model.SQLRow;
import org.openconcerto.sql.model.SQLRowValues;
import org.openconcerto.sql.model.SQLRowValues.CreateMode;
import org.openconcerto.sql.model.SQLRowValuesCluster.State;
import org.openconcerto.sql.model.SQLTable;
import org.openconcerto.sql.model.SQLTableEvent;
import org.openconcerto.sql.model.SQLTableModifiedListener;
import org.openconcerto.sql.model.graph.Link.Direction;
import org.openconcerto.sql.model.graph.Path;
import org.openconcerto.sql.view.list.UpdateRunnable.RmAllRunnable;
import org.openconcerto.sql.view.list.search.SearchOne;
import org.openconcerto.sql.view.list.search.SearchOne.Mode;
import org.openconcerto.sql.view.list.search.SearchQueue;
import org.openconcerto.sql.view.list.search.SearchQueue.SetStateRunnable;
import org.openconcerto.utils.CollectionUtils;
import org.openconcerto.utils.ListMap;
import org.openconcerto.utils.RecursionType;
import org.openconcerto.utils.SleepingQueue;
import org.openconcerto.utils.Tuple2;
import org.openconcerto.utils.cc.IClosure;
import org.openconcerto.utils.cc.IPredicate;
import org.openconcerto.utils.cc.ITransformer;

import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.RunnableFuture;
import java.util.logging.Level;

import net.jcip.annotations.GuardedBy;

public final class UpdateQueue extends SleepingQueue {

    /**
     * Whether the passed future performs an update.
     * 
     * @param f a task in this queue, can be <code>null</code>.
     * @return <code>true</code> if <code>f</code> loads from the db.
     */
    static boolean isUpdate(RunnableFuture<?> f) {
        return isUpdate(SearchQueue.getRunnable(f));
    }

    static boolean isUpdate(Runnable r) {
        return r instanceof UpdateRunnable;
    }

    private static boolean isCancelableUpdate(Runnable r) {
        // don't cancel RmAll so we can put an UpdateAll right after it (the UpdateAll won't be
        // executed since RmAll put the queue to sleep)
        return isUpdate(r) && !(r instanceof RmAllRunnable);
    }

    private final class TableListener implements SQLTableModifiedListener, PropertyChangeListener {
        @Override
        public void tableModified(SQLTableEvent evt) {
            if (UpdateQueue.this.alwaysUpdateAll)
                putUpdateAll();
            else if (evt.getMode() == SQLTableEvent.Mode.ROW_UPDATED) {
                rowModified(evt);
            } else {
                rowAddedOrDeleted(evt);
            }
        }

        @Override
        public void propertyChange(PropertyChangeEvent evt) {
            // where changed
            stateChanged(null, getModel().getReq().createState());
        }
    }

    private final ITableModel tableModel;
    // thread-confined
    private SQLTableModelSourceState state;
    @GuardedBy("itself")
    private final List<ListSQLLine> fullList;
    @GuardedBy("fullList")
    private SQLTableModelColumns columns;
    private final TableListener tableListener;
    // TODO rm : needed for now since our optimizations are false if there's a where not on the
    // primary table, see http://192.168.1.10:3000/issues/show/22
    private boolean alwaysUpdateAll = false;
    private final IClosure<Deque<RunnableFuture<?>>> cancelClosure;

    public UpdateQueue(ITableModel model) {
        super(UpdateQueue.class.getSimpleName() + " on " + model);
        this.tableModel = model;
        this.fullList = new ArrayList<ListSQLLine>();
        this.cancelClosure = createCancelClosure(this, new ITransformer<RunnableFuture<?>, TaskType>() {
            @Override
            public TaskType transformChecked(RunnableFuture<?> input) {
                final Runnable r = SearchQueue.getRunnable(input);
                if (isCancelableUpdate(r))
                    return TaskType.COMPUTE;
                else if (r instanceof SetStateRunnable)
                    return TaskType.SET_STATE;
                else
                    return TaskType.USER;
            }
        });
        this.tableListener = new TableListener();
    }

    private final ITableModel getModel() {
        return this.tableModel;
    }

    @Override
    protected void started() {
        // savoir quand les tables qu'on affiche changent
        addSourceListener();
        stateChanged(null, this.getModel().getReq().createState());
        // Only starts once there's something to search, that way the runnable passed to
        // ITableModel.search() will be meaningful
        // SetStateRunnable since this must not be cancelled by an updateAll, but it shouldn't
        // prevent earlier updates to be cancelled
        this.put(new SetStateRunnable() {
            @Override
            public void run() {
                getModel().startSearchQueue();
            }
        });
    }

    @Override
    protected void dying() throws Exception {
        super.dying();
        assert currentlyInQueue();

        // only kill searchQ once updateQ is really dead, otherwise the currently executing
        // update might finish once the searchQ is already dead.

        final SearchQueue searchQueue = getModel().getSearchQueue();
        // state cannot change since we only change it in this thread (except for an Error being
        // thrown and killing the queue)
        final RunningState state = searchQueue.getRunningState();
        // not started or there was an Error
        if (state == RunningState.NEW || state == RunningState.DEAD)
            return;
        if (state == RunningState.WILL_DIE || state == RunningState.DYING)
            throw new IllegalStateException("Someone else already called die()");
        try {
            searchQueue.die().get();
        } catch (Exception e) {
            if (searchQueue.getRunningState() != RunningState.DEAD)
                throw e;
            // there was an Error in the last run task or while in die(), but it's OK we wanted the
            // queue dead
            Log.get().log(Level.CONFIG, "Exception while killing search queue", e);
        }
        assert searchQueue.getRunningState().compareTo(RunningState.DYING) >= 0;
        searchQueue.join();
        assert searchQueue.getRunningState() == RunningState.DEAD;
    }

    final List<ListSQLLine> getFullList() {
        return this.fullList;
    }

    public final Tuple2<List<ListSQLLine>, SQLTableModelColumns> copyFullList() {
        final Tuple2<List<ListSQLLine>, SQLTableModelColumns> res;
        final List<ListSQLLine> fullList = this.getFullList();
        synchronized (fullList) {
            res = Tuple2.<List<ListSQLLine>, SQLTableModelColumns> create(new ArrayList<ListSQLLine>(fullList), this.columns);
        }
        return res;
    }

    public final ListSQLLine getLine(final Number id) {
        final ListSQLLine res;
        final List<ListSQLLine> fullList = this.getFullList();
        synchronized (fullList) {
            res = ListSQLLine.fromID(fullList, id.intValue());
        }
        return res;
    }

    /**
     * The lines and their path affected by a change of the passed row.
     * 
     * @param r the row that has changed.
     * @return the refreshed lines and their changed paths.
     */
    protected final ListMap<ListSQLLine, Path> getAffectedLines(final SQLRow r) {
        return this.getAffected(r, new ListMap<ListSQLLine, Path>(), true);
    }

    protected final ListMap<Path, ListSQLLine> getAffectedPaths(final SQLRow r) {
        return this.getAffected(r, new ListMap<Path, ListSQLLine>(), false);
    }

    // must be called from within this queue, as this method use fullList
    private <K, V> ListMap<K, V> getAffected(final SQLRow r, final ListMap<K, V> res, final boolean byLine) {
        final List<ListSQLLine> fullList = this.getFullList();
        synchronized (fullList) {
            final SQLTable t = r.getTable();
            final int id = r.getID();
            if (id < SQLRow.MIN_VALID_ID)
                throw new IllegalArgumentException("invalid ID: " + id);
            if (!fullList.isEmpty()) {
                final SQLRowValues proto = this.getState().getReq().getGraphToFetch();
                final List<Path> pathsToT = new ArrayList<Path>();
                proto.getGraph().walk(proto, pathsToT, new ITransformer<State<List<Path>>, List<Path>>() {
                    @Override
                    public List<Path> transformChecked(State<List<Path>> input) {
                        if (input.getCurrent().getTable() == t) {
                            input.getAcc().add(input.getPath());
                        }
                        return input.getAcc();
                    }
                }, RecursionType.BREADTH_FIRST, Direction.ANY);
                for (final Path p : pathsToT) {
                    final String lastReferentField = SearchQueue.getLastReferentField(p);
                    for (final ListSQLLine line : fullList) {
                        boolean put = false;
                        for (final SQLRowValues current : line.getRow().followPath(p, CreateMode.CREATE_NONE, false)) {
                            // works for rowValues w/o any ID
                            if (current != null && current.getID() == id) {
                                put = true;
                            }
                        }
                        // if the modified row isn't in the existing line, it might still affect it
                        // if it's a referent row insertion
                        if (!put && lastReferentField != null && r.exists() && !r.isForeignEmpty(lastReferentField)) {
                            // no NPE, even without an undefined ID since we tested isForeignEmpty()
                            final int foreignID = r.getInt(lastReferentField);
                            for (final SQLRowValues current : line.getRow().followPath(p.minusLast(), CreateMode.CREATE_NONE, false)) {
                                if (current.getID() == foreignID) {
                                    put = true;
                                }
                            }
                        }
                        if (put) {
                            // add to the list of paths that have been refreshed
                            add(byLine, res, p, line);
                        }
                    }
                }
            }
        }
        return res;
    }

    @SuppressWarnings("unchecked")
    <V, K> void add(boolean byLine, ListMap<K, V> res, final Path p, final ListSQLLine line) {
        if (byLine)
            res.add((K) line, (V) p);
        else
            res.add((K) p, (V) line);
    }

    final void setFullList(final List<ListSQLLine> tmp, final SQLTableModelColumns cols) {
        final List<ListSQLLine> fullList = this.getFullList();
        synchronized (fullList) {
            fullList.clear();
            fullList.addAll(tmp);
            // MAYBE only sort() if it can't be done by the SELECT
            // but comparing ints (field ORDRE) is quite fast : 170ms for 100,000 items
            Collections.sort(fullList);
            if (cols != null)
                this.columns = cols;
        }
        this.tableModel.getSearchQueue().fullListChanged();
    }

    final void reorder(final List<Integer> idsOrder) {
        final List<ListSQLLine> fullList = this.getFullList();
        synchronized (fullList) {
            for (final ListSQLLine l : fullList) {
                final Number newOrder;
                if (idsOrder == null) {
                    newOrder = null;
                } else {
                    final int index = idsOrder.indexOf(l.getID());
                    if (index < 0)
                        throw new IllegalArgumentException("Missing id " + l.getID() + " in " + idsOrder);
                    newOrder = index;
                }
                l.setOrder(newOrder);
            }
            Collections.sort(fullList);
        }
        this.tableModel.getSearchQueue().orderChanged();
    }

    // vals can be null if we're removing a referent row
    final void updateLine(ListSQLLine line, Path p, int valsID, SQLRowValues vals) {
        final Set<Integer> modifiedCols = line.loadAt(valsID, vals, p);
        this.tableModel.getSearchQueue().changeFullList(line.getID(), line, modifiedCols, SearchOne.Mode.CHANGE);
    }

    final ListSQLLine replaceLine(final int id, final ListSQLLine newLine) {
        final Mode mode;
        final List<ListSQLLine> fullList = this.getFullList();
        final ListSQLLine oldLine;
        synchronized (fullList) {
            final int modifiedIndex = ListSQLLine.indexFromID(fullList, id);
            oldLine = modifiedIndex < 0 ? null : fullList.get(modifiedIndex);

            if (modifiedIndex < 0) {
                // la ligne n'était dans notre liste
                if (newLine != null) {
                    // mais elle existe : ajout
                    // ATTN on ajoute à la fin, sans se soucier de l'ordre
                    fullList.add(newLine);
                    Collections.sort(fullList);
                    mode = Mode.ADD;
                } else {
                    // et elle n'y est toujours pas
                    mode = Mode.NO_CHANGE;
                }
            } else {
                // la ligne était dans notre liste
                if (newLine != null) {
                    // mettre à jour
                    fullList.set(modifiedIndex, newLine);
                    Collections.sort(fullList);
                    mode = Mode.CHANGE;
                } else {
                    // elle est effacée ou filtrée
                    fullList.remove(modifiedIndex);
                    mode = Mode.REMOVE;
                }
            }
        }

        // notify search queue
        this.tableModel.getSearchQueue().changeFullList(id, newLine, null, mode);
        return oldLine;
    }

    public final int getFullListSize() {
        final List<ListSQLLine> fullList = this.getFullList();
        synchronized (fullList) {
            return fullList.size();
        }
    }

    void setAlwaysUpdateAll(boolean b) {
        this.alwaysUpdateAll = b;
    }

    // *** listeners

    void stateChanged(final SQLTableModelSourceState beforeState, final SQLTableModelSourceState afterState) {
        if (afterState == null)
            throw new NullPointerException("Null state");

        // As in SearchQueue :
        // needs to be 2 different runnables, that way if the source is changed and then the table
        // is updated : the queue would naively contain setState, updateAll, updateAll and thus we
        // can cancel one updateAll. Whereas if the setState was contained in updateAll, we
        // couldn't cancel it.
        // use tasksDo() so that no other runnable can come between setState and updateAll.
        // Otherwise an updateOne might use new columns and add a line with different columns than
        // the full list.
        this.tasksDo(new IClosure<Deque<RunnableFuture<?>>>() {
            @Override
            public void executeChecked(Deque<RunnableFuture<?>> input) {
                put(new SetStateRunnable() {
                    @Override
                    public void run() {
                        setState(afterState);
                    }
                });
                // TODO if request didn't change and the new graph is smaller, copy and prune the
                // rows
                putUpdateAll();
            }
        });
    }

    protected final void setState(final SQLTableModelSourceState newState) {
        if (this.state != null)
            this.rmTableListener();
        this.state = newState;
        if (this.state != null)
            this.addTableListener();
    }

    protected final SQLTableModelSourceState getState() {
        assert this.currentlyInQueue();
        if (this.state == null)
            throw new IllegalStateException("Not yet started");
        return this.state;
    }

    @Override
    protected void willDie() {
        this.rmTableListener();
        this.removeSourceListener();
        super.willDie();
    }

    protected final void addTableListener() {
        this.getState().getReq().addTableListener(this.tableListener);
    }

    private void addSourceListener() {
        this.tableModel.getLinesSource().addListener(this.tableListener);
    }

    protected final void rmTableListener() {
        this.getState().getReq().removeTableListener(this.tableListener);
    }

    private void removeSourceListener() {
        this.tableModel.getLinesSource().rmListener(this.tableListener);
    }

    // *** une des tables que l'on affiche a changé

    void rowModified(final SQLTableEvent evt) {
        final int id = evt.getId();
        if (id < SQLRow.MIN_VALID_ID) {
            this.putUpdateAll();
        } else if (CollectionUtils.containsAny(this.tableModel.getReq().getLineFields(), evt.getFields())) {
            this.put(evt);
        }
        // si on n'affiche pas le champ ignorer
    }

    // takes 1-2ms, perhaps cache
    final Set<SQLTable> getNotForeignTables() {
        final Set<SQLTable> res = new HashSet<SQLTable>();
        final SQLRowValues maxGraph = this.tableModel.getReq().getMaxGraph();
        maxGraph.getGraph().walk(maxGraph, res, new ITransformer<State<Set<SQLTable>>, Set<SQLTable>>() {
            @Override
            public Set<SQLTable> transformChecked(State<Set<SQLTable>> input) {
                if (input.getPath().length() == 0 || input.isBackwards())
                    input.getAcc().add(input.getCurrent().getTable());
                return input.getAcc();
            }
        }, RecursionType.BREADTH_FIRST, Direction.ANY);
        return res;
    }

    void rowAddedOrDeleted(final SQLTableEvent evt) {
        if (evt.getId() < SQLRow.MIN_VALID_ID)
            this.putUpdateAll();
        // if a row of a table that we point to is added, we will care when the referent table will
        // point to it
        else if (this.getNotForeignTables().contains(evt.getTable()))
            this.put(evt);
    }

    // *** puts

    public final void putExternalUpdated(final String externalID, final IPredicate<ListSQLLine> affectedPredicate) {
        this.put(new Runnable() {
            @Override
            public void run() {
                externalUpdated(externalID, affectedPredicate);
            }
        });
    }

    protected final void externalUpdated(final String externalID, final IPredicate<ListSQLLine> affectedPredicate) {
        final List<ListSQLLine> fullList = this.getFullList();
        synchronized (fullList) {
            final Set<Integer> indexes = new HashSet<Integer>();
            int i = 0;
            for (final SQLTableModelColumn col : this.columns.getAllColumns()) {
                if (col.getUsedExternals().contains(externalID)) {
                    indexes.add(i);
                }
                i++;
            }
            if (indexes.isEmpty()) {
                Log.get().log(Level.INFO, "No columns use " + externalID + " in " + this);
                return;
            }

            for (final ListSQLLine line : fullList) {
                if (affectedPredicate.evaluateChecked(line)) {
                    this.tableModel.getSearchQueue().changeFullList(line.getID(), line, indexes, SearchOne.Mode.CHANGE);
                }
            }
        }
    }

    private void put(SQLTableEvent evt) {
        this.put(UpdateRunnable.create(this.tableModel, evt));
    }

    public void putUpdateAll() {
        this.put(UpdateRunnable.create(this.tableModel));
    }

    /**
     * If this is sleeping, empty the list and call {@link #putUpdateAll()} so that the list reload
     * itself when this wakes up.
     * 
     * @throws IllegalStateException if not sleeping.
     */
    void putRemoveAll() {
        if (!this.isSleeping())
            throw new IllegalStateException("not sleeping");
        // no user runnables can come between the RmAll and the UpdateAll since runnableAdded()
        // is blocked by our lock, so there won't be any incoherence for them
        this.put(UpdateRunnable.createRmAll(this, this.tableModel));
        this.setSleeping(false);
        // reload the empty list when waking up
        this.putUpdateAll();
    }

    @Override
    protected void willPut(final RunnableFuture<?> qr) throws InterruptedException {
        if (SearchQueue.getRunnable(qr) instanceof ChangeAllRunnable) {
            // si on met tout à jour, ne sert à rien de garder les maj précédentes.
            this.tasksDo(this.cancelClosure);
        }
    }

    static public enum TaskType {
        USER(false, true), COMPUTE(true, false), SET_STATE(false, false);

        private final boolean cancelable, dependsOnPrevious;

        private TaskType(boolean cancelable, boolean dependsOnPrevious) {
            this.cancelable = cancelable;
            this.dependsOnPrevious = dependsOnPrevious;
        }
    }

    static public final IClosure<Deque<RunnableFuture<?>>> createCancelClosure(final SleepingQueue q, final ITransformer<? super RunnableFuture<?>, TaskType> cancelablePred) {
        return new IClosure<Deque<RunnableFuture<?>>>() {
            @Override
            public void executeChecked(final Deque<RunnableFuture<?>> tasks) {
                // on part de la fin et on supprime toutes les maj jusqu'a ce qu'on trouve
                // un runnable qui n'est pas annulable
                final Iterator<RunnableFuture<?>> iter = tasks.descendingIterator();
                boolean needsPrevious = false;
                while (iter.hasNext() && !needsPrevious) {
                    final RunnableFuture<?> current = iter.next();
                    final TaskType type = cancelablePred.transformChecked(current);
                    needsPrevious = type.dependsOnPrevious;
                    if (type.cancelable)
                        iter.remove();
                }
                // if we stop only because we ran out of items, continue with beingRun
                if (!needsPrevious) {
                    // before trying to cancel being run we should have been through all the backlog
                    assert !iter.hasNext();
                    final RunnableFuture<?> br = q.getBeingRun();
                    if (br != null && cancelablePred.transformChecked(br).cancelable) {
                        // might already be done by now, but it's OK cancel() will just return false
                        br.cancel(true);
                    }
                }
            }
        };
    }

}