Line 44... |
Line 44... |
44 |
import java.util.Deque;
|
44 |
import java.util.Deque;
|
45 |
import java.util.HashSet;
|
45 |
import java.util.HashSet;
|
46 |
import java.util.Iterator;
|
46 |
import java.util.Iterator;
|
47 |
import java.util.List;
|
47 |
import java.util.List;
|
48 |
import java.util.Set;
|
48 |
import java.util.Set;
|
49 |
import java.util.concurrent.FutureTask;
|
49 |
import java.util.concurrent.RunnableFuture;
|
50 |
import java.util.logging.Level;
|
50 |
import java.util.logging.Level;
|
51 |
|
51 |
|
52 |
import net.jcip.annotations.GuardedBy;
|
52 |
import net.jcip.annotations.GuardedBy;
|
53 |
|
53 |
|
54 |
public final class UpdateQueue extends SleepingQueue {
|
54 |
public final class UpdateQueue extends SleepingQueue {
|
Line 57... |
Line 57... |
57 |
* Whether the passed future performs an update.
|
57 |
* Whether the passed future performs an update.
|
58 |
*
|
58 |
*
|
59 |
* @param f a task in this queue, can be <code>null</code>.
|
59 |
* @param f a task in this queue, can be <code>null</code>.
|
60 |
* @return <code>true</code> if <code>f</code> loads from the db.
|
60 |
* @return <code>true</code> if <code>f</code> loads from the db.
|
61 |
*/
|
61 |
*/
|
62 |
static boolean isUpdate(FutureTask<?> f) {
|
62 |
static boolean isUpdate(RunnableFuture<?> f) {
|
63 |
return isUpdate(SearchQueue.getRunnable(f));
|
63 |
return isUpdate(SearchQueue.getRunnable(f));
|
64 |
}
|
64 |
}
|
65 |
|
65 |
|
66 |
static boolean isUpdate(Runnable r) {
|
66 |
static boolean isUpdate(Runnable r) {
|
67 |
return r instanceof UpdateRunnable;
|
67 |
return r instanceof UpdateRunnable;
|
Line 101... |
Line 101... |
101 |
private SQLTableModelColumns columns;
|
101 |
private SQLTableModelColumns columns;
|
102 |
private final TableListener tableListener;
|
102 |
private final TableListener tableListener;
|
103 |
// TODO rm : needed for now since our optimizations are false if there's a where not on the
|
103 |
// TODO rm : needed for now since our optimizations are false if there's a where not on the
|
104 |
// primary table, see http://192.168.1.10:3000/issues/show/22
|
104 |
// primary table, see http://192.168.1.10:3000/issues/show/22
|
105 |
private boolean alwaysUpdateAll = false;
|
105 |
private boolean alwaysUpdateAll = false;
|
106 |
private final IClosure<Deque<FutureTask<?>>> cancelClosure;
|
106 |
private final IClosure<Deque<RunnableFuture<?>>> cancelClosure;
|
107 |
|
107 |
|
108 |
public UpdateQueue(ITableModel model) {
|
108 |
public UpdateQueue(ITableModel model) {
|
109 |
super(UpdateQueue.class.getSimpleName() + " on " + model);
|
109 |
super(UpdateQueue.class.getSimpleName() + " on " + model);
|
110 |
this.tableModel = model;
|
110 |
this.tableModel = model;
|
111 |
this.fullList = new ArrayList<ListSQLLine>();
|
111 |
this.fullList = new ArrayList<ListSQLLine>();
|
112 |
this.cancelClosure = createCancelClosure(this, new ITransformer<FutureTask<?>, TaskType>() {
|
112 |
this.cancelClosure = createCancelClosure(this, new ITransformer<RunnableFuture<?>, TaskType>() {
|
113 |
@Override
|
113 |
@Override
|
114 |
public TaskType transformChecked(FutureTask<?> input) {
|
114 |
public TaskType transformChecked(RunnableFuture<?> input) {
|
115 |
final Runnable r = SearchQueue.getRunnable(input);
|
115 |
final Runnable r = SearchQueue.getRunnable(input);
|
116 |
if (isCancelableUpdate(r))
|
116 |
if (isCancelableUpdate(r))
|
117 |
return TaskType.COMPUTE;
|
117 |
return TaskType.COMPUTE;
|
118 |
else if (r instanceof SetStateRunnable)
|
118 |
else if (r instanceof SetStateRunnable)
|
119 |
return TaskType.SET_STATE;
|
119 |
return TaskType.SET_STATE;
|
Line 375... |
Line 375... |
375 |
// can cancel one updateAll. Whereas if the setState was contained in updateAll, we
|
375 |
// can cancel one updateAll. Whereas if the setState was contained in updateAll, we
|
376 |
// couldn't cancel it.
|
376 |
// couldn't cancel it.
|
377 |
// use tasksDo() so that no other runnable can come between setState and updateAll.
|
377 |
// use tasksDo() so that no other runnable can come between setState and updateAll.
|
378 |
// Otherwise an updateOne might use new columns and add a line with different columns than
|
378 |
// Otherwise an updateOne might use new columns and add a line with different columns than
|
379 |
// the full list.
|
379 |
// the full list.
|
380 |
this.tasksDo(new IClosure<Deque<FutureTask<?>>>() {
|
380 |
this.tasksDo(new IClosure<Deque<RunnableFuture<?>>>() {
|
381 |
@Override
|
381 |
@Override
|
382 |
public void executeChecked(Deque<FutureTask<?>> input) {
|
382 |
public void executeChecked(Deque<RunnableFuture<?>> input) {
|
383 |
put(new SetStateRunnable() {
|
383 |
put(new SetStateRunnable() {
|
384 |
@Override
|
384 |
@Override
|
385 |
public void run() {
|
385 |
public void run() {
|
386 |
setState(afterState);
|
386 |
setState(afterState);
|
387 |
}
|
387 |
}
|
Line 526... |
Line 526... |
526 |
// reload the empty list when waking up
|
526 |
// reload the empty list when waking up
|
527 |
this.putUpdateAll();
|
527 |
this.putUpdateAll();
|
528 |
}
|
528 |
}
|
529 |
|
529 |
|
530 |
@Override
|
530 |
@Override
|
531 |
protected void willPut(final FutureTask<?> qr) throws InterruptedException {
|
531 |
protected void willPut(final RunnableFuture<?> qr) throws InterruptedException {
|
532 |
if (SearchQueue.getRunnable(qr) instanceof ChangeAllRunnable) {
|
532 |
if (SearchQueue.getRunnable(qr) instanceof ChangeAllRunnable) {
|
533 |
// si on met tout à jour, ne sert à rien de garder les maj précédentes.
|
533 |
// si on met tout à jour, ne sert à rien de garder les maj précédentes.
|
534 |
this.tasksDo(this.cancelClosure);
|
534 |
this.tasksDo(this.cancelClosure);
|
535 |
}
|
535 |
}
|
536 |
}
|
536 |
}
|
Line 544... |
Line 544... |
544 |
this.cancelable = cancelable;
|
544 |
this.cancelable = cancelable;
|
545 |
this.dependsOnPrevious = dependsOnPrevious;
|
545 |
this.dependsOnPrevious = dependsOnPrevious;
|
546 |
}
|
546 |
}
|
547 |
}
|
547 |
}
|
548 |
|
548 |
|
549 |
static public final IClosure<Deque<FutureTask<?>>> createCancelClosure(final SleepingQueue q, final ITransformer<? super FutureTask<?>, TaskType> cancelablePred) {
|
549 |
static public final IClosure<Deque<RunnableFuture<?>>> createCancelClosure(final SleepingQueue q, final ITransformer<? super RunnableFuture<?>, TaskType> cancelablePred) {
|
550 |
return new IClosure<Deque<FutureTask<?>>>() {
|
550 |
return new IClosure<Deque<RunnableFuture<?>>>() {
|
551 |
@Override
|
551 |
@Override
|
552 |
public void executeChecked(final Deque<FutureTask<?>> tasks) {
|
552 |
public void executeChecked(final Deque<RunnableFuture<?>> tasks) {
|
553 |
// on part de la fin et on supprime toutes les maj jusqu'a ce qu'on trouve
|
553 |
// on part de la fin et on supprime toutes les maj jusqu'a ce qu'on trouve
|
554 |
// un runnable qui n'est pas annulable
|
554 |
// un runnable qui n'est pas annulable
|
555 |
final Iterator<FutureTask<?>> iter = tasks.descendingIterator();
|
555 |
final Iterator<RunnableFuture<?>> iter = tasks.descendingIterator();
|
556 |
boolean needsPrevious = false;
|
556 |
boolean needsPrevious = false;
|
557 |
while (iter.hasNext() && !needsPrevious) {
|
557 |
while (iter.hasNext() && !needsPrevious) {
|
558 |
final FutureTask<?> current = iter.next();
|
558 |
final RunnableFuture<?> current = iter.next();
|
559 |
final TaskType type = cancelablePred.transformChecked(current);
|
559 |
final TaskType type = cancelablePred.transformChecked(current);
|
560 |
needsPrevious = type.dependsOnPrevious;
|
560 |
needsPrevious = type.dependsOnPrevious;
|
561 |
if (type.cancelable)
|
561 |
if (type.cancelable)
|
562 |
iter.remove();
|
562 |
iter.remove();
|
563 |
}
|
563 |
}
|
564 |
// if we stop only because we ran out of items, continue with beingRun
|
564 |
// if we stop only because we ran out of items, continue with beingRun
|
565 |
if (!needsPrevious) {
|
565 |
if (!needsPrevious) {
|
566 |
// before trying to cancel being run we should have been through all the backlog
|
566 |
// before trying to cancel being run we should have been through all the backlog
|
567 |
assert !iter.hasNext();
|
567 |
assert !iter.hasNext();
|
568 |
final FutureTask<?> br = q.getBeingRun();
|
568 |
final RunnableFuture<?> br = q.getBeingRun();
|
569 |
if (br != null && cancelablePred.transformChecked(br).cancelable) {
|
569 |
if (br != null && cancelablePred.transformChecked(br).cancelable) {
|
570 |
// might already be done by now, but it's OK cancel() will just return false
|
570 |
// might already be done by now, but it's OK cancel() will just return false
|
571 |
br.cancel(true);
|
571 |
br.cancel(true);
|
572 |
}
|
572 |
}
|
573 |
}
|
573 |
}
|