lock.exs (1743B)
1 defmodule Ecto.Integration.LockTest do 2 # We can keep this test async as long as it 3 # is the only one accessing the lock_test table. 4 use ExUnit.Case, async: true 5 6 import Ecto.Query 7 alias Ecto.Integration.PoolRepo 8 9 defmodule LockCounter do 10 use Ecto.Schema 11 12 schema "lock_counters" do 13 field :count, :integer 14 end 15 end 16 17 setup do 18 PoolRepo.delete_all(LockCounter) 19 :ok 20 end 21 22 test "lock for update" do 23 %{id: id} = PoolRepo.insert!(%LockCounter{count: 1}) 24 pid = self() 25 26 lock_for_update = 27 Application.get_env(:ecto_sql, :lock_for_update) || 28 raise ":lock_for_update not set in :ecto application" 29 30 # Here we are manually inserting the lock in the query 31 # to test multiple adapters. Never do this in actual 32 # application code: it is not safe and not public. 33 query = from(lc in LockCounter, where: lc.id == ^id) 34 query = %{query | lock: lock_for_update} 35 36 {:ok, new_pid} = 37 Task.start_link fn -> 38 assert_receive :select_for_update, 5000 39 40 PoolRepo.transaction(fn -> 41 [post] = PoolRepo.all(query) # this should block until the other trans. commit 42 post |> Ecto.Changeset.change(count: post.count + 1) |> PoolRepo.update! 43 end) 44 45 send pid, :updated 46 end 47 48 PoolRepo.transaction(fn -> 49 [post] = PoolRepo.all(query) # select and lock the row 50 send new_pid, :select_for_update # signal second process to begin a transaction 51 post |> Ecto.Changeset.change(count: post.count + 1) |> PoolRepo.update! 52 end) 53 54 assert_receive :updated, 5000 55 56 # Final count will be 3 if SELECT ... FOR UPDATE worked and 2 otherwise 57 assert [%LockCounter{count: 3}] = PoolRepo.all(LockCounter) 58 end 59 end