zf

zenflows testing
git clone https://s.sonu.ch/~srfsh/zf.git
Log | Files | Refs | Submodules | README | LICENSE

messages.ex (10348B)


      1 defmodule Postgrex.Messages do
      2   @moduledoc false
      3 
      4   import Postgrex.BinaryUtils
      5   import Record, only: [defrecord: 2]
      6 
      7   @protocol_vsn_major 3
      8   @protocol_vsn_minor 0
      9 
     10   @auth_types [
     11     ok: 0,
     12     kerberos: 2,
     13     cleartext: 3,
     14     md5: 5,
     15     scm: 6,
     16     gss: 7,
     17     gss_cont: 8,
     18     sspi: 9,
     19     sasl: 10,
     20     sasl_cont: 11,
     21     sasl_fin: 12
     22   ]
     23 
     24   @error_fields [
     25     severity: ?S,
     26     code: ?C,
     27     message: ?M,
     28     detail: ?D,
     29     hint: ?H,
     30     position: ?P,
     31     internal_position: ?p,
     32     internal_query: ?q,
     33     where: ?W,
     34     schema: ?s,
     35     table: ?t,
     36     column: ?c,
     37     data_type: ?d,
     38     constraint: ?n,
     39     file: ?F,
     40     line: ?L,
     41     routine: ?R
     42   ]
     43 
     44   defrecord :msg_auth, [:type, :data]
     45   defrecord :msg_startup, [:params]
     46   defrecord :msg_password, [:pass]
     47   defrecord :msg_error, [:fields]
     48   defrecord :msg_parameter, [:name, :value]
     49   defrecord :msg_backend_key, [:pid, :key]
     50   defrecord :msg_ready, [:status]
     51   defrecord :msg_notice, [:fields]
     52   defrecord :msg_query, [:statement]
     53   defrecord :msg_parse, [:name, :statement, :type_oids]
     54   defrecord :msg_describe, [:type, :name]
     55   defrecord :msg_flush, []
     56   defrecord :msg_close, [:type, :name]
     57   defrecord :msg_parse_complete, []
     58   defrecord :msg_parameter_desc, [:type_oids]
     59   defrecord :msg_too_many_parameters, [:len, :max_len]
     60   defrecord :msg_row_desc, [:fields]
     61   defrecord :msg_no_data, []
     62   defrecord :msg_notify, [:pg_pid, :channel, :payload]
     63   defrecord :msg_bind, [:name_port, :name_stat, :param_formats, :params, :result_formats]
     64   defrecord :msg_execute, [:name_port, :max_rows]
     65   defrecord :msg_sync, []
     66   defrecord :msg_bind_complete, []
     67   defrecord :msg_close_complete, []
     68   defrecord :msg_portal_suspend, []
     69   defrecord :msg_data_row, [:values]
     70   defrecord :msg_command_complete, [:tag]
     71   defrecord :msg_empty_query, []
     72   defrecord :msg_copy_data, [:data]
     73   defrecord :msg_copy_done, []
     74   defrecord :msg_copy_fail, [:message]
     75   defrecord :msg_copy_in_response, [:format, :columns]
     76   defrecord :msg_copy_both_response, [:format, :columns]
     77   defrecord :msg_copy_out_response, [:format, :columns]
     78   defrecord :msg_terminate, []
     79   defrecord :msg_ssl_request, []
     80   defrecord :msg_cancel_request, [:pid, :key]
     81 
     82   defrecord :row_field, [:name, :table_oid, :column, :type_oid, :type_size, :type_mod, :format]
     83 
     84   ### decoders ###
     85 
     86   # auth
     87   def parse(<<type::int32(), rest::binary>>, ?R, size) do
     88     type = decode_auth_type(type)
     89 
     90     data =
     91       case type do
     92         :md5 ->
     93           <<data::binary-size(4)>> = rest
     94           data
     95 
     96         :gss_cont ->
     97           rest_size = size - 2
     98           <<data::size(rest_size)>> = rest
     99           data
    100 
    101         :sasl ->
    102           rest
    103 
    104         :sasl_cont ->
    105           rest
    106 
    107         :sasl_fin ->
    108           rest
    109 
    110         _ ->
    111           nil
    112       end
    113 
    114     msg_auth(type: type, data: data)
    115   end
    116 
    117   # backend_key
    118   def parse(<<pid::int32(), key::int32()>>, ?K, _size) do
    119     msg_backend_key(pid: pid, key: key)
    120   end
    121 
    122   # ready
    123   def parse(<<status::int8()>>, ?Z, _size) do
    124     status =
    125       case status do
    126         ?I -> :idle
    127         ?T -> :transaction
    128         ?E -> :error
    129       end
    130 
    131     msg_ready(status: status)
    132   end
    133 
    134   # parameter_desc
    135   def parse(<<len::uint16(), rest::binary(len, 32)>>, ?t, _size) do
    136     oids = for <<oid::size(32) <- rest>>, do: oid
    137     msg_parameter_desc(type_oids: oids)
    138   end
    139 
    140   def parse(<<overflow_len::uint16(), _::binary>>, ?t, size) do
    141     len = div(size - 2, 4)
    142 
    143     case <<len::uint16()>> do
    144       <<^overflow_len::uint16()>> ->
    145         msg_too_many_parameters(len: len, max_len: 0xFFFF)
    146 
    147       _ ->
    148         raise "invalid parameter description"
    149     end
    150   end
    151 
    152   # row_desc
    153   def parse(<<len::uint16(), rest::binary>>, ?T, _size) do
    154     fields = decode_row_fields(rest, len)
    155     msg_row_desc(fields: fields)
    156   end
    157 
    158   # data_row
    159   def parse(<<_::uint16(), rest::binary>>, ?D, _size) do
    160     msg_data_row(values: rest)
    161   end
    162 
    163   # notify
    164   def parse(<<pg_pid::int32(), rest::binary>>, ?A, _size) do
    165     {channel, rest} = decode_string(rest)
    166     {payload, ""} = decode_string(rest)
    167     msg_notify(pg_pid: pg_pid, channel: channel, payload: payload)
    168   end
    169 
    170   # error
    171   def parse(rest, ?E, _size) do
    172     fields = decode_fields(rest)
    173     msg_error(fields: Map.new(fields))
    174   end
    175 
    176   # notice
    177   def parse(rest, ?N, _size) do
    178     fields = decode_fields(rest)
    179     msg_notice(fields: Map.new(fields))
    180   end
    181 
    182   # parameter
    183   def parse(rest, ?S, _size) do
    184     {name, rest} = decode_string(rest)
    185     {value, ""} = decode_string(rest)
    186     msg_parameter(name: name, value: value)
    187   end
    188 
    189   # parse_complete
    190   def parse(_rest, ?1, _size) do
    191     msg_parse_complete()
    192   end
    193 
    194   # no_data
    195   def parse(_rest, ?n, _size) do
    196     msg_no_data()
    197   end
    198 
    199   # bind_complete
    200   def parse(_rest, ?2, _size) do
    201     msg_bind_complete()
    202   end
    203 
    204   # close_complete
    205   def parse(_rest, ?3, _size) do
    206     msg_close_complete()
    207   end
    208 
    209   # portal_suspended
    210   def parse(_rest, ?s, _size) do
    211     msg_portal_suspend()
    212   end
    213 
    214   # command_complete
    215   def parse(rest, ?C, _size) do
    216     {tag, ""} = decode_string(rest)
    217     msg_command_complete(tag: tag)
    218   end
    219 
    220   # empty_query
    221   def parse(_rest, ?I, _size) do
    222     msg_empty_query()
    223   end
    224 
    225   # msg_copy_data
    226   def parse(data, ?d, _size) do
    227     msg_copy_data(data: data)
    228   end
    229 
    230   # msg_copy_done
    231   def parse(_rest, ?c, _size) do
    232     msg_copy_done()
    233   end
    234 
    235   # msg_copy_fail
    236   def parse(message, ?f, _size) do
    237     msg_copy_fail(message: message)
    238   end
    239 
    240   # msg_copy_in_response
    241   def parse(rest, ?G, _size) do
    242     {format, columns} = decode_copy(rest)
    243     msg_copy_in_response(format: format, columns: columns)
    244   end
    245 
    246   # msg_copy_out_response
    247   def parse(rest, ?H, _size) do
    248     {format, columns} = decode_copy(rest)
    249     msg_copy_out_response(format: format, columns: columns)
    250   end
    251 
    252   # msg_copy_both_response
    253   def parse(rest, ?W, _size) do
    254     {format, columns} = decode_copy(rest)
    255     msg_copy_both_response(format: format, columns: columns)
    256   end
    257 
    258   ### encoders ###
    259 
    260   def encode_msg(msg) do
    261     {first, data} = encode(msg)
    262     size = IO.iodata_length(data) + 4
    263 
    264     if first do
    265       [first, <<size::int32()>>, data]
    266     else
    267       [<<size::int32()>>, data]
    268     end
    269   end
    270 
    271   # startup
    272   defp encode(msg_startup(params: params)) do
    273     params =
    274       Enum.reduce(params, [], fn {key, value}, acc ->
    275         [acc, to_string(key), 0, value, 0]
    276       end)
    277 
    278     vsn = <<@protocol_vsn_major::int16(), @protocol_vsn_minor::int16()>>
    279     {nil, [vsn, params, 0]}
    280   end
    281 
    282   # password
    283   defp encode(msg_password(pass: pass)) do
    284     {?p, [pass]}
    285   end
    286 
    287   # query
    288   defp encode(msg_query(statement: statement)) do
    289     {?Q, [statement, 0]}
    290   end
    291 
    292   # parse
    293   defp encode(msg_parse(name: name, statement: statement, type_oids: oids)) do
    294     oids = for oid <- oids, into: "", do: <<oid::uint32()>>
    295     len = <<div(byte_size(oids), 4)::int16()>>
    296     {?P, [name, 0, statement, 0, len, oids]}
    297   end
    298 
    299   # describe
    300   defp encode(msg_describe(type: type, name: name)) do
    301     byte =
    302       case type do
    303         :statement -> ?S
    304         :portal -> ?P
    305       end
    306 
    307     {?D, [byte, name, 0]}
    308   end
    309 
    310   # flush
    311   defp encode(msg_flush()) do
    312     {?H, ""}
    313   end
    314 
    315   # close
    316   defp encode(msg_close(type: type, name: name)) do
    317     byte =
    318       case type do
    319         :statement -> ?S
    320         :portal -> ?P
    321       end
    322 
    323     {?C, [byte, name, 0]}
    324   end
    325 
    326   # bind
    327   defp encode(
    328          msg_bind(
    329            name_port: port,
    330            name_stat: stat,
    331            param_formats: param_formats,
    332            params: params,
    333            result_formats: result_formats
    334          )
    335        ) do
    336     pfs = for format <- param_formats, into: "", do: <<format(format)::int16()>>
    337     rfs = for format <- result_formats, into: "", do: <<format(format)::int16()>>
    338 
    339     len_pfs = <<div(byte_size(pfs), 2)::int16()>>
    340     len_rfs = <<div(byte_size(rfs), 2)::int16()>>
    341     len_params = <<length(params)::int16()>>
    342 
    343     {?B, [port, 0, stat, 0, len_pfs, pfs, len_params, params, len_rfs, rfs]}
    344   end
    345 
    346   # execute
    347   defp encode(msg_execute(name_port: port, max_rows: rows)) do
    348     {?E, [port, 0, <<rows::int32()>>]}
    349   end
    350 
    351   # sync
    352   defp encode(msg_sync()) do
    353     {?S, ""}
    354   end
    355 
    356   # terminate
    357   defp encode(msg_terminate()) do
    358     {?X, ""}
    359   end
    360 
    361   # ssl_request
    362   defp encode(msg_ssl_request()) do
    363     {nil, <<1234::int16(), 5679::int16()>>}
    364   end
    365 
    366   # cancel_request
    367   defp encode(msg_cancel_request(pid: pid, key: key)) do
    368     {nil, <<1234::int16(), 5678::int16(), pid::int32(), key::int32()>>}
    369   end
    370 
    371   # copy_data
    372   defp encode(msg_copy_data(data: data)) do
    373     {?d, data}
    374   end
    375 
    376   # copy_done
    377   defp encode(msg_copy_done()) do
    378     {?c, ""}
    379   end
    380 
    381   # copy_fail
    382   defp encode(msg_copy_fail(message: msg)) do
    383     {?f, [msg, 0]}
    384   end
    385 
    386   ### encode helpers ###
    387 
    388   defp format(:text), do: 0
    389   defp format(:binary), do: 1
    390 
    391   ### decode helpers ###
    392 
    393   defp decode_fields(<<0>>), do: []
    394 
    395   defp decode_fields(<<field::int8(), rest::binary>>) do
    396     type = decode_field_type(field)
    397     {string, rest} = decode_string(rest)
    398     [{type, string} | decode_fields(rest)]
    399   end
    400 
    401   defp decode_string(bin) do
    402     {pos, 1} = :binary.match(bin, <<0>>)
    403     {string, <<0, rest::binary>>} = :erlang.split_binary(bin, pos)
    404     {string, rest}
    405   end
    406 
    407   defp decode_row_fields("", 0), do: []
    408 
    409   defp decode_row_fields(rest, count) do
    410     {field, rest} = decode_row_field(rest)
    411     [field | decode_row_fields(rest, count - 1)]
    412   end
    413 
    414   defp decode_row_field(rest) do
    415     {name, rest} = decode_string(rest)
    416 
    417     <<table_oid::uint32(), column::int16(), type_oid::uint32(), type_size::int16(),
    418       type_mod::int32(), format::int16(), rest::binary>> = rest
    419 
    420     field =
    421       row_field(
    422         name: name,
    423         table_oid: table_oid,
    424         column: column,
    425         type_oid: type_oid,
    426         type_size: type_size,
    427         type_mod: type_mod,
    428         format: format
    429       )
    430 
    431     {field, rest}
    432   end
    433 
    434   Enum.each(@auth_types, fn {type, value} ->
    435     def decode_auth_type(unquote(value)), do: unquote(type)
    436   end)
    437 
    438   Enum.each(@error_fields, fn {field, char} ->
    439     def decode_field_type(unquote(char)), do: unquote(field)
    440   end)
    441 
    442   def decode_field_type(_), do: :unknown
    443 
    444   defp decode_format(0), do: :text
    445   defp decode_format(1), do: :binary
    446 
    447   defp decode_copy(<<format::int8(), len::uint16(), rest::binary(len, 16)>>) do
    448     format = decode_format(format)
    449     columns = for <<column::uint16() <- rest>>, do: decode_format(column)
    450     {format, columns}
    451   end
    452 end