From d586881f81e74f7c22a2d11094f38634933a558a Mon Sep 17 00:00:00 2001 From: Guilhem Moulin Date: Mon, 14 Sep 2015 01:20:53 +0200 Subject: Pass literals by reference to save memory. --- interimap | 4 +- lib/Net/IMAP/InterIMAP.pm | 313 +++++++++++++++++++++++++++++----------------- 2 files changed, 198 insertions(+), 119 deletions(-) diff --git a/interimap b/interimap index 1535739..e7cd6d5 100755 --- a/interimap +++ b/interimap @@ -971,8 +971,8 @@ sub sync_known_messages($$) { sub callback_new_message($$$$;$$$) { my ($idx, $mailbox, $name, $mail, $UIDs, $buff, $bufflen) = @_; - my $length = defined $mail->{RFC822} ? length($mail->{RFC822}) - : defined $mail->{BINARY} ? length($mail->{BINARY}) + my $length = defined $mail->{RFC822} ? length(${$mail->{RFC822}}) + : defined $mail->{BINARY} ? length(${$mail->{BINARY}}) : return; # not for us if ($length == 0) { msg("$name($mailbox)", "WARNING: Ignoring new 0-length message (UID $mail->{UID})"); diff --git a/lib/Net/IMAP/InterIMAP.pm b/lib/Net/IMAP/InterIMAP.pm index c26d102..5cd0061 100644 --- a/lib/Net/IMAP/InterIMAP.pm +++ b/lib/Net/IMAP/InterIMAP.pm @@ -65,6 +65,7 @@ my %OPTIONS = ( # Use the same buffer size as Net::SSLeay::read(), to ensure there is # never any pending data left in the current TLS record my $BUFSIZE = 32768; +my $CRLF = "\x0D\x0A"; ############################################################################# # Utilities @@ -187,7 +188,9 @@ sub quote($) { return "\"$str\""; } else { - return "{".length($str)."}\r\n".$str; + # we'll later replace the non-synchronizing literal with a + # synchronizing one if need be + return "{".length($str)."+}$CRLF".$str; } } @@ -236,7 +239,8 @@ sub new($%) { # in/out buffer counts and output stream $self->{_INCOUNT} = $self->{_INRAWCOUNT} = 0; $self->{_OUTCOUNT} = $self->{_OUTRAWCOUNT} = 0; - $self->{_OUTBUF} = undef; + $self->{_OUTBUF} = $self->{_INBUF} = undef; + $self->{_LITPLUS} = ''; if ($self->{type} eq 'tunnel') { my $command = $self->{command} // $self->fail("Missing tunnel command"); @@ -286,6 +290,7 @@ sub new($%) { my $socket = IO::Socket::INET->new(%args) or $self->fail("Cannot bind: $@"); $socket->setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) or $self->fail("Can't setsockopt SO_KEEPALIVE: $!"); + $self->_start_ssl($socket) if $self->{type} eq 'imaps'; $self->{$_} = $socket for qw/STDOUT STDIN/; } @@ -776,21 +781,8 @@ sub append($$@) { return unless @_; $self->fail("Server did not advertise UIDPLUS (RFC 4315) capability.") unless $self->_capable('UIDPLUS'); - - my @appends; - foreach my $mail (@_) { - my $append = ''; - $append .= '('.join(' ', grep {lc $_ ne '\recent'} @{$mail->{FLAGS}}).') ' - if defined $mail->{FLAGS}; - $append .= '"'.$mail->{INTERNALDATE}.'" ' if defined $mail->{INTERNALDATE}; - my ($body, $t) = defined $mail->{RFC822} ? ($mail->{RFC822}, '') - : defined $mail->{BINARY} ? ($mail->{BINARY}, '~') - : $self->panic("Missing message body in APPEND"); - $append .= "$t\{".length($body)."\}\r\n".$body; - push @appends, $append; - } $self->fail("Server did not advertise MULTIAPPEND (RFC 3502) capability.") - unless $#appends == 0 or $self->_capable('MULTIAPPEND'); + unless $#_ == 0 or $self->_capable('MULTIAPPEND'); # dump the cache before issuing the command if we're appending to the current mailbox my ($UIDNEXT, $EXISTS, $cache, %vanished); @@ -801,7 +793,21 @@ sub append($$@) { %vanished = map {$_ => 1} @{$self->{_VANISHED}}; } - $self->_send('APPEND '.quote($mailbox).' '.join(' ',@appends)); + my $tag = $self->_cmd_init('APPEND '.quote($mailbox)); + foreach my $mail (@_) { + my $str = ' '; + $str .= '('.join(' ', grep {lc $_ ne '\recent'} @{$mail->{FLAGS}}).') ' if defined $mail->{FLAGS}; + $str .= '"'.$mail->{INTERNALDATE}.'" ' if defined $mail->{INTERNALDATE}; + my ($body, $t) = defined $mail->{RFC822} ? ($mail->{RFC822}, 0) + : defined $mail->{BINARY} ? ($mail->{BINARY}, 1) + : $self->panic("Missing message body in APPEND"); + $self->_cmd_extend(\$str); + $self->_cmd_extend_lit($body, $t); + } + + $self->_cmd_flush(); + $self->_recv($tag); + $IMAP_text =~ /\A\Q$IMAP_cond\E \[APPENDUID ([0-9]+) ([0-9:,]+)\] / or $self->panic($IMAP_text); my ($uidvalidity, $uidset) = ($1, $2); $self->_update_cache_for($mailbox, UIDVALIDITY => $uidvalidity); @@ -819,9 +825,8 @@ sub append($$@) { $self->panic($_); } } - $self->fail("$uidset contains ".scalar(@uids)." elements while " - .scalar(@appends)." messages were appended.") - unless $#uids == $#appends; + $self->fail("$uidset contains ".scalar(@uids)." elements while ".($#_+1)." messages were appended.") + unless $#uids == $#_; # if $mailbox is the current mailbox we need to update the cache if (defined $self->{_SELECTED} and $mailbox eq $self->{_SELECTED}) { @@ -829,12 +834,16 @@ sub append($$@) { my %vanished2 = map {$_ => 1} @{$self->{_VANISHED}}; delete $vanished2{$_} foreach keys %vanished; my $VANISHED = scalar(keys %vanished2); # number of messages VANISHED meanwhile - $cache->{EXISTS} += $#appends+1 if defined $cache->{EXISTS} and $cache->{EXISTS} + $VANISHED == $EXISTS; + $cache->{EXISTS} += $#_+1 if defined $cache->{EXISTS} and $cache->{EXISTS} + $VANISHED == $EXISTS; $cache->{UIDNEXT} = $UIDNEXT if ($cache->{UIDNEXT} // 1) < $UIDNEXT; } - $self->log("Added ".($#appends+1)." message(s) to $mailbox, got new UID ".compact_set(@uids)) - unless $self->{quiet}; + unless ($self->{quiet}) { + my $msg = "Added ".($#_+1)." message(s)"; + $msg .= " to $mailbox" unless defined $self->{_SELECTED} and $mailbox eq $self->{_SELECTED}; + $msg .= ", got new UID ".compact_set(@uids); + $self->log($msg); + } return @uids; } @@ -1190,8 +1199,9 @@ sub push_flag_updates($$@) { # $self->_ssl_error($error, [...]) # Log an SSL $error and exit with return value 1. -sub _ssl_error($$) { +sub _ssl_error($$@) { my $self = shift; + $self->fail(@_) unless defined $self->{_SSL}; $self->log('SSL ERROR: ', @_); if ($self->{debug}) { while (my $err = Net::SSLeay::ERR_get_error()) { @@ -1289,8 +1299,8 @@ sub _getline($;$) { $n = $stdout->sysread($buf, $BUFSIZE, 0); } - $self->panic("Can't read: $!") unless defined $n; - $self->fail("0 bytes read (got EOF)") unless $n > 0; # EOF + $self->_ssl_error("Can't read: $!") unless defined $n; + $self->_ssl_error("0 bytes read (got EOF)") unless $n > 0; # EOF $self->{_OUTRAWCOUNT} += $n; if (defined (my $i = $self->{_Z_INFLATE})) { @@ -1309,10 +1319,10 @@ sub _getline($;$) { $self->{_OUTBUF} = substr($self->{_OUTBUF}, $idx); $self->{_OUTCOUNT} += length($lit) + length($line); - $line =~ s/\r\n\z// or $self->panic($line); + $line =~ s/$CRLF\z// or $self->panic($line); $self->logger('S: '.(@lit ? '[...]' : ''), $line) if $self->{debug}; - return (wantarray ? ($lit, $line) : $line); + return (wantarray ? (\$lit, $line) : $line); } else { push @line, $self->{_OUTBUF}; @@ -1365,96 +1375,150 @@ sub _update_cache_for($$%) { } -# $self->_write(@data) -# Send the given @data to the IMAP server. -# Update the interal raw byte count, but the regular byte count must -# have been updated earlier (eg, by _send_cmd). -sub _write($$) { - my ($self, $data) = @_; - my ($stdin, $ssl) = @$self{qw/STDIN _SSL/}; - - my ($offset, $length) = (0, length($$data)); - while ($length > 0) { - my $written = defined $ssl ? - Net::SSLeay::write_partial($ssl, $offset, $length, $$data) : - $stdin->syswrite($$data, $length, $offset); - $offset += $written; - $length -= $written; - $self->{_INRAWCOUNT} += $written; - } +# $self->_cmd_init($command) +# Generate a new tag for the given $command, push both the +# concatenation to the command buffer. $command can be a scalar or a +# scalar reference. +# Use the _cmd_extend and/or _cmd_extend_lit methods to extend the +# command, and _cmd_flush to send it to the server. +sub _cmd_init($$) { + my $self = shift; + my $tag = sprintf '%06d', $self->{_TAG}++; + my $command = (defined $self->{_INBUF} ? $CRLF : '').$tag.' '.(ref $_[0] ? ${$_[0]} : $_[0]); + $self->_cmd_extend(\$command); + return $tag; } -# $self->_z_flush([$type]) -# Flush the deflation stream, and write the compressed data. -# This method is a noop if no compression layer is active. -sub _z_flush($$;$) { - my ($self, $buf, $t) = @_; - my $d = $self->{_Z_DEFLATE}; - $d->flush($buf, $t) == Z_OK or - $self->panic("Can't flush deflation stream: ", $d->msg()); +# $self->_cmd_extend($args) +# Append $args to the command buffer. $args can be a scalar or a +# scalar reference. If $args contains some literal(s) and the server +# doesn't support LITERAL+, flush the command and wait for an answer +# before each literal +sub _cmd_extend($$) { + my $self = shift; + my $args = ref $_[0] ? $_[0] : \$_[0]; + + if ($self->{_LITPLUS} ne '') { + # server supports LITERAL+: use $args as is + $self->_cmd_extend_($args); + } + else { + # server supports LITERAL+: flush the command before each + # literal + my ($offset, $litlen) = (0, 0); + while ( (my $idx = index($$args, "\n", $offset+$litlen)) >= 0 ) { + my $line = substr($$args, $offset, $idx+1-$offset); + $line =~ s/\{([0-9]+)\+\}$CRLF\z/{$1}$CRLF/ or $self->panic(); + $litlen = $1; + $self->_cmd_flush(\$line); + + my $x = $self->_getline(); + $x =~ /\A\+ / or $self->panic($x); + $offset = $idx+1; + } + my $line = substr($$args, $offset); + $self->_cmd_extend_(\$line); + } } -# $self->_send_cmd($tag, $command) -# Send the given $command to the IMAP server. -# If $command contains literals and the server supportes LITERAL+, -# non-synchronizing literals are sent instead. -# If a compression layer is active, $command is compressed before -# being send. -sub _send_cmd($) { - my ($self, $tag, $command) = @_; - my $litplus = $self->_capable('LITERAL+') ? 1 : 0; +# $self->_cmd_extend_lit($lit, [$lit8]) +# Append the literal $lit to the command buffer. $lit must be a +# scalar reference. If $lit8 is true, a literal8 is sent instead [RFC +# 3516]. +sub _cmd_extend_lit($$;$) { + my ($self, $lit, $lit8) = @_; + my $len = length($$lit); my $d = $self->{_Z_DEFLATE}; - my ($offset, $litlen) = (0, 0); - my $z_flush = 0; # whether to flush the dictionary after processing the next literal + # create a full flush point for long binary literals + my $z_flush = ($len > 4096 and !($self->{'use-binary'} // 1 and !$lit8)) ? 1 : 0; + $lit8 = $lit8 ? '~' : ''; # literal8, RFC 3516 BINARY - my $buf; - while(1) { - my $lit = substr($command, $offset, $litlen) if $litlen > 0; - $offset += $litlen; + my $strlen = $lit8.'{'.$len.$self->{_LITPLUS}.'}'.$CRLF; - my ($line, $z_flush2); - my $idx = index($command, "\n", $offset); - if ($idx < 0) { - $line = substr($command, $offset); + if ($self->{_LITPLUS} ne '') { + $self->_cmd_extend_(\$strlen); + if ($z_flush and defined $d) { + $d->flush(\$self->{_INBUF}, Z_FULL_FLUSH) == Z_OK + or $self->panic("Can't flush deflation stream: ", $d->msg()); } - else { - $line = substr($command, $offset, $idx-1-$offset); - $litlen = $litplus ? ($line =~ s/\{([0-9]+)\}\z/{$1+}/ ? $1 : $self->panic()) - : ($line =~ /\{([0-9]+)\}\z/ ? $1 : $self->panic()); - $z_flush2 = ($litlen > 4096 and # large literal - ($self->{'use-binary'} // 1 or $line =~ /~\{[0-9]+\}\z/) # literal8, RFC 3516 BINARY - ) ? 1 : 0; - } - $self->logger('C: ', ($offset == 0 ? "$tag " : '[...]'), $line) if $self->{debug}; + } + else { + # server doesn't supports LITERAL+ + $self->_cmd_flush(\$strlen, ($z_flush ? Z_FULL_FLUSH : ())); + my $x = $self->_getline(); + $x =~ /\A\+ / or $self->panic($x); + } - my @data = (($offset == 0 ? "$tag " : $lit), $line, "\r\n"); - $self->{_INCOUNT} += length($_) foreach @data; - if (!defined $d) { - $buf .= join '', @data; - } - else { - for (my $i = 0; $i <= $#data; $i++) { - $self->_z_flush(\$buf, Z_FULL_FLUSH) if $i == 0 and $z_flush; - $d->deflate($data[$i], \$buf) == Z_OK or $self->panic("Deflation failed: ", $d->msg()); - $self->_z_flush(\$buf, Z_FULL_FLUSH) if $i == 0 and $z_flush; - } - } + $self->_cmd_extend_($lit); + if ($z_flush and defined $d) { + $d->flush(\$self->{_INBUF}, Z_FULL_FLUSH) == Z_OK + or $self->panic("Can't flush deflation stream: ", $d->msg()); + } +} - if (!$litplus or $idx < 0) { - $self->_z_flush(\$buf, Z_SYNC_FLUSH) if defined $d; - $self->_write(\$buf); - undef $buf; - last if $idx < 0; - my $x = $self->_getline(); - $x =~ /\A\+ / or $self->panic($x); +# $self->_cmd_flush([$crlf], [$z_flush]) +# Append $crlf (default: $CRLF) to the command buffer, flush the +# deflation stream by creating a flush point of type $z_flush +# (default: Z_SYNC_FLUSH) if there is a compression layer, and finally +# send the command to the server. +sub _cmd_flush($;$$) { + my $self = shift; + $self->_cmd_extend_( $_[0] // \$CRLF ); + my $z_flush = $_[1] // Z_SYNC_FLUSH; # the flush point type to use + my ($stdin, $ssl) = @$self{qw/STDIN _SSL/}; + + if ($self->{debug}) { + # remove $CRLF and literals + my ($offset, $litlen) = (0, $self->{_INBUFDBGLEN} // 0); + while ( (my $idx = index($self->{_INBUFDBG}, "\n", $offset+$litlen)) >= 0) { + my $line = substr($self->{_INBUFDBG}, $offset+$litlen, $idx+1-$offset-$litlen); + $line =~ s/$CRLF\z// or $self->panic(); + $self->logger('C: ', ($litlen > 0) ? '[...]' : '', $line); + $litlen = $line =~ /\{([0-9]+)(\+)?\}\z/ ? $1 : 0; + $offset = $idx+1; } + $self->panic() if $offset+$litlen < length($self->{_INBUFDBG}); + undef $self->{_INBUFDBG}; + $self->{_INBUFDBGLEN} = $litlen; + } + + if (defined (my $d = $self->{_Z_DEFLATE})) { + $d->flush(\$self->{_INBUF}, $z_flush) == Z_OK + or $self->panic("Can't flush deflation stream: ", $d->msg()); + } + + my ($offset, $length) = (0, length($self->{_INBUF})); + while ($length > 0) { + my $written = defined $ssl ? + Net::SSLeay::write_partial($ssl, $offset, $length, $self->{_INBUF}) : + $stdin->syswrite($self->{_INBUF}, $length, $offset); + $self->_ssl_error("Can't write: $!") unless defined $written and $written > 0; + + $offset += $written; + $length -= $written; + $self->{_INRAWCOUNT} += $written; + } + undef $self->{_INBUF}; +} + - $z_flush = $z_flush2; - $offset = $idx+1; +# $self->_cmd_extend_($args) +# Append the scalar reference $args to the command buffer. Usually +# one should use the higher-level method _cmd_extend as it takes care +# of literals if the server doesn't support LITERAL+. +sub _cmd_extend_($$) { + my ($self, $args) = @_; + $self->{_INCOUNT} += length($$args); # count IMAP traffic + $self->{_INBUFDBG} .= $$args if $self->{debug}; + if (defined (my $d = $self->{_Z_DEFLATE})) { + $d->deflate($args, \$self->{_INBUF}) == Z_OK or $self->panic("Deflation failed: ", $d->msg()); + } + else { + $self->{_INBUF} .= $$args; } } @@ -1469,15 +1533,31 @@ sub _send_cmd($) { # In void context, croak unless the server answers with a tagged 'OK' # response. Otherwise, return the condition status ('OK'/'NO'/'BAD'). sub _send($$;&) { - my ($self, $command, $callback) = @_; - my $cmd = $command =~ /\AUID ($RE_ATOM_CHAR+) / ? $1 : $command =~ /\A($RE_ATOM_CHAR+) / ? $1 : $command; - my $set = $command =~ /\AUID (?:FETCH|STORE) ([0-9:,*]+)/ ? $1 : undef; + my $self = shift; + my $command = \$_[0]; + my $callback = $_[1]; - # send the command; for servers supporting non-synchronizing - # literals, mark literals as such and then the whole command in one - # go, otherwise send literals one at a time - my $tag = sprintf '%06d', $self->{_TAG}++; - $self->_send_cmd($tag, $command); + my $tag = $self->_cmd_init($command); + $self->_cmd_flush(); + + if (!defined $callback) { + $self->_recv($tag); + } + else { + my $cmd = $$command =~ /\AUID ($RE_ATOM_CHAR+) / ? $1 : $$command =~ /\A($RE_ATOM_CHAR+) / ? $1 : $$command; + my $set = $$command =~ /\AUID (?:FETCH|STORE) ([0-9:,*]+)/ ? $1 : undef; + $self->_recv($tag, $callback, $cmd, $set); + } +} + + +# $self->_recv($tag, [$callback, $command, $set]) +# Wait for a tagged response with the given $tag. The $callback, if +# provided, is used to process each untagged response. $command and +# $set can further limit the set of responses to apply the callback +# to. +sub _recv($$;$&$) { + my ($self, $tag, $callback, $cmd, $set) = @_; my $r; # wait for the answer @@ -1630,6 +1710,7 @@ sub _resp_text($$) { } elsif (/\A\[CAPABILITY((?: $RE_ATOM_CHAR+)+)\] $RE_TEXT_CHAR+\z/) { $self->{_CAPABILITIES} = [ split / /, ($1 =~ s/^ //r) ]; + $self->{_LITPLUS} = (grep { uc $_ eq 'LITERAL+' } @{$self->{_CAPABILITIES}}) ? '+' : ''; } elsif (/\A\[PERMANENTFLAGS \(((?:(?:\\?$RE_ATOM_CHAR+|\\\*)(?: (?:\\?$RE_ATOM_CHAR+|\\\*))*))\)\] $RE_TEXT_CHAR+\z/) { $self->_update_cache( PERMANENTFLAGS => [ split / /, $1 ] ); @@ -1690,7 +1771,7 @@ sub _string($$) { elsif ($$stream =~ s/\A\{([0-9]+)\}\z//) { # literal (my $lit, $$stream) = $self->_getline($1); - return $lit; + return $$lit; } else { $self->panic($$stream); @@ -1844,14 +1925,14 @@ sub _resp($$;$$$) { $mail{INTERNALDATE} = $1; } elsif (s/\A(?:RFC822|BODY\[\]) //) { - $mail{RFC822} = $self->_nstring(\$_); + $mail{RFC822} = \$self->_nstring(\$_); } elsif (s/\ABINARY\[\] //) { if (s/\A~\{([0-9]+)\}\z//) { # literal8, RFC 3516 BINARY (my $lit, $_) = $self->_getline($1); $mail{BINARY} = $lit; } else { - $mail{RFC822} = $self->_nstring(\$_); + $mail{RFC822} = \$self->_nstring(\$_); } } elsif (s/\AFLAGS \((\\?$RE_ATOM_CHAR+(?: \\?$RE_ATOM_CHAR+)*)?\)//) { @@ -1898,10 +1979,8 @@ sub _resp($$;$$$) { elsif (s/\A\+ //) { if (defined $callback and $cmd eq 'AUTHENTICATE') { my $x = $callback->($_); - $self->logger("C: ", $x) if $self->{debug}; - $x .= "\r\n"; - $self->{_INCOUNT} += length($x); - $self->_write($x); + $self->_cmd_extend(\$x); + $self->_cmd_flush(); } } else { -- cgit v1.2.3