db_connection.ex (60135B)
1 defmodule DBConnection.Stream do 2 defstruct [:conn, :query, :params, :opts] 3 4 @type t :: %__MODULE__{conn: DBConnection.conn(), query: any, params: any, opts: Keyword.t()} 5 end 6 7 defimpl Enumerable, for: DBConnection.Stream do 8 def count(_), do: {:error, __MODULE__} 9 10 def member?(_, _), do: {:error, __MODULE__} 11 12 def slice(_), do: {:error, __MODULE__} 13 14 def reduce(stream, acc, fun), do: DBConnection.reduce(stream, acc, fun) 15 end 16 17 defmodule DBConnection.PrepareStream do 18 defstruct [:conn, :query, :params, :opts] 19 20 @type t :: %__MODULE__{conn: DBConnection.conn(), query: any, params: any, opts: Keyword.t()} 21 end 22 23 defimpl Enumerable, for: DBConnection.PrepareStream do 24 def count(_), do: {:error, __MODULE__} 25 26 def member?(_, _), do: {:error, __MODULE__} 27 28 def slice(_), do: {:error, __MODULE__} 29 30 def reduce(stream, acc, fun), do: DBConnection.reduce(stream, acc, fun) 31 end 32 33 defmodule DBConnection do 34 @moduledoc """ 35 A behaviour module for implementing efficient database connection 36 client processes, pools and transactions. 37 38 `DBConnection` handles callbacks differently to most behaviours. Some 39 callbacks will be called in the calling process, with the state 40 copied to and from the calling process. This is useful when the data 41 for a request is large and means that a calling process can interact 42 with a socket directly. 43 44 A side effect of this is that query handling can be written in a 45 simple blocking fashion, while the connection process itself will 46 remain responsive to OTP messages and can enqueue and cancel queued 47 requests. 48 49 If a request or series of requests takes too long to handle in the 50 client process a timeout will trigger and the socket can be cleanly 51 disconnected by the connection process. 52 53 If a calling process waits too long to start its request it will 54 timeout and its request will be cancelled. This prevents requests 55 building up when the database can not keep up. 56 57 If no requests are received for an idle interval, the pool will 58 ping all stale connections which can then ping the database to keep 59 the connection alive. 60 61 Should the connection be lost, attempts will be made to reconnect with 62 (configurable) exponential random backoff to reconnect. All state is 63 lost when a connection disconnects but the process is reused. 64 65 The `DBConnection.Query` protocol provide utility functions so that 66 queries can be encoded and decoded without blocking the connection or pool. 67 """ 68 require Logger 69 70 alias DBConnection.Holder 71 72 require Holder 73 74 defstruct [:pool_ref, :conn_ref, :conn_mode] 75 76 defmodule EncodeError do 77 defexception [:message] 78 end 79 80 defmodule TransactionError do 81 defexception [:status, :message] 82 83 def exception(:idle), 84 do: %__MODULE__{status: :idle, message: "transaction is not started"} 85 86 def exception(:transaction), 87 do: %__MODULE__{status: :transaction, message: "transaction is already started"} 88 89 def exception(:error), 90 do: %__MODULE__{status: :error, message: "transaction is aborted"} 91 end 92 93 @typedoc """ 94 Run or transaction connection reference. 95 """ 96 @type t :: %__MODULE__{pool_ref: any, conn_ref: reference} 97 @type conn :: GenServer.server() | t 98 @type query :: DBConnection.Query.t() 99 @type params :: any 100 @type result :: any 101 @type cursor :: any 102 @type status :: :idle | :transaction | :error 103 104 @type start_option :: 105 {:after_connect, (t -> any) | {module, atom, [any]} | nil} 106 | {:after_connect_timeout, timeout} 107 | {:connection_listeners, list(Process.dest()) | nil} 108 | {:backoff_max, non_neg_integer} 109 | {:backoff_min, non_neg_integer} 110 | {:backoff_type, :stop | :exp | :rand | :rand_exp} 111 | {:configure, (keyword -> keyword) | {module, atom, [any]} | nil} 112 | {:idle_interval, non_neg_integer} 113 | {:max_restarts, non_neg_integer} 114 | {:max_seconds, pos_integer} 115 | {:name, GenServer.name()} 116 | {:pool, module} 117 | {:pool_size, pos_integer} 118 | {:queue_interval, non_neg_integer} 119 | {:queue_target, non_neg_integer} 120 | {:show_sensitive_data_on_connection_error, boolean} 121 122 @type option :: 123 {:log, (DBConnection.LogEntry.t() -> any) | {module, atom, [any]} | nil} 124 | {:queue, boolean} 125 | {:timeout, timeout} 126 | {:deadline, integer | nil} 127 128 @doc """ 129 Connect to the database. Return `{:ok, state}` on success or 130 `{:error, exception}` on failure. 131 132 If an error is returned it will be logged and another 133 connection attempt will be made after a backoff interval. 134 135 This callback is called in the connection process. 136 """ 137 @callback connect(opts :: Keyword.t()) :: 138 {:ok, state :: any} | {:error, Exception.t()} 139 140 @doc """ 141 Checkouts the state from the connection process. Return `{:ok, state}` 142 to allow the checkout or `{:disconnect, exception, state}` to disconnect. 143 144 This callback is called immediately after the connection is established 145 and the state is never effetively checked in again. That's because 146 DBConnection keeps the connection state in an ETS table that is moved 147 between the different clients checking out connections. There is no 148 `checkin` callback. The state is only handed back to the connection 149 process during pings and (re)connects. 150 151 This callback is called in the connection process. 152 """ 153 @callback checkout(state :: any) :: 154 {:ok, new_state :: any} | {:disconnect, Exception.t(), new_state :: any} 155 156 @doc """ 157 Called when the connection has been idle for a period of time. Return 158 `{:ok, state}` to continue or `{:disconnect, exception, state}` to 159 disconnect. 160 161 This callback is called if no callbacks have been called after the 162 idle timeout and a client process is not using the state. The idle 163 timeout can be configured by the `:idle_interval` option. This function 164 can be called whether the connection is checked in or checked out. 165 166 This callback is called in the connection process. 167 """ 168 @callback ping(state :: any) :: 169 {:ok, new_state :: any} | {:disconnect, Exception.t(), new_state :: any} 170 171 @doc """ 172 Handle the beginning of a transaction. 173 174 Return `{:ok, result, state}` to continue, `{status, state}` to notify caller 175 that the transaction can not begin due to the transaction status `status`, 176 `{:error, exception, state}` (deprecated) to error without beginning the 177 transaction, or `{:disconnect, exception, state}` to error and disconnect. 178 179 A callback implementation should only return `status` if it 180 can determine the database's transaction status without side effect. 181 182 This callback is called in the client process. 183 """ 184 @callback handle_begin(opts :: Keyword.t(), state :: any) :: 185 {:ok, result, new_state :: any} 186 | {status, new_state :: any} 187 | {:disconnect, Exception.t(), new_state :: any} 188 189 @doc """ 190 Handle committing a transaction. Return `{:ok, result, state}` on successfully 191 committing transaction, `{status, state}` to notify caller that the 192 transaction can not commit due to the transaction status `status`, 193 `{:error, exception, state}` (deprecated) to error and no longer be inside 194 transaction, or `{:disconnect, exception, state}` to error and disconnect. 195 196 A callback implementation should only return `status` if it 197 can determine the database's transaction status without side effect. 198 199 This callback is called in the client process. 200 """ 201 @callback handle_commit(opts :: Keyword.t(), state :: any) :: 202 {:ok, result, new_state :: any} 203 | {status, new_state :: any} 204 | {:disconnect, Exception.t(), new_state :: any} 205 206 @doc """ 207 Handle rolling back a transaction. Return `{:ok, result, state}` on successfully 208 rolling back transaction, `{status, state}` to notify caller that the 209 transaction can not rollback due to the transaction status `status`, 210 `{:error, exception, state}` (deprecated) to 211 error and no longer be inside transaction, or 212 `{:disconnect, exception, state}` to error and disconnect. 213 214 A callback implementation should only return `status` if it 215 can determine the database' transaction status without side effect. 216 217 This callback is called in the client and connection process. 218 """ 219 @callback handle_rollback(opts :: Keyword.t(), state :: any) :: 220 {:ok, result, new_state :: any} 221 | {status, new_state :: any} 222 | {:disconnect, Exception.t(), new_state :: any} 223 224 @doc """ 225 Handle getting the transaction status. Return `{:idle, state}` if outside a 226 transaction, `{:transaction, state}` if inside a transaction, 227 `{:error, state}` if inside an aborted transaction, or 228 `{:disconnect, exception, state}` to error and disconnect. 229 230 If the callback returns a `:disconnect` tuples then `status/2` will return 231 `:error`. 232 """ 233 @callback handle_status(opts :: Keyword.t(), state :: any) :: 234 {status, new_state :: any} 235 | {:disconnect, Exception.t(), new_state :: any} 236 237 @doc """ 238 Prepare a query with the database. Return `{:ok, query, state}` where 239 `query` is a query to pass to `execute/4` or `close/3`, 240 `{:error, exception, state}` to return an error and continue or 241 `{:disconnect, exception, state}` to return an error and disconnect. 242 243 This callback is intended for cases where the state of a connection is 244 needed to prepare a query and/or the query can be saved in the 245 database to call later. 246 247 This callback is called in the client process. 248 """ 249 @callback handle_prepare(query, opts :: Keyword.t(), state :: any) :: 250 {:ok, query, new_state :: any} 251 | {:error | :disconnect, Exception.t(), new_state :: any} 252 253 @doc """ 254 Execute a query prepared by `c:handle_prepare/3`. Return 255 `{:ok, query, result, state}` to return altered query `query` and result 256 `result` and continue, `{:error, exception, state}` to return an error and 257 continue or `{:disconnect, exception, state}` to return an error and 258 disconnect. 259 260 This callback is called in the client process. 261 """ 262 @callback handle_execute(query, params, opts :: Keyword.t(), state :: any) :: 263 {:ok, query, result, new_state :: any} 264 | {:error | :disconnect, Exception.t(), new_state :: any} 265 266 @doc """ 267 Close a query prepared by `c:handle_prepare/3` with the database. Return 268 `{:ok, result, state}` on success and to continue, 269 `{:error, exception, state}` to return an error and continue, or 270 `{:disconnect, exception, state}` to return an error and disconnect. 271 272 This callback is called in the client process. 273 """ 274 @callback handle_close(query, opts :: Keyword.t(), state :: any) :: 275 {:ok, result, new_state :: any} 276 | {:error | :disconnect, Exception.t(), new_state :: any} 277 278 @doc """ 279 Declare a cursor using a query prepared by `c:handle_prepare/3`. Return 280 `{:ok, query, cursor, state}` to return altered query `query` and cursor 281 `cursor` for a stream and continue, `{:error, exception, state}` to return an 282 error and continue or `{:disconnect, exception, state}` to return an error 283 and disconnect. 284 285 This callback is called in the client process. 286 """ 287 @callback handle_declare(query, params, opts :: Keyword.t(), state :: any) :: 288 {:ok, query, cursor, new_state :: any} 289 | {:error | :disconnect, Exception.t(), new_state :: any} 290 291 @doc """ 292 Fetch the next result from a cursor declared by `c:handle_declare/4`. Return 293 `{:cont, result, state}` to return the result `result` and continue using 294 cursor, `{:halt, result, state}` to return the result `result` and close the 295 cursor, `{:error, exception, state}` to return an error and close the 296 cursor, `{:disconnect, exception, state}` to return an error and disconnect. 297 298 This callback is called in the client process. 299 """ 300 @callback handle_fetch(query, cursor, opts :: Keyword.t(), state :: any) :: 301 {:cont | :halt, result, new_state :: any} 302 | {:error | :disconnect, Exception.t(), new_state :: any} 303 304 @doc """ 305 Deallocate a cursor declared by `c:handle_declare/4` with the database. Return 306 `{:ok, result, state}` on success and to continue, 307 `{:error, exception, state}` to return an error and continue, or 308 `{:disconnect, exception, state}` to return an error and disconnect. 309 310 This callback is called in the client process. 311 """ 312 @callback handle_deallocate(query, cursor, opts :: Keyword.t(), state :: any) :: 313 {:ok, result, new_state :: any} 314 | {:error | :disconnect, Exception.t(), new_state :: any} 315 316 @doc """ 317 Disconnect from the database. Return `:ok`. 318 319 The exception as first argument is the exception from a `:disconnect` 320 3-tuple returned by a previous callback. 321 322 If the state is controlled by a client and it exits or takes too long 323 to process a request the state will be last known state. In these 324 cases the exception will be a `DBConnection.ConnectionError`. 325 326 This callback is called in the connection process. 327 """ 328 @callback disconnect(err :: Exception.t(), state :: any) :: :ok 329 330 @connection_module_key :connection_module 331 332 @doc """ 333 Use `DBConnection` to set the behaviour. 334 """ 335 defmacro __using__(_) do 336 quote location: :keep do 337 @behaviour DBConnection 338 end 339 end 340 341 @doc """ 342 Starts and links to a database connection process. 343 344 By default the `DBConnection` starts a pool with a single connection. 345 The size of the pool can be increased with `:pool_size`. A separate 346 pool can be given with the `:pool` option. 347 348 ### Options 349 350 * `:backoff_min` - The minimum backoff interval (default: `1_000`) 351 * `:backoff_max` - The maximum backoff interval (default: `30_000`) 352 * `:backoff_type` - The backoff strategy, `:stop` for no backoff and 353 to stop, `:exp` for exponential, `:rand` for random and `:rand_exp` for 354 random exponential (default: `:rand_exp`) 355 * `:configure` - A function to run before every connect attempt to 356 dynamically configure the options, either a 1-arity fun, 357 `{module, function, args}` with options prepended to `args` or `nil` where 358 only returned options are passed to connect callback (default: `nil`) 359 * `:after_connect` - A function to run on connect using `run/3`, either 360 a 1-arity fun, `{module, function, args}` with `t:DBConnection.t/0` prepended 361 to `args` or `nil` (default: `nil`) 362 * `:after_connect_timeout` - The maximum time allowed to perform 363 function specified by `:after_connect` option (default: `15_000`) 364 * `:connection_listeners` - A list of process destinations to send 365 notification messages whenever a connection is connected or disconnected. 366 See "Connection listeners" below 367 * `:name` - A name to register the started process (see the `:name` option 368 in `GenServer.start_link/3`) 369 * `:pool` - Chooses the pool to be started (default: `DBConnection.ConnectionPool`) 370 * `:pool_size` - Chooses the size of the pool 371 * `:idle_interval` - Controls the frequency we check for idle connections 372 in the pool. We then notify each idle connection to ping the database. 373 In practice, the ping happens within `idle_interval <= ping < 2 * idle_interval`. 374 Defaults to 1000ms. 375 * `:queue_target` and `:queue_interval` - See "Queue config" below 376 * `:max_restarts` and `:max_seconds` - Configures the `:max_restarts` and 377 `:max_seconds` for the connection pool supervisor (see the `Supervisor` docs). 378 Typically speaking the connection process doesn't terminate, except due to 379 faults in DBConnection. However, if backoff has been disabled, then they 380 also terminate whenever a connection is disconnected (for instance, due to 381 client or server errors) 382 * `:show_sensitive_data_on_connection_error` - By default, `DBConnection` 383 hides all information during connection errors to avoid leaking credentials 384 or other sensitive information. You can set this option if you wish to 385 see complete errors and stacktraces during connection errors 386 387 ### Example 388 389 {:ok, conn} = DBConnection.start_link(mod, [idle_interval: 5_000]) 390 391 ## Queue config 392 393 Handling requests is done through a queue. When DBConnection is 394 started, there are two relevant options to control the queue: 395 396 * `:queue_target` in milliseconds, defaults to 50ms 397 * `:queue_interval` in milliseconds, defaults to 1000ms 398 399 Our goal is to wait at most `:queue_target` for a connection. 400 If all connections checked out during a `:queue_interval` takes 401 more than `:queue_target`, then we double the `:queue_target`. 402 If checking out connections take longer than the new target, 403 then we start dropping messages. 404 405 For example, by default our target is 50ms. If all connections 406 checkouts take longer than 50ms for a whole second, we double 407 the target to 100ms and we start dropping messages if the 408 time to checkout goes above the new limit. 409 410 This allows us to better plan for overloads as we can refuse 411 requests before they are sent to the database, which would 412 otherwise increase the burden on the database, making the 413 overload worse. 414 415 ## Connection listeners 416 417 The `:connection_listeners` option allows one or more processes to be notified 418 whenever a connection is connected or disconnected. A listener may be a remote 419 or local PID, a locally registered name, or a tuple in the form of 420 `{registered_name, node}` for a registered name at another node. 421 422 Each listener process may receive the following messages where `pid` 423 identifies the connection process: 424 425 * `{:connected, pid}` 426 * `{:disconnected, pid}` 427 428 ## Telemetry 429 430 A `[:db_connection, :connection_error]` event is published whenever a connection checkout 431 receives a `%DBConnection.ConnectionError{}`. 432 433 Measurements: 434 435 * `:count` - A fixed-value measurement which always measures 1. 436 437 Metadata 438 439 * `:error` - The `DBConnection.ConnectionError` struct which triggered the event. 440 441 * `:opts` - All options given to the pool operation 442 443 """ 444 @spec start_link(module, opts :: Keyword.t()) :: GenServer.on_start() 445 def start_link(conn_mod, opts) do 446 case child_spec(conn_mod, opts) do 447 {_, {m, f, args}, _, _, _, _} -> apply(m, f, args) 448 %{start: {m, f, args}} -> apply(m, f, args) 449 end 450 end 451 452 @doc """ 453 Creates a supervisor child specification for a pool of connections. 454 455 See `start_link/2` for options. 456 """ 457 @spec child_spec(module, opts :: Keyword.t()) :: :supervisor.child_spec() 458 def child_spec(conn_mod, opts) do 459 pool = Keyword.get(opts, :pool, DBConnection.ConnectionPool) 460 pool.child_spec({conn_mod, opts}) 461 end 462 463 @doc """ 464 Forces all connections in the pool to disconnect within the given interval. 465 466 Once this function is called, the pool will disconnect all of its connections 467 as they are checked in or as they are pinged. Checked in connections will be 468 randomly disconnected within the given time interval. Pinged connections are 469 immediately disconnected - as they are idle (according to `:idle_interval`). 470 471 If the connection has a backoff configured (which is the case by default), 472 disconnecting means an attempt at a new connection will be done immediately 473 after, without starting a new process for each connection. However, if backoff 474 has been disabled, the connection process will terminate. In such cases, 475 disconnecting all connections may cause the pool supervisor to restart 476 depending on the max_restarts/max_seconds configuration of the pool, 477 so you will want to set those carefully. 478 """ 479 @spec disconnect_all(conn, non_neg_integer, opts :: Keyword.t()) :: :ok 480 def disconnect_all(conn, interval, opts \\ []) when interval >= 0 do 481 pool = Keyword.get(opts, :pool, DBConnection.ConnectionPool) 482 interval = System.convert_time_unit(interval, :millisecond, :native) 483 pool.disconnect_all(conn, interval, opts) 484 end 485 486 @doc """ 487 Prepare a query with a database connection for later execution. 488 489 It returns `{:ok, query}` on success or `{:error, exception}` if there was 490 an error. 491 492 The returned `query` can then be passed to `execute/4` and/or `close/3` 493 494 ### Options 495 496 * `:queue` - Whether to block waiting in an internal queue for the 497 connection's state (boolean, default: `true`). See "Queue config" in 498 `start_link/2` docs 499 * `:timeout` - The maximum time that the caller is allowed to perform 500 this operation (default: `15_000`) 501 * `:deadline` - If set, overrides `:timeout` option and specifies absolute 502 monotonic time in milliseconds by which caller must perform operation. 503 See `System` module documentation for more information on monotonic time 504 (default: `nil`) 505 * `:log` - A function to log information about a call, either 506 a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0` 507 prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 508 509 The pool and connection module may support other options. All options 510 are passed to `c:handle_prepare/3`. 511 512 ### Example 513 514 DBConnection.transaction(pool, fn conn -> 515 query = %Query{statement: "SELECT * FROM table"} 516 query = DBConnection.prepare!(conn, query) 517 try do 518 DBConnection.execute!(conn, query, []) 519 after 520 DBConnection.close(conn, query) 521 end 522 end) 523 524 """ 525 @spec prepare(conn, query, opts :: Keyword.t()) :: 526 {:ok, query} | {:error, Exception.t()} 527 def prepare(conn, query, opts \\ []) do 528 meter = meter(opts) 529 530 result = 531 with {:ok, query, meter} <- parse(query, meter, opts) do 532 run(conn, &run_prepare/4, query, meter, opts) 533 end 534 535 log(result, :prepare, query, nil) 536 end 537 538 @doc """ 539 Prepare a query with a database connection and return the prepared 540 query. An exception is raised on error. 541 542 See `prepare/3`. 543 """ 544 @spec prepare!(conn, query, opts :: Keyword.t()) :: query 545 def prepare!(conn, query, opts \\ []) do 546 case prepare(conn, query, opts) do 547 {:ok, result} -> result 548 {:error, err} -> raise err 549 end 550 end 551 552 @doc """ 553 Prepare a query and execute it with a database connection and return both the 554 prepared query and the result, `{:ok, query, result}` on success or 555 `{:error, exception}` if there was an error. 556 557 The returned `query` can be passed to `execute/4` and `close/3`. 558 559 ### Options 560 561 * `:queue` - Whether to block waiting in an internal queue for the 562 connection's state (boolean, default: `true`). See "Queue config" in 563 `start_link/2` docs 564 * `:timeout` - The maximum time that the caller is allowed to perform 565 this operation (default: `15_000`) 566 * `:deadline` - If set, overrides `:timeout` option and specifies absolute 567 monotonic time in milliseconds by which caller must perform operation. 568 See `System` module documentation for more information on monotonic time 569 (default: `nil`) 570 * `:log` - A function to log information about a call, either 571 a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0` 572 prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 573 574 ### Example 575 576 query = %Query{statement: "SELECT id FROM table WHERE id=$1"} 577 {:ok, query, result} = DBConnection.prepare_execute(conn, query, [1]) 578 {:ok, result2} = DBConnection.execute(conn, query, [2]) 579 :ok = DBConnection.close(conn, query) 580 """ 581 @spec prepare_execute(conn, query, params, Keyword.t()) :: 582 {:ok, query, result} 583 | {:error, Exception.t()} 584 def prepare_execute(conn, query, params, opts \\ []) do 585 result = 586 with {:ok, query, meter} <- parse(query, meter(opts), opts) do 587 parsed_prepare_execute(conn, query, params, meter, opts) 588 end 589 590 log(result, :prepare_execute, query, params) 591 end 592 593 defp parsed_prepare_execute(conn, query, params, meter, opts) do 594 with {:ok, query, result, meter} <- 595 run(conn, &run_prepare_execute/5, query, params, meter, opts), 596 {:ok, result, meter} <- decode(query, result, meter, opts) do 597 {:ok, query, result, meter} 598 end 599 end 600 601 @doc """ 602 Prepare a query and execute it with a database connection and return both the 603 prepared query and result. An exception is raised on error. 604 605 See `prepare_execute/4`. 606 """ 607 @spec prepare_execute!(conn, query, Keyword.t()) :: {query, result} 608 def prepare_execute!(conn, query, params, opts \\ []) do 609 case prepare_execute(conn, query, params, opts) do 610 {:ok, query, result} -> {query, result} 611 {:error, err} -> raise err 612 end 613 end 614 615 @doc """ 616 Execute a prepared query with a database connection and return 617 `{:ok, query, result}` on success or `{:error, exception}` if there was an error. 618 619 If the query is not prepared on the connection an attempt may be made to 620 prepare it and then execute again. 621 622 ### Options 623 624 * `:queue` - Whether to block waiting in an internal queue for the 625 connection's state (boolean, default: `true`). See "Queue config" in 626 `start_link/2` docs 627 * `:timeout` - The maximum time that the caller is allowed to perform 628 this operation (default: `15_000`) 629 * `:deadline` - If set, overrides `:timeout` option and specifies absolute 630 monotonic time in milliseconds by which caller must perform operation. 631 See `System` module documentation for more information on monotonic time 632 (default: `nil`) 633 * `:log` - A function to log information about a call, either 634 a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0` 635 prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 636 637 The pool and connection module may support other options. All options 638 are passed to `handle_execute/4`. 639 640 See `prepare/3`. 641 """ 642 @spec execute(conn, query, params, opts :: Keyword.t()) :: 643 {:ok, query, result} | {:error, Exception.t()} 644 def execute(conn, query, params, opts \\ []) do 645 result = 646 case maybe_encode(query, params, meter(opts), opts) do 647 {:prepare, meter} -> 648 parsed_prepare_execute(conn, query, params, meter, opts) 649 650 {:ok, params, meter} -> 651 with {:ok, query, result, meter} <- 652 run(conn, &run_execute/5, query, params, meter, opts), 653 {:ok, result, meter} <- decode(query, result, meter, opts) do 654 {:ok, query, result, meter} 655 end 656 657 {_, _, _, _} = error -> 658 error 659 end 660 661 log(result, :execute, query, params) 662 end 663 664 @doc """ 665 Execute a prepared query with a database connection and return the 666 result. Raises an exception on error. 667 668 See `execute/4` 669 """ 670 @spec execute!(conn, query, params, opts :: Keyword.t()) :: result 671 def execute!(conn, query, params, opts \\ []) do 672 case execute(conn, query, params, opts) do 673 {:ok, _query, result} -> result 674 {:error, err} -> raise err 675 end 676 end 677 678 @doc """ 679 Close a prepared query on a database connection and return `{:ok, result}` on 680 success or `{:error, exception}` on error. 681 682 This function should be used to free resources held by the connection 683 process and/or the database server. 684 685 ## Options 686 687 * `:queue` - Whether to block waiting in an internal queue for the 688 connection's state (boolean, default: `true`). See "Queue config" in 689 `start_link/2` docs 690 * `:timeout` - The maximum time that the caller is allowed to perform 691 this operation (default: `15_000`) 692 * `:deadline` - If set, overrides `:timeout` option and specifies absolute 693 monotonic time in milliseconds by which caller must perform operation. 694 See `System` module documentation for more information on monotonic time 695 (default: `nil`) 696 * `:log` - A function to log information about a call, either 697 a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0` 698 prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 699 700 The pool and connection module may support other options. All options 701 are passed to `c:handle_close/3`. 702 703 See `prepare/3`. 704 """ 705 @spec close(conn, query, opts :: Keyword.t()) :: 706 {:ok, result} | {:error, Exception.t()} 707 def close(conn, query, opts \\ []) do 708 conn 709 |> run_cleanup(&run_close/4, [query], meter(opts), opts) 710 |> log(:close, query, nil) 711 end 712 713 @doc """ 714 Close a prepared query on a database connection and return the result. Raises 715 an exception on error. 716 717 See `close/3`. 718 """ 719 @spec close!(conn, query, opts :: Keyword.t()) :: result 720 def close!(conn, query, opts \\ []) do 721 case close(conn, query, opts) do 722 {:ok, result} -> result 723 {:error, err} -> raise err 724 end 725 end 726 727 @doc """ 728 Acquire a lock on a connection and run a series of requests on it. 729 730 The return value of this function is the return value of `fun`. 731 732 To use the locked connection call the request with the connection 733 reference passed as the single argument to the `fun`. If the 734 connection disconnects all future calls using that connection 735 reference will fail. 736 737 `run/3` and `transaction/3` can be nested multiple times but a 738 `transaction/3` call inside another `transaction/3` will be treated 739 the same as `run/3`. 740 741 ### Options 742 743 * `:queue` - Whether to block waiting in an internal queue for the 744 connection's state (boolean, default: `true`). See "Queue config" in 745 `start_link/2` docs 746 * `:timeout` - The maximum time that the caller is allowed to perform 747 this operation (default: `15_000`) 748 * `:deadline` - If set, overrides `:timeout` option and specifies absolute 749 monotonic time in milliseconds by which caller must perform operation. 750 See `System` module documentation for more information on monotonic time 751 (default: `nil`) 752 753 The pool may support other options. 754 755 ### Example 756 757 {:ok, res} = DBConnection.run(conn, fn conn -> 758 DBConnection.execute!(conn, query, []) 759 end) 760 761 """ 762 @spec run(conn, (t -> result), opts :: Keyword.t()) :: result when result: var 763 def run(conn, fun, opts \\ []) 764 765 def run(%DBConnection{} = conn, fun, _) do 766 fun.(conn) 767 end 768 769 def run(pool, fun, opts) do 770 case checkout(pool, nil, opts) do 771 {:ok, conn, _} -> 772 old_status = status(conn, opts) 773 774 try do 775 result = fun.(conn) 776 {result, run(conn, &run_status/3, nil, opts)} 777 catch 778 kind, error -> 779 checkin(conn) 780 :erlang.raise(kind, error, __STACKTRACE__) 781 else 782 {result, {:error, _, _}} -> 783 checkin(conn) 784 result 785 786 {result, {^old_status, _meter}} -> 787 checkin(conn) 788 result 789 790 {_result, {new_status, _meter}} -> 791 err = 792 DBConnection.ConnectionError.exception( 793 "connection was checked out with status #{inspect(old_status)} " <> 794 "but it was checked in with status #{inspect(new_status)}" 795 ) 796 797 disconnect(conn, err) 798 raise err 799 800 {_result, {kind, reason, stack, _meter}} -> 801 :erlang.raise(kind, reason, stack) 802 end 803 804 {:error, err, _} -> 805 raise err 806 807 {kind, reason, stack, _} -> 808 :erlang.raise(kind, reason, stack) 809 end 810 end 811 812 @doc """ 813 Acquire a lock on a connection and run a series of requests inside a 814 transaction. The result of the transaction fun is return inside an `:ok` 815 tuple: `{:ok, result}`. 816 817 To use the locked connection call the request with the connection 818 reference passed as the single argument to the `fun`. If the 819 connection disconnects all future calls using that connection 820 reference will fail. 821 822 `run/3` and `transaction/3` can be nested multiple times. If a transaction is 823 rolled back or a nested transaction `fun` raises the transaction is marked as 824 failed. All calls except `run/3`, `transaction/3`, `rollback/2`, `close/3` and 825 `close!/3` will raise an exception inside a failed transaction until the outer 826 transaction call returns. All `transaction/3` calls will return 827 `{:error, :rollback}` if the transaction failed or connection closed and 828 `rollback/2` is not called for that `transaction/3`. 829 830 ### Options 831 832 * `:queue` - Whether to block waiting in an internal queue for the 833 connection's state (boolean, default: `true`). See "Queue config" in 834 `start_link/2` docs 835 * `:timeout` - The maximum time that the caller is allowed to perform 836 this operation (default: `15_000`) 837 * `:deadline` - If set, overrides `:timeout` option and specifies absolute 838 monotonic time in milliseconds by which caller must perform operation. 839 See `System` module documentation for more information on monotonic time 840 (default: `nil`) 841 * `:log` - A function to log information about begin, commit and rollback 842 calls made as part of the transaction, either a 1-arity fun, 843 `{module, function, args}` with `t:DBConnection.LogEntry.t/0` prepended to 844 `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 845 846 The pool and connection module may support other options. All options 847 are passed to `c:handle_begin/2`, `c:handle_commit/2` and 848 `c:handle_rollback/2`. 849 850 ### Example 851 852 {:ok, res} = DBConnection.transaction(conn, fn conn -> 853 DBConnection.execute!(conn, query, []) 854 end) 855 """ 856 @spec transaction(conn, (t -> result), opts :: Keyword.t()) :: 857 {:ok, result} | {:error, reason :: any} 858 when result: var 859 def transaction(conn, fun, opts \\ []) 860 861 def transaction(%DBConnection{conn_mode: :transaction} = conn, fun, _opts) do 862 %DBConnection{conn_ref: conn_ref} = conn 863 864 try do 865 result = fun.(conn) 866 conclude(conn, result) 867 catch 868 :throw, {__MODULE__, ^conn_ref, reason} -> 869 fail(conn) 870 {:error, reason} 871 872 kind, reason -> 873 stack = __STACKTRACE__ 874 fail(conn) 875 :erlang.raise(kind, reason, stack) 876 else 877 result -> 878 {:ok, result} 879 end 880 end 881 882 def transaction(%DBConnection{} = conn, fun, opts) do 883 case begin(conn, &run/4, opts) do 884 {:ok, _} -> 885 run_transaction(conn, fun, &run/4, opts) 886 887 {:error, %DBConnection.TransactionError{}} -> 888 {:error, :rollback} 889 890 {:error, err} -> 891 raise err 892 end 893 end 894 895 def transaction(pool, fun, opts) do 896 case begin(pool, &checkout/4, opts) do 897 {:ok, conn, _} -> 898 run_transaction(conn, fun, &checkin/4, opts) 899 900 {:error, %DBConnection.TransactionError{}} -> 901 {:error, :rollback} 902 903 {:error, err} -> 904 raise err 905 end 906 end 907 908 @doc """ 909 Rollback a database transaction and release lock on connection. 910 911 When inside of a `transaction/3` call does a non-local return, using a 912 `throw/1` to cause the transaction to enter a failed state and the 913 `transaction/3` call returns `{:error, reason}`. If `transaction/3` calls are 914 nested the connection is marked as failed until the outermost transaction call 915 does the database rollback. 916 917 ### Example 918 919 {:error, :oops} = DBConnection.transaction(pool, fun(conn) -> 920 DBConnection.rollback(conn, :oops) 921 end) 922 """ 923 @spec rollback(t, reason :: any) :: no_return 924 def rollback(conn, reason) 925 926 def rollback(%DBConnection{conn_mode: :transaction} = conn, reason) do 927 %DBConnection{conn_ref: conn_ref} = conn 928 throw({__MODULE__, conn_ref, reason}) 929 end 930 931 def rollback(%DBConnection{} = _conn, _reason) do 932 raise "not inside transaction" 933 end 934 935 @doc """ 936 Return the transaction status of a connection. 937 938 The callback implementation should return the transaction status according to 939 the database, and not make assumptions based on client-side state. 940 941 This function will raise a `DBConnection.ConnectionError` when called inside a 942 deprecated `transaction/3`. 943 944 ### Options 945 946 See module documentation. The pool and connection module may support other 947 options. All options are passed to `c:handle_status/2`. 948 949 ### Example 950 951 # outside of the transaction, the status is `:idle` 952 DBConnection.status(conn) #=> :idle 953 954 DBConnection.transaction(conn, fn conn -> 955 DBConnection.status(conn) #=> :transaction 956 957 # run a query that will cause the transaction to rollback, e.g. 958 # uniqueness constraint violation 959 DBConnection.execute(conn, bad_query, []) 960 961 DBConnection.status(conn) #=> :error 962 end) 963 964 DBConnection.status(conn) #=> :idle 965 """ 966 @spec status(conn, opts :: Keyword.t()) :: status 967 def status(conn, opts \\ []) do 968 case run(conn, &run_status/3, nil, opts) do 969 {status, _meter} -> 970 status 971 972 {:error, _err, _meter} -> 973 :error 974 975 {kind, reason, stack, _meter} -> 976 :erlang.raise(kind, reason, stack) 977 end 978 end 979 980 @doc """ 981 Create a stream that will prepare a query, execute it and stream results 982 using a cursor. 983 984 ### Options 985 986 * `:queue` - Whether to block waiting in an internal queue for the 987 connection's state (boolean, default: `true`). See "Queue config" in 988 `start_link/2` docs 989 * `:timeout` - The maximum time that the caller is allowed to perform 990 this operation (default: `15_000`) 991 * `:deadline` - If set, overrides `:timeout` option and specifies absolute 992 monotonic time in milliseconds by which caller must perform operation. 993 See `System` module documentation for more information on monotonic time 994 (default: `nil`) 995 * `:log` - A function to log information about a call, either 996 a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0` 997 prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 998 999 The pool and connection module may support other options. All options 1000 are passed to `c:handle_prepare/3`, `c:handle_close/3`, `c:handle_declare/4`, 1001 and `c:handle_deallocate/4`. 1002 1003 ### Example 1004 1005 {:ok, results} = DBConnection.transaction(conn, fn conn -> 1006 query = %Query{statement: "SELECT id FROM table"} 1007 stream = DBConnection.prepare_stream(conn, query, []) 1008 Enum.to_list(stream) 1009 end) 1010 """ 1011 @spec prepare_stream(t, query, params, opts :: Keyword.t()) :: 1012 DBConnection.PrepareStream.t() 1013 def prepare_stream(%DBConnection{} = conn, query, params, opts \\ []) do 1014 %DBConnection.PrepareStream{conn: conn, query: query, params: params, opts: opts} 1015 end 1016 1017 @doc """ 1018 Create a stream that will execute a prepared query and stream results using a 1019 cursor. 1020 1021 ### Options 1022 1023 * `:queue` - Whether to block waiting in an internal queue for the 1024 connection's state (boolean, default: `true`). See "Queue config" in 1025 `start_link/2` docs 1026 * `:timeout` - The maximum time that the caller is allowed to perform 1027 this operation (default: `15_000`) 1028 * `:deadline` - If set, overrides `:timeout` option and specifies absolute 1029 monotonic time in milliseconds by which caller must perform operation. 1030 See `System` module documentation for more information on monotonic time 1031 (default: `nil`) 1032 * `:log` - A function to log information about a call, either 1033 a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0` 1034 prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 1035 1036 The pool and connection module may support other options. All options 1037 are passed to `c:handle_declare/4` and `c:handle_deallocate/4`. 1038 1039 ### Example 1040 1041 DBConnection.transaction(pool, fn conn -> 1042 query = %Query{statement: "SELECT id FROM table"} 1043 query = DBConnection.prepare!(conn, query) 1044 try do 1045 stream = DBConnection.stream(conn, query, []) 1046 Enum.to_list(stream) 1047 after 1048 # Make sure query is closed! 1049 DBConnection.close(conn, query) 1050 end 1051 end) 1052 """ 1053 @spec stream(t, query, params, opts :: Keyword.t()) :: DBConnection.Stream.t() 1054 def stream(%DBConnection{} = conn, query, params, opts \\ []) do 1055 %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts} 1056 end 1057 1058 @doc """ 1059 Reduces a previously built stream or prepared stream. 1060 """ 1061 def reduce(%DBConnection.PrepareStream{} = stream, acc, fun) do 1062 %DBConnection.PrepareStream{conn: conn, query: query, params: params, opts: opts} = stream 1063 1064 declare = fn conn, opts -> 1065 {query, cursor} = prepare_declare!(conn, query, params, opts) 1066 {:cont, query, cursor} 1067 end 1068 1069 enum = resource(conn, declare, &stream_fetch/3, &stream_deallocate/3, opts) 1070 enum.(acc, fun) 1071 end 1072 1073 def reduce(%DBConnection.Stream{} = stream, acc, fun) do 1074 %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts} = stream 1075 1076 declare = fn conn, opts -> 1077 case declare(conn, query, params, opts) do 1078 {:ok, query, cursor} -> 1079 {:cont, query, cursor} 1080 1081 {:ok, cursor} -> 1082 {:cont, query, cursor} 1083 1084 {:error, err} -> 1085 raise err 1086 end 1087 end 1088 1089 enum = resource(conn, declare, &stream_fetch/3, &stream_deallocate/3, opts) 1090 enum.(acc, fun) 1091 end 1092 1093 @doc false 1094 def register_as_pool(conn_module) do 1095 Process.put(@connection_module_key, conn_module) 1096 end 1097 1098 @doc """ 1099 Returns connection module used by the given connection pool. 1100 1101 When given a process that is not a connection pool, returns an `:error`. 1102 """ 1103 @spec connection_module(conn) :: {:ok, module} | :error 1104 def connection_module(conn) do 1105 with pid when pid != nil <- pool_pid(conn), 1106 {:dictionary, dictionary} <- Process.info(pid, :dictionary), 1107 {:ok, module} <- fetch_from_dictionary(dictionary, @connection_module_key), 1108 do: {:ok, module}, 1109 else: (_ -> :error) 1110 end 1111 1112 defp pool_pid(%DBConnection{pool_ref: Holder.pool_ref(pool: pid)}), do: pid 1113 defp pool_pid(conn), do: GenServer.whereis(conn) 1114 1115 defp fetch_from_dictionary(dictionary, key) do 1116 Enum.find_value(dictionary, :error, fn 1117 {^key, value} -> {:ok, value} 1118 _pair -> nil 1119 end) 1120 end 1121 1122 ## Helpers 1123 1124 defp checkout(pool, meter, opts) do 1125 checkout = System.monotonic_time() 1126 pool_mod = Keyword.get(opts, :pool, DBConnection.ConnectionPool) 1127 1128 caller = Keyword.get(opts, :caller, self()) 1129 callers = [caller | Process.get(:"$callers") || []] 1130 1131 try do 1132 pool_mod.checkout(pool, callers, opts) 1133 catch 1134 kind, reason -> 1135 stack = __STACKTRACE__ 1136 {kind, reason, stack, past_event(meter, :checkout, checkout)} 1137 else 1138 {:ok, pool_ref, _conn_mod, checkin, _conn_state} -> 1139 conn = %DBConnection{pool_ref: pool_ref, conn_ref: make_ref()} 1140 meter = meter |> past_event(:checkin, checkin) |> past_event(:checkout, checkout) 1141 {:ok, conn, meter} 1142 1143 {:error, err} -> 1144 {:error, err, past_event(meter, :checkout, checkout)} 1145 end 1146 end 1147 1148 defp checkout(%DBConnection{} = conn, fun, meter, opts) do 1149 with {:ok, result, meter} <- fun.(conn, meter, opts) do 1150 {:ok, conn, result, meter} 1151 end 1152 end 1153 1154 defp checkout(pool, fun, meter, opts) do 1155 with {:ok, conn, meter} <- checkout(pool, meter, opts) do 1156 case fun.(conn, meter, opts) do 1157 {:ok, result, meter} -> 1158 {:ok, conn, result, meter} 1159 1160 error -> 1161 checkin(conn) 1162 error 1163 end 1164 end 1165 end 1166 1167 defp checkin(%DBConnection{pool_ref: pool_ref}) do 1168 Holder.checkin(pool_ref) 1169 end 1170 1171 defp checkin(%DBConnection{} = conn, fun, meter, opts) do 1172 return = fun.(conn, meter, opts) 1173 checkin(conn) 1174 return 1175 end 1176 1177 defp checkin(pool, fun, meter, opts) do 1178 run(pool, fun, meter, opts) 1179 end 1180 1181 defp disconnect(%DBConnection{pool_ref: pool_ref}, err) do 1182 _ = Holder.disconnect(pool_ref, err) 1183 :ok 1184 end 1185 1186 defp stop(%DBConnection{pool_ref: pool_ref}, kind, reason, stack) do 1187 msg = "client #{inspect(self())} stopped: " <> Exception.format(kind, reason, stack) 1188 exception = DBConnection.ConnectionError.exception(msg) 1189 _ = Holder.stop(pool_ref, exception) 1190 :ok 1191 end 1192 1193 defp handle_common_result(return, conn, meter) do 1194 case return do 1195 {:ok, result, _conn_state} -> 1196 {:ok, result, meter} 1197 1198 {:error, err, _conn_state} -> 1199 {:error, err, meter} 1200 1201 {:disconnect, err, _conn_state} -> 1202 disconnect(conn, err) 1203 {:error, err, meter} 1204 1205 {:catch, kind, reason, stack} -> 1206 stop(conn, kind, reason, stack) 1207 {kind, reason, stack, meter} 1208 1209 other -> 1210 bad_return!(other, conn, meter) 1211 end 1212 end 1213 1214 @compile {:inline, bad_return!: 3} 1215 1216 defp bad_return!(other, conn, meter) do 1217 try do 1218 raise DBConnection.ConnectionError, "bad return value: #{inspect(other)}" 1219 catch 1220 :error, reason -> 1221 stack = __STACKTRACE__ 1222 stop(conn, :error, reason, stack) 1223 {:error, reason, stack, meter} 1224 end 1225 end 1226 1227 defp parse(query, meter, opts) do 1228 try do 1229 DBConnection.Query.parse(query, opts) 1230 catch 1231 kind, reason -> 1232 stack = __STACKTRACE__ 1233 {kind, reason, stack, meter} 1234 else 1235 query -> 1236 {:ok, query, meter} 1237 end 1238 end 1239 1240 defp describe(conn, query, meter, opts) do 1241 try do 1242 DBConnection.Query.describe(query, opts) 1243 catch 1244 kind, reason -> 1245 stack = __STACKTRACE__ 1246 raised_close(conn, query, meter, opts, kind, reason, stack) 1247 else 1248 query -> 1249 {:ok, query, meter} 1250 end 1251 end 1252 1253 defp encode(conn, query, params, meter, opts) do 1254 try do 1255 DBConnection.Query.encode(query, params, opts) 1256 catch 1257 kind, reason -> 1258 stack = __STACKTRACE__ 1259 raised_close(conn, query, meter, opts, kind, reason, stack) 1260 else 1261 params -> 1262 {:ok, params, meter} 1263 end 1264 end 1265 1266 defp maybe_encode(query, params, meter, opts) do 1267 try do 1268 DBConnection.Query.encode(query, params, opts) 1269 rescue 1270 DBConnection.EncodeError -> {:prepare, meter} 1271 catch 1272 kind, reason -> 1273 stack = __STACKTRACE__ 1274 {kind, reason, stack, meter} 1275 else 1276 params -> 1277 {:ok, params, meter} 1278 end 1279 end 1280 1281 defp decode(query, result, meter, opts) do 1282 meter = event(meter, :decode) 1283 1284 try do 1285 DBConnection.Query.decode(query, result, opts) 1286 catch 1287 kind, reason -> 1288 stack = __STACKTRACE__ 1289 {kind, reason, stack, meter} 1290 else 1291 result -> 1292 {:ok, result, meter} 1293 end 1294 end 1295 1296 defp prepare_declare(conn, query, params, opts) do 1297 result = 1298 with {:ok, query, meter} <- parse(query, meter(opts), opts) do 1299 parsed_prepare_declare(conn, query, params, meter, opts) 1300 end 1301 1302 log(result, :prepare_declare, query, params) 1303 end 1304 1305 defp parsed_prepare_declare(conn, query, params, meter, opts) do 1306 run(conn, &run_prepare_declare/5, query, params, meter, opts) 1307 end 1308 1309 defp prepare_declare!(conn, query, params, opts) do 1310 case prepare_declare(conn, query, params, opts) do 1311 {:ok, query, cursor} -> 1312 {query, cursor} 1313 1314 {:error, err} -> 1315 raise err 1316 end 1317 end 1318 1319 defp declare(conn, query, params, opts) do 1320 result = 1321 case maybe_encode(query, params, meter(opts), opts) do 1322 {:prepare, meter} -> 1323 parsed_prepare_declare(conn, query, params, meter, opts) 1324 1325 {:ok, params, meter} -> 1326 run(conn, &run_declare/5, query, params, meter, opts) 1327 1328 {_, _, _, _} = error -> 1329 error 1330 end 1331 1332 log(result, :declare, query, params) 1333 end 1334 1335 defp deallocate(conn, query, cursor, opts) do 1336 conn 1337 |> run_cleanup(&run_deallocate/4, [query, cursor], meter(opts), opts) 1338 |> log(:deallocate, query, cursor) 1339 end 1340 1341 defp run_prepare(conn, query, meter, opts) do 1342 with {:ok, query, meter} <- prepare(conn, query, meter, opts) do 1343 describe(conn, query, meter, opts) 1344 end 1345 end 1346 1347 defp prepare(%DBConnection{pool_ref: pool_ref} = conn, query, meter, opts) do 1348 pool_ref 1349 |> Holder.handle(:handle_prepare, [query], opts) 1350 |> handle_common_result(conn, event(meter, :prepare)) 1351 end 1352 1353 defp run_prepare_execute(conn, query, params, meter, opts) do 1354 with {:ok, query, meter} <- run_prepare(conn, query, meter, opts), 1355 {:ok, params, meter} <- encode(conn, query, params, meter, opts) do 1356 run_execute(conn, query, params, meter, opts) 1357 end 1358 end 1359 1360 defp run_execute(conn, query, params, meter, opts) do 1361 %DBConnection{pool_ref: pool_ref} = conn 1362 meter = event(meter, :execute) 1363 1364 case Holder.handle(pool_ref, :handle_execute, [query, params], opts) do 1365 {:ok, query, result, _conn_state} -> 1366 {:ok, query, result, meter} 1367 1368 {:ok, _, _} = other -> 1369 bad_return!(other, conn, meter) 1370 1371 other -> 1372 handle_common_result(other, conn, meter) 1373 end 1374 end 1375 1376 defp raised_close(conn, query, meter, opts, kind, reason, stack) do 1377 with {:ok, _, meter} <- run_close(conn, [query], meter, opts) do 1378 {kind, reason, stack, meter} 1379 end 1380 end 1381 1382 defp run_close(conn, args, meter, opts) do 1383 meter = event(meter, :close) 1384 cleanup(conn, :handle_close, args, meter, opts) 1385 end 1386 1387 defp run_cleanup(%DBConnection{} = conn, fun, args, meter, opts) do 1388 fun.(conn, args, meter, opts) 1389 end 1390 1391 defp run_cleanup(pool, fun, args, meter, opts) do 1392 with {:ok, conn, meter} <- checkout(pool, meter, opts) do 1393 try do 1394 fun.(conn, args, meter, opts) 1395 after 1396 checkin(conn) 1397 end 1398 end 1399 end 1400 1401 defp cleanup(conn, fun, args, meter, opts) do 1402 %DBConnection{pool_ref: pool_ref} = conn 1403 1404 case Holder.cleanup(pool_ref, fun, args, opts) do 1405 {:ok, result, _conn_state} -> 1406 {:ok, result, meter} 1407 1408 {:error, err, _conn_state} -> 1409 {:error, err, meter} 1410 1411 {:disconnect, err, _conn_state} -> 1412 disconnect(conn, err) 1413 {:error, err, meter} 1414 1415 {:catch, kind, reason, stack} -> 1416 stop(conn, kind, reason, stack) 1417 {kind, reason, stack, meter} 1418 1419 other -> 1420 bad_return!(other, conn, meter) 1421 end 1422 end 1423 1424 defp run(%DBConnection{} = conn, fun, meter, opts) do 1425 fun.(conn, meter, opts) 1426 end 1427 1428 defp run(pool, fun, meter, opts) do 1429 with {:ok, conn, meter} <- checkout(pool, meter, opts) do 1430 try do 1431 fun.(conn, meter, opts) 1432 after 1433 checkin(conn) 1434 end 1435 end 1436 end 1437 1438 defp run(%DBConnection{} = conn, fun, arg, meter, opts) do 1439 fun.(conn, arg, meter, opts) 1440 end 1441 1442 defp run(pool, fun, arg, meter, opts) do 1443 with {:ok, conn, meter} <- checkout(pool, meter, opts) do 1444 try do 1445 fun.(conn, arg, meter, opts) 1446 after 1447 checkin(conn) 1448 end 1449 end 1450 end 1451 1452 defp run(%DBConnection{} = conn, fun, arg1, arg2, meter, opts) do 1453 fun.(conn, arg1, arg2, meter, opts) 1454 end 1455 1456 defp run(pool, fun, arg1, arg2, meter, opts) do 1457 with {:ok, conn, meter} <- checkout(pool, meter, opts) do 1458 try do 1459 fun.(conn, arg1, arg2, meter, opts) 1460 after 1461 checkin(conn) 1462 end 1463 end 1464 end 1465 1466 defp meter(opts) do 1467 case Keyword.get(opts, :log) do 1468 nil -> nil 1469 log -> {log, []} 1470 end 1471 end 1472 1473 defp event(nil, _), 1474 do: nil 1475 1476 defp event({log, events}, event), 1477 do: {log, [{event, System.monotonic_time()} | events]} 1478 1479 defp past_event(nil, _, _), 1480 do: nil 1481 1482 defp past_event(log_events, _, nil), 1483 do: log_events 1484 1485 defp past_event({log, events}, event, time), 1486 do: {log, [{event, time} | events]} 1487 1488 defp log({:ok, res, meter}, call, query, params), 1489 do: log(meter, call, query, params, {:ok, res}) 1490 1491 defp log({:ok, res1, res2, meter}, call, query, params), 1492 do: log(meter, call, query, params, {:ok, res1, res2}) 1493 1494 defp log({ok, res, meter}, call, query, cursor) when ok in [:cont, :halt], 1495 do: log(meter, call, query, cursor, {ok, res}) 1496 1497 defp log({:error, err, meter}, call, query, params), 1498 do: log(meter, call, query, params, {:error, err}) 1499 1500 defp log({kind, reason, stack, meter}, call, query, params), 1501 do: log(meter, call, query, params, {kind, reason, stack}) 1502 1503 defp log(nil, _, _, _, result), 1504 do: log_result(result) 1505 1506 defp log({log, times}, call, query, params, result) do 1507 entry = DBConnection.LogEntry.new(call, query, params, times, entry_result(result)) 1508 1509 try do 1510 log(log, entry) 1511 catch 1512 kind, reason -> 1513 stack = __STACKTRACE__ 1514 log_raised(entry, kind, reason, stack) 1515 end 1516 1517 log_result(result) 1518 end 1519 1520 defp entry_result({kind, reason, stack}) 1521 when kind in [:error, :exit, :throw] do 1522 msg = "an exception was raised: " <> Exception.format(kind, reason, stack) 1523 {:error, %DBConnection.ConnectionError{message: msg}} 1524 end 1525 1526 defp entry_result({ok, res}) when ok in [:cont, :halt], 1527 do: {:ok, res} 1528 1529 defp entry_result(other), do: other 1530 1531 defp log({mod, fun, args}, entry), do: apply(mod, fun, [entry | args]) 1532 defp log(fun, entry), do: fun.(entry) 1533 1534 defp log_result({kind, reason, stack}) when kind in [:error, :exit, :throw] do 1535 :erlang.raise(kind, reason, stack) 1536 end 1537 1538 defp log_result(other), do: other 1539 1540 defp log_raised(entry, kind, reason, stack) do 1541 reason = Exception.normalize(kind, reason, stack) 1542 1543 Logger.error( 1544 fn -> 1545 "an exception was raised logging #{inspect(entry)}: " <> 1546 Exception.format(kind, reason, stack) 1547 end, 1548 crash_reason: {crash_reason(kind, reason), stack} 1549 ) 1550 catch 1551 _, _ -> 1552 :ok 1553 end 1554 1555 defp crash_reason(:throw, value), do: {:nocatch, value} 1556 defp crash_reason(_, value), do: value 1557 1558 defp run_transaction(conn, fun, run, opts) do 1559 %DBConnection{conn_ref: conn_ref} = conn 1560 1561 try do 1562 result = fun.(%{conn | conn_mode: :transaction}) 1563 conclude(conn, result) 1564 catch 1565 :throw, {__MODULE__, ^conn_ref, reason} -> 1566 reset(conn) 1567 1568 case rollback(conn, run, opts) do 1569 {:ok, _} -> 1570 {:error, reason} 1571 1572 {:error, %DBConnection.TransactionError{}} -> 1573 {:error, reason} 1574 1575 {:error, %DBConnection.ConnectionError{}} -> 1576 {:error, reason} 1577 1578 {:error, err} -> 1579 raise err 1580 end 1581 1582 kind, reason -> 1583 stack = __STACKTRACE__ 1584 reset(conn) 1585 _ = rollback(conn, run, opts) 1586 :erlang.raise(kind, reason, stack) 1587 else 1588 result -> 1589 case commit(conn, run, opts) do 1590 {:ok, _} -> 1591 {:ok, result} 1592 1593 {:error, %DBConnection.TransactionError{}} -> 1594 {:error, :rollback} 1595 1596 {:error, err} -> 1597 raise err 1598 end 1599 after 1600 reset(conn) 1601 end 1602 end 1603 1604 defp fail(%DBConnection{pool_ref: pool_ref}) do 1605 case Holder.status?(pool_ref, :ok) do 1606 true -> Holder.put_status(pool_ref, :aborted) 1607 false -> :ok 1608 end 1609 end 1610 1611 defp conclude(%DBConnection{pool_ref: pool_ref, conn_ref: conn_ref}, result) do 1612 case Holder.status?(pool_ref, :ok) do 1613 true -> result 1614 false -> throw({__MODULE__, conn_ref, :rollback}) 1615 end 1616 end 1617 1618 defp reset(%DBConnection{pool_ref: pool_ref}) do 1619 case Holder.status?(pool_ref, :aborted) do 1620 true -> Holder.put_status(pool_ref, :ok) 1621 false -> :ok 1622 end 1623 end 1624 1625 defp begin(conn, run, opts) do 1626 conn 1627 |> run.(&run_begin/3, meter(opts), opts) 1628 |> log(:begin, :begin, nil) 1629 end 1630 1631 defp run_begin(conn, meter, opts) do 1632 %DBConnection{pool_ref: pool_ref} = conn 1633 meter = event(meter, :begin) 1634 1635 case Holder.handle(pool_ref, :handle_begin, [], opts) do 1636 {status, _conn_state} when status in [:idle, :transaction, :error] -> 1637 status_disconnect(conn, status, meter) 1638 1639 other -> 1640 handle_common_result(other, conn, meter) 1641 end 1642 end 1643 1644 defp rollback(conn, run, opts) do 1645 conn 1646 |> run.(&run_rollback/3, meter(opts), opts) 1647 |> log(:rollback, :rollback, nil) 1648 end 1649 1650 defp run_rollback(conn, meter, opts) do 1651 %DBConnection{pool_ref: pool_ref} = conn 1652 meter = event(meter, :rollback) 1653 1654 case Holder.handle(pool_ref, :handle_rollback, [], opts) do 1655 {status, _conn_state} when status in [:idle, :transaction, :error] -> 1656 status_disconnect(conn, status, meter) 1657 1658 other -> 1659 handle_common_result(other, conn, meter) 1660 end 1661 end 1662 1663 defp commit(conn, run, opts) do 1664 case run.(conn, &run_commit/3, meter(opts), opts) do 1665 {:rollback, {:ok, result, meter}} -> 1666 log(meter, :commit, :rollback, nil, {:ok, result}) 1667 err = DBConnection.TransactionError.exception(:error) 1668 {:error, err} 1669 1670 {query, other} -> 1671 log(other, :commit, query, nil) 1672 1673 {:error, err, meter} -> 1674 log(meter, :commit, :commit, nil, {:error, err}) 1675 1676 {kind, reason, stack, meter} -> 1677 log(meter, :commit, :commit, nil, {kind, reason, stack}) 1678 end 1679 end 1680 1681 defp run_commit(conn, meter, opts) do 1682 %DBConnection{pool_ref: pool_ref} = conn 1683 meter = event(meter, :commit) 1684 1685 case Holder.handle(pool_ref, :handle_commit, [], opts) do 1686 {:error, _conn_state} -> 1687 {:rollback, run_rollback(conn, meter, opts)} 1688 1689 {status, _conn_state} when status in [:idle, :transaction] -> 1690 {:commit, status_disconnect(conn, status, meter)} 1691 1692 other -> 1693 {:commit, handle_common_result(other, conn, meter)} 1694 end 1695 end 1696 1697 defp status_disconnect(conn, status, meter) do 1698 err = DBConnection.TransactionError.exception(status) 1699 disconnect(conn, err) 1700 {:error, err, meter} 1701 end 1702 1703 defp run_status(conn, meter, opts) do 1704 %DBConnection{pool_ref: pool_ref} = conn 1705 1706 case Holder.handle(pool_ref, :handle_status, [], opts) do 1707 {status, _conn_state} when status in [:idle, :transaction, :error] -> 1708 {status, meter} 1709 1710 {:disconnect, err, _conn_state} -> 1711 disconnect(conn, err) 1712 {:error, err, meter} 1713 1714 {:catch, kind, reason, stack} -> 1715 stop(conn, kind, reason, stack) 1716 {kind, reason, stack, meter} 1717 1718 other -> 1719 bad_return!(other, conn, meter) 1720 end 1721 end 1722 1723 defp run_prepare_declare(conn, query, params, meter, opts) do 1724 with {:ok, query, meter} <- prepare(conn, query, meter, opts), 1725 {:ok, query, meter} <- describe(conn, query, meter, opts), 1726 {:ok, params, meter} <- encode(conn, query, params, meter, opts), 1727 {:ok, query, cursor, meter} <- run_declare(conn, query, params, meter, opts) do 1728 {:ok, query, cursor, meter} 1729 end 1730 end 1731 1732 defp run_declare(conn, query, params, meter, opts) do 1733 %DBConnection{pool_ref: pool_ref} = conn 1734 meter = event(meter, :declare) 1735 1736 case Holder.handle(pool_ref, :handle_declare, [query, params], opts) do 1737 {:ok, query, result, _conn_state} -> 1738 {:ok, query, result, meter} 1739 1740 {:ok, _, _} = other -> 1741 bad_return!(other, conn, meter) 1742 1743 other -> 1744 handle_common_result(other, conn, meter) 1745 end 1746 end 1747 1748 defp stream_fetch(conn, {:cont, query, cursor}, opts) do 1749 conn 1750 |> run(&run_stream_fetch/4, [query, cursor], meter(opts), opts) 1751 |> log(:fetch, query, cursor) 1752 |> case do 1753 {ok, result} when ok in [:cont, :halt] -> 1754 {[result], {ok, query, cursor}} 1755 1756 {:error, err} -> 1757 raise err 1758 end 1759 end 1760 1761 defp stream_fetch(_, {:halt, _, _} = state, _) do 1762 {:halt, state} 1763 end 1764 1765 defp run_stream_fetch(conn, args, meter, opts) do 1766 [query, _] = args 1767 1768 with {ok, result, meter} when ok in [:cont, :halt] <- run_fetch(conn, args, meter, opts), 1769 {:ok, result, meter} <- decode(query, result, meter, opts) do 1770 {ok, result, meter} 1771 end 1772 end 1773 1774 defp run_fetch(conn, args, meter, opts) do 1775 %DBConnection{pool_ref: pool_ref} = conn 1776 meter = event(meter, :fetch) 1777 1778 case Holder.handle(pool_ref, :handle_fetch, args, opts) do 1779 {:cont, result, _conn_state} -> 1780 {:cont, result, meter} 1781 1782 {:halt, result, _conn_state} -> 1783 {:halt, result, meter} 1784 1785 other -> 1786 handle_common_result(other, conn, meter) 1787 end 1788 end 1789 1790 defp stream_deallocate(conn, {_status, query, cursor}, opts), 1791 do: deallocate(conn, query, cursor, opts) 1792 1793 defp run_deallocate(conn, args, meter, opts) do 1794 meter = event(meter, :deallocate) 1795 cleanup(conn, :handle_deallocate, args, meter, opts) 1796 end 1797 1798 defp resource(%DBConnection{} = conn, start, next, stop, opts) do 1799 start = fn -> start.(conn, opts) end 1800 next = fn state -> next.(conn, state, opts) end 1801 stop = fn state -> stop.(conn, state, opts) end 1802 Stream.resource(start, next, stop) 1803 end 1804 end