messages.ex (10348B)
1 defmodule Postgrex.Messages do 2 @moduledoc false 3 4 import Postgrex.BinaryUtils 5 import Record, only: [defrecord: 2] 6 7 @protocol_vsn_major 3 8 @protocol_vsn_minor 0 9 10 @auth_types [ 11 ok: 0, 12 kerberos: 2, 13 cleartext: 3, 14 md5: 5, 15 scm: 6, 16 gss: 7, 17 gss_cont: 8, 18 sspi: 9, 19 sasl: 10, 20 sasl_cont: 11, 21 sasl_fin: 12 22 ] 23 24 @error_fields [ 25 severity: ?S, 26 code: ?C, 27 message: ?M, 28 detail: ?D, 29 hint: ?H, 30 position: ?P, 31 internal_position: ?p, 32 internal_query: ?q, 33 where: ?W, 34 schema: ?s, 35 table: ?t, 36 column: ?c, 37 data_type: ?d, 38 constraint: ?n, 39 file: ?F, 40 line: ?L, 41 routine: ?R 42 ] 43 44 defrecord :msg_auth, [:type, :data] 45 defrecord :msg_startup, [:params] 46 defrecord :msg_password, [:pass] 47 defrecord :msg_error, [:fields] 48 defrecord :msg_parameter, [:name, :value] 49 defrecord :msg_backend_key, [:pid, :key] 50 defrecord :msg_ready, [:status] 51 defrecord :msg_notice, [:fields] 52 defrecord :msg_query, [:statement] 53 defrecord :msg_parse, [:name, :statement, :type_oids] 54 defrecord :msg_describe, [:type, :name] 55 defrecord :msg_flush, [] 56 defrecord :msg_close, [:type, :name] 57 defrecord :msg_parse_complete, [] 58 defrecord :msg_parameter_desc, [:type_oids] 59 defrecord :msg_too_many_parameters, [:len, :max_len] 60 defrecord :msg_row_desc, [:fields] 61 defrecord :msg_no_data, [] 62 defrecord :msg_notify, [:pg_pid, :channel, :payload] 63 defrecord :msg_bind, [:name_port, :name_stat, :param_formats, :params, :result_formats] 64 defrecord :msg_execute, [:name_port, :max_rows] 65 defrecord :msg_sync, [] 66 defrecord :msg_bind_complete, [] 67 defrecord :msg_close_complete, [] 68 defrecord :msg_portal_suspend, [] 69 defrecord :msg_data_row, [:values] 70 defrecord :msg_command_complete, [:tag] 71 defrecord :msg_empty_query, [] 72 defrecord :msg_copy_data, [:data] 73 defrecord :msg_copy_done, [] 74 defrecord :msg_copy_fail, [:message] 75 defrecord :msg_copy_in_response, [:format, :columns] 76 defrecord :msg_copy_both_response, [:format, :columns] 77 defrecord :msg_copy_out_response, [:format, :columns] 78 defrecord :msg_terminate, [] 79 defrecord :msg_ssl_request, [] 80 defrecord :msg_cancel_request, [:pid, :key] 81 82 defrecord :row_field, [:name, :table_oid, :column, :type_oid, :type_size, :type_mod, :format] 83 84 ### decoders ### 85 86 # auth 87 def parse(<<type::int32(), rest::binary>>, ?R, size) do 88 type = decode_auth_type(type) 89 90 data = 91 case type do 92 :md5 -> 93 <<data::binary-size(4)>> = rest 94 data 95 96 :gss_cont -> 97 rest_size = size - 2 98 <<data::size(rest_size)>> = rest 99 data 100 101 :sasl -> 102 rest 103 104 :sasl_cont -> 105 rest 106 107 :sasl_fin -> 108 rest 109 110 _ -> 111 nil 112 end 113 114 msg_auth(type: type, data: data) 115 end 116 117 # backend_key 118 def parse(<<pid::int32(), key::int32()>>, ?K, _size) do 119 msg_backend_key(pid: pid, key: key) 120 end 121 122 # ready 123 def parse(<<status::int8()>>, ?Z, _size) do 124 status = 125 case status do 126 ?I -> :idle 127 ?T -> :transaction 128 ?E -> :error 129 end 130 131 msg_ready(status: status) 132 end 133 134 # parameter_desc 135 def parse(<<len::uint16(), rest::binary(len, 32)>>, ?t, _size) do 136 oids = for <<oid::size(32) <- rest>>, do: oid 137 msg_parameter_desc(type_oids: oids) 138 end 139 140 def parse(<<overflow_len::uint16(), _::binary>>, ?t, size) do 141 len = div(size - 2, 4) 142 143 case <<len::uint16()>> do 144 <<^overflow_len::uint16()>> -> 145 msg_too_many_parameters(len: len, max_len: 0xFFFF) 146 147 _ -> 148 raise "invalid parameter description" 149 end 150 end 151 152 # row_desc 153 def parse(<<len::uint16(), rest::binary>>, ?T, _size) do 154 fields = decode_row_fields(rest, len) 155 msg_row_desc(fields: fields) 156 end 157 158 # data_row 159 def parse(<<_::uint16(), rest::binary>>, ?D, _size) do 160 msg_data_row(values: rest) 161 end 162 163 # notify 164 def parse(<<pg_pid::int32(), rest::binary>>, ?A, _size) do 165 {channel, rest} = decode_string(rest) 166 {payload, ""} = decode_string(rest) 167 msg_notify(pg_pid: pg_pid, channel: channel, payload: payload) 168 end 169 170 # error 171 def parse(rest, ?E, _size) do 172 fields = decode_fields(rest) 173 msg_error(fields: Map.new(fields)) 174 end 175 176 # notice 177 def parse(rest, ?N, _size) do 178 fields = decode_fields(rest) 179 msg_notice(fields: Map.new(fields)) 180 end 181 182 # parameter 183 def parse(rest, ?S, _size) do 184 {name, rest} = decode_string(rest) 185 {value, ""} = decode_string(rest) 186 msg_parameter(name: name, value: value) 187 end 188 189 # parse_complete 190 def parse(_rest, ?1, _size) do 191 msg_parse_complete() 192 end 193 194 # no_data 195 def parse(_rest, ?n, _size) do 196 msg_no_data() 197 end 198 199 # bind_complete 200 def parse(_rest, ?2, _size) do 201 msg_bind_complete() 202 end 203 204 # close_complete 205 def parse(_rest, ?3, _size) do 206 msg_close_complete() 207 end 208 209 # portal_suspended 210 def parse(_rest, ?s, _size) do 211 msg_portal_suspend() 212 end 213 214 # command_complete 215 def parse(rest, ?C, _size) do 216 {tag, ""} = decode_string(rest) 217 msg_command_complete(tag: tag) 218 end 219 220 # empty_query 221 def parse(_rest, ?I, _size) do 222 msg_empty_query() 223 end 224 225 # msg_copy_data 226 def parse(data, ?d, _size) do 227 msg_copy_data(data: data) 228 end 229 230 # msg_copy_done 231 def parse(_rest, ?c, _size) do 232 msg_copy_done() 233 end 234 235 # msg_copy_fail 236 def parse(message, ?f, _size) do 237 msg_copy_fail(message: message) 238 end 239 240 # msg_copy_in_response 241 def parse(rest, ?G, _size) do 242 {format, columns} = decode_copy(rest) 243 msg_copy_in_response(format: format, columns: columns) 244 end 245 246 # msg_copy_out_response 247 def parse(rest, ?H, _size) do 248 {format, columns} = decode_copy(rest) 249 msg_copy_out_response(format: format, columns: columns) 250 end 251 252 # msg_copy_both_response 253 def parse(rest, ?W, _size) do 254 {format, columns} = decode_copy(rest) 255 msg_copy_both_response(format: format, columns: columns) 256 end 257 258 ### encoders ### 259 260 def encode_msg(msg) do 261 {first, data} = encode(msg) 262 size = IO.iodata_length(data) + 4 263 264 if first do 265 [first, <<size::int32()>>, data] 266 else 267 [<<size::int32()>>, data] 268 end 269 end 270 271 # startup 272 defp encode(msg_startup(params: params)) do 273 params = 274 Enum.reduce(params, [], fn {key, value}, acc -> 275 [acc, to_string(key), 0, value, 0] 276 end) 277 278 vsn = <<@protocol_vsn_major::int16(), @protocol_vsn_minor::int16()>> 279 {nil, [vsn, params, 0]} 280 end 281 282 # password 283 defp encode(msg_password(pass: pass)) do 284 {?p, [pass]} 285 end 286 287 # query 288 defp encode(msg_query(statement: statement)) do 289 {?Q, [statement, 0]} 290 end 291 292 # parse 293 defp encode(msg_parse(name: name, statement: statement, type_oids: oids)) do 294 oids = for oid <- oids, into: "", do: <<oid::uint32()>> 295 len = <<div(byte_size(oids), 4)::int16()>> 296 {?P, [name, 0, statement, 0, len, oids]} 297 end 298 299 # describe 300 defp encode(msg_describe(type: type, name: name)) do 301 byte = 302 case type do 303 :statement -> ?S 304 :portal -> ?P 305 end 306 307 {?D, [byte, name, 0]} 308 end 309 310 # flush 311 defp encode(msg_flush()) do 312 {?H, ""} 313 end 314 315 # close 316 defp encode(msg_close(type: type, name: name)) do 317 byte = 318 case type do 319 :statement -> ?S 320 :portal -> ?P 321 end 322 323 {?C, [byte, name, 0]} 324 end 325 326 # bind 327 defp encode( 328 msg_bind( 329 name_port: port, 330 name_stat: stat, 331 param_formats: param_formats, 332 params: params, 333 result_formats: result_formats 334 ) 335 ) do 336 pfs = for format <- param_formats, into: "", do: <<format(format)::int16()>> 337 rfs = for format <- result_formats, into: "", do: <<format(format)::int16()>> 338 339 len_pfs = <<div(byte_size(pfs), 2)::int16()>> 340 len_rfs = <<div(byte_size(rfs), 2)::int16()>> 341 len_params = <<length(params)::int16()>> 342 343 {?B, [port, 0, stat, 0, len_pfs, pfs, len_params, params, len_rfs, rfs]} 344 end 345 346 # execute 347 defp encode(msg_execute(name_port: port, max_rows: rows)) do 348 {?E, [port, 0, <<rows::int32()>>]} 349 end 350 351 # sync 352 defp encode(msg_sync()) do 353 {?S, ""} 354 end 355 356 # terminate 357 defp encode(msg_terminate()) do 358 {?X, ""} 359 end 360 361 # ssl_request 362 defp encode(msg_ssl_request()) do 363 {nil, <<1234::int16(), 5679::int16()>>} 364 end 365 366 # cancel_request 367 defp encode(msg_cancel_request(pid: pid, key: key)) do 368 {nil, <<1234::int16(), 5678::int16(), pid::int32(), key::int32()>>} 369 end 370 371 # copy_data 372 defp encode(msg_copy_data(data: data)) do 373 {?d, data} 374 end 375 376 # copy_done 377 defp encode(msg_copy_done()) do 378 {?c, ""} 379 end 380 381 # copy_fail 382 defp encode(msg_copy_fail(message: msg)) do 383 {?f, [msg, 0]} 384 end 385 386 ### encode helpers ### 387 388 defp format(:text), do: 0 389 defp format(:binary), do: 1 390 391 ### decode helpers ### 392 393 defp decode_fields(<<0>>), do: [] 394 395 defp decode_fields(<<field::int8(), rest::binary>>) do 396 type = decode_field_type(field) 397 {string, rest} = decode_string(rest) 398 [{type, string} | decode_fields(rest)] 399 end 400 401 defp decode_string(bin) do 402 {pos, 1} = :binary.match(bin, <<0>>) 403 {string, <<0, rest::binary>>} = :erlang.split_binary(bin, pos) 404 {string, rest} 405 end 406 407 defp decode_row_fields("", 0), do: [] 408 409 defp decode_row_fields(rest, count) do 410 {field, rest} = decode_row_field(rest) 411 [field | decode_row_fields(rest, count - 1)] 412 end 413 414 defp decode_row_field(rest) do 415 {name, rest} = decode_string(rest) 416 417 <<table_oid::uint32(), column::int16(), type_oid::uint32(), type_size::int16(), 418 type_mod::int32(), format::int16(), rest::binary>> = rest 419 420 field = 421 row_field( 422 name: name, 423 table_oid: table_oid, 424 column: column, 425 type_oid: type_oid, 426 type_size: type_size, 427 type_mod: type_mod, 428 format: format 429 ) 430 431 {field, rest} 432 end 433 434 Enum.each(@auth_types, fn {type, value} -> 435 def decode_auth_type(unquote(value)), do: unquote(type) 436 end) 437 438 Enum.each(@error_fields, fn {field, char} -> 439 def decode_field_type(unquote(char)), do: unquote(field) 440 end) 441 442 def decode_field_type(_), do: :unknown 443 444 defp decode_format(0), do: :text 445 defp decode_format(1), do: :binary 446 447 defp decode_copy(<<format::int8(), len::uint16(), rest::binary(len, 16)>>) do 448 format = decode_format(format) 449 columns = for <<column::uint16() <- rest>>, do: decode_format(column) 450 {format, columns} 451 end 452 end