Skip to content

Commit

Permalink
f9fix: FIX log: R TimeStamp=盡量靠近收到資料的時間;
Browse files Browse the repository at this point in the history
  • Loading branch information
fonwin committed Jul 25, 2023
1 parent 9946fe7 commit ff682e2
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 43 deletions.
34 changes: 27 additions & 7 deletions fon9/fix/FixConfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 23,7 @@ enum class FixSeqSt {
/// 呼叫 FixMsgHandler 之前, 完全不寫任何記錄(即使序號符合規範).
/// 如果 FixMsgTypeConfig::FixSeqAllow_ 設定了此旗標,
/// 則在 FixMsgHandler 處理 FixSeqSt::Conform 通知時,
/// 應呼叫 `fixRecorder.WriteInputConform(args.MsgStr_);` 才能正確處理接收序號.
/// 應呼叫 `fixRecorder.WriteInputConform(rxargs);` 才能正確處理接收序號.
/// 一般而言只有 SequenceReset 才會用到.
NoPreRecord = 0x10,
};
Expand All @@ -34,22 34,41 @@ inline FixSeqSt CompareFixSeqNum(FixSeqNum rxMsgSeqNum, FixSeqNum expectSeqNum)
: FixSeqSt::TooHigh;
}

fon9_WARN_DISABLE_PADDING;
struct FixRecvEvArgs {
class FixRecvEvArgs {
fon9_NON_COPY_NON_MOVE(FixRecvEvArgs);
/// 原始 FIX Message.
StrView OrigMsgStr_;
/// 收到訊息的時間.
TimeStamp RxTime_;

public:
FixRecvEvArgs(FixParser& fixParser)
: Msg_(fixParser) {
}
/// 已解析完畢的 FIX Message.
FixParser& Msg_;
/// FIX Message.
StrView MsgStr_;

/// 在呼叫 FixMsgHandler 時, 這裡會提供:
/// - FixSeqSt::Conform 序號符合規範
/// - FixSeqSt::TooHigh 如果序號高於預期, 且 FixMsgTypeConfig::FixSeqAllow_ 有設定 FixSeqSt::TooHigh
/// - FixSeqSt::TooLow 如果序號高於預期, 且 FixMsgTypeConfig::FixSeqAllow_ 有設定 FixSeqSt::TooLow
FixSeqSt SeqSt_;
char Padding____[4];

/// 原始 FIX Message.
const StrView& OrigMsgStr() const {
return this->OrigMsgStr_;
}
void SetOrigMsgStr(const StrView& origMsgStr) {
this->OrigMsgStr_ = origMsgStr;
}
/// 收到訊息的時間.
const TimeStamp RxTime() const {
return this->RxTime_;
}
void SetRxTime(TimeStamp now) {
this->RxTime_ = now;
}

// this->SeqSt_ = CompareFixSeqNum(this->Msg_.GetMsgSeqNum(), this->FixSender_->GetFixRecorder().GetNextRecvSeq());
void ResetSeqSt();
Expand Down Expand Up @@ -90,9 109,10 @@ struct FixMsgTypeConfig {
}

/// 當訊息不連續時, 是否可以呼叫 FixMsgHandler_.
/// - 當序號連續時, 呼叫前必定會先寫 fixRecorder.WriteInputConform(args.MsgStr_);
/// - 當序號連續時, 呼叫前必定會先寫 fixRecorder.WriteInputConform(rxargs);
/// - 不連續時, 則必須由 FixMsgHandler_ 自行記錄必要訊息.
FixSeqSt FixSeqAllow_{FixSeqSt::Conform};
char Padding____[4];
/// 當 FixReceiver 收到一筆可用訊息時, 透過這裡通知.
FixMsgHandler FixMsgHandler_;
/// 當 FixReceiver 收到 SessionReject or BusinessReject 時.
Expand Down Expand Up @@ -150,8 170,8 @@ class fon9_API FixConfig {
/// 當有 Replay 的需求時(FixSender::Replay), 一律使用 FixSender::GapFill.
/// 例如: 券商與交易所之間的連線, 券商端斷線後重連, 可能全都不重送.
bool IsNoReplay_{false};
char Padding____[7];
};
fon9_WARN_POP;

} } // namespaces
#endif//__fon9_fix_FixConfig_hpp__
35 changes: 18 additions & 17 deletions fon9/fix/FixReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 32,9 @@ void FixReceiver::OnBadCompID(const FixRecvEvArgs& rxargs, FixTag errTag, const
// 造成訊息無法繼續下去.
// 所以如果此筆的序號正確, 則使用 WriteInputConform(), 讓下一筆正確序號的訊息能進入系統.
if (rxargs.SeqSt_ == FixSeqSt::Conform)
fixRecorder.WriteInputConform(rxargs.MsgStr_);
fixRecorder.WriteInputConform(rxargs);
else
fixRecorder.Write(f9fix_kCSTR_HdrError, rxargs.MsgStr_);
fixRecorder.Write(f9fix_kCSTR_HdrError, rxargs.OrigMsgStr());
FixBuilder fixb;
RevPrint(fixb.GetBuffer(), f9fix_kFLD_SessionRejectReason_CompIDProblem f9fix_SPLTAGEQ(Text) "CompID problem");
SendSessionReject(*rxargs.FixSender_, rxargs.Msg_.GetMsgSeqNum(), errTag, msgType, std::move(fixb));
Expand All @@ -54,7 54,7 @@ void FixReceiver::OnRecvResendRequest(const FixRecvEvArgs& rxargs) {
// if (rxargs.SeqSt_ != FixSeqSt::Conform)
// 因為即使序號正確, 也會因 IsEnumContains(mcfg->FixSeqAllow_, FixSeqSt::NoPreRecord) 而沒有寫入紀錄,
// 所以這裡離律強制寫入記錄.
rxargs.FixSender_->GetFixRecorder().Write(f9fix_kCSTR_HdrReplay, rxargs.MsgStr_);
rxargs.FixSender_->GetFixRecorder().Write(f9fix_kCSTR_HdrReplay, rxargs.OrigMsgStr());
// -----]
FixTag errTag;
if (const FixParser::FixField* fldBeginSeqNo = rxargs.Msg_.GetField(errTag = f9fix_kTAG_BeginSeqNo)) {
Expand Down Expand Up @@ -86,23 86,23 @@ void FixReceiver::OnRecvSequenceReset(const FixRecvEvArgs& rxargs) {
const FixSeqNum msgSeqNum = rxargs.Msg_.GetMsgSeqNum();
const FixParser::FixField* fldNewSeqNo = rxargs.Msg_.GetField(f9fix_kTAG_NewSeqNo);
if (fldNewSeqNo == nullptr) {
fixRecorder.Write(f9fix_kCSTR_HdrError, rxargs.MsgStr_);
fixRecorder.Write(f9fix_kCSTR_HdrError, rxargs.OrigMsgStr());
SendRequiredTagMissing(*rxargs.FixSender_, msgSeqNum, f9fix_kTAG_NewSeqNo, f9fix_kMSGTYPE_SequenceReset);
return;
}
FixSeqNum newSeqNo = StrTo(fldNewSeqNo->Value_, FixSeqNum{0});

if (isGapFill) {
if (fon9_UNLIKELY(newSeqNo < msgSeqNum)) {
fixRecorder.WriteInputConform(rxargs.MsgStr_);
fixRecorder.WriteInputConform(rxargs);
FixBuilder fixb;
RevPrint(fixb.GetBuffer(),
f9fix_kFLD_SessionRejectReason_ValueIsIncorrect
f9fix_SPLTAGEQ(Text) "NewSeqNo(", newSeqNo, ") < Expection(", msgSeqNum, ')');
SendSessionReject(*rxargs.FixSender_, msgSeqNum, f9fix_kTAG_NewSeqNo, f9fix_kMSGTYPE_SequenceReset, std::move(fixb));
return;
}
fixRecorder.WriteInputSeqReset(rxargs.MsgStr_, newSeqNo, isGapFill);
fixRecorder.WriteInputSeqReset(rxargs.OrigMsgStr(), newSeqNo, isGapFill);
if (rxargs.FixReceiver_->ResendRequestEndSeqNo_ < newSeqNo) {
// 上次的 ResendRequest 補齊了! 檢查是否有保留訊息, 檢查是否已全部補齊.
rxargs.FixReceiver_->ResendRequestEndSeqNo_ = 0;
Expand All @@ -111,7 111,7 @@ void FixReceiver::OnRecvSequenceReset(const FixRecvEvArgs& rxargs) {
}
else { // SequenceReset: Reset mode.
rxargs.FixReceiver_->ClearRecvKeeper();
fixRecorder.WriteInputSeqReset(rxargs.MsgStr_, newSeqNo, isGapFill);
fixRecorder.WriteInputSeqReset(rxargs.OrigMsgStr(), newSeqNo, isGapFill);
rxargs.FixReceiver_->OnRecoverDone(rxargs);
}
}
Expand All @@ -127,7 127,8 @@ void FixReceiver::CheckMsgKeeper(const FixRecvEvArgs& rxargs, FixSeqNum newSeqNo
if (newSeqNo == msg.SeqNum_) {
FixRecvEvArgs& args = const_cast<FixRecvEvArgs&>(rxargs);
StrView fixmsg{&msg.MsgStr_};
args.MsgStr_ = fixmsg;
args.SetOrigMsgStr(fixmsg);
args.SetRxTime(UtcNow()); // 是否要使用 [當時] 的時間?
args.Msg_.Clear();
args.Msg_.ParseFields(fixmsg, FixParser::Until::FullMessage);
this->DispatchFixMessage(args);
Expand Down Expand Up @@ -167,7 168,7 @@ void FixReceiver::DispatchFixMessage(FixRecvEvArgs& rxargs) {
const FixParser::FixField* fldMsgType = rxargs.Msg_.GetField(f9fix_kTAG_MsgType);
const FixSeqNum msgSeqNum = rxargs.Msg_.GetMsgSeqNum();
if (fon9_UNLIKELY(!fldMsgType)) {
fixRecorder.Write(f9fix_kCSTR_HdrError, rxargs.MsgStr_);
fixRecorder.Write(f9fix_kCSTR_HdrError, rxargs.OrigMsgStr());
SendRequiredTagMissing(*rxargs.FixSender_, msgSeqNum, f9fix_kTAG_MsgType, nullptr);
return;
}
Expand All @@ -182,12 183,12 @@ void FixReceiver::DispatchFixMessage(FixRecvEvArgs& rxargs) {
if (fon9_LIKELY(rxargs.SeqSt_ == FixSeqSt::Conform)) {
if (fon9_LIKELY(cfg && cfg->FixMsgHandler_)) {
if (fon9_LIKELY(!IsEnumContains(cfg->FixSeqAllow_, FixSeqSt::NoPreRecord)))
fixRecorder.WriteInputConform(rxargs.MsgStr_);
fixRecorder.WriteInputConform(rxargs);
// 一般而言, 正常的連續訊息會來到這兒, 丟給 FixMsgHandler 處理!
cfg->FixMsgHandler_(rxargs);
}
else {
fixRecorder.WriteInputConform(rxargs.MsgStr_);
fixRecorder.WriteInputConform(rxargs);
this->OnFixMsgHandlerNotFound(rxargs, msgType);
}
this->OnMessageProcessed(rxargs, msgSeqNum);
Expand Down Expand Up @@ -215,18 216,18 @@ void FixReceiver::OnMsgSeqNumNotExpected(const FixRecvEvArgs& rxargs, GapFlags f
if (msgSeqNum <= this->LastGapSeqNo_) {
// [回補範圍內] 的 [不連續訊息] => 不理會, 等候回補
if (!(flags & GapSkipRecord))
fixRecorder.Write(f9fix_kCSTR_HdrIgnoreRecv, rxargs.MsgStr_);
fixRecorder.Write(f9fix_kCSTR_HdrIgnoreRecv, rxargs.OrigMsgStr());
return;
}
this->LastGapSeqNo_ = msgSeqNum;
if (!(flags & GapKeepRequired)) {
if (!(flags & GapSkipRecord))
fixRecorder.Write(f9fix_kCSTR_HdrIgnoreRecv, rxargs.MsgStr_);
fixRecorder.Write(f9fix_kCSTR_HdrIgnoreRecv, rxargs.OrigMsgStr());
}
else {
if (!(flags & GapSkipRecord))
fixRecorder.Write(f9fix_kCSTR_HdrGapRecv, rxargs.MsgStr_);
this->MsgKeeper_.emplace_back(rxargs.MsgStr_.ToString(), msgSeqNum);
fixRecorder.Write(f9fix_kCSTR_HdrGapRecv, rxargs.OrigMsgStr());
this->MsgKeeper_.emplace_back(rxargs.OrigMsgStr().ToString(), msgSeqNum);
--msgSeqNum;
}
if (this->ResendRequestEndSeqNo_ > 0) // 回補中,等補到指定的序號時,再提出後續的回補要求.
Expand All @@ -238,11 239,11 @@ void FixReceiver::OnMsgSeqNumTooLow(const FixRecvEvArgs& rxargs) {
FixRecorder& fixRecorder = rxargs.FixSender_->GetFixRecorder();
if (const FixParser::FixField* fldPossDup = rxargs.Msg_.GetField(f9fix_kTAG_PossDupFlag))
if (fldPossDup->Value_.Get1st() == 'Y') {
fixRecorder.Write(f9fix_kCSTR_HdrIgnoreRecv, rxargs.MsgStr_);
fixRecorder.Write(f9fix_kCSTR_HdrIgnoreRecv, rxargs.OrigMsgStr());
this->OnRecvPossDup(rxargs);
return;
}
fixRecorder.Write(f9fix_kCSTR_HdrError, rxargs.MsgStr_);
fixRecorder.Write(f9fix_kCSTR_HdrError, rxargs.OrigMsgStr());
FixBuilder fixb;
RevPrint(fixb.GetBuffer(),
f9fix_SPLTAGEQ(Text) // "|58="
Expand Down
13 changes: 6 additions & 7 deletions fon9/fix/FixReceiver_UT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 78,16 @@ int main(int argc, char** argv) {
rxargs.FixConfig_ = &fixc;

while (fgets(strbuf, sizeof(strbuf), infd)) {
rxargs.MsgStr_ = fon9::StrView_cstr(strbuf);
fon9::StrTrim(&rxargs.MsgStr_);
if (rxargs.MsgStr_.empty()
|| rxargs.MsgStr_.Get1st() == '#')
fon9::StrView fixmsg(fon9::StrView_cstr(strbuf));
fon9::StrTrim(&fixmsg);
if (fixmsg.empty() || fixmsg.Get1st() == '#')
continue;
fon9::StrView fixmsg{rxargs.MsgStr_};
auto pres = fixParser.Parse(fixmsg);
rxargs.SetOrigMsgStr(fixmsg);
auto pres = fixParser.Parse(fixmsg);
if (pres > f9fix::FixParser::NeedsMore)
fixr.DispatchFixMessage(rxargs);
else
fixo->GetFixRecorder().Write(f9fix_kCSTR_HdrError, rxargs.MsgStr_, "\n"
fixo->GetFixRecorder().Write(f9fix_kCSTR_HdrError, rxargs.OrigMsgStr(), "\n"
f9fix_kCSTR_HdrError "ParseError|err=", pres);
}
return 0;
Expand Down
7 changes: 4 additions & 3 deletions fon9/fix/FixRecorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 4,7 @@
#define __fon9_fix_FixRecorder_hpp__
#include "fon9/fix/FixParser.hpp"
#include "fon9/fix/FixCompID.hpp"
#include "fon9/fix/FixConfig.hpp"
#include "fon9/buffer/RevBufferList.hpp"
#include "fon9/FileAppender.hpp"

Expand Down Expand Up @@ -133,9 134,9 @@ class fon9_API FixRecorder : protected AsyncFileAppender {

/// 寫入依正常順序收到的 FIX Message.
/// 返回前 this->NextRecvSeq_;
void WriteInputConform(const StrView& fixmsg) {
RevBufferList rbuf{static_cast<BufferNodeSize>(fixmsg.size() 64)};
RevPrint(rbuf, f9fix_kCSTR_HdrRecv, UtcNow(), ' ', fixmsg, '\n');
void WriteInputConform(const FixRecvEvArgs& rxargs) {
RevBufferList rbuf{static_cast<BufferNodeSize>(rxargs.OrigMsgStr().size() 64)};
RevPrint(rbuf, f9fix_kCSTR_HdrRecv, rxargs.RxTime(), ' ', rxargs.OrigMsgStr(), '\n');
auto lk{this->Worker_.Lock()};
this->NextRecvSeq_;
this->WriteBuffer(std::move(lk), std::move(rbuf));
Expand Down
12 changes: 10 additions & 2 deletions fon9/fix/FixRecorder_UT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 17,20 @@ void BuildTestMessage(f9fix::FixBuilder& fixb, fon9::StrView headerCompIds, unsi
fon9::RevPrint(fixb.GetBuffer(), f9fix_SPLTAGEQ(SendingTime));
fon9::RevPrint(fixb.GetBuffer(), f9fix_SPLFLDMSGTYPE(NewOrderSingle) f9fix_SPLTAGEQ(MsgSeqNum), seqNum, headerCompIds);
}
void WriteFixBuilderFinalMsg(f9fix::FixRecorder& fixr, f9fix::FixBuilder& fixb) {
f9fix::FixParser fixpar;
f9fix::FixRecvEvArgs rxargs{fixpar};
std::string origFixMsg = fon9::BufferTo<std::string>(fixb.Final(ToStrView(fixr.BeginHeader_)));
rxargs.SetOrigMsgStr(&origFixMsg);
rxargs.SetRxTime(fon9::UtcNow());
fixr.WriteInputConform(rxargs);
}
void TestFixRecorder(f9fix::FixRecorder& fixr, const unsigned kTimes) {
for (unsigned L = 0; L < kTimes; L) {
f9fix::FixBuilder fixb;
// Test: record recv message.
BuildTestMessage(fixb, ToStrView(fixr.CompIDs_.Header_), L 1, fixr.GetNextRecvSeq());
fixr.WriteInputConform(fon9::ToStrView(fon9::BufferTo<std::string>(fixb.Final(ToStrView(fixr.BeginHeader_)))));
WriteFixBuilderFinalMsg(fixr, fixb);

// Test: record send message.
fixb.Restart();
Expand Down Expand Up @@ -110,7 118,7 @@ int main(int argc, char** argv) {
// 再寫入一筆 recv, 讓 NextRecvSeq != NextSendSeq
f9fix::FixBuilder fixb;
BuildTestMessage(fixb, ToStrView(fixr->CompIDs_.Header_), kTimes 1, fixr->GetNextRecvSeq());
fixr->WriteInputConform(fon9::ToStrView(fon9::BufferTo<std::string>(fixb.Final(ToStrView(fixr->BeginHeader_)))));
WriteFixBuilderFinalMsg(*fixr, fixb);
fixr->WaitFlushed();
fixr.reset(new f9fix::FixRecorder(f9fix_BEGIN_HEADER_V42, f9fix::CompIDs{compIds}));
int count = 100;
Expand Down
4 changes: 2 additions & 2 deletions fon9/fix/FixSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 37,7 @@ void FixSession::OnFixSessionConnected() {
}
void FixSession::OnFixMessageParsed(StrView fixmsg) {
this->MsgReceivedCount_;
this->RxArgs_.MsgStr_ = fixmsg;
this->RxArgs_.SetOrigMsgStr(fixmsg);
if (fon9_LIKELY(this->FixSt_ == FixSessionSt::ApReady)) {
// 為了加快 Ap 層的處以速度, ApReady 狀態下的訊息, 直接處理.
this->DispatchFixMessage(this->RxArgs_);
Expand Down Expand Up @@ -341,7 341,7 @@ void FixSession::OnRecvLogout(const FixRecvEvArgs& rxargs) {
return;
}
if (rxargs.SeqSt_ != FixSeqSt::Conform)
rxargs.FixSender_->GetFixRecorder().Write(f9fix_kCSTR_HdrIgnoreRecv, rxargs.MsgStr_);
rxargs.FixSender_->GetFixRecorder().Write(f9fix_kCSTR_HdrIgnoreRecv, rxargs.OrigMsgStr());
if (fixses->FixSt_ == FixSessionSt::ApReady)
fixses->SendLogout(StrView{"Logout response"});
if (rxargs.SeqSt_ == FixSeqSt::TooHigh)
Expand Down
7 changes: 6 additions & 1 deletion fon9/fix/FixSession.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 45,10 @@ fon9_WARN_DISABLE_PADDING;
/// - 可參考 IoFixSession: 使用 fon9::io 機制的 FixSession.
/// - 負責協調一組 FixReceiver, FixSender, FixConfig 共同合作
/// - 負責掌握狀態變化.
class fon9_API FixSession : protected FixFeeder, public FixReceiver {
class fon9_API FixSession : private FixFeeder, public FixReceiver {
fon9_NON_COPY_NON_MOVE(FixSession);
using baseFixReceiver = FixReceiver;
using baseFixFeeder = FixFeeder;
FixSessionSt FixSt_{FixSessionSt::Disconnected};
uint32_t HeartBtInt_;
uint32_t HbTestCount_;
Expand Down Expand Up @@ -76,6 77,10 @@ class fon9_API FixSession : protected FixFeeder, public FixReceiver {
// override FixFeeder.
void OnFixMessageParsed(StrView fixmsg) override;
FixParser::Result OnFixMessageError(FixParser::Result res, StrView& fixmsgStream, const char* perr) override;
FixParser::Result FeedBuffer(DcQueue& rxbuf) {
this->RxArgs_.SetRxTime(UtcNow());
return baseFixFeeder::FeedBuffer(rxbuf);
}

// override FixReceiver.
void OnRecoverDone(const FixRecvEvArgs& rxargs) override;
Expand Down
8 changes: 4 additions & 4 deletions fon9/fix/IoFixSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 36,7 @@ bool IoFixManager::OnLogonAccepted(FixRecvEvArgs& rxargs, FixSenderSP fixout) {
recorder.Write(f9fix_kCSTR_HdrInfo, "OnLogon.Accepted|from=", static_cast<IoFixSession*>(rxargs.FixSession_)->GetDeviceId());
FixBuilder fixb;
if (rxargs.SeqSt_ == FixSeqSt::TooLow) {
recorder.Write(f9fix_kCSTR_HdrError, rxargs.MsgStr_);
recorder.Write(f9fix_kCSTR_HdrError, rxargs.OrigMsgStr());
RevPrint(fixb.GetBuffer(), f9fix_SPLTAGEQ(Text)
"Bad Logon, MsgSeqNum too low, expecting ", recorder.GetNextRecvSeq(),
" but received ", rxargs.Msg_.GetMsgSeqNum());
Expand All @@ -49,17 49,17 @@ bool IoFixManager::OnLogonAccepted(FixRecvEvArgs& rxargs, FixSenderSP fixout) {
if (const FixParser::FixField* fldEncryptMethod = rxargs.Msg_.GetField(errTag = f9fix_kTAG_EncryptMethod)) {
if (fldEncryptMethod->Value_.Get1st() == *f9fix_kCSTR_EncryptMethod_None) { // EncryptMethod 必須為 None
if (rxargs.SeqSt_ == FixSeqSt::Conform)
recorder.WriteInputConform(rxargs.MsgStr_);
recorder.WriteInputConform(rxargs);
else
recorder.Write(f9fix_kCSTR_HdrIgnoreRecv, rxargs.MsgStr_);
recorder.Write(f9fix_kCSTR_HdrIgnoreRecv, rxargs.OrigMsgStr());
RevPrint(fixb.GetBuffer(), f9fix_kFLD_EncryptMethod_None);
rxargs.FixSession_->SendLogonResponse(fixout, hbInt, std::move(fixb), rxargs);
return true;
}
}
}
}
recorder.Write(f9fix_kCSTR_HdrError, rxargs.MsgStr_);
recorder.Write(f9fix_kCSTR_HdrError, rxargs.OrigMsgStr());
RevPrint(fixb.GetBuffer(), f9fix_SPLTAGEQ(Text) "Bad Logon, Tag#", errTag);
rxargs.FixSession_->SendLogout(std::move(fixb), &recorder);
return false;
Expand Down

0 comments on commit ff682e2

Please sign in to comment.