Revised Pipeline Code

We now try to reconstruct our basic pipeline model using the generalised roles presented above. We will need to give these agents different names, so lets prefix each agent class with the word Pipeline. What we will see here is that the code that was not part of the behaviours will simply move into the new classes. The benefit of this is that we have now separated out the behaviour logic from the coordination logic. This means we can test the behaviour and the use it in the knowledge that it is correct.

The PipelineGenerator

  package assembly.pipeline;
  
  agent PipelineGenerator extends Generator {
      module System system;
      module Prelude prelude;
  
      rule @message(request, string F, lookup( int Id, list Codes)) {
          send (agree, F, lookup( Id, Codes) );
          try {
              !generateCharacters(Codes, list Letters);
              send( inform, F, codes( Id, Letters ) );
  
              int Ind = 0;
              forall (char Letter : Letters) {
                  +assemble( Id, Ind, Letter );
                  Ind = Ind + 1;	      
              }
              list assemblers = system.getAgentsOfType( "assembly.pipeline.PipelineAssembler" );
              send( request, prelude.valueAsString(assemblers, 0), assemble( Id, F ) );
          } recover {
              send( failure, F, lookup( Id, Codes ) );
          }
      }
  
      rule @message( agree, string From, assemble( int Id ) ) {
          +assembling( From, Id ); 
          foreach( assemble( Id, int Index, char Letter ) ) {
              send( inform, From, character( Id, Index, Letter ) );
          }
          send( inform, From, end( Id ) );
      }

      rule @message( inform, string From, assembled( int Id, string String ) ) {
          foreach( assemble( Id, int Index, char Letter ) ) {
              -assemble(Id, Index, Letter);
          }
      }
  }

So, this program basically contains everything that was left after we removed the Generator behaviour from the original generator agent. That said, we have been able to remove one of the rules – for the !requestAssemble(…) goal by moving it into the first rule. We could have also done this in the initial program, but we didn’t because the code became unnecessarily complex.

The PipelineAssembler

Again, all that we expect to happen here is that the previous code for the Assembler will remain the same, minus the rule that actually implements the behaviour.

  package assembly.pipeline;
  
  agent PipelineAssembler extends Assembler {
      module Prelude prelude;
      module System system;

      rule @message(request, string F, assemble(int Id, string R)) {
          +assembling( Id, F, Manager );
          send( agree, F, assemble( Id ) );
      }

      rule @message(inform, string F, character(int Id,int I,char L)) : assembling( Id, F, string R ) {
          +assemble( Id, I, L );
      }
  
      rule @message( inform, string From, end( int Id ) ) : assembling( Id, From, string R ) {
          !assemble( Id, 0, "" );
      }
		
      rule +done(int Id, string String) : assembling(Id, string F, string R) {
          list printers = system.getAgentsOfType( "assembly.pipeline.PipelinePrinter" );
          send( request, prelude.valueAsString(printers, 0), print( Id, String, R) );

          send( inform, F, assembled( Id, String ) );
      }
      
      rule @message(inform, string F, printed(int Id)) : assembling(Id, string S, string R) {
          -assembling(Id, S, R);
          -done(Id, S);
          foreach (assemble(Id, int I, char L)) {
              -assemble(I, L);
          }
      }
  }

Again, we now have a clear separation between the coordination logic and the core behaviour.

The Pipeline Printer

Last, but not least, we have the printer behaviour:

  package assembly.pipeline;
  
  agent PipelinePrinter extends Printer {
      rule @message(request, string F, print(int Id, string S, string R)) {
          send (agree, F, print(Id, S));
          !printString(Id, S);
          send( inform, F, printed( Id ) );
          send (inform, R, printed( Id ));
      }
  }

Running the Code

To run this new version of the basic pipeline, all we need to do is to change the agents that are created by the Launcher class:

  package assembly.pipeline;
  
  agent Launcher {
      module System system;
      module Prelude prelude;
      module Console console;

      initial needCode( 1, [ 104, 101, 108, 108, 111 ] );
  
      rule +!main(list args) {
          system.createAgent("p_agent", "assembly.pipeline.PipelinePrinter" );
          system.createAgent("g_agent", "assembly.pipeline.PipelineGenerator" );
          system.createAgent("a_agent", "assembly.pipeline.PipelineAssembler");
          !requestLookup( 1 );		
      }
  
      rule +!requestLookup( int Id ) : needCode( Id, list Codes ) {
          list generators = system.getAgentsOfType( "assembly.pipeline.PipelineGenerator" );
          send( request, prelude.valueAsString(generators, 0), lookup( Id, Codes ) );
      }
  
      rule @message(inform, string F, printed( int Id )) : "assembly.pipeline.Printer" == system.getType(F) {
          console.println("Job Complete: [" + Id + "]");
          system.exit();
      }
  }

A Managed Pipeline

As a next step, lets explore how we can change the organisational structure of the pipeline from the basic structure above to a managed structure, such as the one outlined in the diagram below.

This organisational structure is more complex, but it offers greater flexibility. Specifically, it decouples the agents performing the behaviours from each other. This makes it possible to, for example, have multiple generators, assemblers and printers (this may be necessary for systems where a single agent is not able to complete all the tasks alone. To implement this model, we need to remove the direct lines of communication between the worker agents. This can be done by adapting the basic pipeline agents to move the code that invokes the next task into a new manager agent. Lets start with the new manager and then look at the revised worker agents. Specifically, we will create three new worker agents: ManagedGenerator, ManagedAssembler, and ManagerPrinter.

The Manager Agent

So, the Manager is the new agent in the system, and its job is to coordinate the process flow between the workers. Much of the code for this has already been written – it is part of the PipelineXXX agents. The code below is the full Manager implementation:

  package assembly.hierarchy;
  
  agent Manager {
      module System system;
      module Prelude prelude;
  
      rule @message(request, string F, print( int Id, list Codes ) ) {
          +processing(Id, F);
	
          list generators = system.getAgentsOfType( "assembly.hierarchy.ManagedGenerator" );
          send( request, prelude.valueAsString(generators, 0), lookup( Id, Codes ) );
      }

      rule @message(inform, string F, codes(int Id, list Letters ) ) {
          int Ind = 0;
          forall (char Letter : Letters) {
              +assemble( Id, Ind, Letter );
              Ind = Ind + 1;	      
          }
      
          list assemblers = system.getAgentsOfType( "assembly.hierarchy.ManagedAssembler" );
          send( request, prelude.valueAsString(assemblers, 0), assemble( Id ) );
      }

      rule @message( agree, string From, assemble( int Id ) ) {
          +assembling( From, Id ); 
          foreach( assemble( Id, int Index, char Letter ) ) {
              send( inform, From, character( Id, Index, Letter ) );
          }
          send( inform, From, end( Id ) );
      }

      rule @message( inform, string From, assembled( int Id, string String ) ) {
          foreach( assemble( Id, int Index, char Letter ) ) {
              -assemble(Id, Index, Letter);
          }
	
          list printers = system.getAgentsOfType( "assembly.hierarchy.ManagedPrinter" );
          send( request, prelude.valueAsString(printers, 0), print( Id, String) );
      }
  
      rule @message(inform, string F, printed(int Id)) : processing(Id, string R) {
          send(inform, R, printed(Id));
      }
  }

The main differences between this code and the previous solution is the events that are now used to link the workers. In the previous example, the completion of a task would drive the passing of that task to the next agent int the pipeline. Here, it is the notification of the completion of each task that drives the behaviour. It is interesting to note that much of the code is the same, and in fact the protocols used are exactly the same, only in this solution the protocols are between different agents. Unfortunately, because the protocols are embedded within the worker implementations, this has meant that we must copy and paste…

The Managed Generator

Without the need to entwine the control flow into this agent, the result is a simple agent class:

  package assembly.hierarchy;
  
  agent ManagedGenerator extends assembly.pipeline.Generator {
      rule @message(request, string F, lookup( int Id, list Codes ) ) {
          send (agree, F, lookup( Id, Codes) );
          try {
              !generateCharacters(Codes, list Letters);
              send( inform, F, codes( Id, Letters ) );
          } recover {
              send( failure, F, lookup( Id, Codes ) );
          }
      }
  }

This class simply implements the Participant role in the FIPA Request Protocol.

The Managed Assembler

The Assembler code is still more complex, but this reflects the complex protocol that was used to implement the assembly behaviour. The code for this agent is below:

  package assembly.hierarchy;
  
  agent ManagedAssembler 
          extends assembly.pipeline.Assembler {
      rule @message(request, string F,
              assemble(int Id)) {
          +assembling( Id, F );
          send( agree, F, assemble( Id ) );
      }

      rule @message(inform, string F, character(int Id,int I,char L)) : assembling( Id, F ) {
          +assemble( Id, I, L );
      }
  
      rule @message( inform, string From, 
                     end( int Id ) ) : 
              assembling( Id, From  ) {
          !assemble( Id, 0, "" );
      }

      rule +done(int Id, string String) :
              assembling(Id, string F ) {
          -assembling(Id, F);
          -done(Id, String);
          foreach (assemble(Id, int I, char L)) {
              -assemble(Id, I, L);
          }
          send( inform, F, assembled( Id, String ) );
      }
  }

Again, as with the Generator, the code is again simplified because the control flow is now handled by the Manager.

The Managed Printer

Again, very simple implementation…

  package assembly.hierarchy;
  
  agent ManagedPrinter
          extends assembly.pipeline.Printer {
      rule @message(request, string F,
                    print(int Id, string S)) {
          send (agree, F, print(Id, S));
          !printString(Id, S);
          send( inform, F, printed( Id ) );
      }
  }

Running the code

Finally, we need a modified Launcher to run the system.

  package assembly.hierarchy;
  
  agent Launcher {
      module System system;
      module Prelude prelude;
      module Console console;

      initial
          needCode( 1, [ 104, 101, 108, 108, 111 ] );

      rule +!main(list args) {
          system.createAgent("a_agent", 
               "assembly.hierarchy.ManagedAssembler");
          system.createAgent("g_agent", 
               "assembly.hierarchy.ManagedGenerator");
          system.createAgent("p_agent", 
               "assembly.hierarchy.ManagedPrinter");
          system.createAgent("m_agent",
              "assembly.hierarchy.Manager");

          !requestLookup( 1 );		
      }

      rule +!requestLookup( int Id ) : 
              needCode( Id, list Codes ) {
          list managers =
              system.getAgentsOfType(
                  "assembly.hierarchy.Manager" );
          send( request, 
                prelude.valueAsString(managers, 0),
                print( Id, Codes ) );
      }

      rule @message(inform, string F,
                    printed( int Id )) {
          console.println("Job Complete: [" + 
                          Id + "]");
          system.exit();
      }
  }

This code is pretty much identical to the last Launcher, with the exception that different agents are created, and the initial request goes to the manager instead of the Generator.

Final Remarks

While the two designs are remarkably similar, the protocols are, unfortunately, different. The Pipeline protocols include the name of the original requestor, while the Managed protocols do not. That said, it would be quite possible to further reduce the amount of duplication, by, for example, moving the shared part of the Assembler protocol into a separate class, such as a AssemblerCoordinator class.

If we were to explore further to look at, for example, introducing a broker, or using a more complex organisational structure like a Network or Market pattern, then much of the code from the Managed solution could be reused.