class Mongo::Session
A logical session representing a set of sequential operations executed
by an application that are related in some way.
@since 2.5.0
Constants
- MISMATCHED_CLUSTER_ERROR_MSG
Error
message indicating that the session was retrieved from a client with a different cluster than that of the client through which it is currently being used.@since 2.5.0
- NO_TRANSACTION_STATE
The state of a session in which the last operation was not related to any transaction or no operations have yet occurred.
@since 2.6.0
- SESSIONS_NOT_SUPPORTED
Error
message describing that sessions are not supported by the server version.@since 2.5.0
- SESSION_ENDED_ERROR_MSG
Error
message describing that the session cannot be used because it has already been ended.@since 2.5.0
- STARTING_TRANSACTION_STATE
The state of a session in which a user has initiated a transaction but no operations within the transactions have occurred yet.
@since 2.6.0
- TRANSACTION_ABORTED_STATE
The state of a session in which the last operation executed was a transaction abort.
@since 2.6.0
- TRANSACTION_COMMITTED_STATE
The state of a session in which the last operation executed was a transaction commit.
@since 2.6.0
- TRANSACTION_IN_PROGRESS_STATE
The state of a session in which a transaction has been started and at least one operation has occurred, but the transaction has not yet been committed or aborted.
@since 2.6.0
- UNLABELED_WRITE_CONCERN_CODES
Attributes
Get the cluster through which this session was created.
@since 2.5.1
The cluster time for this session.
@since 2.5.0
The latest seen operation time for this session.
@since 2.5.0
Get the options for this session.
@since 2.5.0
The options for the transaction currently being executed on the session.
@since 2.6.0
Public Class Methods
Initialize a Session
.
@example
Session.new(server_session, client, options)
@param [ ServerSession
] server_session The server session this session is associated with. @param [ Client
] client The client through which this session is created. @param [ Hash ] options The options for this session.
@option options [ Hash ] :read_preference The read preference options hash,
with the following optional keys: - *:mode* -- the read preference as a string or symbol; valid values are *:primary*, *:primary_preferred*, *:secondary*, *:secondary_preferred* and *:nearest*.
@since 2.5.0
# File lib/mongo/session.rb, line 120 def initialize(server_session, client, options = {}) @server_session = server_session options = options.dup # Because the read preference will need to be inserted into a command as a string, we convert # it from a symbol immediately upon receiving it. if options[:read_preference] && options[:read_preference][:mode] options[:read_preference][:mode] = options[:read_preference][:mode].to_s end @client = client.use(:admin) @options = options.freeze @cluster_time = nil @state = NO_TRANSACTION_STATE end
Public Instance Methods
Abort the currently active transaction without making any changes to the database.
@example Abort the transaction.
session.abort_transaction
@raise [ Mongo::Error::InvalidTransactionOperation
] If a transaction was just committed or
aborted and no new one was started.
@since 2.6.0
# File lib/mongo/session.rb, line 578 def abort_transaction check_if_ended! check_if_no_transaction! if within_states?(TRANSACTION_COMMITTED_STATE) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation.cannot_call_after_msg( :commitTransaction, :abortTransaction)) end if within_states?(TRANSACTION_ABORTED_STATE) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation.cannot_call_twice_msg(:abortTransaction)) end begin unless starting_transaction? write_with_retry(self, txn_options[:write_concern], true) do |server, txn_num| Operation::Command.new( selector: { abortTransaction: 1 }, db_name: 'admin', session: self, txn_num: txn_num ).execute(server) end end @state = TRANSACTION_ABORTED_STATE rescue Mongo::Error::InvalidTransactionOperation raise rescue Mongo::Error @state = TRANSACTION_ABORTED_STATE end end
Add the autocommit field to a command document if applicable.
@example
session.add_autocommit!(cmd)
@return [ Hash, BSON::Document ] The command document.
@since 2.6.0
# File lib/mongo/session.rb, line 185 def add_autocommit!(command) command.tap do |c| c[:autocommit] = false if in_transaction? end end
Add this session's id to a command document.
@example
session.add_id!(cmd)
@return [ Hash, BSON::Document ] The command document.
@since 2.5.0
# File lib/mongo/session.rb, line 199 def add_id!(command) command.merge!(lsid: session_id) end
Add the startTransaction field to a command document if applicable.
@example
session.add_start_transaction!(cmd)
@return [ Hash, BSON::Document ] The command document.
@since 2.6.0
# File lib/mongo/session.rb, line 211 def add_start_transaction!(command) command.tap do |c| c[:startTransaction] = true if starting_transaction? end end
Add the transaction number to a command document if applicable.
@example
session.add_txn_num!(cmd)
@return [ Hash, BSON::Document ] The command document.
@since 2.6.0
# File lib/mongo/session.rb, line 225 def add_txn_num!(command) command.tap do |c| c[:txnNumber] = BSON::Int64.new(@server_session.txn_num) if in_transaction? end end
Add the transactions options if applicable.
@example
session.add_txn_opts!(cmd)
@return [ Hash, BSON::Document ] The command document.
@since 2.6.0
# File lib/mongo/session.rb, line 239 def add_txn_opts!(command, read) command.tap do |c| # The read preference should be added for all read operations. if read && txn_read_pref = txn_read_preference Mongo::Lint.validate_underscore_read_preference(txn_read_pref) txn_read_pref = txn_read_pref.dup txn_read_pref[:mode] = txn_read_pref[:mode].to_s.gsub(/(_\w)/) { |match| match[1].upcase } Mongo::Lint.validate_camel_case_read_preference(txn_read_pref) c['$readPreference'] = txn_read_pref end # The read concern should be added to any command that starts a transaction. if starting_transaction? && txn_read_concern c[:readConcern] ||= {} c[:readConcern].merge!(txn_read_concern) end # We need to send the read concern level as a string rather than a symbol. if c[:readConcern] && c[:readConcern][:level] c[:readConcern][:level] = c[:readConcern][:level].to_s end # The write concern should be added to any abortTransaction or commitTransaction command. if (c[:abortTransaction] || c[:commitTransaction]) && txn_write_concern c[:writeConcern] = txn_write_concern end # A non-numeric write concern w value needs to be sent as a string rather than a symbol. if c[:writeConcern] && c[:writeConcern][:w] && c[:writeConcern][:w].is_a?(Symbol) c[:writeConcern][:w] = c[:writeConcern][:w].to_s end end end
Advance the cached cluster time document for this session.
@example Advance the cluster time.
session.advance_cluster_time(doc)
@param [ BSON::Document, Hash ] new_cluster_time The new cluster time.
@return [ BSON::Document, Hash ] The new cluster time.
@since 2.5.0
# File lib/mongo/session.rb, line 365 def advance_cluster_time(new_cluster_time) if @cluster_time @cluster_time = [ @cluster_time, new_cluster_time ].max_by { |doc| doc[Cluster::CLUSTER_TIME] } else @cluster_time = new_cluster_time end end
Advance the cached operation time for this session.
@example Advance the operation time.
session.advance_operation_time(timestamp)
@param [ BSON::Timestamp ] new_operation_time The new operation time.
@return [ BSON::Timestamp ] The max operation time, considering the current and new times.
@since 2.5.0
# File lib/mongo/session.rb, line 383 def advance_operation_time(new_operation_time) if @operation_time @operation_time = [ @operation_time, new_operation_time ].max else @operation_time = new_operation_time end end
# File lib/mongo/session.rb, line 643 def cluster @client.cluster end
Commit the currently active transaction on the session.
@example Commits the transaction.
session.commit_transaction
@raise [ InvalidTransactionOperation ] If a transaction was just aborted and no new one was
started.
@since 2.6.0
# File lib/mongo/session.rb, line 521 def commit_transaction check_if_ended! check_if_no_transaction! if within_states?(TRANSACTION_ABORTED_STATE) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation.cannot_call_after_msg( :abortTransaction, :commitTransaction)) end begin # If commitTransaction is called twice, we need to run the same commit operation again, so # we revert the session to the previous state. if within_states?(TRANSACTION_COMMITTED_STATE) @state = @last_commit_skipped ? STARTING_TRANSACTION_STATE : TRANSACTION_IN_PROGRESS_STATE end if starting_transaction? @last_commit_skipped = true else @last_commit_skipped = false write_with_retry(self, txn_options[:write_concern], true) do |server, txn_num| Operation::Command.new( selector: { commitTransaction: 1 }, db_name: 'admin', session: self, txn_num: txn_num ).execute(server) end end rescue Mongo::Error::NoServerAvailable, Mongo::Error::SocketError => e e.send(:add_label, Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) raise e rescue Mongo::Error::OperationFailure => e err_doc = e.instance_variable_get(:@result).send(:first_document) if e.write_retryable? || (err_doc['writeConcernError'] && !UNLABELED_WRITE_CONCERN_CODES.include?(err_doc['writeConcernError']['code'])) e.send(:add_label, Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) end raise e ensure @state = TRANSACTION_COMMITTED_STATE end end
End this session.
@example
session.end_session
@return [ nil ] Always nil.
@since 2.5.0
# File lib/mongo/session.rb, line 156 def end_session if !ended? && @client abort_transaction if within_states?(TRANSACTION_IN_PROGRESS_STATE) rescue Mongo::Error @client.cluster.session_pool.checkin(@server_session) end ensure @server_session = nil end
Whether this session has ended.
@example
session.ended?
@return [ true, false ] Whether the session has ended.
@since 2.5.0
# File lib/mongo/session.rb, line 173 def ended? @server_session.nil? end
Is this session an explicit one (i.e. user-created).
@example Is the session explicit?
session.explicit?
@return [ true, false ] Whether this session is explicit.
@since 2.5.2
# File lib/mongo/session.rb, line 462 def explicit? @explicit ||= !implicit? end
Is this session an implicit one (not user-created).
@example Is the session implicit?
session.implicit?
@return [ true, false ] Whether this session is implicit.
@since 2.5.1
# File lib/mongo/session.rb, line 450 def implicit? @implicit ||= !!(@options.key?(:implicit) && @options[:implicit] == true) end
Whether or not the session is currently in a transaction.
@example Is the session in a transaction?
session.in_transaction?
@return [ true | false ] Whether or not the session in a transaction.
@since 2.6.0
# File lib/mongo/session.rb, line 621 def in_transaction? within_states?(STARTING_TRANSACTION_STATE, TRANSACTION_IN_PROGRESS_STATE) end
Get a formatted string for use in inspection.
@example Inspect the session object.
session.inspect
@return [ String ] The session inspection.
@since 2.5.0
# File lib/mongo/session.rb, line 144 def inspect "#<Mongo::Session:0x#{object_id} session_id=#{session_id} options=#{@options}>" end
Increment and return the next transaction number.
@example Get the next transaction number.
session.next_txn_num
@return [ Integer ] The next transaction number.
@since 2.5.0
# File lib/mongo/session.rb, line 426 def next_txn_num @server_session.next_txn_num if @server_session end
Process a response from the server that used this session.
@example Process a response from the server.
session.process(result)
@param [ Operation::Result
] result The result from the operation.
@return [ Operation::Result
] The result.
@since 2.5.0
# File lib/mongo/session.rb, line 346 def process(result) unless implicit? set_operation_time(result) set_cluster_time(result) end @server_session.set_last_use! result end
Will writes executed with this session be retried.
@example Will writes be retried.
session.retry_writes?
@return [ true, false ] If writes will be retried.
@note Retryable
writes are only available on server versions at least 3.6
and with sharded clusters or replica sets.
@since 2.5.0
# File lib/mongo/session.rb, line 402 def retry_writes? !!cluster.options[:retry_writes] && (cluster.replica_set? || cluster.sharded?) end
Get the session id.
@example Get the session id.
session.session_id
@return [ BSON::Document ] The session id.
@since 2.5.0
# File lib/mongo/session.rb, line 414 def session_id @server_session.session_id if @server_session end
Start a new transaction.
Note that the transaction will not be started on the server until an operation is performed after start_transaction
is called.
@example Start a new transaction
session.start_transaction(options)
@param [ Hash ] options The options for the transaction being started.
@option options [ Hash ] read_concern The read concern options hash, with the following
optional keys: - *:level* -- the read preference level as a symbol; valid values are *:local*, *:majority*, and *:snapshot* - *:after_cluster_time* -- the cluster time BSON::Document or hash specifying which cluster time reads should occur after
@option options [ Hash ] :write_concern The write concern options. Can be :w =>
Integer|String, :fsync => Boolean, :j => Boolean.
@option options [ Hash ] :read The read preference options. The hash may have the following
items: - *:mode* -- read preference specified as a symbol; the only valid value is *:primary*.
@raise [ InvalidTransactionOperation ] If a transaction is already in progress or if the write concern is unacknowledged.
@since 2.6.0
# File lib/mongo/session.rb, line 493 def start_transaction(options = nil) check_if_ended! if within_states?(STARTING_TRANSACTION_STATE, TRANSACTION_IN_PROGRESS_STATE) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation::TRANSACTION_ALREADY_IN_PROGRESS) end next_txn_num @txn_options = options || @options[:default_transaction_options] || {} if txn_write_concern && WriteConcern.send(:unacknowledged?, txn_write_concern) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation::UNACKNOWLEDGED_WRITE_CONCERN) end @state = STARTING_TRANSACTION_STATE end
Remove the read concern and/or write concern from the command if not applicable.
@example
session.suppress_read_write_concern!(cmd)
@return [ Hash, BSON::Document ] The command document.
@since 2.6.0
# File lib/mongo/session.rb, line 281 def suppress_read_write_concern!(command) command.tap do |c| next unless in_transaction? c.delete(:readConcern) unless starting_transaction? c.delete(:writeConcern) unless c[:commitTransaction] || c[:abortTransaction] end end
Get the current transaction number.
@example Get the current transaction number.
session.txn_num
@return [ Integer ] The current transaction number.
@since 2.6.0
# File lib/mongo/session.rb, line 438 def txn_num @server_session && @server_session.txn_num end
Get the read preference the session will use in the currently active transaction.
This is a driver style hash with underscore keys.
@example Get the transaction's read preference
session.txn_read_preference
@return [ Hash ] The read preference of the transaction.
@since 2.6.0
# File lib/mongo/session.rb, line 636 def txn_read_preference rp = txn_options && txn_options[:read_preference] || @client.read_preference Mongo::Lint.validate_underscore_read_preference(rp) rp end
Update the state of the session due to a (non-commit and non-abort) operation being run.
@since 2.6.0
# File lib/mongo/session.rb, line 309 def update_state! case @state when STARTING_TRANSACTION_STATE @state = TRANSACTION_IN_PROGRESS_STATE when TRANSACTION_COMMITTED_STATE, TRANSACTION_ABORTED_STATE @state = NO_TRANSACTION_STATE end end
Validate the session.
@example
session.validate!(cluster)
@param [ Cluster
] cluster The cluster the session is attempted to be used with.
@return [ nil ] nil if the session is valid.
@raise [ Mongo::Error::InvalidSession
] Raise error if the session is not valid.
@since 2.5.0
# File lib/mongo/session.rb, line 330 def validate!(cluster) check_matching_cluster!(cluster) check_if_ended! self end
Ensure that the read preference of a command primary.
@example
session.validate_read_preference!(command)
@raise [ Mongo::Error::InvalidTransactionOperation
] If the read preference of the command is not primary.
@since 2.6.0
# File lib/mongo/session.rb, line 299 def validate_read_preference!(command) return unless in_transaction? && non_primary_read_preference_mode?(command) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation::INVALID_READ_PREFERENCE) end
Private Instance Methods
# File lib/mongo/session.rb, line 688 def causal_consistency? @causal_consistency ||= (if @options.key?(:causal_consistency) @options[:causal_consistency] == true else true end) end
# File lib/mongo/session.rb, line 680 def causal_consistency_doc(read_concern) if operation_time && causal_consistency? (read_concern || {}).merge(:afterClusterTime => operation_time) else read_concern end end
# File lib/mongo/session.rb, line 712 def check_if_ended! raise Mongo::Error::InvalidSession.new(SESSION_ENDED_ERROR_MSG) if ended? end
# File lib/mongo/session.rb, line 657 def check_if_no_transaction! return unless within_states?(NO_TRANSACTION_STATE) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation::NO_TRANSACTION_STARTED) end
# File lib/mongo/session.rb, line 716 def check_matching_cluster!(cluster) if @client.cluster != cluster raise Mongo::Error::InvalidSession.new(MISMATCHED_CLUSTER_ERROR_MSG) end end
# File lib/mongo/session.rb, line 673 def non_primary_read_preference_mode?(command) return false unless command['$readPreference'] mode = command['$readPreference']['mode'] || command['$readPreference'][:mode] mode && mode != 'primary' end
# File lib/mongo/session.rb, line 702 def set_cluster_time(result) if cluster_time_doc = result.cluster_time if @cluster_time.nil? @cluster_time = cluster_time_doc elsif cluster_time_doc[Cluster::CLUSTER_TIME] > @cluster_time[Cluster::CLUSTER_TIME] @cluster_time = cluster_time_doc end end end
# File lib/mongo/session.rb, line 696 def set_operation_time(result) if result && result.operation_time @operation_time = result.operation_time end end
# File lib/mongo/session.rb, line 653 def starting_transaction? within_states?(STARTING_TRANSACTION_STATE) end
# File lib/mongo/session.rb, line 664 def txn_read_concern txn_options && txn_options[:read_concern] || @client.read_concern end
# File lib/mongo/session.rb, line 668 def txn_write_concern (txn_options && txn_options[:write_concern]) || (@client.write_concern && @client.write_concern.options) end
# File lib/mongo/session.rb, line 649 def within_states?(*states) states.include?(@state) end