OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 142 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
67 ilm 1
/*
2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3
 *
182 ilm 4
 * Copyright 2011-2019 OpenConcerto, by ILM Informatique. All rights reserved.
67 ilm 5
 *
6
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
7
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
8
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
9
 * language governing permissions and limitations under the License.
10
 *
11
 * When distributing the software, include this License Header Notice in each file.
12
 */
13
 
14
 package org.openconcerto.sql.replication;
15
 
16
import org.openconcerto.sql.model.ConnectionHandlerNoSetup;
17
import org.openconcerto.sql.model.DBRoot;
18
import org.openconcerto.sql.model.DBSystemRoot;
19
import org.openconcerto.sql.model.IResultSetHandler;
20
import org.openconcerto.sql.model.SQLDataSource;
21
import org.openconcerto.sql.model.SQLName;
22
import org.openconcerto.sql.model.SQLSchema;
23
import org.openconcerto.sql.model.SQLSelect;
24
import org.openconcerto.sql.model.SQLServer;
25
import org.openconcerto.sql.model.SQLSyntax;
26
import org.openconcerto.sql.model.SQLSystem;
27
import org.openconcerto.sql.model.SQLTable;
28
import org.openconcerto.sql.model.graph.TablesMap;
29
import org.openconcerto.sql.utils.CSVHandler;
30
import org.openconcerto.sql.utils.ChangeTable;
182 ilm 31
import org.openconcerto.sql.utils.ChangeTable.FCSpec;
67 ilm 32
import org.openconcerto.sql.utils.SQLCreateMoveableTable;
33
import org.openconcerto.sql.utils.SQLCreateRoot;
34
import org.openconcerto.sql.utils.SQLCreateTableBase;
35
import org.openconcerto.sql.utils.SQLUtils;
36
import org.openconcerto.utils.FileUtils;
37
import org.openconcerto.utils.RTInterruptedException;
38
import org.openconcerto.utils.ThreadFactory;
39
import org.openconcerto.utils.cc.IClosure;
40
 
41
import java.io.File;
42
import java.io.IOException;
43
import java.sql.ResultSet;
44
import java.sql.SQLException;
45
import java.util.ArrayList;
46
import java.util.HashMap;
47
import java.util.List;
48
import java.util.Map;
182 ilm 49
import java.util.Map.Entry;
67 ilm 50
import java.util.Set;
51
import java.util.concurrent.Callable;
52
import java.util.concurrent.CancellationException;
53
import java.util.concurrent.ExecutionException;
54
import java.util.concurrent.Executors;
55
import java.util.concurrent.Future;
56
import java.util.concurrent.ScheduledExecutorService;
57
import java.util.concurrent.ScheduledFuture;
58
import java.util.concurrent.TimeUnit;
59
import java.util.concurrent.atomic.AtomicInteger;
60
 
182 ilm 61
import org.apache.commons.dbutils.ResultSetHandler;
62
 
67 ilm 63
import net.jcip.annotations.GuardedBy;
64
import net.jcip.annotations.ThreadSafe;
65
 
66
/**
67
 * Allow to replicate some tables in memory.
68
 *
69
 * @author Sylvain
70
 */
71
@ThreadSafe
72
public class MemoryRep {
73
 
74
    static final short MAX_CANCELED = 10;
75
    static private final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(MemoryRep.class.getName(), true));
76
 
77
    // final thread-safe objects
78
    private final DBSystemRoot master, slave;
79
    // final immutable
80
    private final TablesMap tables;
81
    // final immutable
82
    private final String singleRootName;
83
    @GuardedBy("this")
84
    private ScheduledFuture<?> future;
85
    @GuardedBy("this")
86
    private Future<?> manualFuture;
87
    @GuardedBy("this")
88
    private short canceledCount;
89
    // final thread-safe object
90
    private final AtomicInteger count;
91
 
92
    public MemoryRep(final SQLTable table) {
93
        this(table.getDBSystemRoot(), TablesMap.createByRootFromTable(table));
94
    }
95
 
96
    public MemoryRep(final DBSystemRoot master, final TablesMap tables) {
97
        this.master = master;
98
        this.tables = TablesMap.create(tables);
99
        if (this.tables.size() == 1) {
100
            this.singleRootName = this.tables.keySet().iterator().next();
101
            if (this.singleRootName == null)
102
                throw new IllegalStateException();
103
        } else {
104
            this.singleRootName = null;
105
        }
106
        // private in-memory database
182 ilm 107
        this.slave = new SQLServer(SQLSystem.H2, SQLSystem.H2_IN_MEMORY, null, null, null, new IClosure<DBSystemRoot>() {
67 ilm 108
            @Override
109
            public void executeChecked(DBSystemRoot input) {
110
                input.setRootsToMap(tables.keySet());
111
                // don't waste time on cache for transient data
112
                input.initUseCache(false);
113
            }
114
        }, new IClosure<SQLDataSource>() {
115
            @Override
116
            public void executeChecked(SQLDataSource input) {
117
                // one and only one connection since base is private
118
                input.setInitialSize(1);
119
                input.setMaxActive(1);
120
                input.setMinIdle(0);
121
                input.setMaxIdle(1);
122
                input.setTimeBetweenEvictionRunsMillis(-1);
123
                input.setBlockWhenExhausted(true);
124
                // allow to break free (by throwing an exception) of deadlocks :
125
                // * in replicateData() we take the one and only connection and eventually need the
126
                // lock on structure items (schema and tables)
127
                // * some other thread takes the lock on a structure item and then tries to execute
128
                // a query, waiting on the one and only connection.
129
                // Three minutes should be enough, since we only load data from files into memory,
130
                // and our clients can only do SELECTs
131
                input.setMaxWait(TimeUnit.MINUTES.toMillis(3));
132
            }
133
        }).getSystemRoot("");
134
        // slave is a copy so it needn't have checks (and it simplify replicate())
142 ilm 135
        this.slave.getDataSource().execute(this.slave.getSyntax().disableFKChecks(null));
67 ilm 136
        this.count = new AtomicInteger(0);
137
        this.canceledCount = 0;
138
    }
139
 
140
    /**
141
     * Start the automatic replication. When this method returns the structure is copied (
142
     * {@link #getSlaveTable(String, String)} works), but not the data : use the returned future
143
     * before accessing the data.
144
     *
145
     * @param period the period between successive replications.
146
     * @param unit the time unit of the period parameter.
147
     * @return a future representing the pending data replication.
148
     * @throws InterruptedException if the creation of structure was interrupted.
149
     * @throws ExecutionException if the creation of structure has failed.
150
     */
151
    public synchronized final Future<?> start(final long period, final TimeUnit unit) throws InterruptedException, ExecutionException {
152
        if (this.future != null) {
153
            if (this.future.isCancelled())
154
                throw new IllegalStateException("Already stopped");
155
            else
156
                throw new IllegalStateException("Already started");
157
        }
158
        exec.submit(new Callable<Object>() {
159
            @Override
160
            public Object call() throws Exception {
161
                replicateStruct();
162
                return null;
163
            }
164
        }).get();
165
        final Future<?> res = submitReplicate();
166
        // start after period since we just submitted a replicate()
167
        this.future = exec.scheduleAtFixedRate(getRunnable(), period, period, unit);
168
        return res;
169
    }
170
 
171
    public synchronized final boolean hasStopped() {
172
        return this.future != null && this.future.isCancelled();
173
    }
174
 
175
    /**
176
     * Stop the replication. It can not be started again.
177
     *
178
     * @return a future representing the pending structure deletion, or <code>null</code> if this
179
     *         wasn't started or already stopped.
180
     */
181
    public final Future<?> stop() {
182
        synchronized (this) {
183
            if (this.future == null || this.future.isCancelled())
184
                return null;
185
            this.future.cancel(true);
186
            this.manualFuture.cancel(true);
187
        }
188
        // use exec to be sure not to destroy the server before replicate() notices the interruption
189
        return exec.submit(new Runnable() {
190
            @Override
191
            public void run() {
192
                MemoryRep.this.slave.getServer().destroy();
193
            }
194
        });
195
    }
196
 
197
    public final DBSystemRoot getSlave() {
198
        return this.slave;
199
    }
200
 
201
    private final void checkTable(final String root, final String tableName) {
202
        if (!this.tables.containsKey(root))
203
            throw new IllegalArgumentException("Root not replicated : " + root + " " + tableName);
204
        if (!this.tables.get(root).contains(tableName))
205
            throw new IllegalArgumentException("Table not replicated : " + root + " " + tableName);
206
    }
207
 
208
    public final SQLTable getMasterTable(final String tableName) {
209
        return this.getMasterTable(this.singleRootName, tableName);
210
    }
211
 
212
    public final SQLTable getMasterTable(final String root, final String tableName) {
213
        checkTable(root, tableName);
214
        return this.master.getRoot(root).getTable(tableName);
215
    }
216
 
217
    public final SQLTable getSlaveTable(final String tableName) {
218
        return this.getSlaveTable(this.singleRootName, tableName);
219
    }
220
 
221
    public final SQLTable getSlaveTable(final String root, final String tableName) {
222
        checkTable(root, tableName);
223
        return this.slave.getRoot(root).getTable(tableName);
224
    }
225
 
226
    private final Runnable getRunnable() {
227
        return new Runnable() {
228
            @Override
229
            public void run() {
230
                try {
231
                    replicateData();
232
                } catch (Exception e) {
233
                    // TODO keep it to throw it elsewhere (scheduleAtFixedRate() cannot use
234
                    // Callable)
235
                    e.printStackTrace();
236
                }
237
 
238
            }
239
        };
240
    }
241
 
242
    /**
243
     * Force a manual replication.
244
     *
245
     * @return a future representing the pending data replication.
246
     */
247
    public synchronized final Future<?> submitReplicate() {
248
        final boolean canceled;
249
        // make sure we don't cancel all tasks
250
        if (this.manualFuture != null && this.canceledCount < MAX_CANCELED) {
251
            // false if already canceled or done
252
            canceled = this.manualFuture.cancel(true);
253
        } else {
254
            canceled = false;
255
        }
256
        if (canceled)
257
            this.canceledCount++;
258
        else
259
            this.canceledCount = 0;
260
        this.manualFuture = exec.submit(new Callable<Object>() {
261
            @Override
262
            public Object call() throws Exception {
263
                replicateData();
264
                return null;
265
            }
266
        });
267
        return this.manualFuture;
268
    }
269
 
270
    private final synchronized Future<?> getManualFuture() {
271
        return this.manualFuture;
272
    }
273
 
274
    /**
275
     * Wait on the last submitted manual replication.
276
     *
277
     * @throws ExecutionException if the computation threw an exception.
278
     * @throws InterruptedException if the current thread was interrupted while waiting.
279
     * @see #submitReplicate()
280
     * @see #executeModification(IClosure)
281
     */
282
    public final void waitOnLastManualFuture() throws InterruptedException, ExecutionException {
283
        // don't return the future, that way only the caller of submitReplicate() can cancel it
284
        Future<?> f = getManualFuture();
285
        boolean done = false;
286
        while (!done) {
287
            try {
288
                f.get();
289
                done = true;
290
            } catch (CancellationException e) {
291
                if (hasStopped()) {
292
                    done = true;
293
                } else {
294
                    // canceled by the caller of submitReplicate() or by a new submitReplicate()
295
                    // since f was canceled, data isn't up to date, so see if we can wait on a more
296
                    // recent update
297
                    final Future<?> old = f;
298
                    f = getManualFuture();
299
                    done = old == f;
300
                }
301
                if (done)
302
                    throw e;
303
            }
304
        }
305
    }
306
 
307
    protected final void replicateStruct() throws SQLException, IOException {
142 ilm 308
        final SQLSyntax slaveSyntax = this.slave.getSyntax();
67 ilm 309
        final SQLDataSource slaveDS = this.slave.getDataSource();
310
        final List<SQLCreateTableBase<?>> createTables = new ArrayList<SQLCreateTableBase<?>>();
311
        // undefined IDs by table by root
312
        final Map<String, Map<String, Number>> undefIDs = new HashMap<String, Map<String, Number>>();
313
        for (final Entry<String, Set<String>> e : this.tables.entrySet()) {
314
            final String rootName = e.getKey();
315
            final Set<String> tableNames = e.getValue();
142 ilm 316
            slaveDS.execute(new SQLCreateRoot(this.slave.getSyntax(), rootName).asString());
67 ilm 317
            final DBRoot root = this.master.getRoot(rootName);
318
 
319
            final Map<String, Number> rootUndefIDs = new HashMap<String, Number>(tableNames.size());
320
            undefIDs.put(rootName, rootUndefIDs);
321
 
322
            for (final String tableName : tableNames) {
323
                final SQLTable masterTable = root.getTable(tableName);
142 ilm 324
                final SQLCreateMoveableTable ct = masterTable.getCreateTable(slaveSyntax);
67 ilm 325
                // remove constraints towards non-copied tables
326
                for (final FCSpec fc : new ArrayList<FCSpec>(ct.getForeignConstraints())) {
327
                    final SQLName refTable = new SQLName(rootName, tableName).resolve(fc.getRefTable());
328
                    final String refTableName = refTable.getItem(-1);
329
                    final String refRootName = refTable.getItem(-2);
330
                    if (!this.tables.containsKey(refRootName) || !this.tables.get(refRootName).contains(refTableName))
331
                        ct.removeForeignConstraint(fc);
332
                }
333
                createTables.add(ct);
334
                rootUndefIDs.put(tableName, masterTable.getUndefinedIDNumber());
335
            }
336
        }
337
        // refresh empty roots
338
        this.slave.refetch();
339
        // set undefined IDs
340
        for (final Entry<String, Map<String, Number>> e : undefIDs.entrySet()) {
341
            final SQLSchema schema = this.slave.getRoot(e.getKey()).getSchema();
342
            SQLTable.setUndefIDs(schema, e.getValue());
343
        }
344
        // create tables
345
        for (final String s : ChangeTable.cat(createTables))
346
            slaveDS.execute(s);
347
        // final refresh
348
        this.slave.refetch();
349
    }
350
 
351
    // only called from the executor
352
    protected final void replicateData() throws SQLException, IOException, InterruptedException {
353
        final SQLSyntax slaveSyntax = SQLSyntax.get(this.slave);
354
        final File tempDir = FileUtils.createTempDir(getClass().getCanonicalName() + "_StoreData");
355
        try {
356
            final List<String> queries = new ArrayList<String>();
357
            final List<ResultSetHandler> handlers = new ArrayList<ResultSetHandler>();
358
            final Map<File, SQLTable> files = new HashMap<File, SQLTable>();
359
            for (final Entry<String, Set<String>> e : this.tables.entrySet()) {
360
                if (Thread.interrupted())
361
                    throw new InterruptedException("While creating handlers");
362
                final String rootName = e.getKey();
363
                final File rootDir = new File(tempDir, rootName);
364
                FileUtils.mkdir_p(rootDir);
365
                final DBRoot root = this.master.getRoot(rootName);
366
                final DBRoot slaveRoot = this.slave.getRoot(rootName);
367
                for (final String tableName : e.getValue()) {
368
                    final SQLTable masterT = root.getTable(tableName);
369
                    final SQLSelect select = new SQLSelect(true).addSelectStar(masterT);
370
                    queries.add(select.asString());
371
                    // don't use cache to be sure to have up to date data
372
                    handlers.add(new IResultSetHandler(new ResultSetHandler() {
373
 
374
                        private final CSVHandler csvH = new CSVHandler(masterT.getOrderedFields());
375
 
376
                        @Override
377
                        public Object handle(ResultSet rs) throws SQLException {
378
                            final File tempFile = new File(rootDir, FileUtils.FILENAME_ESCAPER.escape(tableName) + ".csv");
379
                            assert !tempFile.exists();
380
                            try {
381
                                FileUtils.write(this.csvH.handle(rs), tempFile);
382
                                files.put(tempFile, slaveRoot.getTable(tableName));
383
                            } catch (IOException e) {
384
                                throw new SQLException(e);
385
                            }
386
                            return null;
387
                        }
388
                    }, false));
389
                }
390
            }
391
            try {
392
                SQLUtils.executeAtomic(this.master.getDataSource(), new ConnectionHandlerNoSetup<Object, SQLException>() {
393
                    @Override
394
                    public Object handle(SQLDataSource ds) throws SQLException {
395
                        SQLUtils.executeMultiple(MemoryRep.this.master, queries, handlers);
396
                        return null;
397
                    }
398
                });
399
            } catch (RTInterruptedException e) {
400
                final InterruptedException exn = new InterruptedException("Interrupted while querying the master");
401
                exn.initCause(e);
402
                throw exn;
403
            }
404
            SQLUtils.executeAtomic(this.slave.getDataSource(), new ConnectionHandlerNoSetup<Object, IOException>() {
405
                @Override
406
                public Object handle(SQLDataSource ds) throws SQLException, IOException {
407
                    for (final Entry<File, SQLTable> e : files.entrySet()) {
408
                        final SQLTable slaveT = e.getValue();
409
                        // loadData() fires table modified
410
                        slaveSyntax.loadData(e.getKey(), slaveT, true);
411
                    }
412
                    return null;
413
                }
414
            });
415
            this.count.incrementAndGet();
416
        } finally {
417
            FileUtils.rm_R(tempDir);
418
        }
419
    }
420
 
421
    final int getCount() {
422
        return this.count.get();
423
    }
424
 
425
    public Future<?> executeModification(final IClosure<SQLDataSource> cl) {
426
        // change master
427
        cl.executeChecked(this.master.getDataSource());
428
        // update slave
429
        return submitReplicate();
430
    }
431
}