Wednesday, October 30, 2013

Distributed Computing, Socket API Prime Number identification from a list of numbers

In the previous example, we looked into a connection less java Socket API Program. Now, assume that there are 4 nodes in the distributed system and 100 numbers given as input are to be processed (for example, checking if a given number is a prime or not). Now we need a master node to accept this input and proceed with assigning of the work and again displaying the final output. For simplicity, I am giving input as the first 100 natural numbers.
The Setup is assumed to consist of one 5 nodes. One master and others as slaves.
Code: Master.java
import java.net.*;
import java.io.*;
/*
*   ************* SOCKET PROGRAMMING (Master) **************
*Author    : @NaveenKumar
*Program   :To print prime numbers in [1,100]
*Date      : 29/10/2013
*File Name : Master.java
*/
public class Master
{
     public static void main(String args[])
     {
           int num;
           try{
                InetAddress[] receiverHost = {
                             InetAddress.getByName("127.0.0.1"),
                             InetAddress.getByName("127.0.0.1"),
                             InetAddress.getByName("127.0.0.1"),
                             InetAddress.getByName("127.0.0.1")};
                int[] receiverPort = {3000,3010,3020,3030};
                String message;
             //instantiate a datagram socket for sending the data
                DatagramSocket s = new DatagramSocket();
                DatagramSocket s1 = new DatagramSocket(3500);
                byte[] buffer;
                for(int i=1;i<=100;i++)
                {
                     message = Integer.toString(i);
                     buffer = message.getBytes();
                     DatagramPacket p = new             
                     DatagramPacket(buffer,buffer.length,
                            receiverHost[i%4],receiverPort[i%4]);
                     s.send(p);
                }
                s.close();
                byte[] buffer1;
                String msg1;
                for(int i = 0;i<100 i="" o:p="">
                {
                     buffer1 = new byte[10];
                     DatagramPacket p1 = new DatagramPacket
                                                    (buffer1,10);
                     s1.receive(p1);
                     msg1 = new String(buffer1);
                     //num = Integer.parseInt(msg1);
                     if(msg1.charAt(0)!='-')
                           System.out.println("Received: "+msg1);
                }
                s1.close();
           }catch(Exception ex) 
           {
                ex.printStackTrace();
           }//end of catch
     }//end of main
}//end of class

In the above program for Master node,  a list of nodes in the network are provided with the port numbers of respective node on which they are listening. Once the processing of data is done on the other nodes, then the results are then processed on the Master node and displayed. If the number -1 is received from any slave, it is ignored but if a positive integer is received, then it is printed.

Code: Slave.java
import java.net.*;
import java.io.*;

/*
*   ******************** SOCKET PROGRAMMING (SLAVE) **************
*    Author          : @NaveenKumar
*    Date      : 29/10/2013
*    File Name       : Receiver.java
*/

public class Slave
{
     public static void main(String args[])
     {
           System.out.println("Starting Receiver");
           if(args.length!=1)
           {
                System.out.println("Invalid no. of args");
           }
           else{
           int receive_port = Integer.parseInt(args[0]);
           try
           {
                DatagramSocket s = new 
                                   DatagramSocket(receive_port);
                DatagramSocket soc = new DatagramSocket();
                System.out.println("Socket created");
                byte[] buffer = new byte[10];  
                System.out.println("buffer created");
                DatagramPacket p = new DatagramPacket(buffer,10);
                System.out.println("packet created");
                int i,j;
                i=10;
                j=10;
                String msg;
                Slave sl = new Slave();
                while(i==j)
                {
                     s.receive(p);
                     msg = new String(buffer);
                     // process the message
                     sl.process(msg,soc);
                }
                s.close();
           }catch(Exception ex) // end of try and start of catch
           {
                ex.printStackTrace();
           }// end of catch
      }//end of else
     }// end of main
    
     public Slave()
     {
          
     }
     public void process(String msg, DatagramSocket s1)
     {
           String msg1="";
           String msg2 = "";
           String message = msg;
           int res = -1;
           byte[] buffer;
           for(int k=0;k
           {
                if((int)message.charAt(k)<58 amp="" nbsp="" span="">
                                  (int)message.charAt(k)>47) 
                        msg2 += message.charAt(k);
                else 
                        break;
           }
           int num = Integer.parseInt(msg2);
           int temp = 0;
           for(int i=1;i
           {
                if(num%i == 0)
                           temp++;
                if(temp==2)
                     break;
           }
           if(temp==1)
                res = num;
           else
                res = -1;
           try{
                InetAddress receiverHost = 
                             InetAddress.getByName("127.0.0.1");
                int receiverPort = 3500;
                msg1 = Integer.toString(res);
                System.out.println("Msg: "+msg1);
                buffer = msg1.getBytes();
                DatagramPacket p1 = new DatagramPacket(buffer,
                       buffer.length,receiverHost,receiverPort);
                s1.send(p1);
           }catch(Exception ex)
           {
                ex.printStackTrace();
           }// end of catch
     }//end of process method
}// end of class Receiver


In the above code for Slave process, slave process waits for receiving a message from Master process. Once a number is received, it calls process() method with the received message as a parameter to this method. Here in this method, the received data is converted into integer and then checked if it is a prime number of not. If it is a prime number, the number itself is sent back to the Master. If it is not a prime number, then -1 is sent back.

Tuesday, October 29, 2013

Distributed Computing, Socket API

This post is to help my students, in IV year B.Tech (IT), understand a programming facility for IPC, that is, socket API in Distributed Computing. Socket API provides a low level of abstraction for IPC. This API is appropriate if the resources are limited in a Distributed Environment.
In this post I am going to implement a basic Java Socket program first and then I consider a problem that is solved in a distributed environment with IPC programmed with sockets in java.

In the first application, the job here is to send a message to a receiver node from a sender node where, the message is displayed at the receiver to show that the received message is processed. 
The Socket programming implementation of IPC is Asynchronous send and Synchronous receive, because the sender doesn't enter blocking state while sending the message but the receiver will enter into blocking state till the message is received.
The scenario here is, a process Sender is running on a Host A and the other process Receiver is running on Host B.

Code : Sender
---- - ------
import java.net.*;
import java.io.*;
/* 
*   *************** SOCKET PROGRAMMING (SENDER) **************
* Author : @NaveenKumar
* Date : 29/10/2013
* File Name   : Sender.java
*/
public class Sender
{
  public static void main(String args[])
  {
    if(args.length!=3)
    {
      System.out.println("Insufficient parameters");
    }// end of if
    else
    {
      System.out.println("Parameters read");
      try{
       InetAddress receiverHost=InetAddress.getByName(args[0]);
       int receiverPort = Integer.parseInt(args[1]);
       String message = args[2];
  System.out.println("Arguments copied into variables");
  //instantiate a datagram socket for sending the data
       DatagramSocket s = new DatagramSocket();
  System.out.println("Datagram Socket instantiated");
  byte[] buffer = message.getBytes();
       DatagramPacket p = new                  
       DatagramPacket(buffer,buffer.length,
                                  receiverHost,receiverPort);                      System.out.println("Datagram Packet instantiated"); s.send(p);
       s.close();
       }catch(Exception ex) //end of try and beginning of catch
       {
ex.printStackTrace();
       }//end of catch
     }// end of else
  }//end of main
}//end of class


In the above program, the Receiver IP address, Receiver Port number and message to be sent are taken as arguments to main function. Required parameters like Receiver's IP address, Port number, message are set before instantiating the Datagram socket.
After the socket is instantiated (in this case the socket is 's'), a Datagram Packet is instantiated (in this case it is 'p') with the IP address of its receiver, port number and message are bundled into the packet. 
Now, the packet is sent through the socket 's', to the receiver socket.

Code : Receiver
---- - --------
import java.net.*;
import java.io.*;

/* 
* **************** SOCKET PROGRAMMING (RECEIVER) **************
* Author : @NaveenKumar
* Date : 29/10/2013
* File Name   : Receiver.java
*/

public class Receiver
{
  public static void main(String args[])
  {
        System.out.println("Starting Receiver");
   if(args.length!=1)
   {
     System.out.println("Insufficient arguments");
   }// end of it
   else
   {
     System.out.println("Success on number of arguments");
     int receive_port = Integer.parseInt(args[0]);
     try
     {
DatagramSocket s = new DatagramSocket(receive_port);
        System.out.println("Socket created");
byte[] buffer = new byte[10];
System.out.println("buffer created");
DatagramPacket p = new DatagramPacket(buffer,10);
System.out.println("packet created");
s.receive(p);
System.out.println("Received message successfully");
String msg = new String (buffer);
System.out.println(msg);
s.close();
      }catch(Exception ex) // end of try and start of catch
      {
  ex.printStackTrace();
      }// end of catch
    }// end of else
  }// end of main
}// end of class Receiver

In this case of the Receiver process running on Host B, a Datagram Socket is instantiated to facilitate the receive operation, a Datagram Packet is instatitated and when the receive operation is reached, the process is sent to blocked state and set to wait for the message to be received from the Sender.
To show the blocking, checksums are provided at each stage. Once the receive operation is completed, the process is unblocked and continued with the processing of data.

The communication in the above scenario is between two processes Sender and Receiver performing either send or receive operations respectively but any process can send as well as receive data to/from other processes running on same or different hosts. That may be illustrated in the next post.

Hope this is helpful.