ElasticSingleConnectionDataSource.java 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. package org.es.jdbc.api;
  2. import org.elasticsearch.client.Client;
  3. import org.es.jdbc.es.ElasticClientProvider;
  4. import org.es.sql.utils.PersistLogger;
  5. import java.lang.reflect.InvocationHandler;
  6. import java.lang.reflect.InvocationTargetException;
  7. import java.lang.reflect.Method;
  8. import java.lang.reflect.Proxy;
  9. import java.sql.Connection;
  10. import java.sql.SQLException;
  11. public class ElasticSingleConnectionDataSource extends DriverManagerDataSource implements SmartDataSource {
  12. private final Object connectionMonitor = new Object();
  13. private boolean suppressClose;
  14. private Connection target;
  15. private Connection connection;
  16. private Client client;
  17. private ElasticClientProvider elasticClientProvider;
  18. public ElasticSingleConnectionDataSource() {
  19. }
  20. public ElasticSingleConnectionDataSource(String url, boolean suppressClose) {
  21. super(url);
  22. this.suppressClose = suppressClose;
  23. }
  24. public void setElasticClientProvider(ElasticClientProvider elasticClientProvider) {
  25. this.elasticClientProvider = elasticClientProvider;
  26. }
  27. protected boolean isSuppressClose() {
  28. return this.suppressClose;
  29. }
  30. public void setSuppressClose(boolean suppressClose) {
  31. this.suppressClose = suppressClose;
  32. }
  33. @Override
  34. public Connection getConnection() throws SQLException {
  35. synchronized (this.connectionMonitor) {
  36. if (this.connection == null) {
  37. initConnection();
  38. }
  39. if (this.connection.isClosed()) {
  40. throw new SQLException(
  41. "Connection was closed in ElasticSingleConnectionDataSource. Check that user code checks " +
  42. "shouldClose() before closing Connections, or set 'suppressClose' to 'true'");
  43. }
  44. return this.connection;
  45. }
  46. }
  47. @Override
  48. public Connection getConnection(String username, String password) throws SQLException {
  49. return getConnection();
  50. }
  51. public boolean shouldClose(Connection con) {
  52. synchronized (this.connectionMonitor) {
  53. return (con != this.connection && con != this.target);
  54. }
  55. }
  56. public void destroy() {
  57. synchronized (this.connectionMonitor) {
  58. closeConnection();
  59. }
  60. }
  61. public void initConnection() throws SQLException {
  62. if (getUrl() == null) {
  63. throw new IllegalStateException("'url' property is required for lazily initializing a Connection");
  64. }
  65. synchronized (this.connectionMonitor) {
  66. closeConnection();
  67. try {
  68. if (elasticClientProvider != null) {
  69. client = elasticClientProvider.createElasticClientFromUrl(getUrl());
  70. if (client == null) {
  71. throw new SQLException(String.format("Failed to build elastic client for url[%s]", getUrl()));
  72. }
  73. target = new ElasticConnection(getUrl(), null, client);
  74. }
  75. else {
  76. this.target = getConnectionFromDriver();
  77. }
  78. }
  79. catch (Exception exp) {
  80. throw new SQLException(String.format("Failed to create connection for url[%s]", getUrl()), exp);
  81. }
  82. prepareConnection(target);
  83. this.connection = (isSuppressClose() ? getCloseSuppressingConnectionProxy(target) : target);
  84. }
  85. }
  86. public void resetConnection() {
  87. synchronized (this.connectionMonitor) {
  88. closeConnection();
  89. this.target = null;
  90. this.connection = null;
  91. }
  92. }
  93. protected void prepareConnection(Connection con) throws SQLException {
  94. Boolean autoCommit = getAutoCommitValue();
  95. if (autoCommit != null && con.getAutoCommit() != autoCommit) {
  96. con.setAutoCommit(autoCommit);
  97. }
  98. }
  99. private void closeConnection() {
  100. if (this.target != null) {
  101. try {
  102. this.target.close();
  103. }
  104. catch (Throwable ex) {
  105. PersistLogger.warn(this, "Could not close shared JDBC Connection", ex);
  106. }
  107. }
  108. if (client != null) {
  109. try {
  110. client.close();
  111. }
  112. catch (Exception ex) {
  113. PersistLogger.error(this, "Could not close elasticsearch client", ex);
  114. }
  115. }
  116. }
  117. protected Connection getCloseSuppressingConnectionProxy(Connection target) {
  118. return (Connection) Proxy.newProxyInstance(
  119. ConnectionProxy.class.getClassLoader(),
  120. new Class[]{ConnectionProxy.class},
  121. new CloseSuppressingInvocationHandler(target));
  122. }
  123. protected Boolean getAutoCommitValue() {
  124. return Boolean.FALSE;
  125. }
  126. public void setAutoCommit(boolean autoCommit) {
  127. // ignore
  128. }
  129. private static class CloseSuppressingInvocationHandler implements InvocationHandler {
  130. private final Connection target;
  131. public CloseSuppressingInvocationHandler(Connection target) {
  132. this.target = target;
  133. }
  134. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  135. // Invocation on ConnectionProxy interface coming in...
  136. if (method.getName().equals("equals")) {
  137. // Only consider equal when proxies are identical.
  138. return (proxy == args[0]);
  139. }
  140. else if (method.getName().equals("hashCode")) {
  141. // Use hashCode of Connection proxy.
  142. return System.identityHashCode(proxy);
  143. }
  144. else if (method.getName().equals("unwrap")) {
  145. if (((Class) args[0]).isInstance(proxy)) {
  146. return proxy;
  147. }
  148. }
  149. else if (method.getName().equals("isWrapperFor")) {
  150. if (((Class) args[0]).isInstance(proxy)) {
  151. return true;
  152. }
  153. }
  154. else if (method.getName().equals("close")) {
  155. // Handle close method: don't pass the call on.
  156. return null;
  157. }
  158. else if (method.getName().equals("isClosed")) {
  159. return false;
  160. }
  161. else if (method.getName().equals("getTargetConnection")) {
  162. // Handle getTargetConnection method: return underlying Connection.
  163. return this.target;
  164. }
  165. // Invoke method on target Connection.
  166. try {
  167. return method.invoke(this.target, args);
  168. }
  169. catch (InvocationTargetException ex) {
  170. throw ex.getTargetException();
  171. }
  172. }
  173. }
  174. }