public abstract class AbstractQTPComposer extends AbstractMessageVisitor implements DataVisitor, SubscriptionVisitor, RecordSink
setOutput(BufferedOutput)
method
immediately after construction.
Note, that all methods declared to throw IOException
actually never do so if the underlying
BufferedOutput
is memory-only. They are declared to throw IOException
only in order not to process it in every subclass but to process it in the outermost methods only,
where they are wrapped in RuntimeQTPException
if they happen.
composeXXX methods are entry-level methods. The main method is compose(MessageProvider)
.
All compose methods update the number of written bytes in stats
.
visitXXX and writeXXX methods are helper methods that are invoked during composing. However,
visitXXXData/Subscription methods can be invoked directly when stats are not needed.
They update only per-record stats
when appropriate.
This class is not thread-safe.
It can be reused to compose unrelated messages record descriptions are not being written
(see AbstractQTPComposer(DataScheme, boolean)
or if resetSession()
is called
to clean up its state.
AbstractQTPParser
Modifier and Type | Field and Description |
---|---|
protected MessageType |
currentMessageType |
protected com.devexperts.io.ChunkedOutput |
msg
This is temporary buffer where message is composed into.
|
protected ProtocolOption.Set |
optSet |
protected DataScheme |
scheme |
protected QDStats |
stats |
protected boolean |
writeEventTimeSequence |
protected boolean |
writeHeartbeat |
VOID
Modifier | Constructor and Description |
---|---|
protected |
AbstractQTPComposer(DataScheme scheme,
boolean describeRecords)
Constructs composer.
|
Modifier and Type | Method and Description |
---|---|
protected void |
abortMessageAndRethrow(Throwable t) |
void |
append(RecordCursor cursor)
Adds a record to this sink from the specified
RecordCursor . |
void |
beginMessage(MessageType messageType)
Begins new message, must be followed by
endMessage() . |
protected void |
beginRecord(DataRecord record) |
boolean |
compose(MessageProvider provider)
|
void |
composeDescribeProtocol(ProtocolDescriptor descriptor)
|
void |
composeEmptyHeartbeat()
|
void |
composeHeartbeatMessage(HeartbeatPayload heartbeatPayload)
|
void |
composeTimeProgressReport(long timeMillis)
|
protected void |
describeRecord(DataRecord record) |
void |
endMessage()
End current message that was started with
beginMessage(MessageType) and copies composed
message to output . |
protected void |
finishComposingMessage(com.devexperts.io.BufferedOutput out)
Performs actions necessary to finish composing a message
(for example, inserts message size, inserts records descriptions if necessary).
|
void |
flush()
Signal that it is safe to flush
appended records to disk,
e.g. |
protected long |
getEventTimeSequence(RecordCursor cursor) |
protected long |
getMessagePayloadSize() |
boolean |
hasCapacity()
Returns
true when this message's capacity had not exceeded threshold yet. |
protected boolean |
inMessage()
Returns true if we are within a message and false if not.
|
void |
resetSession()
Resets session state for composer with describe records mode.
|
void |
setOptSet(ProtocolOption.Set optSet) |
void |
setOutput(com.devexperts.io.BufferedOutput output)
Changes output for the composed messages.
|
void |
setStats(QDStats stats)
Changes stats to gather composer statistics.
|
void |
setWriteEventTimeSequence(boolean writeEventTimeSequence) |
void |
setWriteHeartbeat(boolean writeHeartbeat) |
protected void |
undoWriteMessageHeaderStateChange() |
boolean |
visitData(DataProvider provider,
MessageType type)
This method consumes available data for data message from the given data provider.
|
void |
visitDescribeProtocol(ProtocolDescriptor descriptor) |
void |
visitHeartbeat(HeartbeatPayload heartbeatPayload) |
void |
visitIntField(DataIntField field,
int value)
Visits next Int-field within current record.
|
void |
visitObjField(DataObjField field,
Object value)
Visits next Obj-field within current record.
|
boolean |
visitOtherMessage(int messageType,
byte[] messageBytes,
int offset,
int length)
This method consumes other message type.
|
void |
visitRecord(DataRecord record,
int cipher,
String symbol)
Deprecated.
|
void |
visitRecord(DataRecord record,
int cipher,
String symbol,
long time)
Deprecated.
|
boolean |
visitSubscription(SubscriptionProvider provider,
MessageType type)
This method consumes available subscription for subscription message from the given subscription provider.
|
protected void |
writeDescribeProtocolMessage(com.devexperts.io.BufferedOutput out,
ProtocolDescriptor descriptor) |
protected void |
writeEmptyHeartbeatMessage(com.devexperts.io.BufferedOutput out) |
protected void |
writeEventTimeSequence(long eventTimeSequence) |
protected abstract void |
writeField(DataField field,
RecordCursor cursor) |
protected void |
writeHeartbeatMessage(com.devexperts.io.BufferedOutput out,
HeartbeatPayload heartbeatPayload) |
protected abstract void |
writeHistorySubscriptionTime(DataRecord record,
long time) |
protected abstract void |
writeIntField(DataIntField field,
int value) |
protected abstract void |
writeMessageHeader(MessageType messageType)
Composes header for a message of a specific type.
|
protected abstract void |
writeObjField(DataObjField field,
Object value) |
protected void |
writeOtherMessageBody(byte[] messageBytes,
int offset,
int length) |
protected abstract int |
writeRecordHeader(DataRecord record,
int cipher,
String symbol,
int eventFlags) |
protected void |
writeRecordPayload(RecordCursor cursor,
int eventFlags) |
visitHistoryAddSubscription, visitHistoryData, visitHistoryRemoveSubscription, visitStreamAddSubscription, visitStreamData, visitStreamRemoveSubscription, visitTickerAddSubscription, visitTickerData, visitTickerRemoveSubscription
protected final DataScheme scheme
protected QDStats stats
protected MessageType currentMessageType
protected boolean writeEventTimeSequence
protected boolean writeHeartbeat
protected ProtocolOption.Set optSet
protected final com.devexperts.io.ChunkedOutput msg
default
chunk pool that is aligned in chunk size with QTPConstants.COMPOSER_THRESHOLD
.protected AbstractQTPComposer(DataScheme scheme, boolean describeRecords)
setOutput
before using this composer.scheme
- the data scheme.describeRecords
- if true
, then describe messages are composed right before
records are used for the first time and this instance keeps its state and shall not be
reused for different communication sessions. See resetSession()
.public void setOutput(com.devexperts.io.BufferedOutput output)
output
- output for the composed messages.public void setStats(QDStats stats)
stats
- stats to gather composer statistics.public void setWriteEventTimeSequence(boolean writeEventTimeSequence)
public void setWriteHeartbeat(boolean writeHeartbeat)
public void setOptSet(ProtocolOption.Set optSet)
public void resetSession()
UnsupportedOperationException
- when composer was constructed without describe records mode.AbstractQTPComposer(DataScheme, boolean)
public final boolean compose(MessageProvider provider)
output
and updates number of written bytes in stats
.
This is the outer method from which all the other visitXXX and writeXXX should be written in order to properly record written bytes statistics if needed. However, there are also other composeXXX methods that must be invoked directly (not from inside of this compose invocation).
Inside of this method endMessage()
and beginMessage(MessageType)
can be used to
to arbitrarily delimit messages.
false
if it had composed all available messages and no messages left to compose,
true
if more message remain to be composedpublic final void composeDescribeProtocol(ProtocolDescriptor descriptor)
public final void composeEmptyHeartbeat()
public final void composeHeartbeatMessage(HeartbeatPayload heartbeatPayload)
public final void composeTimeProgressReport(long timeMillis)
public boolean hasCapacity()
true
when this message's capacity had not exceeded threshold yet.
This implementation always returns true
when invoked not in message,
e.g. the overall number of bytes composed is not checked by this implementation
(only the message size for data and subscription messages is limited).
Each call to beginMessage(MessageType)
resets capacity threshold for a new message.hasCapacity
in interface DataVisitor
hasCapacity
in interface RecordSink
hasCapacity
in interface SubscriptionVisitor
protected long getMessagePayloadSize()
public void visitDescribeProtocol(ProtocolDescriptor descriptor)
visitDescribeProtocol
in interface MessageVisitor
visitDescribeProtocol
in class AbstractMessageVisitor
public void visitHeartbeat(HeartbeatPayload heartbeatPayload)
visitHeartbeat
in interface MessageVisitor
visitHeartbeat
in class AbstractMessageVisitor
public boolean visitData(DataProvider provider, MessageType type)
visitData
in interface MessageVisitor
visitData
in class AbstractMessageVisitor
false
if it had retrieved all available data and no data left,
true
if more data remains to be consumed.public boolean visitSubscription(SubscriptionProvider provider, MessageType type)
visitSubscription
in interface MessageVisitor
visitSubscription
in class AbstractMessageVisitor
false
if it had retrieved all available subscription and nothing left,
true
if more subscription remains to be consumed.public boolean visitOtherMessage(int messageType, byte[] messageBytes, int offset, int length)
visitOtherMessage
in interface MessageVisitor
visitOtherMessage
in class AbstractMessageVisitor
messageType
- integer number representing a type of the message.messageBytes
- array containing message data.offset
- position of the first byte of message data in messageBytes
array.length
- number of bytes starting from offset
in messageBytes
related to this message.public void append(RecordCursor cursor)
RecordSink
RecordCursor
.append
in interface RecordSink
public void flush()
appended
records to disk,
e.g. no locks are currently held.
This implementation does nothing.
flush
in interface RecordSink
public void visitRecord(DataRecord record, int cipher, String symbol)
append(RecordCursor)
DataVisitor
visitRecord
in interface DataVisitor
visitRecord
in interface RecordSink
public void visitRecord(DataRecord record, int cipher, String symbol, long time)
append(RecordCursor)
SubscriptionVisitor
visitRecord
in interface RecordSink
visitRecord
in interface SubscriptionVisitor
public void visitIntField(DataIntField field, int value)
DataVisitor
visitIntField
in interface DataVisitor
visitIntField
in interface RecordSink
public void visitObjField(DataObjField field, Object value)
DataVisitor
visitObjField
in interface DataVisitor
visitObjField
in interface RecordSink
protected final boolean inMessage()
public final void beginMessage(MessageType messageType)
endMessage()
.public final void endMessage()
beginMessage(MessageType)
and copies composed
message to output
.
This method can be invoked while composing message with
visitData(DataProvider, MessageType)
or visitSubscription(SubscriptionProvider, MessageType)
.
It can be followed by resetSession()
.
If invoked while composing, the message should be resumed by beginMessage(MessageType)
invocation before returning control to the composing method.
protected void abortMessageAndRethrow(Throwable t)
protected void writeDescribeProtocolMessage(com.devexperts.io.BufferedOutput out, ProtocolDescriptor descriptor) throws IOException
IOException
protected void writeEmptyHeartbeatMessage(com.devexperts.io.BufferedOutput out) throws IOException
IOException
protected void writeHeartbeatMessage(com.devexperts.io.BufferedOutput out, HeartbeatPayload heartbeatPayload) throws IOException
IOException
protected abstract void writeMessageHeader(MessageType messageType) throws IOException
messageType
- the message type.IOException
- never. If it has been thrown then it means an internal error.protected void undoWriteMessageHeaderStateChange()
protected abstract int writeRecordHeader(DataRecord record, int cipher, String symbol, int eventFlags) throws IOException
IOException
protected void writeRecordPayload(RecordCursor cursor, int eventFlags) throws IOException
IOException
protected void writeEventTimeSequence(long eventTimeSequence) throws IOException
IOException
protected abstract void writeHistorySubscriptionTime(DataRecord record, long time) throws IOException
IOException
protected abstract void writeIntField(DataIntField field, int value) throws IOException
IOException
protected abstract void writeObjField(DataObjField field, Object value) throws IOException
IOException
protected abstract void writeField(DataField field, RecordCursor cursor) throws IOException
IOException
protected void writeOtherMessageBody(byte[] messageBytes, int offset, int length) throws IOException
IOException
protected void finishComposingMessage(com.devexperts.io.BufferedOutput out) throws IOException
BinaryQTPComposer
. This implementation just copies message
from msg
to out
.IOException
protected long getEventTimeSequence(RecordCursor cursor)
protected void describeRecord(DataRecord record) throws IOException
IOException
protected final void beginRecord(DataRecord record)
Copyright © 2002–2025 Devexperts LLC. All rights reserved.