public abstract class AbstractQTPComposer extends AbstractMessageVisitor implements DataVisitor, SubscriptionVisitor, RecordSink
setOutput(BufferedOutput) method
immediately after construction.
Note, that all methods declared to throw IOException actually never do so if the underlying
BufferedOutput is memory-only. They are declared to throw IOException
only in order not to process it in every subclass but to process it in the outermost methods only,
where they are wrapped in RuntimeQTPException if they happen.
composeXXX methods are entry-level methods. The main method is compose(MessageProvider).
All compose methods update the number of written bytes in stats.
visitXXX and writeXXX methods are helper methods that are invoked during composing. However,
visitXXXData/Subscription methods can be invoked directly when stats are not needed.
They update only per-record stats when appropriate.
This class is not thread-safe.
It can be reused to compose unrelated messages record descriptions are not being written
(see AbstractQTPComposer(DataScheme, boolean) or if resetSession() is called
to clean up its state.
AbstractQTPParser| Modifier and Type | Field and Description |
|---|---|
protected MessageType |
currentMessageType |
protected com.devexperts.io.ChunkedOutput |
msg
This is temporary buffer where message is composed into.
|
protected ProtocolOption.Set |
optSet |
protected DataScheme |
scheme |
protected QDStats |
stats |
protected boolean |
writeEventTimeSequence |
protected boolean |
writeHeartbeat |
VOID| Modifier | Constructor and Description |
|---|---|
protected |
AbstractQTPComposer(DataScheme scheme,
boolean describeRecords)
Constructs composer.
|
| Modifier and Type | Method and Description |
|---|---|
protected void |
abortMessageAndRethrow(Throwable t) |
void |
append(RecordCursor cursor)
Adds a record to this sink from the specified
RecordCursor. |
void |
beginMessage(MessageType messageType)
Begins new message, must be followed by
endMessage(). |
protected void |
beginRecord(DataRecord record) |
boolean |
compose(MessageProvider provider)
|
void |
composeDescribeProtocol(ProtocolDescriptor descriptor)
|
void |
composeEmptyHeartbeat()
|
void |
composeHeartbeatMessage(HeartbeatPayload heartbeatPayload)
|
void |
composeTimeProgressReport(long timeMillis)
|
protected void |
describeRecord(DataRecord record) |
void |
endMessage()
End current message that was started with
beginMessage(MessageType) and copies composed
message to output. |
protected void |
finishComposingMessage(com.devexperts.io.BufferedOutput out)
Performs actions necessary to finish composing a message
(for example, inserts message size, inserts records descriptions if necessary).
|
void |
flush()
Signal that it is safe to flush
appended records to disk,
e.g. |
protected long |
getEventTimeSequence(RecordCursor cursor) |
protected long |
getMessagePayloadSize() |
boolean |
hasCapacity()
Returns
true when this message's capacity had not exceeded threshold yet. |
protected boolean |
inMessage()
Returns true if we are within a message and false if not.
|
void |
resetSession()
Resets session state for composer with describe records mode.
|
void |
setOptSet(ProtocolOption.Set optSet) |
void |
setOutput(com.devexperts.io.BufferedOutput output)
Changes output for the composed messages.
|
void |
setStats(QDStats stats)
Changes stats to gather composer statistics.
|
void |
setWriteEventTimeSequence(boolean writeEventTimeSequence) |
void |
setWriteHeartbeat(boolean writeHeartbeat) |
protected void |
undoWriteMessageHeaderStateChange() |
boolean |
visitData(DataProvider provider,
MessageType type)
This method consumes available data for data message from the given data provider.
|
void |
visitDescribeProtocol(ProtocolDescriptor descriptor) |
void |
visitHeartbeat(HeartbeatPayload heartbeatPayload) |
void |
visitIntField(DataIntField field,
int value)
Visits next Int-field within current record.
|
void |
visitObjField(DataObjField field,
Object value)
Visits next Obj-field within current record.
|
boolean |
visitOtherMessage(int messageType,
byte[] messageBytes,
int offset,
int length)
This method consumes other message type.
|
void |
visitRecord(DataRecord record,
int cipher,
String symbol)
Deprecated.
|
void |
visitRecord(DataRecord record,
int cipher,
String symbol,
long time)
Deprecated.
|
boolean |
visitSubscription(SubscriptionProvider provider,
MessageType type)
This method consumes available subscription for subscription message from the given subscription provider.
|
protected void |
writeDescribeProtocolMessage(com.devexperts.io.BufferedOutput out,
ProtocolDescriptor descriptor) |
protected void |
writeEmptyHeartbeatMessage(com.devexperts.io.BufferedOutput out) |
protected void |
writeEventTimeSequence(long eventTimeSequence) |
protected abstract void |
writeField(DataField field,
RecordCursor cursor) |
protected void |
writeHeartbeatMessage(com.devexperts.io.BufferedOutput out,
HeartbeatPayload heartbeatPayload) |
protected abstract void |
writeHistorySubscriptionTime(DataRecord record,
long time) |
protected abstract void |
writeIntField(DataIntField field,
int value) |
protected abstract void |
writeMessageHeader(MessageType messageType)
Composes header for a message of a specific type.
|
protected abstract void |
writeObjField(DataObjField field,
Object value) |
protected void |
writeOtherMessageBody(byte[] messageBytes,
int offset,
int length) |
protected abstract int |
writeRecordHeader(DataRecord record,
int cipher,
String symbol,
int eventFlags) |
protected void |
writeRecordPayload(RecordCursor cursor,
int eventFlags) |
visitHistoryAddSubscription, visitHistoryData, visitHistoryRemoveSubscription, visitStreamAddSubscription, visitStreamData, visitStreamRemoveSubscription, visitTickerAddSubscription, visitTickerData, visitTickerRemoveSubscriptionprotected final DataScheme scheme
protected QDStats stats
protected MessageType currentMessageType
protected boolean writeEventTimeSequence
protected boolean writeHeartbeat
protected ProtocolOption.Set optSet
protected final com.devexperts.io.ChunkedOutput msg
default
chunk pool that is aligned in chunk size with QTPConstants.COMPOSER_THRESHOLD.protected AbstractQTPComposer(DataScheme scheme, boolean describeRecords)
setOutput before using this composer.scheme - the data scheme.describeRecords - if true, then describe messages are composed right before
records are used for the first time and this instance keeps its state and shall not be
reused for different communication sessions. See resetSession().public void setOutput(com.devexperts.io.BufferedOutput output)
output - output for the composed messages.public void setStats(QDStats stats)
stats - stats to gather composer statistics.public void setWriteEventTimeSequence(boolean writeEventTimeSequence)
public void setWriteHeartbeat(boolean writeHeartbeat)
public void setOptSet(ProtocolOption.Set optSet)
public void resetSession()
UnsupportedOperationException - when composer was constructed without describe records mode.AbstractQTPComposer(DataScheme, boolean)public final boolean compose(MessageProvider provider)
output
and updates number of written bytes in stats.
This is the outer method from which all the other visitXXX and writeXXX should be written in order to properly record written bytes statistics if needed. However, there are also other composeXXX methods that must be invoked directly (not from inside of this compose invocation).
Inside of this method endMessage() and beginMessage(MessageType) can be used to
to arbitrarily delimit messages.
false if it had composed all available messages and no messages left to compose,
true if more message remain to be composedpublic final void composeDescribeProtocol(ProtocolDescriptor descriptor)
public final void composeEmptyHeartbeat()
public final void composeHeartbeatMessage(HeartbeatPayload heartbeatPayload)
public final void composeTimeProgressReport(long timeMillis)
public boolean hasCapacity()
true when this message's capacity had not exceeded threshold yet.
This implementation always returns true when invoked not in message,
e.g. the overall number of bytes composed is not checked by this implementation
(only the message size for data and subscription messages is limited).
Each call to beginMessage(MessageType) resets capacity threshold for a new message.hasCapacity in interface DataVisitorhasCapacity in interface RecordSinkhasCapacity in interface SubscriptionVisitorprotected long getMessagePayloadSize()
public void visitDescribeProtocol(ProtocolDescriptor descriptor)
visitDescribeProtocol in interface MessageVisitorvisitDescribeProtocol in class AbstractMessageVisitorpublic void visitHeartbeat(HeartbeatPayload heartbeatPayload)
visitHeartbeat in interface MessageVisitorvisitHeartbeat in class AbstractMessageVisitorpublic boolean visitData(DataProvider provider, MessageType type)
visitData in interface MessageVisitorvisitData in class AbstractMessageVisitorfalse if it had retrieved all available data and no data left,
true if more data remains to be consumed.public boolean visitSubscription(SubscriptionProvider provider, MessageType type)
visitSubscription in interface MessageVisitorvisitSubscription in class AbstractMessageVisitorfalse if it had retrieved all available subscription and nothing left,
true if more subscription remains to be consumed.public boolean visitOtherMessage(int messageType,
byte[] messageBytes,
int offset,
int length)
visitOtherMessage in interface MessageVisitorvisitOtherMessage in class AbstractMessageVisitormessageType - integer number representing a type of the message.messageBytes - array containing message data.offset - position of the first byte of message data in messageBytes array.length - number of bytes starting from offset in messageBytes related to this message.public void append(RecordCursor cursor)
RecordSinkRecordCursor.append in interface RecordSinkpublic void flush()
appended records to disk,
e.g. no locks are currently held.
This implementation does nothing.
flush in interface RecordSinkpublic void visitRecord(DataRecord record, int cipher, String symbol)
append(RecordCursor)DataVisitorvisitRecord in interface DataVisitorvisitRecord in interface RecordSinkpublic void visitRecord(DataRecord record, int cipher, String symbol, long time)
append(RecordCursor)SubscriptionVisitorvisitRecord in interface RecordSinkvisitRecord in interface SubscriptionVisitorpublic void visitIntField(DataIntField field, int value)
DataVisitorvisitIntField in interface DataVisitorvisitIntField in interface RecordSinkpublic void visitObjField(DataObjField field, Object value)
DataVisitorvisitObjField in interface DataVisitorvisitObjField in interface RecordSinkprotected final boolean inMessage()
public final void beginMessage(MessageType messageType)
endMessage().public final void endMessage()
beginMessage(MessageType) and copies composed
message to output.
This method can be invoked while composing message with
visitData(DataProvider, MessageType) or visitSubscription(SubscriptionProvider, MessageType).
It can be followed by resetSession().
If invoked while composing, the message should be resumed by beginMessage(MessageType)
invocation before returning control to the composing method.
protected void abortMessageAndRethrow(Throwable t)
protected void writeDescribeProtocolMessage(com.devexperts.io.BufferedOutput out,
ProtocolDescriptor descriptor)
throws IOException
IOExceptionprotected void writeEmptyHeartbeatMessage(com.devexperts.io.BufferedOutput out)
throws IOException
IOExceptionprotected void writeHeartbeatMessage(com.devexperts.io.BufferedOutput out,
HeartbeatPayload heartbeatPayload)
throws IOException
IOExceptionprotected abstract void writeMessageHeader(MessageType messageType) throws IOException
messageType - the message type.IOException - never. If it has been thrown then it means an internal error.protected void undoWriteMessageHeaderStateChange()
protected abstract int writeRecordHeader(DataRecord record, int cipher, String symbol, int eventFlags) throws IOException
IOExceptionprotected void writeRecordPayload(RecordCursor cursor, int eventFlags) throws IOException
IOExceptionprotected void writeEventTimeSequence(long eventTimeSequence)
throws IOException
IOExceptionprotected abstract void writeHistorySubscriptionTime(DataRecord record, long time) throws IOException
IOExceptionprotected abstract void writeIntField(DataIntField field, int value) throws IOException
IOExceptionprotected abstract void writeObjField(DataObjField field, Object value) throws IOException
IOExceptionprotected abstract void writeField(DataField field, RecordCursor cursor) throws IOException
IOExceptionprotected void writeOtherMessageBody(byte[] messageBytes,
int offset,
int length)
throws IOException
IOExceptionprotected void finishComposingMessage(com.devexperts.io.BufferedOutput out)
throws IOException
BinaryQTPComposer. This implementation just copies message
from msg to out.IOExceptionprotected long getEventTimeSequence(RecordCursor cursor)
protected void describeRecord(DataRecord record) throws IOException
IOExceptionprotected final void beginRecord(DataRecord record)
Copyright © 2002–2025 Devexperts LLC. All rights reserved.