OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 93 | Go to most recent revision | Only display areas with differences | Regard whitespace | Details | Blame | Last modification | View Log | RSS feed

Rev 93 Rev 180
1
/*
1
/*
2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3
 * 
3
 * 
4
 * Copyright 2011 OpenConcerto, by ILM Informatique. All rights reserved.
4
 * Copyright 2011 OpenConcerto, by ILM Informatique. All rights reserved.
5
 * 
5
 * 
6
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
6
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
7
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
7
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
8
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
8
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
9
 * language governing permissions and limitations under the License.
9
 * language governing permissions and limitations under the License.
10
 * 
10
 * 
11
 * When distributing the software, include this License Header Notice in each file.
11
 * When distributing the software, include this License Header Notice in each file.
12
 */
12
 */
13
 
13
 
14
 package org.openconcerto.utils;
14
 package org.openconcerto.utils;
15
 
15
 
16
import org.openconcerto.utils.cc.IClosure;
16
import org.openconcerto.utils.cc.IClosure;
17
 
17
 
18
import java.util.Collection;
-
 
19
import java.util.Deque;
18
import java.util.Deque;
20
import java.util.LinkedList;
19
import java.util.LinkedList;
21
import java.util.concurrent.locks.Condition;
20
import java.util.concurrent.locks.Condition;
22
import java.util.concurrent.locks.Lock;
21
import java.util.concurrent.locks.Lock;
23
import java.util.concurrent.locks.ReentrantLock;
22
import java.util.concurrent.locks.ReentrantLock;
-
 
23
import java.util.function.Function;
24
 
24
 
25
import net.jcip.annotations.GuardedBy;
25
import net.jcip.annotations.GuardedBy;
26
import net.jcip.annotations.ThreadSafe;
26
import net.jcip.annotations.ThreadSafe;
27
 
27
 
28
/**
28
/**
29
 * Holds items and give them one at a time to {@link #process(Object)}. At any time this process can
29
 * Holds items and give them one at a time to {@link #process(Object)}. At any time this process can
30
 * be put on hold by setting it to sleep. All actions in a thread modifying ({@link #put(Object)},
30
 * be put on hold by setting it to sleep. All actions in a thread modifying ({@link #put(Object)},
31
 * {@link #itemsDo(IClosure)}) the queue happens-before the processing of this modification.
31
 * {@link #itemsDo(IClosure)}) the queue happens-before the processing of this modification.
32
 * 
32
 * 
33
 * @author Sylvain
33
 * @author Sylvain
34
 * @param <T> type of item
34
 * @param <T> type of item
35
 */
35
 */
36
@ThreadSafe
36
@ThreadSafe
37
public abstract class DropperQueue<T> extends Thread {
37
public abstract class DropperQueue<T> extends Thread {
38
 
38
 
39
    @GuardedBy("itemsLock")
39
    @GuardedBy("itemsLock")
40
    private final Deque<T> items;
40
    private final Deque<T> items;
41
    private final Lock itemsLock;
41
    private final Lock itemsLock;
42
    private final Condition notEmpty;
42
    private final Condition notEmpty;
43
    @GuardedBy("this")
43
    @GuardedBy("this")
44
    private boolean stop;
44
    private boolean stop;
45
    @GuardedBy("this")
45
    @GuardedBy("this")
46
    private boolean sleeping;
46
    private boolean sleeping;
47
    @GuardedBy("this")
47
    @GuardedBy("this")
48
    private boolean executing;
48
    private boolean executing;
49
 
49
 
50
    /**
50
    /**
51
     * Construct a new instance.
51
     * Construct a new instance.
52
     * 
52
     * 
53
     * @param name name of this thread.
53
     * @param name name of this thread.
54
     */
54
     */
55
    public DropperQueue(String name) {
55
    public DropperQueue(String name) {
56
        super(name);
56
        super(name);
57
        this.items = new LinkedList<T>();
57
        this.items = new LinkedList<T>();
58
        this.stop = false;
58
        this.stop = false;
59
        this.sleeping = false;
59
        this.sleeping = false;
60
        this.executing = false;
60
        this.executing = false;
61
        this.itemsLock = new ReentrantLock();
61
        this.itemsLock = new ReentrantLock();
62
        this.notEmpty = this.itemsLock.newCondition();
62
        this.notEmpty = this.itemsLock.newCondition();
63
    }
63
    }
64
 
64
 
65
    // *** boolean
65
    // *** boolean
66
 
66
 
67
    /**
67
    /**
68
     * Whether this queue should stop temporarily.
68
     * Whether this queue should stop temporarily.
69
     * 
69
     * 
70
     * @param b <code>true</code> to put this to sleep, <code>false</code> to wake it.
70
     * @param b <code>true</code> to put this to sleep, <code>false</code> to wake it.
71
     * @return <code>true</code> if sleeping has changed.
71
     * @return <code>true</code> if sleeping has changed.
72
     */
72
     */
73
    public synchronized final boolean setSleeping(boolean b) {
73
    public synchronized final boolean setSleeping(boolean b) {
74
        if (this.sleeping != b) {
74
        if (this.sleeping != b) {
75
            this.sleeping = b;
75
            this.sleeping = b;
76
            this.signalChange();
76
            this.signalChange();
77
            return true;
77
            return true;
78
        } else
78
        } else
79
            return false;
79
            return false;
80
    }
80
    }
81
 
81
 
82
    public synchronized boolean isSleeping() {
82
    public synchronized boolean isSleeping() {
83
        return this.sleeping;
83
        return this.sleeping;
84
    }
84
    }
85
 
85
 
86
    private synchronized void signalChange() {
86
    private synchronized void signalChange() {
87
        this.signalChange(false);
87
        this.signalChange(false);
88
    }
88
    }
89
 
89
 
90
    private synchronized void signalChange(boolean signalClosure) {
90
    private synchronized void signalChange(boolean signalClosure) {
91
        // interrompt la thread, si elle est dans le await() ou le take()
91
        // interrompt la thread, si elle est dans le await() ou le take()
92
        // elle recheckera les booleens, ATTN elle peut-être en attente du lock juste après take(),
92
        // elle recheckera les booleens, ATTN elle peut-être en attente du lock juste après take(),
93
        // c'est pourquoi on efface le flag avant process()
93
        // c'est pourquoi on efface le flag avant process()
94
        // en général pas besoin d'interrompre la closure, puisque tant qu'on l'exécute
94
        // en général pas besoin d'interrompre la closure, puisque tant qu'on l'exécute
95
        // on ne peut, ni on n'a besoin de prendre depuis la queue
95
        // on ne peut, ni on n'a besoin de prendre depuis la queue
96
        if (signalClosure || !this.isExecuting())
96
        if (signalClosure || !this.isExecuting())
97
            this.interrupt();
97
            this.interrupt();
98
    }
98
    }
99
 
99
 
100
    private synchronized void setExecuting(boolean b) {
100
    private synchronized void setExecuting(boolean b) {
101
        this.executing = b;
101
        this.executing = b;
102
    }
102
    }
103
 
103
 
104
    private synchronized boolean isExecuting() {
104
    private synchronized boolean isExecuting() {
105
        return this.executing;
105
        return this.executing;
106
    }
106
    }
107
 
107
 
108
    private void await() throws InterruptedException {
108
    private void await() throws InterruptedException {
109
        synchronized (this) {
109
        synchronized (this) {
110
            if (this.sleeping) {
110
            if (this.sleeping) {
111
                this.wait();
111
                this.wait();
112
            }
112
            }
113
        }
113
        }
114
    }
114
    }
115
 
115
 
116
    /**
116
    /**
117
     * Signal that this thread must stop indefinitely. This method interrupts
117
     * Signal that this thread must stop indefinitely. This method interrupts
118
     * {@link #process(Object)}.
118
     * {@link #process(Object)}.
119
     * 
119
     * 
120
     * @see #die(boolean)
120
     * @see #die(boolean)
121
     */
121
     */
122
    public final void die() {
122
    public final void die() {
123
        this.die(true);
123
        this.die(true);
124
    }
124
    }
125
 
125
 
126
    /**
126
    /**
127
     * Signal that this thread must stop indefinitely. Once this method returns, it is guaranteed
127
     * Signal that this thread must stop indefinitely. Once this method returns, it is guaranteed
128
     * that no new item will be processed, and that this thread will {@link #isDead() die}. But if
128
     * that no new item will be processed, and that this thread will {@link #isDead() die}. But if
129
     * this thread is currently {@link #process(Object) processing} an item, then the method will
129
     * this thread is currently {@link #process(Object) processing} an item, then the method will
130
     * finish normally if :
130
     * finish normally if :
131
     * <ul>
131
     * <ul>
132
     * <li><code>mayInterruptIfRunning</code> is <code>false</code></li>
132
     * <li><code>mayInterruptIfRunning</code> is <code>false</code></li>
133
     * <li><code>mayInterruptIfRunning</code> is <code>true</code> but the {@link #interrupt()}
133
     * <li><code>mayInterruptIfRunning</code> is <code>true</code> but the {@link #interrupt()}
134
     * isn't checked by the implementing subclass</li>
134
     * isn't checked by the implementing subclass</li>
135
     * </ul>
135
     * </ul>
136
     * 
136
     * 
137
     * @param mayInterruptIfRunning <code>true</code> to interrupt while in {@link #process(Object)}
137
     * @param mayInterruptIfRunning <code>true</code> to interrupt while in {@link #process(Object)}
138
     * @see #isDying()
138
     * @see #isDying()
139
     */
139
     */
140
    public synchronized final void die(boolean mayInterruptIfRunning) {
140
    public synchronized final void die(boolean mayInterruptIfRunning) {
141
        this.stop = true;
141
        this.stop = true;
142
        this.signalChange(mayInterruptIfRunning);
142
        this.signalChange(mayInterruptIfRunning);
143
    }
143
    }
144
 
144
 
145
    /**
145
    /**
146
     * Whether this queue is dying.
146
     * Whether this queue is dying.
147
     * 
147
     * 
148
     * @return <code>true</code> if {@link #die(boolean)} has been called and
148
     * @return <code>true</code> if {@link #die(boolean)} has been called and
149
     *         {@link #process(Object)} is still executing.
149
     *         {@link #process(Object)} is still executing.
150
     * @see #isDead()
150
     * @see #isDead()
151
     */
151
     */
152
    public synchronized final boolean isDying() {
152
    public synchronized final boolean isDying() {
153
        return this.dieCalled() && this.isExecuting();
153
        return this.dieCalled() && this.isExecuting();
154
    }
154
    }
155
 
155
 
156
    /**
156
    /**
157
     * Whether this queue is active.
157
     * Whether this queue is active.
158
     * 
158
     * 
159
     * @return <code>true</code> if {@link #process(Object)} isn't executed and won't ever be again.
159
     * @return <code>true</code> if {@link #process(Object)} isn't executed and won't ever be again.
160
     * @see #isDying()
160
     * @see #isDying()
161
     */
161
     */
162
    public final boolean isDead() {
162
    public final boolean isDead() {
163
        // either we're dead because die() has been called, or because process() threw an Error
163
        // either we're dead because die() has been called, or because process() threw an Error
164
        return this.getState().equals(State.TERMINATED);
164
        return this.getState().equals(State.TERMINATED);
165
    }
165
    }
166
 
166
 
167
    public synchronized final boolean dieCalled() {
167
    public synchronized final boolean dieCalled() {
168
        return this.stop;
168
        return this.stop;
169
    }
169
    }
170
 
170
 
171
    // *** Run
171
    // *** Run
172
 
172
 
173
    @Override
173
    @Override
174
    public void run() {
174
    public void run() {
175
        while (!this.dieCalled()) {
175
        while (!this.dieCalled()) {
176
            try {
176
            try {
177
                this.await();
177
                this.await();
178
                final T item;
178
                final T item;
179
                // lockInterruptibly() to avoid taking another item after being put to sleep
179
                // lockInterruptibly() to avoid taking another item after being put to sleep
180
                this.itemsLock.lockInterruptibly();
180
                this.itemsLock.lockInterruptibly();
181
                try {
181
                try {
182
                    while (this.items.isEmpty())
182
                    while (this.items.isEmpty())
183
                        this.notEmpty.await();
183
                        this.notEmpty.await();
184
                    item = this.items.removeFirst();
184
                    item = this.items.removeFirst();
185
                } finally {
185
                } finally {
186
                    this.itemsLock.unlock();
186
                    this.itemsLock.unlock();
187
                }
187
                }
188
                this.setExecuting(true);
188
                this.setExecuting(true);
189
                // we should not carry the interrupted status in process()
189
                // we should not carry the interrupted status in process()
190
                // we only use it to stop waiting and check variables again, but if we're here we
190
                // we only use it to stop waiting and check variables again, but if we're here we
191
                // have removed an item and must process it
191
                // have removed an item and must process it
192
                Thread.interrupted();
192
                Thread.interrupted();
193
                process(item);
193
                process(item);
194
            } catch (InterruptedException e) {
194
            } catch (InterruptedException e) {
195
                // rien a faire, on recommence la boucle
195
                // rien a faire, on recommence la boucle
196
            } catch (RuntimeException e) {
196
            } catch (RuntimeException e) {
197
                e.printStackTrace();
197
                e.printStackTrace();
198
                // une exn s'est produite, on considère qu'on peut passer à la suite
198
                // une exn s'est produite, on considère qu'on peut passer à la suite
199
            } finally {
199
            } finally {
200
                this.setExecuting(false);
200
                this.setExecuting(false);
201
            }
201
            }
202
        }
202
        }
203
    }
203
    }
204
 
204
 
205
    abstract protected void process(final T item);
205
    abstract protected void process(final T item);
206
 
206
 
207
    // *** items
207
    // *** items
208
 
208
 
209
    /**
209
    /**
210
     * Adds an item to this queue. Actions in the thread prior to calling this method happen-before
210
     * Adds an item to this queue. Actions in the thread prior to calling this method happen-before
211
     * the passed argument is {@link #process(Object) processed}.
211
     * the passed argument is {@link #process(Object) processed}.
212
     * 
212
     * 
213
     * @param item the item to add.
213
     * @param item the item to add.
214
     */
214
     */
215
    public final void put(T item) {
215
    public final void put(T item) {
216
        this.itemsLock.lock();
216
        this.itemsLock.lock();
217
        try {
217
        try {
218
            this.items.add(item);
218
            this.items.add(item);
219
            this.notEmpty.signal();
219
            this.notEmpty.signal();
220
        } finally {
220
        } finally {
221
            this.itemsLock.unlock();
221
            this.itemsLock.unlock();
222
        }
222
        }
223
    }
223
    }
224
 
224
 
225
    public final void eachItemDo(final IClosure<T> c) {
225
    public final void eachItemDo(final IClosure<T> c) {
226
        this.itemsDo(new IClosure<Collection<T>>() {
226
        this.itemsDo((items) -> {
227
            @Override
-
 
228
            public void executeChecked(Collection<T> items) {
-
 
229
                for (final T t : items) {
227
            for (final T t : items) {
230
                    c.executeChecked(t);
228
                c.executeChecked(t);
231
                }
229
            }
232
            }
230
            return null;
233
        });
231
        });
234
    }
232
    }
235
 
233
 
236
    /**
234
    /**
237
     * Allows <code>c</code> to arbitrarily modify our queue as it is locked during this method.
235
     * Allows <code>c</code> to arbitrarily modify our queue as it is locked during this method.
238
     * I.e. no items will be removed (passed to the closure) nor added.
236
     * I.e. no items will be removed (passed to the closure) nor added.
239
     * 
237
     * 
240
     * @param c what to do with our queue.
238
     * @param c what to do with our queue.
241
     */
239
     */
242
    public final void itemsDo(IClosure<? super Deque<T>> c) {
240
    public final void itemsDo(IClosure<? super Deque<T>> c) {
-
 
241
        this.itemsDo((q) -> {
-
 
242
            c.executeChecked(q);
-
 
243
            return null;
-
 
244
        });
-
 
245
    }
-
 
246
 
-
 
247
    public final <R> R itemsDo(Function<? super Deque<T>, R> c) {
243
        this.itemsLock.lock();
248
        this.itemsLock.lock();
244
        try {
249
        try {
245
            c.executeChecked(this.items);
250
            final R res = c.apply(this.items);
246
            if (!this.items.isEmpty())
251
            if (!this.items.isEmpty())
247
                this.notEmpty.signal();
252
                this.notEmpty.signal();
-
 
253
            return res;
248
        } finally {
254
        } finally {
249
            this.itemsLock.unlock();
255
            this.itemsLock.unlock();
250
        }
256
        }
251
    }
257
    }
252
 
258
 
253
}
259
}