ONLamp.com
oreilly.comSafari Books Online.Conferences.

advertisement


Developing High Performance Asynchronous IO Applications
Pages: 1, 2, 3

Implementing Flow Control with Event Loops

While we had experience with GUI event-based applications, that background wasn't very useful when it came to implementing flow control in our application. This is because in a typical GUI application, a user generated event (a mouse click, for example) usually either calls a blocking callback (during which the user loses the control over the application) or gets handled asynchronously (with user regaining the control immediately). Whereas in the asynchronous IO networking application, blocking is not acceptable and asynchronous operations may trigger more asynchronous operations, so implementing flow control is quite tricky.



Consider a simple task: read from a file handle and check whether a certain pattern matched the read data.

In the blocking IO synchronous flow this is easy:

  my $read = sysread $rfh, my $data, 16;
  die "failed to read: $!" unless defined $read;
  warn "got OK\n" if $read && $data =~ /OK/;

It's easy to see the sequence of operations here.

This sequence doesn't work in the async IO world, as it can't do anything that relies on the read data immediately after calling the async read operation, because asynchronous operations return before they even start. An asynchronous version of this code might be:

  my $ctx = { 
      data   => "",
      len    => 16,
      offset =>  0,
  }; # context
  read_then_run($rfh, \&check_OK, $ctx);
  sub check_OK { 
      my ($ctx) = shift;
      warn "got OK\n" if $ctx->{data} && $ctx->{data} =~ /OK/;
  }

This code encapsulates the data into a context to pass around the callbacks. The check_OK() function will executed when reading completes. Here's the read_then_run function:

  sub read_then_run {
      my ($rfh, $cb, $ctx) = @_;
  
      my $read = sysread $rfh, $ctx->{data}, $ctx->{len}, $ctx->{offset};
      die "failed to read: $!" unless defined $read;
  
      $cb->($ctx);      
  }

This is still a blocking synchronous operation.

Now assume that the $rfh file handle is non-blocking. Here's an async IO read equivalent:

  sub read_then_run {
      my ($rfh, $cb, $ctx) = @_;
  
      $ctx->{cb} = $cb;
      my $e = event_new($rfh, EV_READ, \&_handle_read_event, $ctx);
      $e->add($timeout);
  }

First, this creates a read event, passing it the callback to call when the read event is triggered in the context data. Then, the code adds it to the event loop using a predefined $timeout value. The code also needs to remember what callback to call when the read operation completes, so it stashes the check_OK() function reference into the context object.

Now when someone writes to the $rfh file handle and data is available for reading, the event loop will call the _handle_read_event callback:

  sub _handle_read_event {
      my ($e, $e_type, $ctx) = @_;
  
      die "timeout?" if $e_type == EV_TIMEOUT;
  
      my $read = sysread $e->fh, $ctx->{data}, $ctx->{len}, $ctx->{offset};
  
      # Error occurred or eof
      unless ($read) {
          if ($!{EAGAIN}) {
              $e->add($timeout);
              return;
          }
          else {
              die "failed to read: $!";
          }
      }
  
      if ($read < $ctx->{len}) { # under-read
          $ctx->{len} = $ctx->{len} - $read;
          $ctx->{offset} = $ctx->{offset} + $read;
          $e->add($timeout);
      }
      elsif ($read == $ctx->{len}) {
          $ctx->{cb}->($ctx);
      }
      else {
          die "huh?";
      }
  }

When the _handle_read_event callback is called, it first checks whether there was a timeout. The registered event handler gets triggered when there was data available to read or a timeout occurred.

If it wasn't a timeout, that means that there is data to read, so the handler attempts to read the data.

If sysread() returned undef, it indicates an error. If the system has requested a retry (with EAGAIN error), the function adds the same event to the event loop and returns. If some other error occurred, it dies.

If the read was fully successful, the very first check_OK() callback gets called, and the task is complete. If it has not read all of the requested data, the code re-adds the same event again and will continue until it has read all the data or either an error or an EOF occurs.

Similar tasks may include a readline and write operations, where the logic is pretty similar. There will be a similar async IO readline_then_run() and write_then_run() functions, which will internally handle read and write events and call the callback to run at the end of it.

When there is a callback that executes an async IO code internally, it usually returns immediately and the caller normally doesn't do anything after that, because the control must pass to that async IO handler.

Suppose you have a logical sequence of writing to a the file handle, then reading from the file handle and doing something on read completion:

  write($fh, ...);
  read_then_run( $fh, \&run_on_read_completed, ...);

These two functions can't appear one after each in the code. Instead there should be some notion of a callback stack, where code can push items to execute and pop them off when it's time to run them. To make the previous example work you must first create a stack:

  push run_on_read_completed (last to run)
  push run_read
  run run_write

When the sequence runs, it's clear that the last thing to run should be the run_on_read_completed() callback, and before it the read event should be created. Earlier the run_on_write_completed() should execute, and the very first task is to create the write event (or write directly). As each of the phases completes, the engine pops the next step from the stack and executes it, until the stack becomes empty.

The code may look like:

  write_then_read($ctx, \&run_on_read_completed, ...);
  
  sub write_then_read {
      my ($ctx, $cb, ...);
  
      push @{ $ctx->{cb} }, $cb; # pushes run_on_read_completed
  
      run_write($ctx, \&_then_read, ...);
  }
  
  sub run_write {
      my ($ctx, $cb, ...);
  
      push @{ $ctx->{cb} }, $cb; # pushes _then_read
  
      # create write event
      my $e = event_new($ctx->{fh}, EV_WRITE, \&_handle_write_event, $ctx);
      $e->add($timeout);
  }
  
  sub _handle_write_event {
      my ($e, $e_type, $ctx) = @_;
  
      # do all the async IO write logic, 
      # which may take more than one event
     
      if ($writing_completed) {
          my $cb = pop @{ $ctx->{cb} };
          $cb->($ctx); # calls _then_read
      }
  }
  
  sub _then_read {
      my ($ctx);
  
      my $e = event_new($rfh, EV_READ, \&_handle_read_event, $ctx);
      $e->add($timeout);
  }
  
  sub _handle_read_event {
      my ($e, $e_type, $ctx) = @_;
  
      # do all the async IO read logic, 
      # which may take more than one event
     
      if ($reading_completed) {
          my $cb = pop @{ $ctx->{cb} };
          $cb->($ctx); # calls run_on_read_completed
      }
  }

This example doesn't show any read/write operations, but marks those to make it easier to see how the callbacks get passed around. Read this code in the top-down fashion because that's how it executes.

As you can see, async IO flow requires writing lots of small functions that all call each other as callbacks. Pass a context object around to maintain state.

There could be many other similar sequences that you can abstract into wrappers. With help of those wrappers, implementing more complicated logic becomes much easier.

Pages: 1, 2, 3

Next Pagearrow





Sponsored by: