In part four in this series of book excerpts from JXTA in a Nutshell, learn about JXTA pipes.
One of the most powerful of JXTA's services is the pipe service. The idea of a pipe is familiar to users of Unix systems: a pipe is used to connect the output from one command to the input of another command. On a Unix system, if you want to count the number of unique exceptions that occur in a log file, you might use this command:
piccolo% cat log | grep "Exception" | sort | uniq | wc -l
cat command prints the log file to its standard output. The
grep command reads this file from its standard input and searches for lines containing the string "Exception"; it prints matching lines to its standard output. The
sort command reads these lines from its standard input and sends the sorted list to its standard output, where it is read by the
uniq command, which removes duplicate lines. The unduplicated lines are sent to its standard output, where they are read by the
wc command, which counts the number of lines and finally prints that number.
Pipes are quite useful in that they allow you to build complex functionality from a number of simple building blocks. JXTA takes the familiar idea of pipes and extends their functionality to the network.
In This Series
Getting Started with JXTA, Part 2
JXTA pipes are defined in terms of the endpoints available to a peer. A peer endpoint is a logical abstraction of an address on a network transport that is capable of sending and receiving network messages. In the examples we've seen so far, the network transport has always been IP-based: the shell peer we've looked at has a TCP endpoint (port 9701 by default) and can have an HTTP endpoint. However, JXTA does not make this assumption and allows an endpoint to be an address on any network transport as long as the network is capable of sending and receiving datagram-style (i.e., unreliable packet-based) messages.
In the shell, the available endpoints are established when the configurator runs. The TCP endpoint is based on the network address selected from the pull-down menu and the port number entered into the TCP port text box; the HTTP endpoint is enabled by selecting support for HTTP.
In JXTA, pipes provide a unidirectional, virtual connection between two pipe endpoints: input pipe (receiving end) and output pipe (sending end). Pipe connections are established independently of the pipe endpoints peer location. For example, the input pipe endpoint can be located behind a firewall or NAT while the output endpoint can be located on a peer on the Internet. The endpoints may even be on physically different networks: the input pipe endpoint could be on a TCP network while the output pipe endpoint is on a token ring network. As long as there are available JXTA relay peers between the two endpoints, a logical pipe between them may be defined.
Therefore, pipes enable connections without any consideration of connectivity. Two peers may require an intermediary routing peer to communicate between each other. Pipes virtualize peer connections to homogenize and provide an abstraction of the full connectivity available within the JXTA network.
Pipe connections are layered on top of the peer endpoint connections, as we show in Figure 2-4. This figure shows a set of peer endpoint connections: Peer B, for example, has two HTTP endpoint connections while Peer D has two HTTP endpoint connections and one TCP/IP endpoint connection.
Figure 2-4. Pipe connections over peer endpoints
Note that some peers sit behind a firewall. HTTP proxy connections are used to connect to a peer outside the firewall (a TCP/IP Socks connection could also have been used). Peer C and Peer A can act as router peers for peers behind firewalls.
A pipe connection may involve multiple peer endpoint transport connections (multi-hops). The pipe connection between Peer B and Peer E involves an HTTP connection between Peer B and Peer C, an HTTP connection between Peer C and Peer D, and a TCP/IP connection between Peer D and Peer E. On the other hand, if a pipe connection is established between Peer A and Peer C, a single peer endpoint connection will implement the pipe, as Peer A and Peer C have a direct TCP/IP connection.
Pipes may send messages in different ways, each of which may provide a different quality of service. Some examples of this include:
Unidirectional, asynchronous: The pipe endpoint sends a message, and no guarantee of delivery is made.
Synchronous request/response (RPC): The pipe endpoint sends a message and receives a correlated answer.
Publish/subscribe: A pipe endpoint subscribes to messages sent from a publisher endpoint.
Bulk data transfer: The pipe provides reliable data transfer of binary data.
Streaming: The pipe provides efficient data transfer over a flow-controlled channel.
The default service provided by a JXTA pipe is unidirectional, asynchronous communication. All other pipe services can be implemented by the developer either directly on top of the peer endpoints or over the default pipe abstraction. The unidirectional and asynchronous pipe service was selected as the minimum common denominator service easily implementable on a wide variety of non-IP transports. The asynchronous model was also selected since it fits well with the unreliable nature of a P2P network such as JXTA, in which peers or connections may disappear at any moment. Asynchronous models are also known to be more scalable (as the number of peers increase) than synchronous models. Finally, an asynchronous model provides an easier fault recovery and garbage collection mechanism when network connections fail.
Pipes provide a nonlocalized communication abstraction. Applications and services create pipe endpoints (an input pipe and an output pipe) to communicate independently of the pipe endpoints' peer location. Due to the unreliability and the interchangeable nature of peers in the JXTA network, pipes provide a powerful mechanism to build fault-tolerant applications. When creating a connection to a service, the pipe abstraction permits the peer to bind the pipe endpoints dynamically to the best or more appropriate instance of the service independent of the peer location. This hides the location of a service peer from the application, which is extremely important on an unreliable network.
The indirection introduced by pipes also allows applications to bind to the most appropriate instance of a service: perhaps the closest service or the best performing service. JXTA presents a novel approach to address application reliability. By building and proliferating interchangeable services on peers, applications can adapt to the unreliable network environment by connecting to or changing to the most available or efficient service; the application can do this at any time and with no regard to the location of the service.
Pipes are an essential tool to build such services and applications. JXTA pipes introduce a fundamental network programming shift in which developers should not write applications that connect to a specific peer to access a unique service, but should write applications that discover the closest available service regardless of which peer is running the service.
Pipe endpoints are dynamically bounded to a peer endpoint at runtime via the Pipe Binding Protocol (PBP). The pipe-binding process consists of searching for and connecting two or more pipe endpoints. When a message is sent over a pipe, it is sent by the local output pipe to the destination input pipe endpoint that is currently listening to the pipe. The set of currently listening peers (that is, the location of the input pipe endpoint) is resolved using the PBP.
Pipes are uniquely identified by a pipe advertisement. The pipe advertisement contains a unique pipe ID and an optional pipe name. Pipe advertisements are a resource within the JXTA network; they may be discovered just as we discovered peers and peergroups. Each pipe advertisement is associated with a unique pipe.
Applications use a pipe service to create pipe endpoints (both input and output) associated with a particular pipe advertisement. The pipe service uses pipe advertisements to identify the pipe and resolve the input pipe and output pipe endpoints.
Pipes support two modes of communication (see Figure 2-5):
Point-to-point: A point-to-point pipe connects exactly two pipe endpoints: an input pipe receives messages sent from an output pipe. No reply or acknowledgment is supported. Additional information in the message payload (such as a unique ID) is required to determine the sequence of messages sent over the pipe. The message payload may also contain a pipe advertisement that can be used to open a pipe to reply to the sender.
Propagate pipe: A propagate pipe connects one output pipe to multiple input pipes. Messages flow into the input pipes from the output pipe (propagation source). A message sent over a propagate pipe is sent to all listening input pipes; this process may create multiple copies of the message. On a TCP/IP network, IP multicasting is used as an implementation for propagate pipes when the propagate scope maps to an underlying physical subnet in a one-to-one fashion. Propagate pipes can also be implemented using point-to-point communication on transports that do not support multicasting.
Figure 2-5. Pipe communication modes
Pipe connectivity is related to the concept of peergroups: only pipe endpoints that are located in the same peergroup can be mutually resolved by a pipe service. Each peergroup has its own pipe service, so to open a pipe connection between two peers, the two peers must have joined the same peergroup.
Of course, since all peers are part of the NetPeerGroup, any peer can open a pipe to any other peer. The difference is in which pipe service will resolve the pipe. In Figure 2-5, Peer 1 can use the pipe service of either the NetPeerGroup or of Peergroup A to resolve its pipe connections to Peers 2 and 3. Peers 2 and 4 can use only the pipe service of the NetPeerGroup to resolve their mutual pipe connection. The context of the pipe resolution is important because of the security context that may be enforced by the pipe service of a peergroup: Peer 1 may decide that it does not want to send data to any peer that has not been authenticated into Peergroup A. It then advertises only its pipe endpoint within that peergroup.
A peer can maintain different pipe connections to the same peer, holding each of them in a different peergroup context for security reasons. Messages can be sent to the peer with different security levels depending on the pipe used.
Pipes are used behind the scenes for many shell services. One such service is the talk service, which allows users in two different shells to send simple string messages to each other.
To use the talk service, you must register two users (either in the same or -- ideally -- different shells). One registration looks like this:
JXTA>talk -register sdo ...... User: sdo is now registered JXTA>talk -login sdo
Now repeat the process in a second shell with a different user:
JXTA>talk -register jra ...... User: jra is now registered JXTA>talk -login jra
In the first shell, you can then send a message from
JXTA>talk -u sdo jra found user's advertisement attempting to connect talk is connected to user jra Type your message. To exit, type "." at beginning of line Hello!
In the second shell, you'll see this message:
talk: from sdo to jra Message: Hello!
Behind the scenes, this example uses a number of JXTA services; the one that concerns us for now is the pipe service. The message from
jra is sent via a pipe. What happens is this:
A user is registered. This creates an advertisement that the peer will accept talk messages.
The user logs in. This creates the actual input pipe. A thread is set up to read continually from the input pipe, which is why the message from
jra appeared asynchronously in the second shell window. This is why logging in is a necessary step in this service; without it, there would be no input pipe to accept messages.
The user sends a message. This creates an output pipe; whatever data is written to this output pipe will be read on the input pipe of the user to whom the message is directed.
We can explore some other shell commands to understand a little more about how JXTA pipes work. To create a pipe, we must make a pipe advertisement and use it to create the pipes:
JXTA>pipeadv = mkadv -p JXTA>inpipe = mkpipe -i pipeadv JXTA>outpipe = mkpipe -o pipeadv
This creates both an input and output pipe. In order for the pipes to be connected to each other, they must be created with the same advertisement that we created here. In the talk service (and in other JXTA applications), you discover input pipe advertisements that are created by the shell in response to the
talk -register command. The shell uses this pipe advertisement to create the input pipe in response to the
talk -login command; it uses this pipe advertisement to create the output pipe in response to the
talk user command.
In this example, we've created the input and output pipes in the same peer. Later, we'll see how to create the pipes in different peers.
The information transmitted through pipes is messages. Messages define an envelope to transfer any kind of data: text, code, and so on. Furthermore, a message may contain an arbitrary number of uniquely named sections. Each section has an associated MIME type and can hold any form of data. Binary data may be encoded using the Base64 encoding scheme in the body of a section; a CDATA section may also be used. Some sections may contain XML-structured documents. Applications and services communicate by constructing messages and sending and/or receiving messages through the input and output pipe endpoints.
JXTA messages use a binary format to enable the efficient transfer of binary and XML data. A JXTA binary message format is composed of a sequence of elements. Each element has a name and a MIME type and can contain either binary or XML data. The form of a JXTA message is shown in Figure 2-6.Figure 2-6. A JXTA message
In order to send data over a pipe (or to any JXTA peer), the data must be encapsulated in a message. In the shell, this can be done by importing an existing file that contains the message body and using the
mkmsg command to convert the data into a message. When this is done, the imported data will be associated in the message with a tag of your choosing.
First, we need to create a file containing the message data. Such a file would contain a set of arbitrary XML tags; a simple example is the file containing the single line:
If this file is named data, then we can import it into the shell like this:
JXTA>importfile -f data mydata
mydata variable now contains the data read in from the file. In fact, it contains other information as well, since the shell has embedded some additional XML into it.
You can create the actual message like this:
JXTA>mymsg = mkmsg JXTA>put mymsg mytag mydata
The first command creates an empty message named
mymsg; the second populates this message with the data from the
mydata object, associating it with the
If you've made the pipes and message, you can then send and receive data like this:
JXTA>send outpipe mymsg JXTA>newmsg = recv inpipe recv has received a message
The message is sent on the output pipe. The input pipe then reads the message and stores it in a new message variable,
newmsg. The contents of
newmsg are the same as those of
mymsg; they have a tag of
mytag and an associated message body that contains the structured XML document created by reading the datafile. The message body can be extracted with the
JXTA>newdata = get newmsg mytag JXTA>cat newdata <?xml version="1.0"> <ShellDoc> <Item> <Data>Hello, world!</Data> </Item> </ShellDoc>
In the next and final installment, learn about advertisements.
Scott Oaks is a Java Technologist at Sun Microsystems, where he has worked since 1987. While at Sun, he has specialized in many disparate technologies, from the SunOS kernel to network programming and RPCs.
Bernard Traversat is a well-known developer in the Java Community and an active member of the Project JXTA. Bernard is the Engineering Manager for the JXTA CORE.
Li Gong is a well-known developer in the Java Community and an active member of the Project JXTA. Li is the JXTA Engineering Director for the JXTA CORE.
 In order to send a message to an output pipe endpoint, the corresponding input pipe endpoint must have been created first. This is a limitation of the current JXTA v1.0 implementation; it is expected to be removed in the future.
Return to ONJava.com.
Copyright © 2009 O'Reilly Media, Inc.