OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 156 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
17 ilm 1
/*
2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3
 *
4
 * Copyright 2011 OpenConcerto, by ILM Informatique. All rights reserved.
5
 *
6
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
7
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
8
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
9
 * language governing permissions and limitations under the License.
10
 *
11
 * When distributing the software, include this License Header Notice in each file.
12
 */
13
 
14
 package org.openconcerto.sql.view.list;
15
 
93 ilm 16
import org.openconcerto.sql.Log;
73 ilm 17
import org.openconcerto.sql.model.SQLRow;
18
import org.openconcerto.sql.model.SQLRowValues;
93 ilm 19
import org.openconcerto.sql.model.SQLRowValues.CreateMode;
73 ilm 20
import org.openconcerto.sql.model.SQLRowValuesCluster.State;
17 ilm 21
import org.openconcerto.sql.model.SQLTable;
22
import org.openconcerto.sql.model.SQLTableEvent;
83 ilm 23
import org.openconcerto.sql.model.SQLTableModifiedListener;
80 ilm 24
import org.openconcerto.sql.model.graph.Link.Direction;
93 ilm 25
import org.openconcerto.sql.model.graph.Path;
17 ilm 26
import org.openconcerto.sql.view.list.UpdateRunnable.RmAllRunnable;
93 ilm 27
import org.openconcerto.sql.view.list.search.SearchOne;
28
import org.openconcerto.sql.view.list.search.SearchOne.Mode;
29
import org.openconcerto.sql.view.list.search.SearchQueue;
30
import org.openconcerto.sql.view.list.search.SearchQueue.SetStateRunnable;
142 ilm 31
import org.openconcerto.utils.CollectionUtils;
93 ilm 32
import org.openconcerto.utils.ListMap;
73 ilm 33
import org.openconcerto.utils.RecursionType;
17 ilm 34
import org.openconcerto.utils.SleepingQueue;
93 ilm 35
import org.openconcerto.utils.Tuple2;
17 ilm 36
import org.openconcerto.utils.cc.IClosure;
93 ilm 37
import org.openconcerto.utils.cc.IPredicate;
73 ilm 38
import org.openconcerto.utils.cc.ITransformer;
17 ilm 39
 
40
import java.beans.PropertyChangeEvent;
41
import java.beans.PropertyChangeListener;
93 ilm 42
import java.util.ArrayList;
43
import java.util.Collections;
61 ilm 44
import java.util.Deque;
73 ilm 45
import java.util.HashSet;
93 ilm 46
import java.util.Iterator;
47
import java.util.List;
73 ilm 48
import java.util.Set;
177 ilm 49
import java.util.concurrent.RunnableFuture;
93 ilm 50
import java.util.logging.Level;
17 ilm 51
 
93 ilm 52
import net.jcip.annotations.GuardedBy;
53
 
54
public final class UpdateQueue extends SleepingQueue {
17 ilm 55
 
56
    /**
57
     * Whether the passed future performs an update.
58
     *
59
     * @param f a task in this queue, can be <code>null</code>.
60
     * @return <code>true</code> if <code>f</code> loads from the db.
61
     */
177 ilm 62
    static boolean isUpdate(RunnableFuture<?> f) {
93 ilm 63
        return isUpdate(SearchQueue.getRunnable(f));
17 ilm 64
    }
65
 
93 ilm 66
    static boolean isUpdate(Runnable r) {
67
        return r instanceof UpdateRunnable;
68
    }
69
 
70
    private static boolean isCancelableUpdate(Runnable r) {
17 ilm 71
        // don't cancel RmAll so we can put an UpdateAll right after it (the UpdateAll won't be
72
        // executed since RmAll put the queue to sleep)
93 ilm 73
        return isUpdate(r) && !(r instanceof RmAllRunnable);
17 ilm 74
    }
75
 
76
    private final class TableListener implements SQLTableModifiedListener, PropertyChangeListener {
93 ilm 77
        @Override
17 ilm 78
        public void tableModified(SQLTableEvent evt) {
79
            if (UpdateQueue.this.alwaysUpdateAll)
80
                putUpdateAll();
93 ilm 81
            else if (evt.getMode() == SQLTableEvent.Mode.ROW_UPDATED) {
17 ilm 82
                rowModified(evt);
73 ilm 83
            } else {
84
                rowAddedOrDeleted(evt);
17 ilm 85
            }
86
        }
87
 
88
        @Override
89
        public void propertyChange(PropertyChangeEvent evt) {
90
            // where changed
93 ilm 91
            stateChanged(null, getModel().getReq().createState());
17 ilm 92
        }
93
    }
94
 
95
    private final ITableModel tableModel;
93 ilm 96
    // thread-confined
97
    private SQLTableModelSourceState state;
98
    @GuardedBy("itself")
99
    private final List<ListSQLLine> fullList;
100
    @GuardedBy("fullList")
101
    private SQLTableModelColumns columns;
17 ilm 102
    private final TableListener tableListener;
73 ilm 103
    // TODO rm : needed for now since our optimizations are false if there's a where not on the
104
    // primary table, see http://192.168.1.10:3000/issues/show/22
17 ilm 105
    private boolean alwaysUpdateAll = false;
177 ilm 106
    private final IClosure<Deque<RunnableFuture<?>>> cancelClosure;
17 ilm 107
 
108
    public UpdateQueue(ITableModel model) {
109
        super(UpdateQueue.class.getSimpleName() + " on " + model);
110
        this.tableModel = model;
93 ilm 111
        this.fullList = new ArrayList<ListSQLLine>();
177 ilm 112
        this.cancelClosure = createCancelClosure(this, new ITransformer<RunnableFuture<?>, TaskType>() {
93 ilm 113
            @Override
177 ilm 114
            public TaskType transformChecked(RunnableFuture<?> input) {
93 ilm 115
                final Runnable r = SearchQueue.getRunnable(input);
116
                if (isCancelableUpdate(r))
117
                    return TaskType.COMPUTE;
118
                else if (r instanceof SetStateRunnable)
119
                    return TaskType.SET_STATE;
120
                else
121
                    return TaskType.USER;
122
            }
123
        });
17 ilm 124
        this.tableListener = new TableListener();
93 ilm 125
    }
126
 
127
    private final ITableModel getModel() {
128
        return this.tableModel;
129
    }
130
 
131
    @Override
132
    protected void started() {
17 ilm 133
        // savoir quand les tables qu'on affiche changent
142 ilm 134
        addSourceListener();
93 ilm 135
        stateChanged(null, this.getModel().getReq().createState());
136
        // Only starts once there's something to search, that way the runnable passed to
137
        // ITableModel.search() will be meaningful
138
        // SetStateRunnable since this must not be cancelled by an updateAll, but it shouldn't
139
        // prevent earlier updates to be cancelled
140
        this.put(new SetStateRunnable() {
141
            @Override
142
            public void run() {
156 ilm 143
                getModel().startSearchQueue();
93 ilm 144
            }
145
        });
17 ilm 146
    }
147
 
93 ilm 148
    @Override
149
    protected void dying() throws Exception {
150
        super.dying();
151
        assert currentlyInQueue();
152
 
153
        // only kill searchQ once updateQ is really dead, otherwise the currently executing
154
        // update might finish once the searchQ is already dead.
155
 
156
        final SearchQueue searchQueue = getModel().getSearchQueue();
157
        // state cannot change since we only change it in this thread (except for an Error being
158
        // thrown and killing the queue)
159
        final RunningState state = searchQueue.getRunningState();
160
        // not started or there was an Error
161
        if (state == RunningState.NEW || state == RunningState.DEAD)
162
            return;
163
        if (state == RunningState.WILL_DIE || state == RunningState.DYING)
164
            throw new IllegalStateException("Someone else already called die()");
165
        try {
166
            searchQueue.die().get();
167
        } catch (Exception e) {
168
            if (searchQueue.getRunningState() != RunningState.DEAD)
169
                throw e;
170
            // there was an Error in the last run task or while in die(), but it's OK we wanted the
171
            // queue dead
172
            Log.get().log(Level.CONFIG, "Exception while killing search queue", e);
173
        }
174
        assert searchQueue.getRunningState().compareTo(RunningState.DYING) >= 0;
175
        searchQueue.join();
176
        assert searchQueue.getRunningState() == RunningState.DEAD;
177
    }
178
 
179
    final List<ListSQLLine> getFullList() {
180
        return this.fullList;
181
    }
182
 
183
    public final Tuple2<List<ListSQLLine>, SQLTableModelColumns> copyFullList() {
184
        final Tuple2<List<ListSQLLine>, SQLTableModelColumns> res;
185
        final List<ListSQLLine> fullList = this.getFullList();
186
        synchronized (fullList) {
187
            res = Tuple2.<List<ListSQLLine>, SQLTableModelColumns> create(new ArrayList<ListSQLLine>(fullList), this.columns);
188
        }
189
        return res;
190
    }
191
 
192
    public final ListSQLLine getLine(final Number id) {
193
        final ListSQLLine res;
194
        final List<ListSQLLine> fullList = this.getFullList();
195
        synchronized (fullList) {
196
            res = ListSQLLine.fromID(fullList, id.intValue());
197
        }
198
        return res;
199
    }
200
 
201
    /**
202
     * The lines and their path affected by a change of the passed row.
203
     *
204
     * @param r the row that has changed.
205
     * @return the refreshed lines and their changed paths.
206
     */
207
    protected final ListMap<ListSQLLine, Path> getAffectedLines(final SQLRow r) {
208
        return this.getAffected(r, new ListMap<ListSQLLine, Path>(), true);
209
    }
210
 
211
    protected final ListMap<Path, ListSQLLine> getAffectedPaths(final SQLRow r) {
212
        return this.getAffected(r, new ListMap<Path, ListSQLLine>(), false);
213
    }
214
 
215
    // must be called from within this queue, as this method use fullList
216
    private <K, V> ListMap<K, V> getAffected(final SQLRow r, final ListMap<K, V> res, final boolean byLine) {
217
        final List<ListSQLLine> fullList = this.getFullList();
218
        synchronized (fullList) {
219
            final SQLTable t = r.getTable();
220
            final int id = r.getID();
221
            if (id < SQLRow.MIN_VALID_ID)
222
                throw new IllegalArgumentException("invalid ID: " + id);
223
            if (!fullList.isEmpty()) {
142 ilm 224
                final SQLRowValues proto = this.getState().getReq().getGraphToFetch();
93 ilm 225
                final List<Path> pathsToT = new ArrayList<Path>();
226
                proto.getGraph().walk(proto, pathsToT, new ITransformer<State<List<Path>>, List<Path>>() {
227
                    @Override
228
                    public List<Path> transformChecked(State<List<Path>> input) {
229
                        if (input.getCurrent().getTable() == t) {
230
                            input.getAcc().add(input.getPath());
231
                        }
232
                        return input.getAcc();
233
                    }
234
                }, RecursionType.BREADTH_FIRST, Direction.ANY);
235
                for (final Path p : pathsToT) {
236
                    final String lastReferentField = SearchQueue.getLastReferentField(p);
237
                    for (final ListSQLLine line : fullList) {
238
                        boolean put = false;
239
                        for (final SQLRowValues current : line.getRow().followPath(p, CreateMode.CREATE_NONE, false)) {
240
                            // works for rowValues w/o any ID
241
                            if (current != null && current.getID() == id) {
242
                                put = true;
243
                            }
244
                        }
245
                        // if the modified row isn't in the existing line, it might still affect it
246
                        // if it's a referent row insertion
156 ilm 247
                        if (!put && lastReferentField != null && r.exists() && !r.isForeignEmpty(lastReferentField)) {
248
                            // no NPE, even without an undefined ID since we tested isForeignEmpty()
93 ilm 249
                            final int foreignID = r.getInt(lastReferentField);
250
                            for (final SQLRowValues current : line.getRow().followPath(p.minusLast(), CreateMode.CREATE_NONE, false)) {
251
                                if (current.getID() == foreignID) {
252
                                    put = true;
253
                                }
254
                            }
255
                        }
256
                        if (put) {
257
                            // add to the list of paths that have been refreshed
258
                            add(byLine, res, p, line);
259
                        }
260
                    }
261
                }
262
            }
263
        }
264
        return res;
265
    }
266
 
267
    @SuppressWarnings("unchecked")
268
    <V, K> void add(boolean byLine, ListMap<K, V> res, final Path p, final ListSQLLine line) {
269
        if (byLine)
270
            res.add((K) line, (V) p);
271
        else
272
            res.add((K) p, (V) line);
273
    }
274
 
275
    final void setFullList(final List<ListSQLLine> tmp, final SQLTableModelColumns cols) {
276
        final List<ListSQLLine> fullList = this.getFullList();
277
        synchronized (fullList) {
278
            fullList.clear();
279
            fullList.addAll(tmp);
280
            // MAYBE only sort() if it can't be done by the SELECT
281
            // but comparing ints (field ORDRE) is quite fast : 170ms for 100,000 items
282
            Collections.sort(fullList);
283
            if (cols != null)
284
                this.columns = cols;
285
        }
286
        this.tableModel.getSearchQueue().fullListChanged();
287
    }
288
 
142 ilm 289
    final void reorder(final List<Integer> idsOrder) {
93 ilm 290
        final List<ListSQLLine> fullList = this.getFullList();
291
        synchronized (fullList) {
292
            for (final ListSQLLine l : fullList) {
293
                final Number newOrder;
294
                if (idsOrder == null) {
295
                    newOrder = null;
296
                } else {
297
                    final int index = idsOrder.indexOf(l.getID());
298
                    if (index < 0)
299
                        throw new IllegalArgumentException("Missing id " + l.getID() + " in " + idsOrder);
300
                    newOrder = index;
301
                }
302
                l.setOrder(newOrder);
303
            }
304
            Collections.sort(fullList);
305
        }
306
        this.tableModel.getSearchQueue().orderChanged();
307
    }
308
 
309
    // vals can be null if we're removing a referent row
310
    final void updateLine(ListSQLLine line, Path p, int valsID, SQLRowValues vals) {
311
        final Set<Integer> modifiedCols = line.loadAt(valsID, vals, p);
312
        this.tableModel.getSearchQueue().changeFullList(line.getID(), line, modifiedCols, SearchOne.Mode.CHANGE);
313
    }
314
 
315
    final ListSQLLine replaceLine(final int id, final ListSQLLine newLine) {
316
        final Mode mode;
317
        final List<ListSQLLine> fullList = this.getFullList();
318
        final ListSQLLine oldLine;
319
        synchronized (fullList) {
320
            final int modifiedIndex = ListSQLLine.indexFromID(fullList, id);
321
            oldLine = modifiedIndex < 0 ? null : fullList.get(modifiedIndex);
322
 
323
            if (modifiedIndex < 0) {
324
                // la ligne n'était dans notre liste
325
                if (newLine != null) {
326
                    // mais elle existe : ajout
327
                    // ATTN on ajoute à la fin, sans se soucier de l'ordre
328
                    fullList.add(newLine);
329
                    Collections.sort(fullList);
330
                    mode = Mode.ADD;
331
                } else {
332
                    // et elle n'y est toujours pas
333
                    mode = Mode.NO_CHANGE;
334
                }
335
            } else {
336
                // la ligne était dans notre liste
337
                if (newLine != null) {
338
                    // mettre à jour
339
                    fullList.set(modifiedIndex, newLine);
340
                    Collections.sort(fullList);
341
                    mode = Mode.CHANGE;
342
                } else {
343
                    // elle est effacée ou filtrée
344
                    fullList.remove(modifiedIndex);
345
                    mode = Mode.REMOVE;
346
                }
347
            }
348
        }
349
 
350
        // notify search queue
351
        this.tableModel.getSearchQueue().changeFullList(id, newLine, null, mode);
352
        return oldLine;
353
    }
354
 
355
    public final int getFullListSize() {
356
        final List<ListSQLLine> fullList = this.getFullList();
357
        synchronized (fullList) {
358
            return fullList.size();
359
        }
360
    }
361
 
17 ilm 362
    void setAlwaysUpdateAll(boolean b) {
363
        this.alwaysUpdateAll = b;
364
    }
365
 
366
    // *** listeners
367
 
93 ilm 368
    void stateChanged(final SQLTableModelSourceState beforeState, final SQLTableModelSourceState afterState) {
369
        if (afterState == null)
370
            throw new NullPointerException("Null state");
371
 
372
        // As in SearchQueue :
373
        // needs to be 2 different runnables, that way if the source is changed and then the table
374
        // is updated : the queue would naively contain setState, updateAll, updateAll and thus we
375
        // can cancel one updateAll. Whereas if the setState was contained in updateAll, we
376
        // couldn't cancel it.
377
        // use tasksDo() so that no other runnable can come between setState and updateAll.
378
        // Otherwise an updateOne might use new columns and add a line with different columns than
379
        // the full list.
177 ilm 380
        this.tasksDo(new IClosure<Deque<RunnableFuture<?>>>() {
93 ilm 381
            @Override
177 ilm 382
            public void executeChecked(Deque<RunnableFuture<?>> input) {
93 ilm 383
                put(new SetStateRunnable() {
384
                    @Override
385
                    public void run() {
142 ilm 386
                        setState(afterState);
93 ilm 387
                    }
388
                });
389
                // TODO if request didn't change and the new graph is smaller, copy and prune the
390
                // rows
391
                putUpdateAll();
392
            }
393
        });
394
    }
395
 
142 ilm 396
    protected final void setState(final SQLTableModelSourceState newState) {
397
        if (this.state != null)
398
            this.rmTableListener();
399
        this.state = newState;
400
        if (this.state != null)
401
            this.addTableListener();
402
    }
403
 
93 ilm 404
    protected final SQLTableModelSourceState getState() {
405
        assert this.currentlyInQueue();
406
        if (this.state == null)
407
            throw new IllegalStateException("Not yet started");
408
        return this.state;
409
    }
410
 
17 ilm 411
    @Override
83 ilm 412
    protected void willDie() {
142 ilm 413
        this.rmTableListener();
414
        this.removeSourceListener();
83 ilm 415
        super.willDie();
17 ilm 416
    }
417
 
93 ilm 418
    protected final void addTableListener() {
142 ilm 419
        this.getState().getReq().addTableListener(this.tableListener);
93 ilm 420
    }
421
 
142 ilm 422
    private void addSourceListener() {
17 ilm 423
        this.tableModel.getLinesSource().addListener(this.tableListener);
424
    }
425
 
93 ilm 426
    protected final void rmTableListener() {
142 ilm 427
        this.getState().getReq().removeTableListener(this.tableListener);
93 ilm 428
    }
429
 
142 ilm 430
    private void removeSourceListener() {
17 ilm 431
        this.tableModel.getLinesSource().rmListener(this.tableListener);
432
    }
433
 
434
    // *** une des tables que l'on affiche a changé
435
 
436
    void rowModified(final SQLTableEvent evt) {
437
        final int id = evt.getId();
73 ilm 438
        if (id < SQLRow.MIN_VALID_ID) {
17 ilm 439
            this.putUpdateAll();
440
        } else if (CollectionUtils.containsAny(this.tableModel.getReq().getLineFields(), evt.getFields())) {
441
            this.put(evt);
442
        }
443
        // si on n'affiche pas le champ ignorer
444
    }
445
 
73 ilm 446
    // takes 1-2ms, perhaps cache
447
    final Set<SQLTable> getNotForeignTables() {
448
        final Set<SQLTable> res = new HashSet<SQLTable>();
449
        final SQLRowValues maxGraph = this.tableModel.getReq().getMaxGraph();
450
        maxGraph.getGraph().walk(maxGraph, res, new ITransformer<State<Set<SQLTable>>, Set<SQLTable>>() {
451
            @Override
452
            public Set<SQLTable> transformChecked(State<Set<SQLTable>> input) {
453
                if (input.getPath().length() == 0 || input.isBackwards())
454
                    input.getAcc().add(input.getCurrent().getTable());
455
                return input.getAcc();
456
            }
80 ilm 457
        }, RecursionType.BREADTH_FIRST, Direction.ANY);
73 ilm 458
        return res;
17 ilm 459
    }
460
 
73 ilm 461
    void rowAddedOrDeleted(final SQLTableEvent evt) {
462
        if (evt.getId() < SQLRow.MIN_VALID_ID)
463
            this.putUpdateAll();
464
        // if a row of a table that we point to is added, we will care when the referent table will
465
        // point to it
466
        else if (this.getNotForeignTables().contains(evt.getTable()))
467
            this.put(evt);
17 ilm 468
    }
469
 
470
    // *** puts
471
 
93 ilm 472
    public final void putExternalUpdated(final String externalID, final IPredicate<ListSQLLine> affectedPredicate) {
473
        this.put(new Runnable() {
474
            @Override
475
            public void run() {
476
                externalUpdated(externalID, affectedPredicate);
477
            }
478
        });
479
    }
480
 
481
    protected final void externalUpdated(final String externalID, final IPredicate<ListSQLLine> affectedPredicate) {
482
        final List<ListSQLLine> fullList = this.getFullList();
483
        synchronized (fullList) {
484
            final Set<Integer> indexes = new HashSet<Integer>();
485
            int i = 0;
486
            for (final SQLTableModelColumn col : this.columns.getAllColumns()) {
487
                if (col.getUsedExternals().contains(externalID)) {
488
                    indexes.add(i);
489
                }
490
                i++;
491
            }
492
            if (indexes.isEmpty()) {
493
                Log.get().log(Level.INFO, "No columns use " + externalID + " in " + this);
494
                return;
495
            }
496
 
497
            for (final ListSQLLine line : fullList) {
498
                if (affectedPredicate.evaluateChecked(line)) {
499
                    this.tableModel.getSearchQueue().changeFullList(line.getID(), line, indexes, SearchOne.Mode.CHANGE);
500
                }
501
            }
502
        }
503
    }
504
 
17 ilm 505
    private void put(SQLTableEvent evt) {
506
        this.put(UpdateRunnable.create(this.tableModel, evt));
507
    }
508
 
509
    public void putUpdateAll() {
510
        this.put(UpdateRunnable.create(this.tableModel));
511
    }
512
 
513
    /**
514
     * If this is sleeping, empty the list and call {@link #putUpdateAll()} so that the list reload
515
     * itself when this wakes up.
516
     *
517
     * @throws IllegalStateException if not sleeping.
518
     */
519
    void putRemoveAll() {
520
        if (!this.isSleeping())
521
            throw new IllegalStateException("not sleeping");
522
        // no user runnables can come between the RmAll and the UpdateAll since runnableAdded()
523
        // is blocked by our lock, so there won't be any incoherence for them
524
        this.put(UpdateRunnable.createRmAll(this, this.tableModel));
525
        this.setSleeping(false);
526
        // reload the empty list when waking up
527
        this.putUpdateAll();
528
    }
529
 
93 ilm 530
    @Override
177 ilm 531
    protected void willPut(final RunnableFuture<?> qr) throws InterruptedException {
93 ilm 532
        if (SearchQueue.getRunnable(qr) instanceof ChangeAllRunnable) {
17 ilm 533
            // si on met tout à jour, ne sert à rien de garder les maj précédentes.
93 ilm 534
            this.tasksDo(this.cancelClosure);
535
        }
536
    }
537
 
538
    static public enum TaskType {
539
        USER(false, true), COMPUTE(true, false), SET_STATE(false, false);
540
 
541
        private final boolean cancelable, dependsOnPrevious;
542
 
543
        private TaskType(boolean cancelable, boolean dependsOnPrevious) {
544
            this.cancelable = cancelable;
545
            this.dependsOnPrevious = dependsOnPrevious;
546
        }
547
    }
548
 
177 ilm 549
    static public final IClosure<Deque<RunnableFuture<?>>> createCancelClosure(final SleepingQueue q, final ITransformer<? super RunnableFuture<?>, TaskType> cancelablePred) {
550
        return new IClosure<Deque<RunnableFuture<?>>>() {
93 ilm 551
            @Override
177 ilm 552
            public void executeChecked(final Deque<RunnableFuture<?>> tasks) {
93 ilm 553
                // on part de la fin et on supprime toutes les maj jusqu'a ce qu'on trouve
554
                // un runnable qui n'est pas annulable
177 ilm 555
                final Iterator<RunnableFuture<?>> iter = tasks.descendingIterator();
93 ilm 556
                boolean needsPrevious = false;
557
                while (iter.hasNext() && !needsPrevious) {
177 ilm 558
                    final RunnableFuture<?> current = iter.next();
93 ilm 559
                    final TaskType type = cancelablePred.transformChecked(current);
560
                    needsPrevious = type.dependsOnPrevious;
561
                    if (type.cancelable)
562
                        iter.remove();
563
                }
564
                // if we stop only because we ran out of items, continue with beingRun
565
                if (!needsPrevious) {
566
                    // before trying to cancel being run we should have been through all the backlog
567
                    assert !iter.hasNext();
177 ilm 568
                    final RunnableFuture<?> br = q.getBeingRun();
93 ilm 569
                    if (br != null && cancelablePred.transformChecked(br).cancelable) {
570
                        // might already be done by now, but it's OK cancel() will just return false
571
                        br.cancel(true);
17 ilm 572
                    }
573
                }
93 ilm 574
            }
575
        };
17 ilm 576
    }
577
 
578
}