001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.command; 019 020import java.io.BufferedInputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.EOFException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.io.OutputStream; 027import java.util.zip.DeflaterOutputStream; 028import java.util.zip.InflaterInputStream; 029 030import javax.jms.JMSException; 031import javax.jms.MessageEOFException; 032import javax.jms.MessageFormatException; 033import javax.jms.MessageNotReadableException; 034import javax.jms.MessageNotWriteableException; 035import javax.jms.StreamMessage; 036 037import org.apache.activemq.ActiveMQConnection; 038import org.apache.activemq.util.ByteArrayInputStream; 039import org.apache.activemq.util.ByteArrayOutputStream; 040import org.apache.activemq.util.ByteSequence; 041import org.apache.activemq.util.JMSExceptionSupport; 042import org.apache.activemq.util.MarshallingSupport; 043 044/** 045 * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive 046 * types in the Java programming language. It is filled and read sequentially. 047 * It inherits from the <CODE>Message</CODE> interface and adds a stream 048 * message body. Its methods are based largely on those found in 049 * <CODE>java.io.DataInputStream</CODE> and 050 * <CODE>java.io.DataOutputStream</CODE>. <p/> 051 * <P> 052 * The primitive types can be read or written explicitly using methods for each 053 * type. They may also be read or written generically as objects. For instance, 054 * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to 055 * <CODE>StreamMessage.writeObject(new 056 * Integer(6))</CODE>. Both forms are 057 * provided, because the explicit form is convenient for static programming, and 058 * the object form is needed when types are not known at compile time. <p/> 059 * <P> 060 * When the message is first created, and when <CODE>clearBody</CODE> is 061 * called, the body of the message is in write-only mode. After the first call 062 * to <CODE>reset</CODE> has been made, the message body is in read-only mode. 063 * After a message has been sent, the client that sent it can retain and modify 064 * it without affecting the message that has been sent. The same message object 065 * can be sent multiple times. When a message has been received, the provider 066 * has called <CODE>reset</CODE> so that the message body is in read-only mode 067 * for the client. <p/> 068 * <P> 069 * If <CODE>clearBody</CODE> is called on a message in read-only mode, the 070 * message body is cleared and the message body is in write-only mode. <p/> 071 * <P> 072 * If a client attempts to read a message in write-only mode, a 073 * <CODE>MessageNotReadableException</CODE> is thrown. <p/> 074 * <P> 075 * If a client attempts to write a message in read-only mode, a 076 * <CODE>MessageNotWriteableException</CODE> is thrown. <p/> 077 * <P> 078 * <CODE>StreamMessage</CODE> objects support the following conversion table. 079 * The marked cases must be supported. The unmarked cases must throw a 080 * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive 081 * conversions may throw a runtime exception if the primitive's 082 * <CODE>valueOf()</CODE> method does not accept it as a valid 083 * <CODE>String</CODE> representation of the primitive. <p/> 084 * <P> 085 * A value written as the row type can be read as the column type. <p/> 086 * 087 * <PRE> 088 * | | boolean byte short char int long float double String byte[] 089 * |---------------------------------------------------------------------- 090 * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X 091 * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] | 092 * X |---------------------------------------------------------------------- 093 * 094 * </PRE> 095 * 096 * <p/> 097 * <P> 098 * Attempting to read a null value as a primitive type must be treated as 099 * calling the primitive's corresponding <code>valueOf(String)</code> 100 * conversion method with a null value. Since <code>char</code> does not 101 * support a <code>String</code> conversion, attempting to read a null value 102 * as a <code>char</code> must throw a <code>NullPointerException</code>. 103 * 104 * @openwire:marshaller code="27" 105 * @see javax.jms.Session#createStreamMessage() 106 * @see javax.jms.BytesMessage 107 * @see javax.jms.MapMessage 108 * @see javax.jms.Message 109 * @see javax.jms.ObjectMessage 110 * @see javax.jms.TextMessage 111 */ 112public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage { 113 114 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE; 115 116 protected transient DataOutputStream dataOut; 117 protected transient ByteArrayOutputStream bytesOut; 118 protected transient DataInputStream dataIn; 119 protected transient int remainingBytes = -1; 120 121 public Message copy() { 122 ActiveMQStreamMessage copy = new ActiveMQStreamMessage(); 123 copy(copy); 124 return copy; 125 } 126 127 private void copy(ActiveMQStreamMessage copy) { 128 storeContent(); 129 super.copy(copy); 130 copy.dataOut = null; 131 copy.bytesOut = null; 132 copy.dataIn = null; 133 } 134 135 public void onSend() throws JMSException { 136 super.onSend(); 137 storeContent(); 138 } 139 140 private void storeContent() { 141 if (dataOut != null) { 142 try { 143 dataOut.close(); 144 setContent(bytesOut.toByteSequence()); 145 bytesOut = null; 146 dataOut = null; 147 } catch (IOException ioe) { 148 throw new RuntimeException(ioe); 149 } 150 } 151 } 152 153 public byte getDataStructureType() { 154 return DATA_STRUCTURE_TYPE; 155 } 156 157 public String getJMSXMimeType() { 158 return "jms/stream-message"; 159 } 160 161 /** 162 * Clears out the message body. Clearing a message's body does not clear its 163 * header values or property entries. <p/> 164 * <P> 165 * If this message body was read-only, calling this method leaves the 166 * message body in the same state as an empty body in a newly created 167 * message. 168 * 169 * @throws JMSException if the JMS provider fails to clear the message body 170 * due to some internal error. 171 */ 172 173 public void clearBody() throws JMSException { 174 super.clearBody(); 175 this.dataOut = null; 176 this.dataIn = null; 177 this.bytesOut = null; 178 this.remainingBytes = -1; 179 } 180 181 /** 182 * Reads a <code>boolean</code> from the stream message. 183 * 184 * @return the <code>boolean</code> value read 185 * @throws JMSException if the JMS provider fails to read the message due to 186 * some internal error. 187 * @throws MessageEOFException if unexpected end of message stream has been 188 * reached. 189 * @throws MessageFormatException if this type conversion is invalid. 190 * @throws MessageNotReadableException if the message is in write-only mode. 191 */ 192 193 public boolean readBoolean() throws JMSException { 194 initializeReading(); 195 try { 196 197 this.dataIn.mark(10); 198 int type = this.dataIn.read(); 199 if (type == -1) { 200 throw new MessageEOFException("reached end of data"); 201 } 202 if (type == MarshallingSupport.BOOLEAN_TYPE) { 203 return this.dataIn.readBoolean(); 204 } 205 if (type == MarshallingSupport.STRING_TYPE) { 206 return Boolean.valueOf(this.dataIn.readUTF()).booleanValue(); 207 } 208 if (type == MarshallingSupport.NULL) { 209 this.dataIn.reset(); 210 throw new NullPointerException("Cannot convert NULL value to boolean."); 211 } else { 212 this.dataIn.reset(); 213 throw new MessageFormatException(" not a boolean type"); 214 } 215 } catch (EOFException e) { 216 throw JMSExceptionSupport.createMessageEOFException(e); 217 } catch (IOException e) { 218 throw JMSExceptionSupport.createMessageFormatException(e); 219 } 220 } 221 222 /** 223 * Reads a <code>byte</code> value from the stream message. 224 * 225 * @return the next byte from the stream message as a 8-bit 226 * <code>byte</code> 227 * @throws JMSException if the JMS provider fails to read the message due to 228 * some internal error. 229 * @throws MessageEOFException if unexpected end of message stream has been 230 * reached. 231 * @throws MessageFormatException if this type conversion is invalid. 232 * @throws MessageNotReadableException if the message is in write-only mode. 233 */ 234 235 public byte readByte() throws JMSException { 236 initializeReading(); 237 try { 238 239 this.dataIn.mark(10); 240 int type = this.dataIn.read(); 241 if (type == -1) { 242 throw new MessageEOFException("reached end of data"); 243 } 244 if (type == MarshallingSupport.BYTE_TYPE) { 245 return this.dataIn.readByte(); 246 } 247 if (type == MarshallingSupport.STRING_TYPE) { 248 return Byte.valueOf(this.dataIn.readUTF()).byteValue(); 249 } 250 if (type == MarshallingSupport.NULL) { 251 this.dataIn.reset(); 252 throw new NullPointerException("Cannot convert NULL value to byte."); 253 } else { 254 this.dataIn.reset(); 255 throw new MessageFormatException(" not a byte type"); 256 } 257 } catch (NumberFormatException mfe) { 258 try { 259 this.dataIn.reset(); 260 } catch (IOException ioe) { 261 throw JMSExceptionSupport.create(ioe); 262 } 263 throw mfe; 264 265 } catch (EOFException e) { 266 throw JMSExceptionSupport.createMessageEOFException(e); 267 } catch (IOException e) { 268 throw JMSExceptionSupport.createMessageFormatException(e); 269 } 270 } 271 272 /** 273 * Reads a 16-bit integer from the stream message. 274 * 275 * @return a 16-bit integer from the stream message 276 * @throws JMSException if the JMS provider fails to read the message due to 277 * some internal error. 278 * @throws MessageEOFException if unexpected end of message stream has been 279 * reached. 280 * @throws MessageFormatException if this type conversion is invalid. 281 * @throws MessageNotReadableException if the message is in write-only mode. 282 */ 283 284 public short readShort() throws JMSException { 285 initializeReading(); 286 try { 287 288 this.dataIn.mark(17); 289 int type = this.dataIn.read(); 290 if (type == -1) { 291 throw new MessageEOFException("reached end of data"); 292 } 293 if (type == MarshallingSupport.SHORT_TYPE) { 294 return this.dataIn.readShort(); 295 } 296 if (type == MarshallingSupport.BYTE_TYPE) { 297 return this.dataIn.readByte(); 298 } 299 if (type == MarshallingSupport.STRING_TYPE) { 300 return Short.valueOf(this.dataIn.readUTF()).shortValue(); 301 } 302 if (type == MarshallingSupport.NULL) { 303 this.dataIn.reset(); 304 throw new NullPointerException("Cannot convert NULL value to short."); 305 } else { 306 this.dataIn.reset(); 307 throw new MessageFormatException(" not a short type"); 308 } 309 } catch (NumberFormatException mfe) { 310 try { 311 this.dataIn.reset(); 312 } catch (IOException ioe) { 313 throw JMSExceptionSupport.create(ioe); 314 } 315 throw mfe; 316 317 } catch (EOFException e) { 318 throw JMSExceptionSupport.createMessageEOFException(e); 319 } catch (IOException e) { 320 throw JMSExceptionSupport.createMessageFormatException(e); 321 } 322 323 } 324 325 /** 326 * Reads a Unicode character value from the stream message. 327 * 328 * @return a Unicode character from the stream message 329 * @throws JMSException if the JMS provider fails to read the message due to 330 * some internal error. 331 * @throws MessageEOFException if unexpected end of message stream has been 332 * reached. 333 * @throws MessageFormatException if this type conversion is invalid 334 * @throws MessageNotReadableException if the message is in write-only mode. 335 */ 336 337 public char readChar() throws JMSException { 338 initializeReading(); 339 try { 340 341 this.dataIn.mark(17); 342 int type = this.dataIn.read(); 343 if (type == -1) { 344 throw new MessageEOFException("reached end of data"); 345 } 346 if (type == MarshallingSupport.CHAR_TYPE) { 347 return this.dataIn.readChar(); 348 } 349 if (type == MarshallingSupport.NULL) { 350 this.dataIn.reset(); 351 throw new NullPointerException("Cannot convert NULL value to char."); 352 } else { 353 this.dataIn.reset(); 354 throw new MessageFormatException(" not a char type"); 355 } 356 } catch (NumberFormatException mfe) { 357 try { 358 this.dataIn.reset(); 359 } catch (IOException ioe) { 360 throw JMSExceptionSupport.create(ioe); 361 } 362 throw mfe; 363 364 } catch (EOFException e) { 365 throw JMSExceptionSupport.createMessageEOFException(e); 366 } catch (IOException e) { 367 throw JMSExceptionSupport.createMessageFormatException(e); 368 } 369 } 370 371 /** 372 * Reads a 32-bit integer from the stream message. 373 * 374 * @return a 32-bit integer value from the stream message, interpreted as an 375 * <code>int</code> 376 * @throws JMSException if the JMS provider fails to read the message due to 377 * some internal error. 378 * @throws MessageEOFException if unexpected end of message stream has been 379 * reached. 380 * @throws MessageFormatException if this type conversion is invalid. 381 * @throws MessageNotReadableException if the message is in write-only mode. 382 */ 383 384 public int readInt() throws JMSException { 385 initializeReading(); 386 try { 387 388 this.dataIn.mark(33); 389 int type = this.dataIn.read(); 390 if (type == -1) { 391 throw new MessageEOFException("reached end of data"); 392 } 393 if (type == MarshallingSupport.INTEGER_TYPE) { 394 return this.dataIn.readInt(); 395 } 396 if (type == MarshallingSupport.SHORT_TYPE) { 397 return this.dataIn.readShort(); 398 } 399 if (type == MarshallingSupport.BYTE_TYPE) { 400 return this.dataIn.readByte(); 401 } 402 if (type == MarshallingSupport.STRING_TYPE) { 403 return Integer.valueOf(this.dataIn.readUTF()).intValue(); 404 } 405 if (type == MarshallingSupport.NULL) { 406 this.dataIn.reset(); 407 throw new NullPointerException("Cannot convert NULL value to int."); 408 } else { 409 this.dataIn.reset(); 410 throw new MessageFormatException(" not an int type"); 411 } 412 } catch (NumberFormatException mfe) { 413 try { 414 this.dataIn.reset(); 415 } catch (IOException ioe) { 416 throw JMSExceptionSupport.create(ioe); 417 } 418 throw mfe; 419 420 } catch (EOFException e) { 421 throw JMSExceptionSupport.createMessageEOFException(e); 422 } catch (IOException e) { 423 throw JMSExceptionSupport.createMessageFormatException(e); 424 } 425 } 426 427 /** 428 * Reads a 64-bit integer from the stream message. 429 * 430 * @return a 64-bit integer value from the stream message, interpreted as a 431 * <code>long</code> 432 * @throws JMSException if the JMS provider fails to read the message due to 433 * some internal error. 434 * @throws MessageEOFException if unexpected end of message stream has been 435 * reached. 436 * @throws MessageFormatException if this type conversion is invalid. 437 * @throws MessageNotReadableException if the message is in write-only mode. 438 */ 439 440 public long readLong() throws JMSException { 441 initializeReading(); 442 try { 443 444 this.dataIn.mark(65); 445 int type = this.dataIn.read(); 446 if (type == -1) { 447 throw new MessageEOFException("reached end of data"); 448 } 449 if (type == MarshallingSupport.LONG_TYPE) { 450 return this.dataIn.readLong(); 451 } 452 if (type == MarshallingSupport.INTEGER_TYPE) { 453 return this.dataIn.readInt(); 454 } 455 if (type == MarshallingSupport.SHORT_TYPE) { 456 return this.dataIn.readShort(); 457 } 458 if (type == MarshallingSupport.BYTE_TYPE) { 459 return this.dataIn.readByte(); 460 } 461 if (type == MarshallingSupport.STRING_TYPE) { 462 return Long.valueOf(this.dataIn.readUTF()).longValue(); 463 } 464 if (type == MarshallingSupport.NULL) { 465 this.dataIn.reset(); 466 throw new NullPointerException("Cannot convert NULL value to long."); 467 } else { 468 this.dataIn.reset(); 469 throw new MessageFormatException(" not a long type"); 470 } 471 } catch (NumberFormatException mfe) { 472 try { 473 this.dataIn.reset(); 474 } catch (IOException ioe) { 475 throw JMSExceptionSupport.create(ioe); 476 } 477 throw mfe; 478 479 } catch (EOFException e) { 480 throw JMSExceptionSupport.createMessageEOFException(e); 481 } catch (IOException e) { 482 throw JMSExceptionSupport.createMessageFormatException(e); 483 } 484 } 485 486 /** 487 * Reads a <code>float</code> from the stream message. 488 * 489 * @return a <code>float</code> value from the stream message 490 * @throws JMSException if the JMS provider fails to read the message due to 491 * some internal error. 492 * @throws MessageEOFException if unexpected end of message stream has been 493 * reached. 494 * @throws MessageFormatException if this type conversion is invalid. 495 * @throws MessageNotReadableException if the message is in write-only mode. 496 */ 497 498 public float readFloat() throws JMSException { 499 initializeReading(); 500 try { 501 this.dataIn.mark(33); 502 int type = this.dataIn.read(); 503 if (type == -1) { 504 throw new MessageEOFException("reached end of data"); 505 } 506 if (type == MarshallingSupport.FLOAT_TYPE) { 507 return this.dataIn.readFloat(); 508 } 509 if (type == MarshallingSupport.STRING_TYPE) { 510 return Float.valueOf(this.dataIn.readUTF()).floatValue(); 511 } 512 if (type == MarshallingSupport.NULL) { 513 this.dataIn.reset(); 514 throw new NullPointerException("Cannot convert NULL value to float."); 515 } else { 516 this.dataIn.reset(); 517 throw new MessageFormatException(" not a float type"); 518 } 519 } catch (NumberFormatException mfe) { 520 try { 521 this.dataIn.reset(); 522 } catch (IOException ioe) { 523 throw JMSExceptionSupport.create(ioe); 524 } 525 throw mfe; 526 527 } catch (EOFException e) { 528 throw JMSExceptionSupport.createMessageEOFException(e); 529 } catch (IOException e) { 530 throw JMSExceptionSupport.createMessageFormatException(e); 531 } 532 } 533 534 /** 535 * Reads a <code>double</code> from the stream message. 536 * 537 * @return a <code>double</code> value from the stream message 538 * @throws JMSException if the JMS provider fails to read the message due to 539 * some internal error. 540 * @throws MessageEOFException if unexpected end of message stream has been 541 * reached. 542 * @throws MessageFormatException if this type conversion is invalid. 543 * @throws MessageNotReadableException if the message is in write-only mode. 544 */ 545 546 public double readDouble() throws JMSException { 547 initializeReading(); 548 try { 549 550 this.dataIn.mark(65); 551 int type = this.dataIn.read(); 552 if (type == -1) { 553 throw new MessageEOFException("reached end of data"); 554 } 555 if (type == MarshallingSupport.DOUBLE_TYPE) { 556 return this.dataIn.readDouble(); 557 } 558 if (type == MarshallingSupport.FLOAT_TYPE) { 559 return this.dataIn.readFloat(); 560 } 561 if (type == MarshallingSupport.STRING_TYPE) { 562 return Double.valueOf(this.dataIn.readUTF()).doubleValue(); 563 } 564 if (type == MarshallingSupport.NULL) { 565 this.dataIn.reset(); 566 throw new NullPointerException("Cannot convert NULL value to double."); 567 } else { 568 this.dataIn.reset(); 569 throw new MessageFormatException(" not a double type"); 570 } 571 } catch (NumberFormatException mfe) { 572 try { 573 this.dataIn.reset(); 574 } catch (IOException ioe) { 575 throw JMSExceptionSupport.create(ioe); 576 } 577 throw mfe; 578 579 } catch (EOFException e) { 580 throw JMSExceptionSupport.createMessageEOFException(e); 581 } catch (IOException e) { 582 throw JMSExceptionSupport.createMessageFormatException(e); 583 } 584 } 585 586 /** 587 * Reads a <CODE>String</CODE> from the stream message. 588 * 589 * @return a Unicode string from the stream message 590 * @throws JMSException if the JMS provider fails to read the message due to 591 * some internal error. 592 * @throws MessageEOFException if unexpected end of message stream has been 593 * reached. 594 * @throws MessageFormatException if this type conversion is invalid. 595 * @throws MessageNotReadableException if the message is in write-only mode. 596 */ 597 598 public String readString() throws JMSException { 599 initializeReading(); 600 try { 601 602 this.dataIn.mark(65); 603 int type = this.dataIn.read(); 604 if (type == -1) { 605 throw new MessageEOFException("reached end of data"); 606 } 607 if (type == MarshallingSupport.NULL) { 608 return null; 609 } 610 if (type == MarshallingSupport.BIG_STRING_TYPE) { 611 return MarshallingSupport.readUTF8(dataIn); 612 } 613 if (type == MarshallingSupport.STRING_TYPE) { 614 return this.dataIn.readUTF(); 615 } 616 if (type == MarshallingSupport.LONG_TYPE) { 617 return new Long(this.dataIn.readLong()).toString(); 618 } 619 if (type == MarshallingSupport.INTEGER_TYPE) { 620 return new Integer(this.dataIn.readInt()).toString(); 621 } 622 if (type == MarshallingSupport.SHORT_TYPE) { 623 return new Short(this.dataIn.readShort()).toString(); 624 } 625 if (type == MarshallingSupport.BYTE_TYPE) { 626 return new Byte(this.dataIn.readByte()).toString(); 627 } 628 if (type == MarshallingSupport.FLOAT_TYPE) { 629 return new Float(this.dataIn.readFloat()).toString(); 630 } 631 if (type == MarshallingSupport.DOUBLE_TYPE) { 632 return new Double(this.dataIn.readDouble()).toString(); 633 } 634 if (type == MarshallingSupport.BOOLEAN_TYPE) { 635 return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString(); 636 } 637 if (type == MarshallingSupport.CHAR_TYPE) { 638 return new Character(this.dataIn.readChar()).toString(); 639 } else { 640 this.dataIn.reset(); 641 throw new MessageFormatException(" not a String type"); 642 } 643 } catch (NumberFormatException mfe) { 644 try { 645 this.dataIn.reset(); 646 } catch (IOException ioe) { 647 throw JMSExceptionSupport.create(ioe); 648 } 649 throw mfe; 650 651 } catch (EOFException e) { 652 throw JMSExceptionSupport.createMessageEOFException(e); 653 } catch (IOException e) { 654 throw JMSExceptionSupport.createMessageFormatException(e); 655 } 656 } 657 658 /** 659 * Reads a byte array field from the stream message into the specified 660 * <CODE>byte[]</CODE> object (the read buffer). <p/> 661 * <P> 662 * To read the field value, <CODE>readBytes</CODE> should be successively 663 * called until it returns a value less than the length of the read buffer. 664 * The value of the bytes in the buffer following the last byte read is 665 * undefined. <p/> 666 * <P> 667 * If <CODE>readBytes</CODE> returns a value equal to the length of the 668 * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there 669 * are no more bytes to be read, this call returns -1. <p/> 670 * <P> 671 * If the byte array field value is null, <CODE>readBytes</CODE> returns 672 * -1. <p/> 673 * <P> 674 * If the byte array field value is empty, <CODE>readBytes</CODE> returns 675 * 0. <p/> 676 * <P> 677 * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE> 678 * field value has been made, the full value of the field must be read 679 * before it is valid to read the next field. An attempt to read the next 680 * field before that has been done will throw a 681 * <CODE>MessageFormatException</CODE>. <p/> 682 * <P> 683 * To read the byte field value into a new <CODE>byte[]</CODE> object, use 684 * the <CODE>readObject</CODE> method. 685 * 686 * @param value the buffer into which the data is read 687 * @return the total number of bytes read into the buffer, or -1 if there is 688 * no more data because the end of the byte field has been reached 689 * @throws JMSException if the JMS provider fails to read the message due to 690 * some internal error. 691 * @throws MessageEOFException if unexpected end of message stream has been 692 * reached. 693 * @throws MessageFormatException if this type conversion is invalid. 694 * @throws MessageNotReadableException if the message is in write-only mode. 695 * @see #readObject() 696 */ 697 698 public int readBytes(byte[] value) throws JMSException { 699 700 initializeReading(); 701 try { 702 if (value == null) { 703 throw new NullPointerException(); 704 } 705 706 if (remainingBytes == -1) { 707 this.dataIn.mark(value.length + 1); 708 int type = this.dataIn.read(); 709 if (type == -1) { 710 throw new MessageEOFException("reached end of data"); 711 } 712 if (type != MarshallingSupport.BYTE_ARRAY_TYPE) { 713 throw new MessageFormatException("Not a byte array"); 714 } 715 remainingBytes = this.dataIn.readInt(); 716 } else if (remainingBytes == 0) { 717 remainingBytes = -1; 718 return -1; 719 } 720 721 if (value.length <= remainingBytes) { 722 // small buffer 723 remainingBytes -= value.length; 724 this.dataIn.readFully(value); 725 return value.length; 726 } else { 727 // big buffer 728 int rc = this.dataIn.read(value, 0, remainingBytes); 729 remainingBytes = 0; 730 return rc; 731 } 732 733 } catch (EOFException e) { 734 JMSException jmsEx = new MessageEOFException(e.getMessage()); 735 jmsEx.setLinkedException(e); 736 throw jmsEx; 737 } catch (IOException e) { 738 JMSException jmsEx = new MessageFormatException(e.getMessage()); 739 jmsEx.setLinkedException(e); 740 throw jmsEx; 741 } 742 } 743 744 /** 745 * Reads an object from the stream message. <p/> 746 * <P> 747 * This method can be used to return, in objectified format, an object in 748 * the Java programming language ("Java object") that has been written to 749 * the stream with the equivalent <CODE>writeObject</CODE> method call, or 750 * its equivalent primitive <CODE>write<I>type</I></CODE> method. <p/> 751 * <P> 752 * Note that byte values are returned as <CODE>byte[]</CODE>, not 753 * <CODE>Byte[]</CODE>. <p/> 754 * <P> 755 * An attempt to call <CODE>readObject</CODE> to read a byte field value 756 * into a new <CODE>byte[]</CODE> object before the full value of the byte 757 * field has been read will throw a <CODE>MessageFormatException</CODE>. 758 * 759 * @return a Java object from the stream message, in objectified format (for 760 * example, if the object was written as an <CODE>int</CODE>, an 761 * <CODE>Integer</CODE> is returned) 762 * @throws JMSException if the JMS provider fails to read the message due to 763 * some internal error. 764 * @throws MessageEOFException if unexpected end of message stream has been 765 * reached. 766 * @throws MessageFormatException if this type conversion is invalid. 767 * @throws MessageNotReadableException if the message is in write-only mode. 768 * @see #readBytes(byte[] value) 769 */ 770 771 public Object readObject() throws JMSException { 772 initializeReading(); 773 try { 774 this.dataIn.mark(65); 775 int type = this.dataIn.read(); 776 if (type == -1) { 777 throw new MessageEOFException("reached end of data"); 778 } 779 if (type == MarshallingSupport.NULL) { 780 return null; 781 } 782 if (type == MarshallingSupport.BIG_STRING_TYPE) { 783 return MarshallingSupport.readUTF8(dataIn); 784 } 785 if (type == MarshallingSupport.STRING_TYPE) { 786 return this.dataIn.readUTF(); 787 } 788 if (type == MarshallingSupport.LONG_TYPE) { 789 return Long.valueOf(this.dataIn.readLong()); 790 } 791 if (type == MarshallingSupport.INTEGER_TYPE) { 792 return Integer.valueOf(this.dataIn.readInt()); 793 } 794 if (type == MarshallingSupport.SHORT_TYPE) { 795 return Short.valueOf(this.dataIn.readShort()); 796 } 797 if (type == MarshallingSupport.BYTE_TYPE) { 798 return Byte.valueOf(this.dataIn.readByte()); 799 } 800 if (type == MarshallingSupport.FLOAT_TYPE) { 801 return new Float(this.dataIn.readFloat()); 802 } 803 if (type == MarshallingSupport.DOUBLE_TYPE) { 804 return new Double(this.dataIn.readDouble()); 805 } 806 if (type == MarshallingSupport.BOOLEAN_TYPE) { 807 return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE; 808 } 809 if (type == MarshallingSupport.CHAR_TYPE) { 810 return Character.valueOf(this.dataIn.readChar()); 811 } 812 if (type == MarshallingSupport.BYTE_ARRAY_TYPE) { 813 int len = this.dataIn.readInt(); 814 byte[] value = new byte[len]; 815 this.dataIn.readFully(value); 816 return value; 817 } else { 818 this.dataIn.reset(); 819 throw new MessageFormatException("unknown type"); 820 } 821 } catch (NumberFormatException mfe) { 822 try { 823 this.dataIn.reset(); 824 } catch (IOException ioe) { 825 throw JMSExceptionSupport.create(ioe); 826 } 827 throw mfe; 828 829 } catch (EOFException e) { 830 JMSException jmsEx = new MessageEOFException(e.getMessage()); 831 jmsEx.setLinkedException(e); 832 throw jmsEx; 833 } catch (IOException e) { 834 JMSException jmsEx = new MessageFormatException(e.getMessage()); 835 jmsEx.setLinkedException(e); 836 throw jmsEx; 837 } 838 } 839 840 /** 841 * Writes a <code>boolean</code> to the stream message. The value 842 * <code>true</code> is written as the value <code>(byte)1</code>; the 843 * value <code>false</code> is written as the value <code>(byte)0</code>. 844 * 845 * @param value the <code>boolean</code> value to be written 846 * @throws JMSException if the JMS provider fails to write the message due 847 * to some internal error. 848 * @throws MessageNotWriteableException if the message is in read-only mode. 849 */ 850 851 public void writeBoolean(boolean value) throws JMSException { 852 initializeWriting(); 853 try { 854 MarshallingSupport.marshalBoolean(dataOut, value); 855 } catch (IOException ioe) { 856 throw JMSExceptionSupport.create(ioe); 857 } 858 } 859 860 /** 861 * Writes a <code>byte</code> to the stream message. 862 * 863 * @param value the <code>byte</code> value to be written 864 * @throws JMSException if the JMS provider fails to write the message due 865 * to some internal error. 866 * @throws MessageNotWriteableException if the message is in read-only mode. 867 */ 868 869 public void writeByte(byte value) throws JMSException { 870 initializeWriting(); 871 try { 872 MarshallingSupport.marshalByte(dataOut, value); 873 } catch (IOException ioe) { 874 throw JMSExceptionSupport.create(ioe); 875 } 876 } 877 878 /** 879 * Writes a <code>short</code> to the stream message. 880 * 881 * @param value the <code>short</code> value to be written 882 * @throws JMSException if the JMS provider fails to write the message due 883 * to some internal error. 884 * @throws MessageNotWriteableException if the message is in read-only mode. 885 */ 886 887 public void writeShort(short value) throws JMSException { 888 initializeWriting(); 889 try { 890 MarshallingSupport.marshalShort(dataOut, value); 891 } catch (IOException ioe) { 892 throw JMSExceptionSupport.create(ioe); 893 } 894 } 895 896 /** 897 * Writes a <code>char</code> to the stream message. 898 * 899 * @param value the <code>char</code> value to be written 900 * @throws JMSException if the JMS provider fails to write the message due 901 * to some internal error. 902 * @throws MessageNotWriteableException if the message is in read-only mode. 903 */ 904 905 public void writeChar(char value) throws JMSException { 906 initializeWriting(); 907 try { 908 MarshallingSupport.marshalChar(dataOut, value); 909 } catch (IOException ioe) { 910 throw JMSExceptionSupport.create(ioe); 911 } 912 } 913 914 /** 915 * Writes an <code>int</code> to the stream message. 916 * 917 * @param value the <code>int</code> value to be written 918 * @throws JMSException if the JMS provider fails to write the message due 919 * to some internal error. 920 * @throws MessageNotWriteableException if the message is in read-only mode. 921 */ 922 923 public void writeInt(int value) throws JMSException { 924 initializeWriting(); 925 try { 926 MarshallingSupport.marshalInt(dataOut, value); 927 } catch (IOException ioe) { 928 throw JMSExceptionSupport.create(ioe); 929 } 930 } 931 932 /** 933 * Writes a <code>long</code> to the stream message. 934 * 935 * @param value the <code>long</code> value to be written 936 * @throws JMSException if the JMS provider fails to write the message due 937 * to some internal error. 938 * @throws MessageNotWriteableException if the message is in read-only mode. 939 */ 940 941 public void writeLong(long value) throws JMSException { 942 initializeWriting(); 943 try { 944 MarshallingSupport.marshalLong(dataOut, value); 945 } catch (IOException ioe) { 946 throw JMSExceptionSupport.create(ioe); 947 } 948 } 949 950 /** 951 * Writes a <code>float</code> to the stream message. 952 * 953 * @param value the <code>float</code> value to be written 954 * @throws JMSException if the JMS provider fails to write the message due 955 * to some internal error. 956 * @throws MessageNotWriteableException if the message is in read-only mode. 957 */ 958 959 public void writeFloat(float value) throws JMSException { 960 initializeWriting(); 961 try { 962 MarshallingSupport.marshalFloat(dataOut, value); 963 } catch (IOException ioe) { 964 throw JMSExceptionSupport.create(ioe); 965 } 966 } 967 968 /** 969 * Writes a <code>double</code> to the stream message. 970 * 971 * @param value the <code>double</code> value to be written 972 * @throws JMSException if the JMS provider fails to write the message due 973 * to some internal error. 974 * @throws MessageNotWriteableException if the message is in read-only mode. 975 */ 976 977 public void writeDouble(double value) throws JMSException { 978 initializeWriting(); 979 try { 980 MarshallingSupport.marshalDouble(dataOut, value); 981 } catch (IOException ioe) { 982 throw JMSExceptionSupport.create(ioe); 983 } 984 } 985 986 /** 987 * Writes a <code>String</code> to the stream message. 988 * 989 * @param value the <code>String</code> value to be written 990 * @throws JMSException if the JMS provider fails to write the message due 991 * to some internal error. 992 * @throws MessageNotWriteableException if the message is in read-only mode. 993 */ 994 995 public void writeString(String value) throws JMSException { 996 initializeWriting(); 997 try { 998 if (value == null) { 999 MarshallingSupport.marshalNull(dataOut); 1000 } else { 1001 MarshallingSupport.marshalString(dataOut, value); 1002 } 1003 } catch (IOException ioe) { 1004 throw JMSExceptionSupport.create(ioe); 1005 } 1006 } 1007 1008 /** 1009 * Writes a byte array field to the stream message. <p/> 1010 * <P> 1011 * The byte array <code>value</code> is written to the message as a byte 1012 * array field. Consecutively written byte array fields are treated as two 1013 * distinct fields when the fields are read. 1014 * 1015 * @param value the byte array value to be written 1016 * @throws JMSException if the JMS provider fails to write the message due 1017 * to some internal error. 1018 * @throws MessageNotWriteableException if the message is in read-only mode. 1019 */ 1020 1021 public void writeBytes(byte[] value) throws JMSException { 1022 writeBytes(value, 0, value.length); 1023 } 1024 1025 /** 1026 * Writes a portion of a byte array as a byte array field to the stream 1027 * message. <p/> 1028 * <P> 1029 * The a portion of the byte array <code>value</code> is written to the 1030 * message as a byte array field. Consecutively written byte array fields 1031 * are treated as two distinct fields when the fields are read. 1032 * 1033 * @param value the byte array value to be written 1034 * @param offset the initial offset within the byte array 1035 * @param length the number of bytes to use 1036 * @throws JMSException if the JMS provider fails to write the message due 1037 * to some internal error. 1038 * @throws MessageNotWriteableException if the message is in read-only mode. 1039 */ 1040 1041 public void writeBytes(byte[] value, int offset, int length) throws JMSException { 1042 initializeWriting(); 1043 try { 1044 MarshallingSupport.marshalByteArray(dataOut, value, offset, length); 1045 } catch (IOException ioe) { 1046 throw JMSExceptionSupport.create(ioe); 1047 } 1048 } 1049 1050 /** 1051 * Writes an object to the stream message. <p/> 1052 * <P> 1053 * This method works only for the objectified primitive object types (<code>Integer</code>, 1054 * <code>Double</code>, <code>Long</code> ...), 1055 * <code>String</code> objects, and byte arrays. 1056 * 1057 * @param value the Java object to be written 1058 * @throws JMSException if the JMS provider fails to write the message due 1059 * to some internal error. 1060 * @throws MessageFormatException if the object is invalid. 1061 * @throws MessageNotWriteableException if the message is in read-only mode. 1062 */ 1063 1064 public void writeObject(Object value) throws JMSException { 1065 initializeWriting(); 1066 if (value == null) { 1067 try { 1068 MarshallingSupport.marshalNull(dataOut); 1069 } catch (IOException ioe) { 1070 throw JMSExceptionSupport.create(ioe); 1071 } 1072 } else if (value instanceof String) { 1073 writeString(value.toString()); 1074 } else if (value instanceof Character) { 1075 writeChar(((Character)value).charValue()); 1076 } else if (value instanceof Boolean) { 1077 writeBoolean(((Boolean)value).booleanValue()); 1078 } else if (value instanceof Byte) { 1079 writeByte(((Byte)value).byteValue()); 1080 } else if (value instanceof Short) { 1081 writeShort(((Short)value).shortValue()); 1082 } else if (value instanceof Integer) { 1083 writeInt(((Integer)value).intValue()); 1084 } else if (value instanceof Float) { 1085 writeFloat(((Float)value).floatValue()); 1086 } else if (value instanceof Double) { 1087 writeDouble(((Double)value).doubleValue()); 1088 } else if (value instanceof byte[]) { 1089 writeBytes((byte[])value); 1090 }else if (value instanceof Long) { 1091 writeLong(((Long)value).longValue()); 1092 }else { 1093 throw new MessageFormatException("Unsupported Object type: " + value.getClass()); 1094 } 1095 } 1096 1097 /** 1098 * Puts the message body in read-only mode and repositions the stream of 1099 * bytes to the beginning. 1100 * 1101 * @throws JMSException if an internal error occurs 1102 */ 1103 1104 public void reset() throws JMSException { 1105 storeContent(); 1106 this.bytesOut = null; 1107 this.dataIn = null; 1108 this.dataOut = null; 1109 this.remainingBytes = -1; 1110 setReadOnlyBody(true); 1111 } 1112 1113 private void initializeWriting() throws MessageNotWriteableException { 1114 checkReadOnlyBody(); 1115 if (this.dataOut == null) { 1116 this.bytesOut = new ByteArrayOutputStream(); 1117 OutputStream os = bytesOut; 1118 ActiveMQConnection connection = getConnection(); 1119 if (connection != null && connection.isUseCompression()) { 1120 compressed = true; 1121 os = new DeflaterOutputStream(os); 1122 } 1123 this.dataOut = new DataOutputStream(os); 1124 } 1125 } 1126 1127 protected void checkWriteOnlyBody() throws MessageNotReadableException { 1128 if (!readOnlyBody) { 1129 throw new MessageNotReadableException("Message body is write-only"); 1130 } 1131 } 1132 1133 private void initializeReading() throws MessageNotReadableException { 1134 checkWriteOnlyBody(); 1135 if (this.dataIn == null) { 1136 ByteSequence data = getContent(); 1137 if (data == null) { 1138 data = new ByteSequence(new byte[] {}, 0, 0); 1139 } 1140 InputStream is = new ByteArrayInputStream(data); 1141 if (isCompressed()) { 1142 is = new InflaterInputStream(is); 1143 is = new BufferedInputStream(is); 1144 } 1145 this.dataIn = new DataInputStream(is); 1146 } 1147 } 1148 1149 public String toString() { 1150 return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }"; 1151 } 1152}