OpenConcerto

Dépôt officiel du code source de l'ERP OpenConcerto
sonarqube

svn://code.openconcerto.org/openconcerto

Rev

Rev 17 | Rev 63 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
17 ilm 1
/*
2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3
 *
4
 * Copyright 2011 OpenConcerto, by ILM Informatique. All rights reserved.
5
 *
6
 * The contents of this file are subject to the terms of the GNU General Public License Version 3
7
 * only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
8
 * copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
9
 * language governing permissions and limitations under the License.
10
 *
11
 * When distributing the software, include this License Header Notice in each file.
12
 */
13
 
14
 package org.openconcerto.sql.model;
15
 
16
import org.openconcerto.sql.Log;
17
import org.openconcerto.sql.State;
18
import org.openconcerto.sql.request.SQLCache;
19
import org.openconcerto.utils.ExceptionHandler;
20
import org.openconcerto.utils.ExceptionUtils;
21
import org.openconcerto.utils.RTInterruptedException;
22
import org.openconcerto.utils.ThreadFactory;
23
import org.openconcerto.utils.cache.CacheResult;
24
 
25
import java.sql.Connection;
26
import java.sql.ResultSet;
27
import java.sql.ResultSetMetaData;
28
import java.sql.SQLException;
29
import java.sql.Statement;
30
import java.util.Arrays;
31
import java.util.Collections;
32
import java.util.HashMap;
33
import java.util.HashSet;
34
import java.util.Hashtable;
35
import java.util.List;
36
import java.util.Map;
37
import java.util.NoSuchElementException;
38
import java.util.Set;
39
import java.util.WeakHashMap;
40
import java.util.concurrent.ExecutorService;
41
import java.util.concurrent.LinkedBlockingQueue;
42
import java.util.concurrent.ThreadPoolExecutor;
43
import java.util.concurrent.TimeUnit;
44
import java.util.concurrent.locks.ReentrantLock;
45
import java.util.logging.Level;
46
 
47
import javax.sql.DataSource;
48
 
61 ilm 49
import net.jcip.annotations.GuardedBy;
50
import net.jcip.annotations.ThreadSafe;
51
 
17 ilm 52
import org.apache.commons.dbcp.BasicDataSource;
53
import org.apache.commons.dbcp.PoolingDataSource;
54
import org.apache.commons.dbcp.SQLNestedException;
55
import org.apache.commons.dbutils.BasicRowProcessor;
56
import org.apache.commons.dbutils.ResultSetHandler;
57
import org.apache.commons.dbutils.RowProcessor;
58
import org.apache.commons.dbutils.handlers.ArrayHandler;
59
import org.apache.commons.dbutils.handlers.ArrayListHandler;
60
import org.apache.commons.dbutils.handlers.MapHandler;
61
import org.apache.commons.dbutils.handlers.MapListHandler;
62
import org.apache.commons.dbutils.handlers.ScalarHandler;
63
import org.apache.commons.pool.impl.GenericObjectPool;
64
import org.postgresql.util.GT;
65
import org.postgresql.util.PSQLException;
66
 
67
/**
68
 * Une source de donnée SQL.
69
 *
70
 * @author ILM Informatique 10 juin 2004
71
 */
61 ilm 72
@ThreadSafe
17 ilm 73
public final class SQLDataSource extends BasicDataSource implements Cloneable {
74
 
75
    // MAYBE add a cache, but ATTN synchronized : one connection per thread, but only one shared DS
76
 
77
    /** A map of supported database systems and associated drivers. */
78
    static public final Map<SQLSystem, String> DRIVERS;
79
    static {
80
        DRIVERS = new HashMap<SQLSystem, String>();
81
        DRIVERS.put(SQLSystem.MYSQL, "com.mysql.jdbc.Driver");
82
        DRIVERS.put(SQLSystem.POSTGRESQL, "org.postgresql.Driver");
83
        DRIVERS.put(SQLSystem.DERBY, "org.apache.derby.jdbc.ClientDriver");
84
        DRIVERS.put(SQLSystem.H2, "org.h2.Driver");
85
        DRIVERS.put(SQLSystem.MSSQL, "com.microsoft.sqlserver.jdbc.SQLServerDriver");
86
 
87
        // by default h2 convert database name to upper case (we used to work around it with
88
        // SQLSystem.getMDName() but in r2251 an equalsIgnoreCase() was replaced by equals())
89
        // see http://code.google.com/p/h2database/issues/detail?id=204
90
        System.setProperty("h2.databaseToUpper", "false");
91
    }
92
 
93
    // timeouts in seconds
94
    static public final int loginTimeOut = 15;
95
    static public final int socketTimeOut = 8 * 60;
96
 
97
    static public interface IgnoringRowProcessor extends RowProcessor {
98
 
99
        @Override
100
        public Map<String, Object> toMap(ResultSet rs) throws SQLException;
101
 
102
        /**
103
         * Convert the passed result set to a map, ignoring some columns.
104
         *
105
         * @param rs the result set.
106
         * @param toIgnore which columns' label to ignore.
107
         * @return a map with all columns of <code>rs</code> except <code>toIgnore</code>.
108
         * @throws SQLException if an error occurs while reading <code>rs</code>.
109
         */
110
        public Map<String, Object> toMap(ResultSet rs, Set<String> toIgnore) throws SQLException;
111
    }
112
 
113
    // ignoring case-sensitive processor
114
    static private class IgnoringCSRowProcessor extends BasicRowProcessor implements IgnoringRowProcessor {
115
        @Override
116
        public Map<String, Object> toMap(ResultSet rs) throws SQLException {
117
            return toMap(rs, Collections.<String> emptySet());
118
        }
119
 
120
        // on ne veut pas de CaseInsensitiveMap
121
        @Override
122
        public Map<String, Object> toMap(ResultSet rs, Set<String> toIgnore) throws SQLException {
123
            final Map<String, Object> result = new HashMap<String, Object>();
124
            final ResultSetMetaData rsmd = rs.getMetaData();
125
            final int cols = rsmd.getColumnCount();
126
            for (int i = 1; i <= cols; i++) {
127
                // ATTN use label not base name (eg "des" in "SELECT DESIGNATION as des FROM ...")
128
                final String label = rsmd.getColumnLabel(i);
129
                if (!toIgnore.contains(label))
130
                    result.put(label, rs.getObject(i));
131
            }
132
            return result;
133
        }
134
    }
135
 
136
    static public final IgnoringRowProcessor ROW_PROC = new IgnoringCSRowProcessor();
137
    // all thread safe
138
    public static final ColumnListHandler COLUMN_LIST_HANDLER = new ColumnListHandler();
139
    public static final ArrayListHandler ARRAY_LIST_HANDLER = new ArrayListHandler();
140
    public static final ArrayHandler ARRAY_HANDLER = new ArrayHandler();
141
    public static final ScalarHandler SCALAR_HANDLER = new ScalarHandler();
142
    public static final MapListHandler MAP_LIST_HANDLER = new MapListHandler(ROW_PROC);
143
    public static final MapHandler MAP_HANDLER = new MapHandler(ROW_PROC);
144
 
61 ilm 145
    // Cache, linked to cacheEnable and tables
146
    @GuardedBy("this")
147
    private SQLCache<List<?>, Object> cache;
148
    @GuardedBy("this")
17 ilm 149
    private boolean cacheEnabled;
150
    // tables that can be used in queries (and thus can impact the cache)
61 ilm 151
    @GuardedBy("this")
152
    private Set<SQLTable> tables;
17 ilm 153
 
154
    private static int count = 0; // compteur de requetes
155
 
156
    private final SQLServer server;
157
    // une connexion par thread
61 ilm 158
    @GuardedBy("this")
17 ilm 159
    private final Map<Thread, Connection> connections;
61 ilm 160
    // no need to synchronize multiple call to this attribute since we only access the
161
    // Thread.currentThread() key
162
    @GuardedBy("handlers")
17 ilm 163
    private final Map<Thread, HandlersStack> handlers;
164
 
61 ilm 165
    @GuardedBy("this")
17 ilm 166
    private ExecutorService exec = null;
167
 
61 ilm 168
    @GuardedBy("this")
17 ilm 169
    private CleanUp cleanUp;
61 ilm 170
 
171
    // linked to initialSchema and uptodate
172
    @GuardedBy("this")
17 ilm 173
    private boolean initialShemaSet;
61 ilm 174
    @GuardedBy("this")
17 ilm 175
    private String initialShema;
61 ilm 176
    // which Connection have the right default schema
177
    @GuardedBy("this")
17 ilm 178
    private final Map<Connection, Object> uptodate;
179
 
61 ilm 180
    private volatile int retryWait;
181
 
17 ilm 182
    private final ReentrantLock testLock = new ReentrantLock();
183
 
184
    public SQLDataSource(SQLServer server, String base, String login, String pass) {
185
        this(server, server.getURL(base), login, pass, Collections.<SQLTable> emptySet());
186
    }
187
 
188
    private SQLDataSource(SQLServer server, String url, String login, String pass, Set<SQLTable> tables) {
189
        this(server);
190
 
191
        final SQLSystem system = server.getSQLSystem();
192
        if (!DRIVERS.containsKey(system))
193
            throw new IllegalArgumentException("unknown database system: " + system);
194
 
195
        this.setDriverClassName(DRIVERS.get(system));
196
        this.setUrl("jdbc:" + system.getJDBCName() + ":" + url);
197
 
198
        this.setUsername(login);
199
        this.setPassword(pass);
200
        this.setTables(tables);
201
 
202
        if (this.server.getSQLSystem() == SQLSystem.MYSQL) {
203
            this.addConnectionProperty("transformedBitIsBoolean", "true");
204
        } else if (this.server.getSQLSystem() == SQLSystem.H2) {
205
            this.addConnectionProperty("CACHE_SIZE", "32000");
206
        }
207
        this.setLoginTimeout(loginTimeOut);
208
        this.setSocketTimeout(socketTimeOut);
209
        this.setRetryWait(3);
210
        // ATTN DO NOT call execute() or any method that might create a connection
211
        // since at this point dsInit() has not been called and thus connection properties might be
212
        // missing (eg allowMultiQueries). And the faulty connection will stay in the pool.
213
    }
214
 
215
    @Override
216
    public final void setLoginTimeout(int timeout) {
217
        if (this.server.getSQLSystem() == SQLSystem.MYSQL) {
218
            this.addConnectionProperty("connectTimeout", timeout + "000");
219
        } else if (this.server.getSQLSystem() == SQLSystem.POSTGRESQL) {
220
            this.addConnectionProperty("loginTimeout", timeout + "");
221
        }
222
    }
223
 
224
    public final void setSocketTimeout(int timeout) {
225
        if (this.server.getSQLSystem() == SQLSystem.MYSQL) {
226
            this.addConnectionProperty("socketTimeout", timeout + "000");
227
        } else if (this.server.getSQLSystem() == SQLSystem.H2) {
228
            this.addConnectionProperty("QUERY_TIMEOUT", timeout + "000");
229
        } else if (this.server.getSQLSystem() == SQLSystem.POSTGRESQL) {
230
            this.addConnectionProperty("socketTimeout", timeout + "");
231
        }
232
    }
233
 
234
    public final void setRetryWait(int retryWait) {
235
        this.retryWait = retryWait;
236
    }
237
 
61 ilm 238
    synchronized void setTables(Set<SQLTable> tables) {
17 ilm 239
        // don't change the cache if we're only adding tables
240
        final boolean update = this.cache == null || !tables.containsAll(this.tables);
61 ilm 241
        this.tables = Collections.unmodifiableSet(new HashSet<SQLTable>(tables));
17 ilm 242
        if (update)
243
            updateCache();
244
    }
245
 
61 ilm 246
    private synchronized void updateCache() {
17 ilm 247
        if (this.cache != null)
248
            this.cache.clear();
249
        if (this.cacheEnabled && this.tables.size() > 0)
61 ilm 250
            this.cache = new SQLCache<List<?>, Object>(30, 30, "results of " + this.getClass().getSimpleName());
17 ilm 251
        else
252
            this.cache = null;
253
    }
254
 
255
    /**
256
     * Enable or disable the cache. ATTN if you enable the cache you must
257
     * {@link SQLTable#fire(SQLTableEvent) fire} table events, or use a class that does like
258
     * {@link SQLRowValues}.
259
     *
260
     * @param b <code>true</code> to enable the cache.
261
     */
61 ilm 262
    public final synchronized void setCacheEnabled(boolean b) {
17 ilm 263
        if (this.cacheEnabled != b) {
264
            this.cacheEnabled = b;
265
            updateCache();
266
        }
267
    }
268
 
269
    /* pour le clonage */
270
    private SQLDataSource(SQLServer server) {
271
        this.server = server;
61 ilm 272
        this.connections = new HashMap<Thread, Connection>();
17 ilm 273
        // on a besoin d'une implementation synchronisée
61 ilm 274
        this.handlers = new Hashtable<Thread, HandlersStack>();
17 ilm 275
        // weak, since this is only a hint to avoid initializing the connection
276
        // on each borrowal
277
        this.uptodate = new WeakHashMap<Connection, Object>();
278
        this.cleanUp = null;
279
        this.initialShemaSet = false;
280
        this.initialShema = null;
281
 
282
        // see #getNewConnection(boolean)
283
        this.setValidationQuery("SELECT 1");
284
        this.setTestOnBorrow(false);
285
 
286
        this.setInitialSize(3);
287
        this.setMaxActive(48);
288
        // creating connections is quite costly so make sure we always have a couple free
289
        this.setMinIdle(2);
290
        // but not too much as it can lock out other users (the server has a max connection count)
291
        this.setMaxIdle(16);
292
        // see #createDataSource() for properties not supported by this class
61 ilm 293
        this.tables = Collections.emptySet();
17 ilm 294
        this.cache = null;
295
        this.cacheEnabled = false;
296
    }
297
 
298
    /**
299
     * Exécute la requête et retourne le résultat sous forme de liste de map. Si la requete va
300
     * retourner beaucoup de lignes, il est peut-être préférable d'utiliser un ResultSetHandler.
301
     *
302
     * @param query le requête à exécuter.
303
     * @return le résultat de la requête.
304
     * @see MapListHandler
305
     * @see #execute(String, ResultSetHandler)
306
     */
307
    public List execute(String query) {
308
        return (List) this.execute(query, MAP_LIST_HANDLER);
309
    }
310
 
311
    /**
312
     * Exécute la requête et retourne la première colonne uniquement.
313
     *
314
     * @param query le requête à exécuter.
315
     * @return le résultat de la requête.
316
     * @see ColumnListHandler
317
     * @see #execute(String, ResultSetHandler)
318
     */
319
    public List executeCol(String query) {
320
        return (List) this.execute(query, COLUMN_LIST_HANDLER);
321
    }
322
 
323
    /**
324
     * Exécute la requête et retourne le résultat sous forme de liste de tableau. Si la requete va
325
     * retourner beaucoup de lignes, il est peut-être préférable d'utiliser un ResultSetHandler.
326
     *
327
     * @param query le requête à exécuter.
328
     * @return le résultat de la requête.
329
     * @see ArrayListHandler
330
     * @see #execute(String, ResultSetHandler)
331
     */
332
    public List executeA(String query) {
333
        return (List) this.execute(query, ARRAY_LIST_HANDLER);
334
    }
335
 
336
    /**
337
     * Exécute la requête et retourne la première ligne du résultat sous forme de map.
338
     *
339
     * @param query le requête à exécuter.
340
     * @return le résultat de la requête.
341
     * @see MapHandler
342
     * @see #execute(String)
343
     */
344
    public Map execute1(String query) {
345
        return (Map) this.execute(query, MAP_HANDLER);
346
    }
347
 
348
    /**
349
     * Exécute la requête et retourne la première ligne du résultat sous forme de tableau.
350
     *
351
     * @param query le requête à exécuter.
352
     * @return le résultat de la requête.
353
     * @see ArrayHandler
354
     * @see #executeA(String)
355
     */
356
    public Object[] executeA1(String query) {
357
        return (Object[]) this.execute(query, ARRAY_HANDLER);
358
    }
359
 
360
    /**
361
     * Exécute la requête et retourne la valeur de la premiere colonne de la premiere ligne.
362
     *
363
     * @param query le requête à exécuter.
364
     * @return le résultat de la requête.
365
     */
366
    public Object executeScalar(String query) {
367
        return this.execute(query, SCALAR_HANDLER);
368
    }
369
 
370
    /**
371
     * Exécute la requête et passe le résultat au ResultSetHandler.
372
     *
373
     * @param query le requête à exécuter.
374
     * @param rsh le handler à utiliser, ou <code>null</code>.
375
     * @return le résultat du handler, <code>null</code> si rsh est <code>null</code>.
376
     * @see #execute(String)
377
     */
378
    public Object execute(String query, ResultSetHandler rsh) {
379
        return this.execute(query, rsh, null);
380
    }
381
 
382
    /**
383
     * Execute <code>query</code> within <code>c</code>, passing the result set to <code>rsh</code>.
384
     *
385
     * @param query the query to perform.
386
     * @param rsh what to do with the result, can be <code>null</code>.
387
     * @param changeState whether <code>query</code> changes the state of a connection.
388
     * @return the result of <code>rsh</code>, <code>null</code> if rsh or the resultSet is
389
     *         <code>null</code>.
390
     * @throws RTInterruptedException if the current thread is interrupted while waiting for the
391
     *         cache or for the database.
392
     */
393
    public final Object execute(final String query, final ResultSetHandler rsh, final boolean changeState) throws RTInterruptedException {
394
        return this.execute(query, rsh, changeState, null);
395
    }
396
 
397
    private Object execute(final String query, final ResultSetHandler rsh, final Connection c) throws RTInterruptedException {
398
        // false since the vast majority of request do NOT change the state
399
        return this.execute(query, rsh, false, c);
400
    }
401
 
402
    /**
403
     * Execute <code>query</code> within <code>c</code>, passing the result set to <code>rsh</code>.
404
     *
405
     * @param query the query to perform.
406
     * @param rsh what to do with the result, can be <code>null</code>.
407
     * @param changeState whether <code>query</code> changes the state of a connection.
408
     * @param passedConn the sql connection to use.
409
     * @return the result of <code>rsh</code>, <code>null</code> if rsh or the resultSet is
410
     *         <code>null</code>.
411
     * @throws RTInterruptedException if the current thread is interrupted while waiting for the
412
     *         cache or for the database.
413
     */
414
    private Object execute(final String query, final ResultSetHandler rsh, final boolean changeState, final Connection passedConn) throws RTInterruptedException {
415
        final long timeMs = System.currentTimeMillis();
416
        final long time = System.nanoTime();
417
        // some systems refuse to execute nothing
418
        if (query.length() == 0) {
419
            SQLRequestLog.log(query, "Pas de requête.", timeMs, 0);
420
            return null;
421
        }
422
 
423
        final IResultSetHandler irsh = rsh instanceof IResultSetHandler ? (IResultSetHandler) rsh : null;
61 ilm 424
        final SQLCache<List<?>, Object> cache;
425
        synchronized (this) {
426
            cache = this.cache;
427
        }
428
        final List<Object> key = cache != null && query.startsWith("SELECT") ? Arrays.asList(new Object[] { query, rsh }) : null;
17 ilm 429
        if (key != null && (irsh == null || irsh.readCache())) {
61 ilm 430
            final CacheResult<Object> l = cache.check(key);
17 ilm 431
            if (l.getState() == CacheResult.State.INTERRUPTED)
432
                throw new RTInterruptedException("interrupted while waiting for the cache");
433
            else if (l.getState() == CacheResult.State.VALID) {
434
                // cache actif
435
                if (State.DEBUG)
436
                    State.INSTANCE.addCacheHit();
437
                SQLRequestLog.log(query, "En cache.", timeMs, 0);
438
                return l.getRes();
439
            }
440
        }
441
 
442
        Object result = null;
443
        QueryInfo info = null;
444
        long durationSQL = 0;
445
        try {
446
            info = new QueryInfo(query, changeState, passedConn);
447
            try {
448
                final Object[] res = this.executeTwice(info);
449
                final Statement stmt = (Statement) res[0];
450
                ResultSet rs = (ResultSet) res[1];
451
                // TODO 1. rename #execute(String) to #executeN(String)
452
                // and make #execute(String) do #execute(String, null)
453
                // 2. let null rs pass to rsh
454
                // otherwise you write ds.execute("req", new ResultSetHandler() {
455
                // public Object handle(ResultSet rs) throws SQLException {
456
                // return "OK";
457
                // }
458
                // });
459
                // and OK won't be returned if "req" returns a null rs.
460
                durationSQL = System.nanoTime() - time;
461
                if (rsh != null && rs != null) {
462
                    if (this.getSystem() == SQLSystem.DERBY || this.getSystem() == SQLSystem.POSTGRESQL) {
463
                        rs = new SQLResultSet(rs);
464
                    }
465
 
466
                    result = rsh.handle(rs);
467
                }
468
 
469
                stmt.close();
470
                // if key was added to the cache
471
                if (key != null) {
61 ilm 472
                    synchronized (this) {
473
                        putInCache(cache, irsh, key, result, true);
474
                        if (this.cache != cache)
475
                            putInCache(this.cache, irsh, key, result, false);
476
                    }
17 ilm 477
                }
478
                info.releaseConnection();
479
            } catch (SQLException exn) {
480
                // don't usually do a getSchema() as it access the db
481
                throw new IllegalStateException("Impossible d'accéder au résultat de " + query + "\n in " + this, exn);
482
            }
483
        } catch (RuntimeException e) {
484
            // for each #check() there must be a #removeRunning()
485
            // let the cache know we ain't gonna tell it the result
61 ilm 486
            if (cache != null && key != null)
487
                cache.removeRunning(key);
17 ilm 488
            if (info != null)
489
                info.releaseConnection(e);
490
            throw e;
491
        }
492
 
493
        SQLRequestLog.log(query, "", info, timeMs, durationSQL, System.nanoTime() - time);
494
 
495
        return result;
496
    }
497
 
61 ilm 498
    private synchronized void putInCache(final SQLCache<List<?>, Object> cache, final IResultSetHandler irsh, final List<Object> key, Object result, final boolean removeRunning) {
499
        if (irsh != null && irsh.writeCache()) {
500
            cache.put(key, result, irsh.getCacheModifiers() == null ? this.tables : irsh.getCacheModifiers());
501
        } else if (irsh == null && IResultSetHandler.shouldCache(result)) {
502
            cache.put(key, result, this.tables);
503
        } else if (removeRunning) {
504
            cache.removeRunning(key);
505
        }
506
    }
507
 
17 ilm 508
    private synchronized final ExecutorService getExec() {
509
        if (this.exec == null) {
510
            // not daemon since we want the connections to be returned
511
            final ThreadFactory factory = new ThreadFactory(SQLDataSource.class.getSimpleName() + " " + this.toString() + " exec n° ", false);
512
            // a rather larger number of threads since all they do is wait severals seconds
513
            this.exec = new ThreadPoolExecutor(0, 32, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
514
        }
515
        return this.exec;
516
    }
517
 
518
    public final class QueryInfo {
519
        private final String query;
520
        // whether query change the state of our connection
521
        private final boolean changeState;
522
        // can change if private
523
        private Connection c;
524
        // whether we acquired a new connection (and thus can do whatever we want with it)
525
        private final boolean privateConnection;
526
 
527
        QueryInfo(String query, boolean changeState, final Connection passedConn) {
528
            super();
529
            this.query = query;
530
            this.changeState = changeState;
531
 
532
            // if passedConn is provided use it, else we need to find one
533
            boolean acquiredConnection = false;
534
            final Connection foundConn;
535
            if (passedConn != null)
536
                foundConn = passedConn;
537
            else if (!handlingConnection()) {
538
                foundConn = getNewConnection();
539
                acquiredConnection = true;
540
            } else {
541
                final HandlersStack threadHandlers = getHandlersStack();
542
                if (!changeState || threadHandlers.isChangeAllowed()) {
543
                    foundConn = threadHandlers.getConnection();
544
                } else {
545
                    throw new IllegalStateException("the passed query change the connection's state and the current thread has a connection which will thus be changed."
546
                            + " A possible solution is to execute it in the setup() of a ConnectionHandler\n" + query);
547
                }
548
            }
549
 
550
            this.privateConnection = acquiredConnection;
551
            this.c = foundConn;
552
        }
553
 
554
        public final Connection getConnection() {
555
            return this.c;
556
        }
557
 
558
        public final String getQuery() {
559
            return this.query;
560
        }
561
 
562
        void releaseConnection(RuntimeException e) {
563
            // MySQL reste des fois bloqué dans SocketInputStream.socketRead0()
564
            // (le serveur ayant tué la query)
565
            if (e instanceof InterruptedQuery && getSystem() == SQLSystem.MYSQL) {
566
                final ExecutorThread thread = ((InterruptedQuery) e).getThread();
567
 
568
                if (this.privateConnection) {
569
                    if (this.changeState)
570
                        // no need to try to save the connection, it is no longer valid
571
                        this.releaseConnection();
572
                    else {
573
                        // test if the connection is still valid before returning it to the pool
574
                        getExec().execute(new Runnable() {
575
                            public void run() {
576
                                // on attend un peu
577
                                try {
578
                                    thread.join(1500);
579
                                    // pour voir si on meurt
580
                                    if (thread.isAlive()) {
581
                                        Log.get().warning(getFailedCancelMsg());
582
                                        closeConnection(getConnection());
583
                                    } else {
584
                                        // la connexion est ok, on la remet dans le pool
585
                                        returnConnection(getConnection());
586
                                    }
587
                                } catch (InterruptedException e) {
588
                                    // the datasource is closing
589
                                    Log.get().fine("Interrupted while joining " + getQuery());
590
                                    closeConnection(getConnection());
591
                                }
592
                            }
593
                        });
594
                    }
595
                } else {
596
                    // try to save the connection since it is used by others
597
                    try {
598
                        // clear the interrupt status set by InterruptedQuery
599
                        // so that we can wait on thread
600
                        Thread.interrupted();
601
                        thread.join(500);
602
                    } catch (InterruptedException e2) {
603
                        System.err.println("ignore, we are already interrupted");
604
                        e2.printStackTrace();
605
                    }
606
                    // remettre le flag pour les méthodes appelantes.
607
                    Thread.currentThread().interrupt();
608
 
609
                    // connection is still stuck
610
                    if (thread.isAlive()) {
611
                        throw new IllegalStateException(getFailedCancelMsg(), e);
612
                    } else
613
                        this.releaseConnection();
614
                }
615
            } else
616
                this.releaseConnection();
617
        }
618
 
619
        void releaseConnection() {
620
            // have we borrowed a connection, otherwise it is not our responsibility to release it.
621
            if (this.privateConnection) {
622
                if (this.changeState)
623
                    // the connection is no longer in a pristine state so close it
624
                    closeConnection(this.getConnection());
625
                else
626
                    // otherwise we can reuse it
627
                    returnConnection(this.getConnection());
628
            }
629
        }
630
 
631
        private final String getFailedCancelMsg() {
632
            return "cancel of " + System.identityHashCode(getConnection()) + " failed for " + getQuery();
633
        }
634
 
635
        // an error has occured, try within another connection if possible
636
        public final Connection obtainNewConnection() {
637
            if (!this.privateConnection)
638
                return null;
639
            else {
640
                // ATTN should be sure that our connection was not already closed,
641
                // see #closeConnection()
642
                closeConnection(this.getConnection());
643
                this.c = borrowConnection(true);
644
                return this.getConnection();
645
            }
646
        }
647
    }
648
 
649
    private final boolean handlingConnection() {
650
        return this.handlers.containsKey(Thread.currentThread());
651
    }
652
 
653
    private final HandlersStack getHandlersStack() {
654
        return this.handlers.get(Thread.currentThread());
655
    }
656
 
657
    /**
658
     * Use a single connection to execute <code>handler</code>.
659
     *
660
     * @param <T> type of return.
661
     * @param <X> type of exception.
662
     * @param handler what to do with the connection.
663
     * @return what <code>handler</code> returned.
664
     * @throws SQLException if an exception happens in setup() or restore().
665
     * @throws X if handle() throws an exception.
666
     * @see ConnectionHandler
667
     */
668
    public final <T, X extends Exception> T useConnection(ConnectionHandler<T, X> handler) throws SQLException, X {
669
        final HandlersStack h;
670
        if (!this.handlingConnection()) {
671
            h = new HandlersStack(this.getNewConnection(), handler);
672
            this.handlers.put(Thread.currentThread(), h);
673
        } else if (handler.canRestoreState()) {
674
            h = this.getHandlersStack().push(handler);
675
        } else
676
            throw new IllegalStateException("this thread has already called useConnection() and thus expect its state, but the passed handler cannot restore state: " + handler);
677
 
678
        Connection conn = null;
679
        Exception exn = null;
680
        try {
681
            conn = h.getConnection();
682
            h.setChangeAllowed(true);
683
            handler.setup(conn);
684
            h.setChangeAllowed(false);
685
            handler.compute(this);
686
        } catch (Exception e) {
687
            h.setChangeAllowed(false);
688
            exn = e;
689
        }
690
 
691
        // in all cases (thanks to the above catch), try to restore the state
692
        // if conn is null setup() was never called
693
        boolean pristineState = conn == null;
694
        if (!pristineState && handler.canRestoreState()) {
695
            h.setChangeAllowed(true);
696
            try {
697
                handler.restoreState(conn);
698
                pristineState = true;
699
            } catch (Exception e) {
700
                if (exn == null)
701
                    exn = e;
702
                else
703
                    // the original exn as the source
704
                    exn = new SQLException("could not restore state: " + ExceptionUtils.getStackTrace(e), exn);
705
            }
706
            h.setChangeAllowed(false);
707
        }
708
 
709
        // ATTN conn can be null (return/closeConnection() accept it)
710
        if (h.pop()) {
711
            // remove if this thread has no handlers left
712
            this.handlers.remove(Thread.currentThread());
713
            if (pristineState)
714
                this.returnConnection(conn);
715
            else
716
                this.closeConnection(conn);
717
        } else {
718
            // connection is still used
719
            if (!pristineState) {
720
                h.invalidConnection();
721
                this.closeConnection(conn);
722
            }
723
            // else the top handler will release the connection
724
        }
725
        if (exn != null)
726
            if (exn instanceof RuntimeException)
727
                throw (RuntimeException) exn;
728
            else
729
                throw (SQLException) exn;
730
        else
731
            return handler.get();
732
    }
733
 
734
    // this method create a Statement, don't forget to close it when you're done
735
    private Object[] executeTwice(QueryInfo queryInfo) throws SQLException {
736
        final String query = queryInfo.getQuery();
737
        Object[] res;
738
        try {
739
            res = executeOnce(query, queryInfo.getConnection());
740
        } catch (SQLException exn) {
741
            if (State.DEBUG)
742
                State.INSTANCE.addFailedRequest(query);
743
            // maybe this was a network problem, so wait a little
744
            try {
745
                Thread.sleep(1000);
746
            } catch (InterruptedException e) {
747
                throw new RTInterruptedException(e.getMessage() + " : " + query, exn);
748
            }
749
            // and try to obtain a new connection
750
            try {
751
                final Connection otherConn = queryInfo.obtainNewConnection();
752
                if (otherConn != null) {
753
                    res = executeOnce(query, otherConn);
754
                } else
755
                    throw exn;
756
            } catch (Exception e) {
757
                if (e == exn)
758
                    throw exn;
759
                else
760
                    throw new SQLException("second exec failed: " + e.getLocalizedMessage(), exn);
761
            }
762
        }
763
        return res;
764
    }
765
 
766
    private Object[] executeOnce(String query, Connection c) throws SQLException {
767
        final Statement stmt = c.createStatement();
768
        final ResultSet rs = execute(query, stmt);
769
        return new Object[] { stmt, rs };
770
    }
771
 
772
    /**
773
     * Exécute la requête et retourne le résultat. Attention le resultSet peut cesser d'être valide
774
     * a tout moment, de plus cette méthode ne ferme pas le statement qu'elle crée, la méthode
775
     * préférée est execute()
776
     *
777
     * @param query le requête à exécuter.
778
     * @return le résultat de la requête.
779
     * @deprecated replaced by execute().
780
     * @see #execute(String)
781
     */
782
    public ResultSet executeRaw(String query) {
783
        try {
784
            return execute(query, this.getStatement());
785
        } catch (SQLException e) {
786
            try {
787
                return execute(query, this.getStatement());
788
            } catch (SQLException ex) {
789
                ExceptionHandler.handle("Impossible d'executer la query: " + query, ex);
790
                return null;
791
            }
792
        }
793
    }
794
 
795
    /**
796
     * Retourne un nouveau statement. Attention, la fermeture est à la charge de l'appelant.
797
     *
798
     * @return un nouveau statement.
799
     * @throws SQLException if an error occurs.
800
     */
801
    private Statement getStatement() throws SQLException {
802
        return this.getConnection().createStatement();
803
    }
804
 
805
    /**
806
     * Execute la requete avec le statement passé. Attention cette méthode ne peut fermer le
807
     * statement car elle retourne directement le resultSet.
808
     *
809
     * @param query le requête à exécuter.
810
     * @param stmt le statement.
811
     * @return le résultat de la requête, should never be null according to the spec but Derby don't
812
     *         care.
813
     * @throws SQLException si erreur lors de l'exécution de la requête.
814
     */
815
    private ResultSet execute(String query, Statement stmt) throws SQLException, RTInterruptedException {
816
        // System.err.println("\n" + count + "*** " + query + "\n");
817
 
818
        if (State.DEBUG)
819
            State.INSTANCE.beginRequest(query);
820
 
821
        // test before calling JDBC methods and creating threads
822
        if (Thread.currentThread().isInterrupted()) {
823
            throw new RTInterruptedException("request interrupted : " + query);
824
        }
825
 
826
        final long t1 = System.currentTimeMillis();
827
        ResultSet rs = null;
828
        try {
829
            // MAYBE un truc un peu plus formel
830
            if (query.startsWith("INSERT") || query.startsWith("UPDATE") || query.startsWith("DELETE") || query.startsWith("ALTER") || query.startsWith("DROP") || query.startsWith("SET")) {
831
                final boolean returnGenK = (query.startsWith("INSERT") || query.startsWith("UPDATE")) && stmt.getConnection().getMetaData().supportsGetGeneratedKeys();
832
                stmt.executeUpdate(query, returnGenK ? Statement.RETURN_GENERATED_KEYS : Statement.NO_GENERATED_KEYS);
833
                rs = returnGenK ? stmt.getGeneratedKeys() : null;
834
            } else {
835
                // TODO en faire qu'un seul par Connection
836
                final ExecutorThread thr = new ExecutorThread(stmt, query);
837
                // on lance l'exécution
838
                thr.start();
839
                // et on attend soit qu'elle finisse soit qu'on soit interrompu
840
                try {
841
                    rs = thr.getRs();
842
                } catch (InterruptedException e) {
843
                    thr.stopQuery();
844
                    throw new InterruptedQuery("request interrupted : " + query, e, thr);
845
                }
846
            }
847
        } finally {
848
            if (State.DEBUG)
849
                State.INSTANCE.endRequest(query);
850
        }
851
        long t2 = System.currentTimeMillis();
852
        // obviously very long queries tend to last longer, that's normal so don't warn
853
        if (t2 - t1 > 1000 && query.length() < 1000) {
854
            System.err.println("Warning:" + (t2 - t1) + "ms pour :" + query);
855
        }
856
 
857
        count++;
858
        return rs;
859
    }
860
 
861
    private final class InterruptedQuery extends RTInterruptedException {
862
 
863
        private final ExecutorThread thread;
864
 
865
        InterruptedQuery(String message, Throwable cause, ExecutorThread thr) {
866
            super(message, cause);
867
            this.thread = thr;
868
        }
869
 
870
        public final ExecutorThread getThread() {
871
            return this.thread;
872
        }
873
    }
874
 
875
    private static int executorSerial = 0;
876
 
877
    private final class ExecutorThread extends Thread {
878
 
879
        private final Statement stmt;
880
        private final String query;
881
 
882
        private ResultSet rs;
883
        private Exception exn;
884
        private boolean canceled;
885
 
886
        public ExecutorThread(Statement stmt, String query) {
887
            super(executorSerial++ + " ExecutorThread on " + query);
888
            this.stmt = stmt;
889
            this.query = query;
890
            this.canceled = false;
891
        }
892
 
893
        public void run() {
894
            synchronized (this) {
895
                if (this.canceled)
896
                    return;
897
            }
898
 
899
            ResultSet rs = null;
900
            try {
901
                // do not use executeQuery since this.query might contain several statements
902
                this.stmt.execute(this.query);
903
                synchronized (this) {
904
                    if (this.canceled)
905
                        return;
906
                }
907
                rs = this.stmt.getResultSet();
908
            } catch (Exception e) {
909
                // can only be SQLException or RuntimeException
910
                // eg MySQLStatementCancelledException if stopQuery() was called
911
                this.exn = e;
912
            }
913
            this.rs = rs;
914
        }
915
 
916
        public void stopQuery() throws SQLException {
917
            this.stmt.cancel();
918
            synchronized (this) {
919
                this.canceled = true;
920
            }
921
        }
922
 
923
        public ResultSet getRs() throws SQLException, InterruptedException {
924
            this.join();
925
            // pas besoin de synchronized puisque seule notre thread ecrit les var
926
            // et qu'elle est maintenant terminée
927
            if (this.exn != null) {
928
                if (this.exn instanceof SQLException)
929
                    throw (SQLException) this.exn;
930
                else
931
                    throw (RuntimeException) this.exn;
932
            }
933
            return this.rs;
934
        }
935
    }
936
 
937
    /**
938
     * All connections obtained with {@link #getConnection()} will be closed immediately, but
939
     * threads in {@link #useConnection(ConnectionHandler)} will get to keep them. After the last
940
     * thread returns from {@link #useConnection(ConnectionHandler)} there won't be any connection
941
     * left open. This instance won't be permanently closed, it can be reused later.
942
     *
943
     * @throws SQLException if a database error occurs
944
     */
945
    public synchronized void close() throws SQLException {
946
        final GenericObjectPool pool = this.connectionPool;
947
        super.close();
948
        // super close and unset our pool, but we need to keep it
949
        // to allow used connections to be closed, see #closeConnection(Connection)
950
        this.connectionPool = pool;
951
        // cleanUp will be recreated if necessary
952
        final CleanUp toStop = this.cleanUp;
953
        this.cleanUp = null;
954
        // happens if the datasource was never used
955
        if (toStop != null)
956
            toStop.interrupt();
957
        // since we're stopping cleanUp we have to close connections
958
        // if you don't want your connections to close at any time: #useConnection()
959
        for (final Connection conn : this.connections.values()) {
960
            this.closeConnection(conn);
961
        }
962
        this.connections.clear();
963
        // interrupt to force waiting threads to close their connections
964
        if (this.exec != null) {
965
            this.exec.shutdownNow();
966
            this.exec = null;
967
        }
968
 
969
        // uptodate was cleared by closeConnection()
970
        // the handlers will clear themselves
971
        // the cache is expected to be cleared (when all connections are closed)
972
        if (this.getBorrowedConnectionCount() == 0)
973
            noConnectionIsOpen();
974
        // ATTN keep tables to be able to reopen
975
    }
976
 
61 ilm 977
    private synchronized void noConnectionIsOpen() {
17 ilm 978
        assert this.connectionPool.getNumIdle() + this.connectionPool.getNumActive() == 0;
979
        if (this.cache != null)
980
            this.cache.clear();
981
    }
982
 
983
    public final synchronized boolean isClosed() {
984
        return this.dataSource == null;
985
    }
986
 
987
    /**
988
     * Invalide la connection actuelle.
989
     */
990
    public void closeConnection() {
991
        this.releaseConnection(Thread.currentThread(), true);
992
    }
993
 
994
    /**
995
     * Retourne la connexion actuelle dans le pool des connexions libres. Attention à partir de ce
996
     * moment cette connexion peut être utilisée par d'autres.
997
     */
998
    public void returnConnection() {
999
        this.releaseConnection(Thread.currentThread(), false);
1000
    }
1001
 
1002
    // remove the connection for the passed thread, the calling method must
1003
    // then return or close it otherwise it will stay borrowed.
1004
    protected synchronized Connection rmConnection(Thread th) {
1005
        return this.connections.remove(th);
1006
    }
1007
 
1008
    public synchronized void releaseConnection(Thread th, final boolean close) {
1009
        if (this.handlers.containsKey(th)) {
1010
            if (close)
1011
                throw new IllegalArgumentException("cannot close the connection, it is in use by " + this.handlers.get(th));
1012
            // else nothing it will be released by useConnection().
1013
            Log.get().fine("ignoring " + close + " for " + th);
1014
        } else if (this.connections.containsKey(th)) {
1015
            if (close)
1016
                this.closeConnection(this.rmConnection(th));
1017
            else
1018
                this.returnConnection(this.rmConnection(th));
1019
        }
1020
    }
1021
 
1022
    protected synchronized boolean isInCharge(final CleanUp cleanUp) {
1023
        return this.cleanUp == cleanUp;
1024
    }
1025
 
1026
    final Set<Thread> getThreads(final Set<Thread> threads) {
1027
        threads.clear();
1028
        synchronized (this) {
1029
            threads.addAll(this.connections.keySet());
1030
        }
1031
        return threads;
1032
    }
1033
 
1034
    /**
1035
     * Retourne une connection à cette source de donnée. Si la connexion échoue cette méthode va
1036
     * réessayer quelques secondes plus tard.
1037
     * <p>
1038
     * Note : il y a une connexion par thread donc attention a ne pas créer d'instance de Thread a
1039
     * la pelle.
1040
     * <p>
1041
     *
1042
     * @return une connection à cette source de donnée.
1043
     */
1044
    public Connection getConnection() {
1045
        return this.getConnection(Thread.currentThread());
1046
    }
1047
 
1048
    private synchronized Connection getConnection(Thread th) {
1049
        if (this.handlers.containsKey(th)) {
1050
            return this.handlers.get(th).getConnection();
1051
        }
1052
        Connection res = this.connections.get(th);
1053
        if (res == null) {
1054
            res = this.getNewConnection();
1055
            this.connections.put(th, res);
1056
        }
1057
        return res;
1058
    }
1059
 
1060
    private final Connection getNewConnection() {
1061
        try {
1062
            return this.borrowConnection(false);
1063
        } catch (RTInterruptedException e) {
1064
            throw e;
1065
        } catch (Exception e) {
1066
            return this.borrowConnection(true);
1067
        }
1068
    }
1069
 
1070
    /**
1071
     * Borrow a new connection from the pool, optionally purging invalid connections with the
1072
     * validation query.
1073
     *
1074
     * @param test if <code>true</code> then testOnBorrow will be set.
1075
     * @return the new connection.
1076
     */
1077
    private final Connection borrowConnection(final boolean test) {
1078
        if (test) {
1079
            this.testLock.lock();
1080
            // invalidate all bad connections
1081
            setTestOnBorrow(true);
1082
        }
1083
        try {
1084
            final Connection res = this.getRawConnection();
1085
            try {
1086
                initConnection(res);
1087
                return res;
1088
            } catch (RuntimeException e) {
1089
                this.closeConnection(res);
1090
                throw e;
1091
            }
1092
        } finally {
1093
            if (test) {
1094
                setTestOnBorrow(false);
1095
                this.testLock.unlock();
1096
            }
1097
        }
1098
    }
1099
 
1100
    // initialize the passed connection if needed
1101
    protected final void initConnection(final Connection res) {
1102
        boolean setSchema = false;
1103
        String schemaToSet = null;
1104
        synchronized (this) {
1105
            if (!this.uptodate.containsKey(res)) {
1106
                if (this.initialShemaSet) {
1107
                    setSchema = true;
1108
                    schemaToSet = this.initialShema;
1109
                }
1110
                // safe to put before setSchema() since res cannot be passed to
1111
                // release/closeConnection()
1112
                this.uptodate.put(res, null);
1113
            }
1114
        }
1115
        // warmup the connection (executing a bogus simple query, like "SELECT 1") could help but in
1116
        // general doesn't since we often do getDS().execute() and thus our warm up thread will run
1117
        // after the execute(), making it useless.
1118
        if (setSchema)
1119
            this.setSchema(schemaToSet, res);
1120
    }
1121
 
1122
    private static final String pgInterrupted = GT.tr("Interrupted while attempting to connect.");
1123
 
1124
    private Connection getRawConnection() {
1125
        Connection result = null;
1126
        try {
1127
            result = super.getConnection();
1128
        } catch (SQLException e1) {
1129
            // try to know if interrupt, TODO cleanup : patch pg Driver.java to fill the cause
1130
            if (e1.getCause() instanceof InterruptedException || (e1 instanceof PSQLException && e1.getMessage().equals(pgInterrupted))) {
1131
                throw new RTInterruptedException(e1);
1132
            }
61 ilm 1133
            final int retryWait = this.retryWait;
1134
            if (retryWait == 0)
17 ilm 1135
                throw new IllegalStateException("Impossible d'obtenir une connexion sur " + this, e1);
1136
            try {
1137
                // on attend un petit peu
61 ilm 1138
                Thread.sleep(retryWait * 1000);
17 ilm 1139
                // avant de réessayer
1140
                result = super.getConnection();
1141
            } catch (InterruptedException e) {
1142
                throw new RTInterruptedException("interrupted while waiting for a second try", e);
1143
            } catch (Exception e) {
1144
                throw new IllegalStateException("Impossible d'obtenir une connexion sur " + this + ": " + e.getLocalizedMessage(), e1);
1145
            }
1146
        }
1147
        if (State.DEBUG)
1148
            State.INSTANCE.connectionCreated();
1149
        return result;
1150
    }
1151
 
1152
    public final int getBorrowedConnectionCount() {
1153
        return this.connectionPool.getNumActive();
1154
    }
1155
 
61 ilm 1156
    public synchronized final Set<Thread> getThreadsWithConnection() {
17 ilm 1157
        return this.connections.keySet();
1158
    }
1159
 
1160
    @Override
1161
    protected synchronized DataSource createDataSource() throws SQLException {
1162
        if (isClosed()) {
1163
            // initialize lotta things
1164
            super.createDataSource();
1165
            this.connectionPool.setLifo(true);
1166
            // don't block (threads block while owning SQLDataSource lock, thus preventing others
1167
            // from releasing connections)
1168
            this.connectionPool.setWhenExhaustedAction(GenericObjectPool.WHEN_EXHAUSTED_GROW);
1169
            // after 40s idle connections are closed
1170
            this.connectionPool.setTimeBetweenEvictionRunsMillis(4000);
1171
            this.connectionPool.setNumTestsPerEvictionRun(5);
1172
            // soft meaning only kill connections above minIdle
1173
            this.connectionPool.setSoftMinEvictableIdleTimeMillis(40000);
1174
            // PoolingDataSource returns a PoolGuardConnectionWrapper that complicates a lot of
1175
            // things for nothing, so overload to simply return an object of the pool
1176
            this.dataSource = new PoolingDataSource(this.connectionPool) {
1177
 
1178
                @Override
1179
                public Connection getConnection() throws SQLException {
1180
                    try {
1181
                        return (Connection) this._pool.borrowObject();
1182
                    } catch (SQLException e) {
1183
                        throw e;
1184
                    } catch (NoSuchElementException e) {
1185
                        throw new SQLNestedException("Cannot get a connection, pool exhausted", e);
1186
                    } catch (RuntimeException e) {
1187
                        throw e;
1188
                    } catch (Exception e) {
1189
                        throw new SQLNestedException("Cannot get a connection, general error", e);
1190
                    }
1191
                }
1192
 
1193
                @Override
1194
                public Connection getConnection(String username, String password) throws SQLException {
1195
                    throw new UnsupportedOperationException();
1196
                }
1197
            };
1198
            this.cleanUp = new CleanUp(15000);
1199
        }
1200
        return this.dataSource;
1201
    }
1202
 
1203
    /**
1204
     * To return a connection to the pool.
1205
     *
1206
     * @param con a connection obtained with getRawConnection(), can be <code>null</code>.
1207
     */
1208
    protected void returnConnection(final Connection con) {
1209
        if (con != null) {
1210
            // if !this.initialShemaSet the out of date cannot be brought up to date
1211
            final boolean unrecoverableOutOfDate;
1212
            synchronized (this) {
1213
                unrecoverableOutOfDate = !this.initialShemaSet && !this.uptodate.containsKey(con);
1214
            }
1215
            if (isClosed() || unrecoverableOutOfDate)
1216
                // if closed : don't fill the pool (which will have thrown an exception anyway)
1217
                // if we shouldn't set the schema, we must close all previous connections
1218
                // so that we get new ones from the db with the current setting
1219
                this.closeConnection(con);
1220
            else {
1221
                try {
1222
                    // our connectionPool use PoolableConnectionFactory which creates
1223
                    // PoolableConnection whose close() actually does a returnObject()
1224
                    con.close();
1225
                } catch (Exception e) {
1226
                    /* tant pis */
1227
                    if (Log.get().getLevel().intValue() <= Level.FINE.intValue()) {
1228
                        System.err.println("Could not return " + con);
1229
                        e.printStackTrace();
1230
                    }
1231
                }
1232
                if (State.DEBUG)
1233
                    State.INSTANCE.connectionRemoved();
1234
            }
1235
        }
1236
    }
1237
 
1238
    /**
1239
     * To actually close a connection to the db (and remove it from the pool).
1240
     *
1241
     * @param con a connection obtained with getRawConnection(), can be <code>null</code>.
1242
     */
1243
    protected void closeConnection(final Connection con) {
1244
        // Neither BasicDataSource nor PoolingDataSource provide a closeConnection()
1245
        // so we implement one here
1246
        if (con != null) {
1247
            synchronized (this) {
1248
                this.uptodate.remove(con);
1249
            }
1250
            try {
1251
                // ATTN this always does _numActive--, so we can't call it multiple times
1252
                // with the same object
1253
                this.connectionPool.invalidateObject(con);
1254
            } catch (Exception e) {
1255
                /* tant pis */
1256
                if (Log.get().getLevel().intValue() <= Level.FINE.intValue()) {
1257
                    System.err.println("Could not close " + con);
1258
                    e.printStackTrace();
1259
                }
1260
            }
1261
            // the last connection is being returned
1262
            if (this.isClosed() && this.getBorrowedConnectionCount() == 0) {
1263
                noConnectionIsOpen();
1264
            }
1265
        }
1266
    }
1267
 
1268
    /**
1269
     * From now on, every new connection will have its default schema set to schemaName.
1270
     *
1271
     * @param schemaName the name of the initial default schema, <code>null</code> to remove any
1272
     *        default schema.
1273
     */
1274
    public void setInitialSchema(String schemaName) {
1275
        if (schemaName != null || this.server.getSQLSystem().isClearingPathSupported()) {
1276
            // test if schemaName is valid
1277
            final Connection newConn = this.getNewConnection();
1278
            try {
1279
                this.setSchema(schemaName, newConn);
1280
            } finally {
1281
                this.closeConnection(newConn);
1282
            }
1283
            this.setInitialSchema(true, schemaName);
1284
        } else if (this.server.getSQLSystem().isDBPathEmpty()) {
1285
            this.unsetInitialSchema();
1286
        } else
1287
            throw new IllegalArgumentException(this + " cannot have no default schema");
1288
    }
1289
 
1290
    public void unsetInitialSchema() {
1291
        this.setInitialSchema(false, null);
1292
    }
1293
 
1294
    private final void setInitialSchema(final boolean set, final String schemaName) {
1295
        synchronized (this) {
1296
            this.initialShemaSet = set;
1297
            this.initialShema = set ? schemaName : null;
1298
            this.uptodate.clear();
1299
            if (!set)
1300
                // by definition we don't want to modify the connection,
1301
                // so empty the pool, that way new connections will be created
1302
                // the borrowed ones will be closed when returned
1303
                this.connectionPool.clear();
1304
        }
1305
    }
1306
 
1307
    public synchronized final String getInitialSchema() {
1308
        return this.initialShema;
1309
    }
1310
 
1311
    /**
1312
     * Set the default schema of the current thread's connection. NOTE: pointless if not in
1313
     * {@link #useConnection(ConnectionHandler)} since otherwise a connection will be borrowed then
1314
     * closed.
1315
     *
1316
     * @param schemaName the name of the new default schema.
1317
     */
1318
    public void setSchema(String schemaName) {
1319
        this.setSchema(schemaName, null);
1320
    }
1321
 
1322
    private void setSchema(String schemaName, Connection c) {
1323
        final String q;
1324
        if (this.getSystem() == SQLSystem.MYSQL) {
1325
            if (schemaName == null) {
1326
                if (this.getSchema(c) != null)
1327
                    throw new IllegalArgumentException("cannot unset DATABASE in MySQL");
1328
                else
1329
                    // nothing to do
1330
                    q = null;
1331
            } else
1332
                q = "USE " + schemaName;
1333
        } else if (this.getSystem() == SQLSystem.DERBY)
1334
            q = "SET SCHEMA \"" + schemaName + '"';
1335
        else if (this.getSystem() == SQLSystem.H2) {
1336
            q = "SET SCHEMA " + SQLBase.quoteIdentifier(schemaName);
1337
            // TODO use the line below, but for now it is only used after schema()
1338
            // plus there's no function to read it back
1339
            // q = "set SCHEMA_SEARCH_PATH " + SQLBase.quoteIdentifier(schemaName == null ? "" :
1340
            // schemaName);
1341
        } else if (this.getSystem() == SQLSystem.POSTGRESQL) {
1342
            final String schemasString;
1343
            if (schemaName == null) {
1344
                schemasString = "''";
1345
            } else {
1346
                // ATTN does NOT work if a schema has " in its name
1347
                // current_schemas() instead of current_setting() since the former removes dups and
1348
                // returns an array
1349
                schemasString = SQLSelect.quote(" '\"' || array_to_string( cast (%s as name) || current_schemas(false) , '\", \"')||  '\"'", schemaName);
1350
            }
1351
            q = "select set_config('search_path', " + schemasString + " , false)";
1352
        } else if (this.getSystem() == SQLSystem.MSSQL) {
1353
            if (schemaName == null)
1354
                throw new IllegalArgumentException("cannot unset default schema in " + this.getSystem());
1355
            else
1356
                q = "alter user " + getUsername() + " with default_schema = " + SQLBase.quoteIdentifier(schemaName);
1357
        } else
1358
            throw new UnsupportedOperationException();
1359
 
1360
        if (q != null)
1361
            this.execute(q, null, true, c);
1362
    }
1363
 
1364
    public final String getSchema() {
1365
        return this.getSchema(null);
1366
    }
1367
 
1368
    private String getSchema(Connection c) {
1369
        final String q;
1370
        if (this.getSystem() == SQLSystem.MYSQL)
1371
            q = "select DATABASE(); ";
1372
        else if (this.getSystem() == SQLSystem.DERBY)
1373
            q = "select CURRENT SCHEMA;";
1374
        else if (this.getSystem() == SQLSystem.POSTGRESQL) {
1375
            q = "select (current_schemas(false))[1];";
1376
        } else if (this.getSystem() == SQLSystem.H2) {
1377
            q = "select SCHEMA();";
1378
        } else if (this.getSystem() == SQLSystem.MSSQL) {
1379
            q = "select SCHEMA_NAME();";
1380
        } else
1381
            throw new UnsupportedOperationException();
1382
 
1383
        return (String) this.execute(q, SCALAR_HANDLER, c);
1384
    }
1385
 
1386
    public String toString() {
1387
        return this.getUrl();
1388
    }
1389
 
1390
    private final SQLSystem getSystem() {
1391
        return this.server.getSQLSystem();
1392
    }
1393
 
1394
    public Object clone() {
1395
        SQLDataSource ds = new SQLDataSource(this.server);
1396
        ds.setUrl(this.getUrl());
1397
        ds.setUsername(this.getUsername());
1398
        ds.setPassword(this.getPassword());
1399
        ds.setDriverClassName(this.getDriverClassName());
1400
        return ds;
1401
    }
1402
 
1403
    /**
1404
     * This thread periodically check for dead borrowers and return their connection.
1405
     */
1406
    final private class CleanUp extends Thread {
1407
        private final int period;
1408
 
1409
        public CleanUp(final int period) {
1410
            super("Clean up for " + SQLDataSource.this);
1411
            this.period = period;
1412
            this.setDaemon(true);
1413
            this.start();
1414
        }
1415
 
1416
        public void run() {
1417
            final Set<Thread> threads = new HashSet<Thread>();
1418
            while (SQLDataSource.this.isInCharge(this)) {
1419
                for (final Thread th : getThreads(threads)) {
1420
                    // a thread cannot re-live, so it's safe to return its connection
1421
                    // likewise it's ok if it has already returned its connection
1422
                    // (on its own, or even by another CleanUp instance)
1423
                    if (!th.isAlive()) {
1424
                        SQLDataSource.this.releaseConnection(th, false);
1425
                    }
1426
                }
1427
                try {
1428
                    Thread.sleep(this.period);
1429
                } catch (InterruptedException e) {
1430
                    // ignore, ne s'arrêter que lorsque le while le dit
1431
                    Log.get().fine("Interruption ignored for " + this);
1432
                }
1433
            }
1434
            Log.get().fine("done " + this);
1435
        }
1436
    }
1437
 
1438
}