diff --git a/lib/App/SD/ForeignReplica.pm b/lib/App/SD/ForeignReplica.pm index 1020ad6..2d8fba1 100644 --- a/lib/App/SD/ForeignReplica.pm +++ b/lib/App/SD/ForeignReplica.pm @@ -61,7 +61,8 @@ sub integrate_changeset { ); my $changeset = $args{'changeset'}; - return if $self->last_changeset_from_source( $changeset->original_source_uuid) >= $changeset->original_sequence_no; + return if $self->last_changeset_from_source( + $changeset->original_source_uuid) >= $changeset->original_sequence_no; $self->SUPER::integrate_changeset(%args); } @@ -83,7 +84,7 @@ sub integrate_change { my ( $change, $changeset ) = validate_pos( @_, { isa => 'Prophet::Change' }, - { isa => 'Prophet::ChangeSet' } + { isa => 'Prophet::ChangeSet' }, ); # don't push internal records @@ -97,7 +98,10 @@ sub integrate_change { =head2 record_pushed_transactions -Walk the set of transactions on the ticket whose id you've passed in, looking for updates by the 'current user' which happened after start_time and before now. Then mark those transactions as ones that originated in SD, so we don't accidentally push them later. +Walk the set of transactions on the ticket whose id you've passed in, looking +for updates by the 'current user' which happened after start_time and before +now. Then mark those transactions as ones that originated in SD, so we don't +accidentally push them later. =over @@ -114,38 +118,38 @@ Walk the set of transactions on the ticket whose id you've passed in, looking fo sub record_pushed_transactions { my $self = shift; my %args = validate( @_, - { ticket => 1, changeset => { isa => 'Prophet::ChangeSet' }, start_time => 1} ); - + { ticket => 1, + changeset => { isa => 'Prophet::ChangeSet' }, start_time => 1} ); my $earliest_valid_txn_date; # walk through every transaction on the ticket, starting with the latest - + for my $txn ( $self->get_txn_list_by_date($args{ticket}) ) { - # walk backwards through all transactions on the ticket we just updated - # Skip any transaction where the remote user isn't me, this might include any transaction - # RT created with a scrip on your behalf - + # Skip any transaction where the remote user isn't me, this might + # include any transaction RT created with a scrip on your behalf + next unless $txn->{creator} eq $self->foreign_username; - # get the completion time _after_ we do our next round trip to rt to try to make sure - # a bit of lag doesn't skew us to the wrong side of a 1s boundary - - + # get the completion time _after_ we do our next round trip to rt to + # try to make sure a bit of lag doesn't skew us to the wrong side of a + # 1s boundary if (!$earliest_valid_txn_date){ my $change_window = time() - $args{start_time}; - # skip any transaction created more than 5 seconds before the push started. - # I can't think of any reason that number shouldn't be 1, but clocks are fickle - $earliest_valid_txn_date = $txn->{created} - ($change_window + 5); - } + # skip any transaction created more than 5 seconds before the push + # started. I can't think of any reason that number shouldn't be 1, + # but clocks are fickle + $earliest_valid_txn_date = $txn->{created} - ($change_window + 5); + } last if $txn->{created} < $earliest_valid_txn_date; # if the transaction id is older than the id of the last changeset # we got from the original source of this changeset, we're done - last if $txn->{id} <= $self->app_handle->handle->last_changeset_from_source($args{changeset}->original_source_uuid); - + last if $txn->{id} <= $self->app_handle->handle->last_changeset_from_source( + $args{changeset}->original_source_uuid); + # if the transaction from RT is more recent than the most recent # transaction we got from the original source of the changeset # then we should record that we sent that transaction upstream @@ -157,11 +161,11 @@ sub record_pushed_transactions { ); } } - + =head2 record_pushed_transaction $foreign_transaction_id, $changeset -Record that this replica was the original source of $foreign_transaction_id +Record that this replica was the original source of $foreign_transaction_id (with changeset $changeset) =cut @@ -169,10 +173,13 @@ Record that this replica was the original source of $foreign_transaction_id sub record_pushed_transaction { my $self = shift; my %args = validate( @_, - { transaction => 1, changeset => { isa => 'Prophet::ChangeSet' }, record => 1 } ); + { transaction => 1, changeset => { isa => 'Prophet::ChangeSet' }, + record => 1 } ); - my $key = join('-', "foreign-txn-from" , $self->uuid , 'record' , $args{record} , 'txn' , $args{transaction} ); - my $value = join(':', $args{changeset}->original_source_uuid, $args{changeset}->original_sequence_no ); + my $key = join('-', "foreign-txn-from" , $self->uuid , + 'record' , $args{record} , 'txn' , $args{transaction} ); + my $value = join(':', $args{changeset}->original_source_uuid, + $args{changeset}->original_sequence_no ); $self->store_local_metadata($key => $value); @@ -193,7 +200,7 @@ once we've done a pull from the remote replica, we can safely expire all records of this type for the remote replica (they'll be obsolete) -We use this cache to avoid integrating changesets we've pushed to the +We use this cache to avoid integrating changesets we've pushed to the remote replica when doing a subsequent pull =cut @@ -225,52 +232,48 @@ sub traverse_changesets { ); next unless $continue; - } - - $args{callback}->( changeset => $changeset, after_integrate_changeset => sub { $self->record_last_changeset_from_replica( $changeset->original_source_uuid => $changeset->original_sequence_no ); - # We're treating each individual ticket in the foreign system as its own 'replica' - # because of that, we need to hint to the push side of the system what the most recent - # txn on each ticket it has. + # We're treating each individual ticket in the foreign system + # as its own 'replica' because of that, we need to hint to the + # push side of the system what the most recent txn on each + # ticket it has. my $previously_modified - = App::SD::Util::string_to_datetime( $self->upstream_last_modified_date || ''); - my $created_datetime = App::SD::Util::string_to_datetime( $changeset->created ); + = App::SD::Util::string_to_datetime( + $self->upstream_last_modified_date || ''); + my $created_datetime = App::SD::Util::string_to_datetime( + $changeset->created ); $self->record_upstream_last_modified_date( $changeset->created ) if ( ( $created_datetime ? $created_datetime->epoch : 0 ) > ( $previously_modified ? $previously_modified->epoch : 0 ) ); - } ); $args{reporting_callback}->($changeset) if ($args{reporting_callback}); - } - } sub _construct_changeset_index_entry { my $self = shift; my $changeset = shift; - return [ $changeset->sequence_no, $changeset->original_source_uuid, $changeset->original_sequence_no, $changeset->calculate_sha1]; - + return [ $changeset->sequence_no, $changeset->original_source_uuid, + $changeset->original_sequence_no, $changeset->calculate_sha1 ]; } sub remote_uri_path_for_id { die "your subclass needs to implement this to be able to ". "map a remote id to /ticket/id or soemthing"; - } =head2 uuid_for_remote_id $id -lookup the uuid for the remote record id. If we don't find it, +lookup the uuid for the remote record id. If we don't find it, construct it out of the remote url and the remote uri path for the record id; =cut @@ -286,7 +289,8 @@ sub _lookup_uuid_for_remote_id { my $self = shift; my ($id) = validate_pos( @_, 1 ); - return $self->fetch_local_metadata('local_uuid_for_'. $self->_url_based_uuid_for_remote_ticket_id( $id)); + return $self->fetch_local_metadata( + 'local_uuid_for_'. $self->_url_based_uuid_for_remote_ticket_id($id)); } sub _set_uuid_for_remote_id { @@ -306,14 +310,12 @@ sub _url_based_uuid_for_remote_ticket_id { $self->remote_url . $self->remote_uri_path_for_id( $id ) ); - } - -# This mapping table stores uuids for tickets we've synced from a remote database -# Basically, if we created the ticket to begin with, then we'll know its uuid -# if we pulled the ticket from the foreign replica then its uuid will be generated -# based on a UUID-from-ticket-url scheme +# This mapping table stores uuids for tickets we've synced from a remote +# database Basically, if we created the ticket to begin with, then we'll know +# its uuid if we pulled the ticket from the foreign replica then its uuid will +# be generated based on a UUID-from-ticket-url scheme sub remote_id_for_uuid { my ( $self, $uuid_or_luid ) = @_; @@ -351,7 +353,6 @@ sub _set_remote_id_for_uuid { ); $ticket->load( uuid => $args{'uuid'} ); $ticket->set_props( props => { $self->uuid.'-id' => $args{'remote_id'} } ); - } =head2 record_remote_id_for_pushed_record diff --git a/lib/App/SD/Replica/trac.pm b/lib/App/SD/Replica/trac.pm index 9a2b9af..14063b0 100644 --- a/lib/App/SD/Replica/trac.pm +++ b/lib/App/SD/Replica/trac.pm @@ -10,12 +10,10 @@ use constant scheme => 'trac'; use constant pull_encoder => 'App::SD::Replica::trac::PullEncoder'; use constant push_encoder => 'App::SD::Replica::trac::PushEncoder'; - -use Prophet::ChangeSet; - has trac => ( isa => 'Net::Trac::Connection', is => 'rw'); has remote_url => ( isa => 'Str', is => 'rw'); has query => ( isa => 'Maybe[Str]', is => 'rw'); + sub foreign_username { return shift->trac->user(@_) } sub BUILD { @@ -57,16 +55,18 @@ sub BUILD { $self->trac->ensure_logged_in; } - - sub get_txn_list_by_date { my $self = shift; my $ticket = shift; my $ticket_obj = Net::Trac::Ticket->new( connection => $self->trac); $ticket_obj->load($ticket); - - my @txns = map { { id => $_->date->epoch, creator => $_->author, created => $_->date->epoch } } sort {$b->date <=> $a->date } @{$ticket_obj->history->entries}; + + my @txns = map { + { id => $_->date->epoch, creator => $_->author, + created => $_->date->epoch } + } sort {$b->date <=> $a->date } @{$ticket_obj->history->entries}; + return @txns; } @@ -119,7 +119,6 @@ sub database_settings { }; } - __PACKAGE__->meta->make_immutable; no Any::Moose; diff --git a/lib/App/SD/Replica/trac/PullEncoder.pm b/lib/App/SD/Replica/trac/PullEncoder.pm index 5d2aed8..c0fcb70 100644 --- a/lib/App/SD/Replica/trac/PullEncoder.pm +++ b/lib/App/SD/Replica/trac/PullEncoder.pm @@ -5,6 +5,9 @@ extends 'App::SD::ForeignReplica::PullEncoder'; use Params::Validate qw(:all); use Memoize; +use Prophet::ChangeSet; +use Prophet::Change; + has sync_source => ( isa => 'App::SD::Replica::trac', is => 'rw', @@ -26,7 +29,7 @@ sub ticket_last_modified { sub translate_ticket_state { my $self = shift; my $ticket_object = shift; - my $transactions = shift; + my $transactions = shift; my $content = $ticket_object->description; my $ticket_data = { @@ -47,9 +50,6 @@ sub translate_ticket_state { cc => ( $ticket_object->cc || undef ), }; - - - # delete undefined and empty fields delete $ticket_data->{$_} for grep !defined $ticket_data->{$_} || $ticket_data->{$_} eq '', keys %$ticket_data; @@ -66,27 +66,29 @@ Returns a array of all tickets found matching your QUERY hash. sub find_matching_tickets { my $self = shift; my %query = (@_); - my $last_changeset_seen_dt = $self->_only_pull_tickets_modified_after(); + my $last_changeset_seen_dt = $self->_only_pull_tickets_modified_after(); $self->sync_source->log("Searching for tickets"); - my $search = Net::Trac::TicketSearch->new( connection => $self->sync_source->trac, limit => 9999 ); + my $search = Net::Trac::TicketSearch->new( + connection => $self->sync_source->trac, limit => 9999 ); $search->query(%query); my @results = @{$search->results}; $self->sync_source->log("Trimming things after our last pull"); if ($last_changeset_seen_dt) { # >= is wasteful but may catch race conditions - @results = grep {$_->last_modified >= $last_changeset_seen_dt} @results; + @results = grep {$_->last_modified >= $last_changeset_seen_dt} @results; } return \@results; } =head2 find_matching_transactions { ticket => $id, starting_transaction => $num } -Returns a reference to an array of all transactions (as hashes) on ticket $id after transaction $num. +Returns a reference to an array of all transactions (as hashes) on ticket $id +after transaction $num. =cut -sub find_matching_transactions { +sub find_matching_transactions { my $self = shift; my %args = validate( @_, { ticket => 1, starting_transaction => 1 } ); my @raw_txns = @{$args{ticket}->history->entries}; @@ -139,10 +141,10 @@ sub transcode_create_txn { my $create_data = shift; my $final_data = shift; my $ticket = $txn->ticket; - # this sequence_no only works because trac tickets only allow one update - # per ticket per second. - # we decrement by 1 on the off chance that someone created and - # updated the ticket in the first second + # this sequence_no only works because trac tickets only allow one update + # per ticket per second. + # we decrement by 1 on the off chance that someone created and + # updated the ticket in the first second my $changeset = Prophet::ChangeSet->new( { original_source_uuid => $self->sync_source->uuid_for_remote_id( $ticket->id ), original_sequence_no => ( $ticket->created->epoch-1), diff --git a/lib/App/SD/Replica/trac/PushEncoder.pm b/lib/App/SD/Replica/trac/PushEncoder.pm index b8d2a9b..9bb5187 100644 --- a/lib/App/SD/Replica/trac/PushEncoder.pm +++ b/lib/App/SD/Replica/trac/PushEncoder.pm @@ -1,16 +1,16 @@ package App::SD::Replica::trac::PushEncoder; -use Any::Moose; +use Any::Moose; use Params::Validate; use Time::HiRes qw/usleep/; -has sync_source => + +has sync_source => ( isa => 'App::SD::Replica::trac', is => 'rw'); extends 'App::SD::ForeignReplica::PushEncoder'; - sub after_integrate_change { - usleep(1100); # trac only accepts one ticket update per second. Yes. + usleep(1100); # trac only accepts one ticket update per second. Yes. } sub integrate_ticket_update { @@ -25,8 +25,10 @@ sub integrate_ticket_update { my $remote_ticket_id = $self->sync_source->remote_id_for_uuid( $change->record_uuid ); my $ticket = Net::Trac::Ticket->new( connection => $self->sync_source->trac); - $ticket->load($remote_ticket_id); - $ticket->update( %{ $self->_recode_props_for_integrate($change) } ); + $ticket->load($remote_ticket_id) or + die "couldn't load remote track ticket $remote_ticket_id\n"; + $ticket->update( %{ $self->_recode_props_for_integrate($change) } ) or + die "couldn't update remote track ticket $remote_ticket_id\n"; return $remote_ticket_id; } @@ -48,7 +50,8 @@ sub integrate_ticket_create { sub integrate_comment { my $self = shift; - my ($change, $changeset) = validate_pos( @_, { isa => 'Prophet::Change' }, {isa => 'Prophet::ChangeSet'} ); + my ($change, $changeset) = validate_pos( @_, + { isa => 'Prophet::Change' }, {isa => 'Prophet::ChangeSet'} ); # Figure out the remote site's ticket ID for this change's record @@ -59,11 +62,13 @@ sub integrate_comment { $ticket->load($ticket_id); $ticket->comment( $props{content}); return $ticket_id; -} +} sub integrate_attachment { - my ($self, $change, $changeset ) = validate_pos( @_, { isa => 'App::SD::Replica::trac::PushEncoder'}, { isa => 'Prophet::Change' }, { isa => 'Prophet::ChangeSet' }); - + my ($self, $change, $changeset ) = validate_pos( @_, + { isa => 'App::SD::Replica::trac::PushEncoder'}, + { isa => 'Prophet::Change' }, + { isa => 'Prophet::ChangeSet' }); my %props = map { $_->name => $_->new_value } $change->prop_changes;