OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 93 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
17 ilm 1
/*
2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3
 *
4
 * Copyright 2011 OpenConcerto, by ILM Informatique. All rights reserved.
5
 *
6
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
7
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
8
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
9
 * language governing permissions and limitations under the License.
10
 *
11
 * When distributing the software, include this License Header Notice in each file.
12
 */
13
 
14
 package org.openconcerto.utils;
15
 
16
import org.openconcerto.utils.cc.IClosure;
17
 
61 ilm 18
import java.util.Deque;
19
import java.util.LinkedList;
20
import java.util.concurrent.locks.Condition;
17 ilm 21
import java.util.concurrent.locks.Lock;
22
import java.util.concurrent.locks.ReentrantLock;
180 ilm 23
import java.util.function.Function;
17 ilm 24
 
93 ilm 25
import net.jcip.annotations.GuardedBy;
26
import net.jcip.annotations.ThreadSafe;
27
 
17 ilm 28
/**
29
 * Holds items and give them one at a time to {@link #process(Object)}. At any time this process can
93 ilm 30
 * be put on hold by setting it to sleep. All actions in a thread modifying ({@link #put(Object)},
31
 * {@link #itemsDo(IClosure)}) the queue happens-before the processing of this modification.
17 ilm 32
 *
33
 * @author Sylvain
34
 * @param <T> type of item
35
 */
93 ilm 36
@ThreadSafe
17 ilm 37
public abstract class DropperQueue<T> extends Thread {
38
 
93 ilm 39
    @GuardedBy("itemsLock")
61 ilm 40
    private final Deque<T> items;
41
    private final Lock itemsLock;
42
    private final Condition notEmpty;
93 ilm 43
    @GuardedBy("this")
17 ilm 44
    private boolean stop;
93 ilm 45
    @GuardedBy("this")
17 ilm 46
    private boolean sleeping;
93 ilm 47
    @GuardedBy("this")
17 ilm 48
    private boolean executing;
49
 
50
    /**
51
     * Construct a new instance.
52
     *
53
     * @param name name of this thread.
54
     */
55
    public DropperQueue(String name) {
56
        super(name);
61 ilm 57
        this.items = new LinkedList<T>();
17 ilm 58
        this.stop = false;
59
        this.sleeping = false;
60
        this.executing = false;
61 ilm 61
        this.itemsLock = new ReentrantLock();
62
        this.notEmpty = this.itemsLock.newCondition();
17 ilm 63
    }
64
 
65
    // *** boolean
66
 
67
    /**
68
     * Whether this queue should stop temporarily.
69
     *
70
     * @param b <code>true</code> to put this to sleep, <code>false</code> to wake it.
71
     * @return <code>true</code> if sleeping has changed.
72
     */
73
    public synchronized final boolean setSleeping(boolean b) {
74
        if (this.sleeping != b) {
75
            this.sleeping = b;
76
            this.signalChange();
77
            return true;
78
        } else
79
            return false;
80
    }
81
 
82
    public synchronized boolean isSleeping() {
83
        return this.sleeping;
84
    }
85
 
86
    private synchronized void signalChange() {
87
        this.signalChange(false);
88
    }
89
 
90
    private synchronized void signalChange(boolean signalClosure) {
91
        // interrompt la thread, si elle est dans le await() ou le take()
92
        // elle recheckera les booleens, ATTN elle peut-être en attente du lock juste après take(),
93
        // c'est pourquoi on efface le flag avant process()
94
        // en général pas besoin d'interrompre la closure, puisque tant qu'on l'exécute
95
        // on ne peut, ni on n'a besoin de prendre depuis la queue
96
        if (signalClosure || !this.isExecuting())
97
            this.interrupt();
98
    }
99
 
100
    private synchronized void setExecuting(boolean b) {
101
        this.executing = b;
102
    }
103
 
104
    private synchronized boolean isExecuting() {
105
        return this.executing;
106
    }
107
 
108
    private void await() throws InterruptedException {
109
        synchronized (this) {
61 ilm 110
            if (this.sleeping) {
17 ilm 111
                this.wait();
112
            }
113
        }
114
    }
115
 
116
    /**
83 ilm 117
     * Signal that this thread must stop indefinitely. This method interrupts
118
     * {@link #process(Object)}.
119
     *
120
     * @see #die(boolean)
17 ilm 121
     */
83 ilm 122
    public final void die() {
123
        this.die(true);
124
    }
125
 
126
    /**
127
     * Signal that this thread must stop indefinitely. Once this method returns, it is guaranteed
128
     * that no new item will be processed, and that this thread will {@link #isDead() die}. But if
129
     * this thread is currently {@link #process(Object) processing} an item, then the method will
130
     * finish normally if :
131
     * <ul>
132
     * <li><code>mayInterruptIfRunning</code> is <code>false</code></li>
133
     * <li><code>mayInterruptIfRunning</code> is <code>true</code> but the {@link #interrupt()}
134
     * isn't checked by the implementing subclass</li>
135
     * </ul>
136
     *
137
     * @param mayInterruptIfRunning <code>true</code> to interrupt while in {@link #process(Object)}
138
     * @see #isDying()
139
     */
140
    public synchronized final void die(boolean mayInterruptIfRunning) {
17 ilm 141
        this.stop = true;
83 ilm 142
        this.signalChange(mayInterruptIfRunning);
17 ilm 143
    }
144
 
83 ilm 145
    /**
146
     * Whether this queue is dying.
147
     *
148
     * @return <code>true</code> if {@link #die(boolean)} has been called and
149
     *         {@link #process(Object)} is still executing.
150
     * @see #isDead()
151
     */
152
    public synchronized final boolean isDying() {
153
        return this.dieCalled() && this.isExecuting();
154
    }
155
 
156
    /**
157
     * Whether this queue is active.
158
     *
159
     * @return <code>true</code> if {@link #process(Object)} isn't executed and won't ever be again.
160
     * @see #isDying()
161
     */
162
    public final boolean isDead() {
163
        // either we're dead because die() has been called, or because process() threw an Error
93 ilm 164
        return this.getState().equals(State.TERMINATED);
83 ilm 165
    }
166
 
167
    public synchronized final boolean dieCalled() {
17 ilm 168
        return this.stop;
169
    }
170
 
171
    // *** Run
172
 
173
    @Override
174
    public void run() {
83 ilm 175
        while (!this.dieCalled()) {
17 ilm 176
            try {
177
                this.await();
61 ilm 178
                final T item;
179
                // lockInterruptibly() to avoid taking another item after being put to sleep
180
                this.itemsLock.lockInterruptibly();
181
                try {
182
                    while (this.items.isEmpty())
183
                        this.notEmpty.await();
184
                    item = this.items.removeFirst();
185
                } finally {
186
                    this.itemsLock.unlock();
187
                }
17 ilm 188
                this.setExecuting(true);
189
                // we should not carry the interrupted status in process()
190
                // we only use it to stop waiting and check variables again, but if we're here we
191
                // have removed an item and must process it
192
                Thread.interrupted();
193
                process(item);
194
            } catch (InterruptedException e) {
195
                // rien a faire, on recommence la boucle
196
            } catch (RuntimeException e) {
197
                e.printStackTrace();
198
                // une exn s'est produite, on considère qu'on peut passer à la suite
199
            } finally {
200
                this.setExecuting(false);
201
            }
202
        }
203
    }
204
 
205
    abstract protected void process(final T item);
206
 
207
    // *** items
208
 
209
    /**
93 ilm 210
     * Adds an item to this queue. Actions in the thread prior to calling this method happen-before
211
     * the passed argument is {@link #process(Object) processed}.
17 ilm 212
     *
213
     * @param item the item to add.
214
     */
215
    public final void put(T item) {
61 ilm 216
        this.itemsLock.lock();
17 ilm 217
        try {
218
            this.items.add(item);
61 ilm 219
            this.notEmpty.signal();
17 ilm 220
        } finally {
61 ilm 221
            this.itemsLock.unlock();
17 ilm 222
        }
223
    }
224
 
225
    public final void eachItemDo(final IClosure<T> c) {
180 ilm 226
        this.itemsDo((items) -> {
227
            for (final T t : items) {
228
                c.executeChecked(t);
17 ilm 229
            }
180 ilm 230
            return null;
17 ilm 231
        });
232
    }
233
 
234
    /**
235
     * Allows <code>c</code> to arbitrarily modify our queue as it is locked during this method.
236
     * I.e. no items will be removed (passed to the closure) nor added.
237
     *
238
     * @param c what to do with our queue.
239
     */
61 ilm 240
    public final void itemsDo(IClosure<? super Deque<T>> c) {
180 ilm 241
        this.itemsDo((q) -> {
242
            c.executeChecked(q);
243
            return null;
244
        });
245
    }
246
 
247
    public final <R> R itemsDo(Function<? super Deque<T>, R> c) {
61 ilm 248
        this.itemsLock.lock();
17 ilm 249
        try {
180 ilm 250
            final R res = c.apply(this.items);
61 ilm 251
            if (!this.items.isEmpty())
252
                this.notEmpty.signal();
180 ilm 253
            return res;
17 ilm 254
        } finally {
61 ilm 255
            this.itemsLock.unlock();
17 ilm 256
        }
257
    }
258
 
259
}