transaction.exs (7607B)
1 defmodule Ecto.Integration.TransactionTest do 2 # We can keep this test async as long as it 3 # is the only one access the transactions table 4 use Ecto.Integration.Case, async: true 5 6 import Ecto.Query 7 alias Ecto.Integration.PoolRepo # Used for writes 8 alias Ecto.Integration.TestRepo # Used for reads 9 10 @moduletag :capture_log 11 12 defmodule UniqueError do 13 defexception message: "unique error" 14 end 15 16 setup do 17 PoolRepo.delete_all "transactions" 18 :ok 19 end 20 21 defmodule Trans do 22 use Ecto.Schema 23 24 schema "transactions" do 25 field :num, :integer 26 end 27 end 28 29 test "transaction returns value" do 30 refute PoolRepo.in_transaction?() 31 {:ok, val} = PoolRepo.transaction(fn -> 32 assert PoolRepo.in_transaction?() 33 {:ok, val} = 34 PoolRepo.transaction(fn -> 35 assert PoolRepo.in_transaction?() 36 42 37 end) 38 assert PoolRepo.in_transaction?() 39 val 40 end) 41 refute PoolRepo.in_transaction?() 42 assert val == 42 43 end 44 45 test "transaction re-raises" do 46 assert_raise UniqueError, fn -> 47 PoolRepo.transaction(fn -> 48 PoolRepo.transaction(fn -> 49 raise UniqueError 50 end) 51 end) 52 end 53 end 54 55 # tag is required for TestRepo, since it is checkout in 56 # Ecto.Integration.Case setup 57 @tag isolation_level: :snapshot 58 test "transaction commits" do 59 # mssql requires that all transactions that use same shared lock are set 60 # to :snapshot isolation level 61 opts = [isolation_level: :snapshot] 62 63 PoolRepo.transaction(fn -> 64 e = PoolRepo.insert!(%Trans{num: 1}) 65 assert [^e] = PoolRepo.all(Trans) 66 assert [] = TestRepo.all(Trans) 67 end, opts) 68 69 assert [%Trans{num: 1}] = PoolRepo.all(Trans) 70 end 71 72 @tag isolation_level: :snapshot 73 test "transaction rolls back" do 74 opts = [isolation_level: :snapshot] 75 try do 76 PoolRepo.transaction(fn -> 77 e = PoolRepo.insert!(%Trans{num: 2}) 78 assert [^e] = PoolRepo.all(Trans) 79 assert [] = TestRepo.all(Trans) 80 raise UniqueError 81 end, opts) 82 rescue 83 UniqueError -> :ok 84 end 85 86 assert [] = TestRepo.all(Trans) 87 end 88 89 test "transaction rolls back per repository" do 90 message = "cannot call rollback outside of transaction" 91 92 assert_raise RuntimeError, message, fn -> 93 PoolRepo.rollback(:done) 94 end 95 96 assert_raise RuntimeError, message, fn -> 97 TestRepo.transaction fn -> 98 PoolRepo.rollback(:done) 99 end 100 end 101 end 102 103 @tag :assigns_id_type 104 test "transaction rolls back with reason on aborted transaction" do 105 e1 = PoolRepo.insert!(%Trans{num: 13}) 106 107 assert_raise Ecto.ConstraintError, fn -> 108 TestRepo.transaction fn -> 109 PoolRepo.insert!(%Trans{id: e1.id, num: 14}) 110 end 111 end 112 end 113 114 test "nested transaction partial rollback" do 115 assert PoolRepo.transaction(fn -> 116 e1 = PoolRepo.insert!(%Trans{num: 3}) 117 assert [^e1] = PoolRepo.all(Trans) 118 119 try do 120 PoolRepo.transaction(fn -> 121 e2 = PoolRepo.insert!(%Trans{num: 4}) 122 assert [^e1, ^e2] = PoolRepo.all(from(t in Trans, order_by: t.num)) 123 raise UniqueError 124 end) 125 rescue 126 UniqueError -> :ok 127 end 128 129 assert_raise DBConnection.ConnectionError, "transaction rolling back", 130 fn() -> PoolRepo.insert!(%Trans{num: 5}) end 131 end) == {:error, :rollback} 132 133 assert TestRepo.all(Trans) == [] 134 end 135 136 test "manual rollback doesn't bubble up" do 137 x = PoolRepo.transaction(fn -> 138 e = PoolRepo.insert!(%Trans{num: 6}) 139 assert [^e] = PoolRepo.all(Trans) 140 PoolRepo.rollback(:oops) 141 end) 142 143 assert x == {:error, :oops} 144 assert [] = TestRepo.all(Trans) 145 end 146 147 test "manual rollback bubbles up on nested transaction" do 148 assert PoolRepo.transaction(fn -> 149 e = PoolRepo.insert!(%Trans{num: 7}) 150 assert [^e] = PoolRepo.all(Trans) 151 assert {:error, :oops} = PoolRepo.transaction(fn -> 152 PoolRepo.rollback(:oops) 153 end) 154 assert_raise DBConnection.ConnectionError, "transaction rolling back", 155 fn() -> PoolRepo.insert!(%Trans{num: 8}) end 156 end) == {:error, :rollback} 157 158 assert [] = TestRepo.all(Trans) 159 end 160 161 test "transactions are not shared in repo" do 162 pid = self() 163 opts = [isolation_level: :snapshot] 164 165 new_pid = spawn_link fn -> 166 PoolRepo.transaction(fn -> 167 e = PoolRepo.insert!(%Trans{num: 9}) 168 assert [^e] = PoolRepo.all(Trans) 169 send(pid, :in_transaction) 170 receive do 171 :commit -> :ok 172 after 173 5000 -> raise "timeout" 174 end 175 end, opts) 176 send(pid, :committed) 177 end 178 179 receive do 180 :in_transaction -> :ok 181 after 182 5000 -> raise "timeout" 183 end 184 185 # mssql requires that all transactions that use same shared lock 186 # set transaction isolation level to "snapshot" so this must be wrapped into 187 # explicit transaction 188 PoolRepo.transaction(fn -> 189 assert [] = PoolRepo.all(Trans) 190 end, opts) 191 192 send(new_pid, :commit) 193 receive do 194 :committed -> :ok 195 after 196 5000 -> raise "timeout" 197 end 198 199 assert [%Trans{num: 9}] = PoolRepo.all(Trans) 200 end 201 202 ## Checkout 203 204 describe "with checkouts" do 205 test "transaction inside checkout" do 206 PoolRepo.checkout(fn -> 207 refute PoolRepo.in_transaction?() 208 PoolRepo.transaction(fn -> 209 assert PoolRepo.in_transaction?() 210 end) 211 refute PoolRepo.in_transaction?() 212 end) 213 end 214 215 test "checkout inside transaction" do 216 PoolRepo.transaction(fn -> 217 assert PoolRepo.in_transaction?() 218 PoolRepo.checkout(fn -> 219 assert PoolRepo.in_transaction?() 220 end) 221 assert PoolRepo.in_transaction?() 222 end) 223 end 224 225 @tag :transaction_checkout_raises 226 test "checkout raises on transaction attempt" do 227 assert_raise DBConnection.ConnectionError, ~r"connection was checked out with status", fn -> 228 PoolRepo.checkout(fn -> PoolRepo.query!("BEGIN") end) 229 end 230 end 231 end 232 233 ## Logging 234 235 defp register_telemetry() do 236 Process.put(:telemetry, fn _, measurements, event -> send(self(), {measurements, event}) end) 237 end 238 239 test "log begin, commit and rollback" do 240 register_telemetry() 241 242 PoolRepo.transaction(fn -> 243 assert_received {measurements, %{params: [], result: {:ok, _res}}} 244 assert is_integer(measurements.query_time) and measurements.query_time >= 0 245 assert is_integer(measurements.queue_time) and measurements.queue_time >= 0 246 247 refute_received %{} 248 register_telemetry() 249 end) 250 251 assert_received {measurements, %{params: [], result: {:ok, _res}}} 252 assert is_integer(measurements.query_time) and measurements.query_time >= 0 253 refute Map.has_key?(measurements, :queue_time) 254 255 assert PoolRepo.transaction(fn -> 256 refute_received %{} 257 register_telemetry() 258 PoolRepo.rollback(:log_rollback) 259 end) == {:error, :log_rollback} 260 261 assert_received {measurements, %{params: [], result: {:ok, _res}}} 262 assert is_integer(measurements.query_time) and measurements.query_time >= 0 263 refute Map.has_key?(measurements, :queue_time) 264 end 265 266 test "log queries inside transactions" do 267 PoolRepo.transaction(fn -> 268 register_telemetry() 269 assert [] = PoolRepo.all(Trans) 270 271 assert_received {measurements, %{params: [], result: {:ok, _res}}} 272 assert is_integer(measurements.query_time) and measurements.query_time >= 0 273 assert is_integer(measurements.decode_time) and measurements.query_time >= 0 274 refute Map.has_key?(measurements, :queue_time) 275 end) 276 end 277 end