OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 17 | Rev 61 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
17 ilm 1
/*
2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3
 *
4
 * Copyright 2011 OpenConcerto, by ILM Informatique. All rights reserved.
5
 *
6
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
7
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
8
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
9
 * language governing permissions and limitations under the License.
10
 *
11
 * When distributing the software, include this License Header Notice in each file.
12
 */
13
 
14
 package org.openconcerto.utils;
15
 
16
import org.openconcerto.utils.cc.IClosure;
17
import org.openconcerto.utils.cc.IPredicate;
18
 
19
import java.beans.PropertyChangeListener;
20
import java.beans.PropertyChangeSupport;
21
import java.util.Collection;
22
import java.util.concurrent.BlockingDeque;
23
import java.util.concurrent.CancellationException;
24
import java.util.concurrent.ExecutionException;
25
import java.util.concurrent.Future;
26
import java.util.concurrent.FutureTask;
27
 
28
/**
29
 * A queue that can be put to sleep. Submitted runnables are converted to FutureTask, that can later
30
 * be cancelled.
31
 *
32
 * @author Sylvain
33
 */
34
public class SleepingQueue {
35
 
36
    private final String name;
37
 
38
    private final PropertyChangeSupport support;
39
    private FutureTask<?> beingRun;
40
 
41
    private final SingleThreadedExecutor tasksQueue;
42
    private boolean canceling;
43
    private IPredicate<FutureTask<?>> cancelPredicate;
44
 
45
    public SleepingQueue() {
46
        this(SleepingQueue.class.getName() + System.currentTimeMillis());
47
    }
48
 
49
    public SleepingQueue(String name) {
50
        super();
51
        this.name = name;
52
 
53
        this.canceling = false;
54
        this.cancelPredicate = null;
55
        this.support = new PropertyChangeSupport(this);
56
        this.setBeingRun(null);
57
 
58
        this.tasksQueue = new SingleThreadedExecutor();
59
 
60
        this.tasksQueue.start();
61
    }
62
 
63
    /**
64
     * Customize the thread used to execute the passed runnables. This implementation sets the
65
     * priority to the minimum.
66
     *
67
     * @param thr the thread used by this queue.
68
     */
69
    protected void customizeThread(Thread thr) {
70
        thr.setPriority(Thread.MIN_PRIORITY);
71
    }
72
 
73
    public final FutureTask<?> put(Runnable workRunnable) {
74
        if (this.shallAdd(workRunnable)) {
75
            final IFutureTask<Object> t = this.tasksQueue.newTaskFor(workRunnable);
76
            this.add(t);
77
            return t;
78
        } else
79
            return null;
80
 
81
    }
82
 
83
    public final <F extends FutureTask<?>> F execute(F t) {
84
        if (this.shallAdd(t)) {
85
            this.add(t);
86
            return t;
87
        } else
88
            return null;
89
    }
90
 
91
    private void add(FutureTask<?> t) {
92
        // no need to synchronize, if die() is called after our test, t won't be executed anyway
93
        if (this.isDead())
41 ilm 94
            throw new IllegalStateException("Already dead, cannot exec " + t);
17 ilm 95
 
96
        this.tasksQueue.put(t);
97
    }
98
 
99
    private final boolean shallAdd(Runnable runnable) {
100
        if (runnable == null)
101
            throw new NullPointerException("null runnable");
102
        try {
103
            this.willPut(runnable);
104
            return true;
105
        } catch (InterruptedException e) {
106
            // si on interrompt, ne pas ajouter
107
            return false;
108
        }
109
    }
110
 
111
    /**
112
     * Give subclass the ability to reject runnables.
113
     *
114
     * @param r the runnable that is being added.
115
     * @throws InterruptedException if r should not be added to this queue.
116
     */
117
    protected void willPut(Runnable r) throws InterruptedException {
118
    }
119
 
120
    /**
121
     * Cancel all queued tasks and the current task.
122
     */
123
    protected final void cancel() {
124
        this.cancel(null);
125
    }
126
 
127
    /**
128
     * Cancel only tasks for which pred is <code>true</code>.
129
     *
130
     * @param pred a predicate to know which tasks to cancel.
131
     */
132
    protected final void cancel(final IPredicate<FutureTask<?>> pred) {
133
        this.tasksDo(new IClosure<Collection<FutureTask<?>>>() {
134
            @Override
135
            public void executeChecked(Collection<FutureTask<?>> tasks) {
136
                cancel(pred, tasks);
137
            }
138
        });
139
    }
140
 
141
    private final void cancel(IPredicate<FutureTask<?>> pred, Collection<FutureTask<?>> tasks) {
142
        try {
143
            synchronized (this) {
144
                this.canceling = true;
145
                this.cancelPredicate = pred;
146
                this.cancelCheck(this.getBeingRun());
147
            }
148
 
149
            for (final FutureTask<?> t : tasks) {
150
                this.cancelCheck(t);
151
            }
152
        } finally {
153
            synchronized (this) {
154
                this.canceling = false;
155
                // allow the predicate to be gc'd
156
                this.cancelPredicate = null;
157
            }
158
        }
159
    }
160
 
161
    public final void tasksDo(IClosure<? super BlockingDeque<FutureTask<?>>> c) {
162
        this.tasksQueue.itemsDo(c);
163
    }
164
 
165
    private void cancelCheck(FutureTask<?> t) {
166
        if (t != null)
167
            synchronized (this) {
168
                if (this.canceling && (this.cancelPredicate == null || this.cancelPredicate.evaluateChecked(t)))
169
                    t.cancel(true);
170
            }
171
    }
172
 
173
    private void setBeingRun(final FutureTask<?> beingRun) {
174
        final Future old;
175
        synchronized (this) {
176
            old = this.beingRun;
177
            this.beingRun = beingRun;
178
        }
179
        this.support.firePropertyChange("beingRun", old, beingRun);
180
    }
181
 
182
    protected final synchronized FutureTask<?> getBeingRun() {
183
        return this.beingRun;
184
    }
185
 
186
    public boolean isSleeping() {
187
        return this.tasksQueue.isSleeping();
188
    }
189
 
190
    public void setSleeping(boolean sleeping) {
191
        if (this.tasksQueue.setSleeping(sleeping)) {
192
            this.support.firePropertyChange("sleeping", null, this.isSleeping());
193
        }
194
    }
195
 
196
    /**
197
     * Stops this queue. Once this method returns, it is guaranteed that no other task will be taken
198
     * from the queue to be started, and that this thread will die.
199
     */
200
    public final void die() {
201
        this.tasksQueue.die();
202
        this.dying();
203
    }
204
 
205
    protected void dying() {
206
        // nothing by default
207
    }
208
 
209
    /**
210
     * Whether this queue is dying, ie if die() has been called.
211
     *
212
     * @return <code>true</code> if this queue will not execute any more tasks.
213
     * @see #die()
214
     */
215
    public final boolean isDead() {
216
        return this.tasksQueue.isDead();
217
    }
218
 
219
    public void addPropertyChangeListener(PropertyChangeListener l) {
220
        this.support.addPropertyChangeListener(l);
221
    }
222
 
223
    public void rmPropertyChangeListener(PropertyChangeListener l) {
224
        this.support.removePropertyChangeListener(l);
225
    }
226
 
227
    private final class SingleThreadedExecutor extends DropperQueue<FutureTask<?>> {
228
        private SingleThreadedExecutor() {
229
            super(SleepingQueue.this.name + System.currentTimeMillis());
230
            customizeThread(this);
231
        }
232
 
233
        protected <T> IFutureTask<T> newTaskFor(final Runnable task) {
234
            return this.newTaskFor(task, null);
235
        }
236
 
237
        protected <T> IFutureTask<T> newTaskFor(final Runnable task, T value) {
238
            return new IFutureTask<T>(task, value, " for {" + SleepingQueue.this.name + "}");
239
        }
240
 
241
        @Override
242
        protected void process(FutureTask<?> task) {
243
            if (!task.isDone()) {
244
                /*
245
                 * From ThreadPoolExecutor : Track execution state to ensure that afterExecute is
246
                 * called only if task completed or threw exception. Otherwise, the caught runtime
247
                 * exception will have been thrown by afterExecute itself, in which case we don't
248
                 * want to call it again.
249
                 */
250
                boolean ran = false;
251
                beforeExecute(task);
252
                try {
253
                    task.run();
254
                    ran = true;
255
                    afterExecute(task, null);
256
                } catch (RuntimeException ex) {
257
                    if (!ran)
258
                        afterExecute(task, ex);
259
                    // don't throw ex, afterExecute() can do whatever needs to be done (like killing
260
                    // this queue)
261
                }
262
            }
263
        }
264
 
265
        protected void beforeExecute(final FutureTask<?> f) {
266
            cancelCheck(f);
267
            setBeingRun(f);
268
        }
269
 
270
        protected void afterExecute(final FutureTask<?> f, final Throwable t) {
271
            setBeingRun(null);
272
 
273
            try {
274
                f.get();
275
            } catch (CancellationException e) {
276
                // don't care
277
            } catch (InterruptedException e) {
278
                // f was interrupted : eg we're dying or f was canceled
279
            } catch (ExecutionException e) {
280
                // f.run() raised an exn
281
                e.printStackTrace();
282
            }
283
        }
284
    }
285
 
286
    public String toString() {
287
        return super.toString() + " Queue: " + this.tasksQueue + " run:" + this.getBeingRun();
288
    }
289
 
290
}