A Basic Pipeline

To get us started, we will first examine a basic pipeline example as is illustrated in the figure below.

In this model, the three worker agents form a pipeline. The Generator agent receives the request. When it has completed its task, it immediately contacts the Assembler agent, which is the next agent in the pipeline. The assembler performs its task, and then immediately contacts the Printer agent, which is the third agent in the pipeline. When the printer agent completes its task, it forwards the result on to the agent that made the initial request (here marked as the Requestor).

In the example implementation below, we will use various approaches to implement these behaviours. We have not attempted to produce an “optimal” solution, instead, we use this example to demonstrate these approaches.

Lets have a look at the code for each of the pipeline agents.

Generator

First, lets look at the generator agent. This is the agent that is responsible for converting ASCII codes into characters.

  package assembly.pipeline;
  
  import assembly.LookupLibrary;
  
  agent Generator {
      module LookupLibrary library;
      module System system;
      module Prelude prelude;

      rule @message(request, string F, lookup( int Id, list Codes)) {
          send (agree, F, lookup( Id, Codes) ); 
          try {
              list Letters = [];
              forall (int val : Codes) {
                  if ((32 <= val) & (val <= 126)) {
                      Letters = Letters + [library.toCharacter(val)];
                  } else {
                      system.fail();
                  }
              }
      	
              !requestAssemble(Id, Letters, F);
              send( inform, F, codes( Id, Letters ) );
          } recover {
              send( failure, F, lookup( Id, Codes ) );
          }
      }
 	
      rule +!requestAssemble(int Id, list Letters, string Manager) {
          int Ind = 0;
          forall (char Letter : Letters) {
             +assemble( Id, Ind, Letter );
             Ind = Ind + 1;	      
          }
      
          list assemblers = system.getAgentsOfType( "assembly.pipeline.Assembler" );
          send( request, prelude.valueAsString(assemblers, 0), assemble( Id, Manager ) );
      }
  
      rule @message( agree, string From, assemble( int Id ) ) {
          +assembling( From, Id ); 
          foreach( assemble( Id, int Index, char Letter ) ) {
              send( inform, From, character( Id, Index, Letter ) );
          }
          send( inform, From, end( Id ) );
      }
      
      rule @message( inform, string From, assembled( int Id, string String ) ) {
          foreach( assemble( Id, int Index, char Letter ) ) {
              -assemble(Id, Index, Letter);
          }
      }
  }

The actual behaviour of the Generator agent is encoded in the first for the three rules. This behaviour implements the Participant role of the FIPA Request Protocol. Here, the Generator receives a request from another agent to start the process. The Generator agrees to this, and performs the lookup. While converting the ASCII codes to characters, the Generator agent checks that each value is in the correct range and performs the conversion. If a character is not in the correct range, the intention is failed, which is caught by the try…recover statement, resulting in the requesting agent being informed of the failure.

The remaining three rules actually have nothing to do with the basic behaviour of the Generator. Instead, they are related to the Assembler agent. Specifically, the assembler uses a customised FIPA Request Protocol in which the individual characters are streamed to it using inform statements, followed by an end of stream message. This streaming starts once the Assembler has agreed to perform the assembly operation. The second rule implements the start of this behaviour by implementing the Initiator role of the FIPA Request protocol. The third rule then implements the streaming part of the behaviour. Finally, the fourth rule handles the “request completed” message that is sent by the assembler when it has finished its task. This message is used to remove the beliefs that were created at the start of the assembly behaviour.

Assembler Agent

The Assembler agent implements the second step in the pipeline. It is responsible for combining the individual characters into a single string. To achieve this, the Assembler requires that the agent that initiates the behaviour stream the characters that make up the string to it, followed by an end of stream message that it will use to trigger the actual assembly of the string.

  package assembly.pipeline;
  
  agent Assembler {
      module Prelude prelude;
      module System system;

      rule @message(request, string F, assemble(int Id, string R)) {
          +assembling( Id, F, Manager );
          send( agree, F, assemble( Id ) );
      }

      rule @message(inform, string F, character(int Id,int I,char L)) : assembling( Id, F, string R ) {
          +assemble( Id, I, L );
      }
  
      rule @message( inform, string From, end( int Id ) ) : assembling( Id, From, string R ) {
          !assemble( Id, 0, "" );
      }

      rule +!assemble( int Id, int Index, string String ) : assembling( Id, string F, string R ) {
          if ( assemble( Id, Index, char Letter ) ) {
              !assemble( Id, Index + 1, String + Letter );
          } else {
              +done( Id, String );     
          }
      }
	
      rule +done(int Id, string String) : assembling(Id, string F, string R) {
          list printers = system.getAgentsOfType( "assembly.pipeline.Printer" );
          send( request, prelude.valueAsString(printers, 0), print( Id, String, R) );

          send( inform, F, assembled( Id, String ) );
      }
      
      rule @message(inform, string F, printed(int Id)) : assembling(Id, string S, string R) {
          -assembling(Id, S, R);
          -done(Id, S);
          foreach (assemble(Id, int I, char L)) {
              -assemble(I, L);
          }
      }
  }

For this agent, the core behaviour is spread over the first four rules. The first rule receives the request to perform the assembly operation. The second rule receives the streamed characters. The third rule receives the end of stream message and triggers the fourth rule, which recursively constructs the string from the information passed via the second rule. The final step of this fourth rule is generate a belief indicating that the string has been constructed. This belief (at least the belief adoption event relating to this belief) triggers the fifth rule, which sends messages to the Generator (to indicate that the assembly has been completed) and to the Printer agent which performs the final task. Finally, the sixth rule handles the message sent by the Printer agent when the printing task is completed. This message is used to clear up any beliefs that were generated during the assembly of the string.

The Printer Agent

The final agent in the pipeline is the printer agent. This agent is responsible for printing out the string and informing the agent that made the initial request that the task is complete.

  package assembly.pipeline;
  
  agent Printer {
      module Console console;
  
      initial manager("m_agent");

      rule @message(request, string F, print(int Id, string S, string R)) {
          send (agree, F, print(Id, S));
          console.println("Result of job "+Id+" is:[" + S + "]");
          send( inform, F, printed( Id ) );
          send (inform, R, printed( Id ));
      }
  }

This agent again implements the Participant role of the FIPA Request protocol. When it receives the request to print the string, it agrees to do it; prints out the string; and then tells the Assembler that it has completed its task. It also informs the agent that initially requested the task that the string has been printed.

Running the Code

To run this code, we need a class with a main rule that creates the pipeline and sends the initial request:

  package assembly.pipeline;
  
  agent Launcher {
      module System system;
      module Prelude prelude;
      module Console console;

      initial needCode( 1, [ 104, 101, 108, 108, 111 ] );
  
      rule +!main(list args) {
          system.createAgent("p_agent", "assembly.pipeline.Printer" );
          system.createAgent("g_agent", "assembly.pipeline.Generator" );
          system.createAgent("a_agent", "assembly.pipeline.Assembler");
          !requestLookup( 1 );		
      }
  
      rule +!requestLookup( int Id ) : needCode( Id, list Codes ) {
          list generators = system.getAgentsOfType( "assembly.pipeline.PipelineGenerator" );
          send( request, prelude.valueAsString(generators, 0), lookup( Id, Codes ) );
      }
  
      rule @message(inform, string F, printed( int Id )) : "assembly.pipeline.Printer" == system.getType(F) {
          console.println("Job Complete: [" + Id + "]");
          system.exit();
      }
  }

Comments

The above solution is interesting because of the diversity in the approach. It would be quite easy to create a simpler and more efficient solution, but this would make the solution less interesting.

The main reason for creating this example is to try to look at how an agent program changes as the organisational structure changes. This is interesting as it will help to illustrate how to create more reusable code.

Its clear from this example that the logic for performing the tasks is interwoven into the logic for coordinating the task (the flow of control between the agents). Also, it is clear that, in the case of the Assembler, the behaviour itself is split over two agent classes (the Generator and the Assembler) or is it?

The question has to be asked as to – what actually is the core behaviour of this agent, and what input does it require. In short, the core behaviour is the assembly of characters into a string. This input to this is a set of beliefs about those characters and the order in which they appear. Just to be clear – the input is not a set of messages. The messages are simply a way of transferring the information required for the behaviour to the agent (and the end of stream message is simply a way of indicating that the data has been transferred).

If we adopt this view of the system behaviours, then we can modify our solution to reflect this. The next example attempts to do this.

Insert: The LookupLibrary code

For reference, here is the LookupLibrary API.

  package assembly;
  
  import astra.core.Module;
  
  public class LookupLibrary extends Module {
      @TERM
      public char toCharacter(Integer val) {
          return (char) val.intValue();
      }
  }