zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

transaction.exs (7607B)


      1 defmodule Ecto.Integration.TransactionTest do
      2   # We can keep this test async as long as it
      3   # is the only one access the transactions table
      4   use Ecto.Integration.Case, async: true
      5 
      6   import Ecto.Query
      7   alias Ecto.Integration.PoolRepo # Used for writes
      8   alias Ecto.Integration.TestRepo # Used for reads
      9 
     10   @moduletag :capture_log
     11 
     12   defmodule UniqueError do
     13     defexception message: "unique error"
     14   end
     15 
     16   setup do
     17     PoolRepo.delete_all "transactions"
     18     :ok
     19   end
     20 
     21   defmodule Trans do
     22     use Ecto.Schema
     23 
     24     schema "transactions" do
     25       field :num, :integer
     26     end
     27   end
     28 
     29   test "transaction returns value" do
     30     refute PoolRepo.in_transaction?()
     31     {:ok, val} = PoolRepo.transaction(fn ->
     32       assert PoolRepo.in_transaction?()
     33       {:ok, val} =
     34         PoolRepo.transaction(fn ->
     35           assert PoolRepo.in_transaction?()
     36           42
     37         end)
     38       assert PoolRepo.in_transaction?()
     39       val
     40     end)
     41     refute PoolRepo.in_transaction?()
     42     assert val == 42
     43   end
     44 
     45   test "transaction re-raises" do
     46     assert_raise UniqueError, fn ->
     47       PoolRepo.transaction(fn ->
     48         PoolRepo.transaction(fn ->
     49           raise UniqueError
     50         end)
     51       end)
     52     end
     53   end
     54 
     55   # tag is required for TestRepo, since it is checkout in
     56   # Ecto.Integration.Case setup
     57   @tag isolation_level: :snapshot
     58   test "transaction commits" do
     59     # mssql requires that all transactions that use same shared lock are set
     60     # to :snapshot isolation level
     61     opts = [isolation_level: :snapshot]
     62 
     63     PoolRepo.transaction(fn ->
     64       e = PoolRepo.insert!(%Trans{num: 1})
     65       assert [^e] = PoolRepo.all(Trans)
     66       assert [] = TestRepo.all(Trans)
     67     end, opts)
     68 
     69     assert [%Trans{num: 1}] = PoolRepo.all(Trans)
     70   end
     71 
     72   @tag isolation_level: :snapshot
     73   test "transaction rolls back" do
     74     opts = [isolation_level: :snapshot]
     75     try do
     76       PoolRepo.transaction(fn ->
     77         e = PoolRepo.insert!(%Trans{num: 2})
     78         assert [^e] = PoolRepo.all(Trans)
     79         assert [] = TestRepo.all(Trans)
     80         raise UniqueError
     81       end, opts)
     82     rescue
     83       UniqueError -> :ok
     84     end
     85 
     86     assert [] = TestRepo.all(Trans)
     87   end
     88 
     89   test "transaction rolls back per repository" do
     90     message = "cannot call rollback outside of transaction"
     91 
     92     assert_raise RuntimeError, message, fn ->
     93       PoolRepo.rollback(:done)
     94     end
     95 
     96     assert_raise RuntimeError, message, fn ->
     97       TestRepo.transaction fn ->
     98         PoolRepo.rollback(:done)
     99       end
    100     end
    101   end
    102 
    103   @tag :assigns_id_type
    104   test "transaction rolls back with reason on aborted transaction" do
    105     e1 = PoolRepo.insert!(%Trans{num: 13})
    106 
    107     assert_raise Ecto.ConstraintError, fn ->
    108       TestRepo.transaction fn ->
    109         PoolRepo.insert!(%Trans{id: e1.id, num: 14})
    110       end
    111     end
    112   end
    113 
    114   test "nested transaction partial rollback" do
    115     assert PoolRepo.transaction(fn ->
    116       e1 = PoolRepo.insert!(%Trans{num: 3})
    117       assert [^e1] = PoolRepo.all(Trans)
    118 
    119       try do
    120         PoolRepo.transaction(fn ->
    121           e2 = PoolRepo.insert!(%Trans{num: 4})
    122           assert [^e1, ^e2] = PoolRepo.all(from(t in Trans, order_by: t.num))
    123           raise UniqueError
    124         end)
    125       rescue
    126         UniqueError -> :ok
    127       end
    128 
    129       assert_raise DBConnection.ConnectionError, "transaction rolling back",
    130         fn() -> PoolRepo.insert!(%Trans{num: 5}) end
    131     end) == {:error, :rollback}
    132 
    133     assert TestRepo.all(Trans) == []
    134   end
    135 
    136   test "manual rollback doesn't bubble up" do
    137     x = PoolRepo.transaction(fn ->
    138       e = PoolRepo.insert!(%Trans{num: 6})
    139       assert [^e] = PoolRepo.all(Trans)
    140       PoolRepo.rollback(:oops)
    141     end)
    142 
    143     assert x == {:error, :oops}
    144     assert [] = TestRepo.all(Trans)
    145   end
    146 
    147   test "manual rollback bubbles up on nested transaction" do
    148     assert PoolRepo.transaction(fn ->
    149       e = PoolRepo.insert!(%Trans{num: 7})
    150       assert [^e] = PoolRepo.all(Trans)
    151       assert {:error, :oops} = PoolRepo.transaction(fn ->
    152         PoolRepo.rollback(:oops)
    153       end)
    154       assert_raise DBConnection.ConnectionError, "transaction rolling back",
    155         fn() -> PoolRepo.insert!(%Trans{num: 8}) end
    156     end) == {:error, :rollback}
    157 
    158     assert [] = TestRepo.all(Trans)
    159   end
    160 
    161   test "transactions are not shared in repo" do
    162     pid = self()
    163     opts = [isolation_level: :snapshot]
    164 
    165     new_pid = spawn_link fn ->
    166       PoolRepo.transaction(fn ->
    167         e = PoolRepo.insert!(%Trans{num: 9})
    168         assert [^e] = PoolRepo.all(Trans)
    169         send(pid, :in_transaction)
    170         receive do
    171           :commit -> :ok
    172         after
    173           5000 -> raise "timeout"
    174         end
    175       end, opts)
    176       send(pid, :committed)
    177     end
    178 
    179     receive do
    180       :in_transaction -> :ok
    181     after
    182       5000 -> raise "timeout"
    183     end
    184 
    185     # mssql requires that all transactions that use same shared lock
    186     # set transaction isolation level to "snapshot" so this must be wrapped into
    187     # explicit transaction
    188     PoolRepo.transaction(fn ->
    189       assert [] = PoolRepo.all(Trans)
    190     end, opts)
    191 
    192     send(new_pid, :commit)
    193     receive do
    194       :committed -> :ok
    195     after
    196       5000 -> raise "timeout"
    197     end
    198 
    199     assert [%Trans{num: 9}] = PoolRepo.all(Trans)
    200   end
    201 
    202   ## Checkout
    203 
    204   describe "with checkouts" do
    205     test "transaction inside checkout" do
    206       PoolRepo.checkout(fn ->
    207         refute PoolRepo.in_transaction?()
    208         PoolRepo.transaction(fn ->
    209           assert PoolRepo.in_transaction?()
    210         end)
    211         refute PoolRepo.in_transaction?()
    212       end)
    213     end
    214 
    215     test "checkout inside transaction" do
    216       PoolRepo.transaction(fn ->
    217         assert PoolRepo.in_transaction?()
    218         PoolRepo.checkout(fn ->
    219           assert PoolRepo.in_transaction?()
    220         end)
    221         assert PoolRepo.in_transaction?()
    222       end)
    223     end
    224 
    225     @tag :transaction_checkout_raises
    226     test "checkout raises on transaction attempt" do
    227       assert_raise DBConnection.ConnectionError, ~r"connection was checked out with status", fn ->
    228         PoolRepo.checkout(fn -> PoolRepo.query!("BEGIN") end)
    229       end
    230     end
    231   end
    232 
    233   ## Logging
    234 
    235   defp register_telemetry() do
    236     Process.put(:telemetry, fn _, measurements, event -> send(self(), {measurements, event}) end)
    237   end
    238 
    239   test "log begin, commit and rollback" do
    240     register_telemetry()
    241 
    242     PoolRepo.transaction(fn ->
    243       assert_received {measurements, %{params: [], result: {:ok, _res}}}
    244       assert is_integer(measurements.query_time) and measurements.query_time >= 0
    245       assert is_integer(measurements.queue_time) and measurements.queue_time >= 0
    246 
    247       refute_received %{}
    248       register_telemetry()
    249     end)
    250 
    251     assert_received {measurements, %{params: [], result: {:ok, _res}}}
    252     assert is_integer(measurements.query_time) and measurements.query_time >= 0
    253     refute Map.has_key?(measurements, :queue_time)
    254 
    255     assert PoolRepo.transaction(fn ->
    256       refute_received %{}
    257       register_telemetry()
    258       PoolRepo.rollback(:log_rollback)
    259     end) == {:error, :log_rollback}
    260 
    261     assert_received {measurements, %{params: [], result: {:ok, _res}}}
    262     assert is_integer(measurements.query_time) and measurements.query_time >= 0
    263     refute Map.has_key?(measurements, :queue_time)
    264   end
    265 
    266   test "log queries inside transactions" do
    267     PoolRepo.transaction(fn ->
    268       register_telemetry()
    269       assert [] = PoolRepo.all(Trans)
    270 
    271       assert_received {measurements, %{params: [], result: {:ok, _res}}}
    272       assert is_integer(measurements.query_time) and measurements.query_time >= 0
    273       assert is_integer(measurements.decode_time) and measurements.query_time >= 0
    274       refute Map.has_key?(measurements, :queue_time)
    275     end)
    276   end
    277 end