-
Notifications
You must be signed in to change notification settings - Fork 1
Policy API
OpenPit exposes custom policy hooks for two stages:
-
Start stage: cheap checks that must run for every request -
Main stage: deeper checks that can emit one or more rejects and register reversible mutations
This page describes behavior first, then shows language-specific examples for the SDKs that expose the relevant custom-policy surface directly.
- Start stage returns one reject outcome or pass-through.
- Main stage can collect multiple rejects and register reversible mutations.
- Main-stage context provides read-only access to request data.
- Main-stage mutations are committed only when the full
execute requeststep succeeds.
For account-adjustment batch policy hooks, see Account Adjustments.
Custom policy state must not be read or mutated in parallel with engine calls on the same engine instance.
- Unsafe pattern: one thread executes
start stageorexecute requestwhile another thread reads or mutates fields that the same policy callbacks use. - If shared access is unavoidable, synchronization is fully owned by the host application (locks, serialized access, actor loop, etc.).
- Preferred pattern: keep policy state mutations inside engine calls and feed
external corrections through
apply account adjustmentsonly.
Go SDK keeps policy integration idiomatic and thin:
- unified interface:
pretrade.Policy— all stage hooks and account-adjustment callback in one interface - for custom order/report types:
-
pretrade.ClientPreTradePolicy[Order, Report]— all four callbacks receive the typed project struct; account-adjustment usesmodel.AccountAdjustmentregardless of client type
-
- adapters with payload validation:
pretrade.NewSafeClientPreTradePolicy - adapters without validation (for SDK-controlled paths):
pretrade.NewUnsafeFastClientPreTradePolicy - built-in native policies are registered via the
Builtinbuilder method
Python exposes a unified policy class over record-style openpit.Order and
openpit.ExecutionReport:
- unified class:
openpit.pretrade.Policy— all stage hooks and account-adjustment callback with default no-op implementations
Business outcomes are returned, not raised:
- start stage returns
Iterable[PolicyReject] - main stage returns
PolicyDecision - account adjustment returns
PolicyDecision | Iterable[PolicyReject] | tuple[Mutation, ...] | None
Policies can register:
Mutation(commit=callable, rollback=callable)
Rust exposes a unified trait for custom policies and caller-defined order contracts:
- unified trait:
PreTradePolicy<Order, ExecutionReport, AccountAdjustment = ()>— all stage hooks and account-adjustment callback with default no-op implementations (onlynameis required) - start-stage callback receives:
&PreTradeContext,&Order - main-stage callback receives:
&PreTradeContext,&Order,&mut Mutations - account-adjustment callback receives:
&AccountAdjustmentContext,AccountId,&A,&mut Mutations
Go
package main
import (
"fmt"
"go.openpit.dev/openpit/accountadjustment"
"go.openpit.dev/openpit/model"
"go.openpit.dev/openpit/param"
"go.openpit.dev/openpit/pretrade"
"go.openpit.dev/openpit/reject"
"go.openpit.dev/openpit/tx"
)
type NotionalCapPolicy struct {
// Policy-local config: reject any order above this absolute notional.
MaxAbsNotional param.Volume
}
func (p *NotionalCapPolicy) Close() {}
func (p *NotionalCapPolicy) Name() string { return "NotionalCapPolicy" }
func (p *NotionalCapPolicy) CheckPreTradeStart(
pretrade.Context,
model.Order,
) []reject.Reject {
return nil
}
func (p *NotionalCapPolicy) PerformPreTradeCheck(
_ pretrade.Context,
order model.Order,
_ tx.Mutations,
) []reject.Reject {
operation, ok := order.Operation().Get()
if !ok {
return reject.NewSingleItemList(
reject.CodeMissingRequiredField,
p.Name(),
"required order field missing",
"operation is not set",
reject.ScopeOrder,
)
}
// Translate the public order surface into one number that this policy
// can reason about: requested notional.
tradeAmount, ok := operation.TradeAmount().Get()
if !ok {
return reject.NewSingleItemList(
reject.CodeMissingRequiredField,
p.Name(),
"required order field missing",
"trade_amount is not set",
reject.ScopeOrder,
)
}
var requestedNotional param.Volume
if tradeAmount.IsVolume() {
requestedNotional = tradeAmount.MustVolume()
} else {
price, ok := operation.Price().Get()
if !ok {
return reject.NewSingleItemList(
reject.CodeOrderValueCalculationFailed,
p.Name(),
"order value calculation failed",
"price not provided for evaluating notional",
reject.ScopeOrder,
)
}
notional, err := price.CalculateVolume(tradeAmount.MustQuantity())
if err != nil {
return reject.NewSingleItemList(
reject.CodeOrderValueCalculationFailed,
p.Name(),
"order value calculation failed",
"price and quantity could not be used to evaluate notional",
reject.ScopeOrder,
)
}
requestedNotional = notional
}
if requestedNotional.Compare(p.MaxAbsNotional) > 0 {
// Business validation failures should become explicit rejects.
return reject.NewSingleItemList(
reject.CodeRiskLimitExceeded,
p.Name(),
"strategy cap exceeded",
fmt.Sprintf(
"requested notional %v, max allowed: %v",
requestedNotional, p.MaxAbsNotional,
),
reject.ScopeOrder,
)
}
// This policy only validates. It does not reserve mutable state.
return nil
}
func (p *NotionalCapPolicy) ApplyExecutionReport(model.ExecutionReport) bool {
return false
}
func (p *NotionalCapPolicy) ApplyAccountAdjustment(
accountadjustment.Context,
param.AccountID,
model.AccountAdjustment,
tx.Mutations,
) []reject.Reject {
return nil
}Python
import typing
import openpit
class NotionalCapPolicy(openpit.pretrade.Policy):
def __init__(self, max_abs_notional: openpit.param.Volume) -> None:
# Policy-local config: reject any order above this absolute notional.
self._max_abs_notional = max_abs_notional
@property
@typing.override
def name(self) -> str:
return "NotionalCapPolicy"
@typing.override
def perform_pre_trade_check(
self,
ctx: openpit.pretrade.Context,
order: openpit.Order,
) -> openpit.pretrade.PolicyDecision:
assert order.operation is not None
# Translate the public order surface into one number that this policy
# can reason about: requested notional.
trade_amount = order.operation.trade_amount
if trade_amount.is_volume:
requested_notional = trade_amount.as_volume
else:
assert trade_amount.is_quantity
assert order.operation.price is not None
requested_notional = order.operation.price.calculate_volume(
trade_amount.as_quantity
)
if requested_notional > self._max_abs_notional:
# Business validation failures should become explicit rejects,
# not exceptions.
return openpit.pretrade.PolicyDecision.reject(
rejects=[
openpit.pretrade.PolicyReject(
code=openpit.pretrade.RejectCode.RISK_LIMIT_EXCEEDED,
reason="strategy cap exceeded",
details=(
"requested notional "
f"{requested_notional}, "
f"max allowed: {self._max_abs_notional}"
),
scope=openpit.pretrade.RejectScope.ORDER,
)
]
)
# This policy only validates. It does not reserve mutable state.
return openpit.pretrade.PolicyDecision.accept()
@typing.override
def apply_execution_report(
self,
report: openpit.ExecutionReport,
) -> bool:
_ = report
return FalseRust
use openpit::param::{TradeAmount, Volume};
use openpit::pretrade::{
PreTradeContext, PreTradePolicy, Reject, RejectCode, RejectScope, Rejects,
};
use openpit::Mutations;
use openpit::{HasOrderPrice, HasTradeAmount};
struct NotionalCapPolicy {
// Policy-local config: reject any order above this absolute notional.
max_abs_notional: Volume,
}
impl<Order, ExecutionReport> PreTradePolicy<Order, ExecutionReport>
for NotionalCapPolicy
where
Order: HasTradeAmount + HasOrderPrice,
{
fn name(&self) -> &str {
"NotionalCapPolicy"
}
fn perform_pre_trade_check(
&self,
_ctx: &PreTradeContext,
order: &Order,
_mutations: &mut Mutations,
) -> Result<(), Rejects> {
// Translate the public order surface into one number that this policy
// can reason about: requested notional.
let trade_amount = match order.trade_amount() {
Ok(trade_amount) => trade_amount,
Err(error) => {
return Err(Rejects::from(Reject::new(
self.name(),
RejectScope::Order,
RejectCode::MissingRequiredField,
"required order field missing",
error.to_string(),
)));
}
};
let price = match order.price() {
Ok(price) => price,
Err(error) => {
return Err(Rejects::from(Reject::new(
self.name(),
RejectScope::Order,
RejectCode::MissingRequiredField,
"required order field missing",
error.to_string(),
)));
}
};
let requested_notional = match (trade_amount, price) {
(TradeAmount::Volume(volume), _) => volume,
(TradeAmount::Quantity(quantity), Some(price)) => {
match price.calculate_volume(quantity) {
Ok(v) => v,
Err(_) => {
return Err(Rejects::from(Reject::new(
self.name(),
RejectScope::Order,
RejectCode::OrderValueCalculationFailed,
"order value calculation failed",
"price and quantity could not be used \
to evaluate notional",
)));
}
}
}
(TradeAmount::Quantity(_), None) => {
return Err(Rejects::from(Reject::new(
self.name(),
RejectScope::Order,
RejectCode::OrderValueCalculationFailed,
"order value calculation failed",
"price not provided for evaluating notional",
)));
}
_ => {
return Err(Rejects::from(Reject::new(
self.name(),
RejectScope::Order,
RejectCode::UnsupportedOrderType,
"unsupported order type",
"custom trade amount variant is not supported",
)));
}
};
if requested_notional > self.max_abs_notional {
// Business validation failures should become explicit rejects.
return Err(Rejects::from(Reject::new(
self.name(),
RejectScope::Order,
RejectCode::RiskLimitExceeded,
"strategy cap exceeded",
format!(
"requested notional {}, max allowed: {}",
requested_notional, self.max_abs_notional
),
)));
}
Ok(())
}
fn apply_execution_report(&self, _report: &ExecutionReport) -> bool {
false
}
}If at least one main-stage policy rejects, the engine does not return a reservation and rolls back all registered mutations in reverse order.
Rollback order is deterministic:
- registration order for commit
- reverse registration order for rollback
This pattern is useful when one policy updates intermediate in-memory state and the same policy decides that the request must be rejected.
Go
package main
import (
"fmt"
"go.openpit.dev/openpit/accountadjustment"
"go.openpit.dev/openpit/model"
"go.openpit.dev/openpit/param"
"go.openpit.dev/openpit/pretrade"
"go.openpit.dev/openpit/reject"
"go.openpit.dev/openpit/tx"
)
type ReserveThenValidatePolicy struct {
reserved param.Volume
limit param.Volume
}
func (p *ReserveThenValidatePolicy) Close() {}
func (p *ReserveThenValidatePolicy) Name() string {
return "ReserveThenValidatePolicy"
}
func (p *ReserveThenValidatePolicy) CheckPreTradeStart(
pretrade.Context,
model.Order,
) []reject.Reject {
return nil
}
func (p *ReserveThenValidatePolicy) PerformPreTradeCheck(
_ pretrade.Context,
_ model.Order,
mutations tx.Mutations,
) []reject.Reject {
// Pretend that this request needs a temporary reservation of 100.
// We apply it eagerly because downstream logic wants to observe the
// tentative state immediately.
prevReserved := p.reserved
nextReserved, _ := param.NewVolumeFromString("100")
p.reserved = nextReserved
_ = mutations.Push(
func() {
// Commit is empty: state was applied eagerly.
},
func() {
p.reserved = prevReserved
},
)
if p.reserved.Compare(p.limit) > 0 {
// Return the reject after the rollback mutation is registered.
// The engine will restore the previous state automatically.
return reject.NewSingleItemList(
reject.CodeRiskLimitExceeded,
p.Name(),
"temporary reservation exceeds limit",
fmt.Sprintf("reserved %v, limit: %v", nextReserved, p.limit),
reject.ScopeOrder,
)
}
return nil
}
func (p *ReserveThenValidatePolicy) ApplyExecutionReport(
model.ExecutionReport,
) bool {
return false
}
func (p *ReserveThenValidatePolicy) ApplyAccountAdjustment(
accountadjustment.Context,
param.AccountID,
model.AccountAdjustment,
tx.Mutations,
) []reject.Reject {
return nil
}Python
import typing
import openpit
class ReserveThenValidatePolicy(openpit.pretrade.Policy):
def __init__(self) -> None:
self._reserved = openpit.param.Volume(0.0)
self._limit = openpit.param.Volume(50.0)
@property
@typing.override
def name(self) -> str:
return "ReserveThenValidatePolicy"
@typing.override
def perform_pre_trade_check(
self,
ctx: openpit.pretrade.Context,
order: openpit.Order,
) -> openpit.pretrade.PolicyDecision:
assert order.operation is not None
# Pretend that this request needs a temporary reservation of 100.
# We apply it eagerly because downstream logic wants to observe the
# tentative state immediately.
prev_reserved = self._reserved
next_reserved = openpit.param.Volume(100.0)
self._reserved = next_reserved
rollback = openpit.Mutation(
commit=lambda: None, # Commit is empty: state was applied eagerly.
rollback=lambda: setattr(self, "_reserved", prev_reserved),
)
if next_reserved > self._limit:
# Return the reject together with the rollback mutation.
# The engine will restore the previous state automatically.
return openpit.pretrade.PolicyDecision.reject(
rejects=[
openpit.pretrade.PolicyReject(
code=openpit.pretrade.RejectCode.RISK_LIMIT_EXCEEDED,
reason="temporary reservation exceeds limit",
details=(
f"reserved {next_reserved}, "
f"limit: {self._limit}"
),
scope=openpit.pretrade.RejectScope.ORDER,
)
],
mutations=[rollback],
)
return openpit.pretrade.PolicyDecision.accept(mutations=[rollback])
@typing.override
def apply_execution_report(
self,
report: openpit.ExecutionReport,
) -> bool:
_ = report
return FalseRust
use std::sync::Arc;
use openpit::param::{AccountId, Volume};
use openpit::pretrade::{
PreTradeContext, PreTradePolicy, Reject, RejectCode, RejectScope, Rejects,
};
use openpit::storage::{
CreateStorageFor, LockingPolicyFactory, Storage, StorageBuilder,
};
use openpit::{HasAccountId, Mutation, Mutations};
struct ReserveThenValidatePolicy<StorageLockingPolicyFactory>
where
StorageLockingPolicyFactory: LockingPolicyFactory,
{
reserved: Arc<
Storage<
AccountId,
Volume,
StorageLockingPolicyFactory::Policy,
>,
>,
next: Volume,
limit: Volume,
}
impl<StorageLockingPolicyFactory>
ReserveThenValidatePolicy<StorageLockingPolicyFactory>
where
StorageLockingPolicyFactory: LockingPolicyFactory
+ CreateStorageFor<AccountId>,
{
fn new(
storage_builder: &StorageBuilder<StorageLockingPolicyFactory>,
next: Volume,
limit: Volume,
) -> Self {
Self {
reserved: Arc::new(storage_builder.create()),
next,
limit,
}
}
}
impl<Order, ExecutionReport, StorageLockingPolicyFactory>
PreTradePolicy<Order, ExecutionReport>
for ReserveThenValidatePolicy<StorageLockingPolicyFactory>
where
Order: HasAccountId,
StorageLockingPolicyFactory: LockingPolicyFactory
+ CreateStorageFor<AccountId>,
StorageLockingPolicyFactory::Policy: 'static,
{
fn name(&self) -> &str {
"ReserveThenValidatePolicy"
}
fn perform_pre_trade_check(
&self,
_ctx: &PreTradeContext,
order: &Order,
mutations: &mut Mutations,
) -> Result<(), Rejects> {
let account_id = order.account_id().map_err(|error| {
Rejects::from(Reject::new(
self.name(),
RejectScope::Order,
RejectCode::MissingRequiredField,
"missing account id",
error.to_string(),
))
})?;
// Pretend that this request needs a temporary reservation of 100.
// We apply it eagerly because downstream logic wants to observe the
// tentative state immediately.
let (prev, next) = self.reserved.with_mut(
account_id,
|| Volume::ZERO,
|reserved, _is_new| {
let prev = *reserved;
let next = self.next;
*reserved = next;
(prev, next)
},
);
let rollback_reserved = Arc::clone(&self.reserved);
mutations.push(Mutation::new(
|| {
// Commit is empty: state was applied eagerly.
},
move || {
rollback_reserved.with_mut(
account_id,
|| Volume::ZERO,
|reserved, _is_new| {
*reserved = prev;
},
);
},
));
if next > self.limit {
// Return the reject after the rollback mutation is registered.
// The engine will restore the previous state automatically.
return Err(Rejects::from(Reject::new(
self.name(),
RejectScope::Order,
RejectCode::RiskLimitExceeded,
"temporary reservation exceeds limit",
format!("reserved {}, limit: {}", next, self.limit),
)));
}
Ok(())
}
fn apply_execution_report(&self, _report: &ExecutionReport) -> bool {
false
}
}Go uses ClientEngine and typed policy interfaces to work with project-specific
order and report types:
- Embed
model.Orderinto a custom struct to add project-specific fields. - Embed
model.ExecutionReportinto a custom struct to add project-specific fields. - Implement
pretrade.ClientPreTradePolicy[Order, Report]— all four callbacks receive the typed project struct, not the genericmodel.Order; account adjustment usesmodel.AccountAdjustmentregardless of client type. - Build the engine with
NewClientPreTradeEngineBuilder[Order, Report](), which returns a*ClientEngine[Order, Report, ...]. The client engine wraps each submitted value in a cgo handle and routes it to the typed policy callbacks.
Go
package main
import (
"fmt"
"log"
"go.openpit.dev/openpit"
"go.openpit.dev/openpit/accountadjustment"
"go.openpit.dev/openpit/model"
"go.openpit.dev/openpit/param"
"go.openpit.dev/openpit/pretrade"
"go.openpit.dev/openpit/reject"
"go.openpit.dev/openpit/tx"
)
// StrategyOrder carries project-specific metadata alongside the standard order.
type StrategyOrder struct {
model.Order
StrategyTag string
}
// StrategyReport carries project-specific metadata alongside
// the standard report.
type StrategyReport struct {
model.ExecutionReport
VenueExecID string
}
// StrategyTagPolicy rejects orders from blocked strategy tags.
type StrategyTagPolicy struct{}
func (p *StrategyTagPolicy) Close() {}
func (p *StrategyTagPolicy) Name() string { return "StrategyTagPolicy" }
func (p *StrategyTagPolicy) CheckPreTradeStart(
_ pretrade.Context,
order StrategyOrder,
) []reject.Reject {
if order.StrategyTag == "blocked" {
return reject.NewSingleItemList(
reject.CodeComplianceRestriction,
p.Name(),
"strategy blocked",
fmt.Sprintf("strategy tag %q is not allowed", order.StrategyTag),
reject.ScopeOrder,
)
}
return nil
}
func (p *StrategyTagPolicy) PerformPreTradeCheck(
pretrade.Context,
StrategyOrder,
tx.Mutations,
) []reject.Reject {
return nil
}
func (p *StrategyTagPolicy) ApplyExecutionReport(StrategyReport) bool {
return false
}
func (p *StrategyTagPolicy) ApplyAccountAdjustment(
accountadjustment.Context,
param.AccountID,
model.AccountAdjustment,
tx.Mutations,
) []reject.Reject {
return nil
}
func main() {
engine, err := openpit.NewClientPreTradeEngineBuilder[
StrategyOrder, StrategyReport,
]().
FullSync().
PreTrade(&StrategyTagPolicy{}).
Build()
if err != nil {
log.Fatal(err)
}
defer engine.Stop()
order := StrategyOrder{Order: model.NewOrder(), StrategyTag: "alpha"}
request, rejects, err := engine.StartPreTrade(order)
if err != nil {
log.Fatal(err)
}
if rejects != nil {
for _, r := range rejects {
fmt.Printf("rejected by %s: %s\n", r.Policy, r.Reason)
}
return
}
defer request.Close()
reservation, rejects, err := request.Execute()
if err != nil {
log.Fatal(err)
}
if rejects != nil {
for _, r := range rejects {
fmt.Printf("rejected by %s: %s\n", r.Policy, r.Reason)
}
return
}
defer reservation.Close()
reservation.Commit()
}Python custom models inherit from openpit.Order or openpit.ExecutionReport.
The original subclass instance reaches policy callbacks unchanged. Policies
access project-specific attributes by casting the received base type.
Python
import typing
import openpit
class StrategyOrder(openpit.Order):
def __init__(
self,
*,
operation: openpit.OrderOperation,
strategy_tag: str,
) -> None:
super().__init__(operation=operation)
# Project-specific metadata carried alongside the standard order fields.
self.strategy_tag = strategy_tag
class StrategyReport(openpit.ExecutionReport):
def __init__(
self,
*,
operation: openpit.ExecutionReportOperation,
financial_impact: openpit.FinancialImpact,
venue_exec_id: str,
) -> None:
super().__init__(operation=operation, financial_impact=financial_impact)
# Project-specific metadata alongside the standard report fields.
self.venue_exec_id = venue_exec_id
class StrategyTagPolicy(openpit.pretrade.Policy):
@property
def name(self) -> str:
return "StrategyTagPolicy"
def check_pre_trade_start(
self,
ctx: openpit.pretrade.Context,
order: openpit.Order,
) -> typing.Iterable[openpit.pretrade.PolicyReject]:
# The original subclass instance reaches the callback unchanged.
strategy_order = typing.cast(StrategyOrder, order)
if strategy_order.strategy_tag == "blocked":
return [
openpit.pretrade.PolicyReject(
code=openpit.pretrade.RejectCode.COMPLIANCE_RESTRICTION,
reason="strategy blocked",
details=(
"strategy tag "
f"{strategy_order.strategy_tag!r}"
" is not allowed"
),
scope=openpit.pretrade.RejectScope.ORDER,
)
]
return []
engine = (
openpit.Engine.builder()
.no_sync()
.pre_trade(StrategyTagPolicy())
.build()
)
order = StrategyOrder(
operation=openpit.OrderOperation(
instrument=openpit.Instrument("AAPL", "USD"),
account_id=openpit.param.AccountId.from_u64(99224416),
side=openpit.param.Side.BUY,
trade_amount=openpit.param.TradeAmount.quantity(10),
price=openpit.param.Price(25),
),
strategy_tag="alpha",
)
start_result = engine.start_pre_trade(order=order)
if not start_result:
messages = ", ".join(
f"{r.policy} [{r.code}]: {r.reason}: {r.details}"
for r in start_result.rejects
)
raise RuntimeError(messages)
execute_result = start_result.request.execute()
if not execute_result:
messages = ", ".join(
f"{r.policy} [{r.code}]: {r.reason}: {r.details}"
for r in execute_result.rejects
)
raise RuntimeError(messages)
execute_result.reservation.commit()Rust uses capability traits (Has*) and can compose OrderOperation with
project-only fields plus Deref to inherit required capabilities.
See Custom Rust Types for full derive setup, manual trait implementations, and wrapper composition patterns.
- Policies: built-in controls and policy catalog
- Pre-trade Pipeline: request and reservation semantics
- Account Adjustments: batch rollback semantics
- Custom Rust Types: Rust model composition patterns
- Custom Go Types: Go ClientEngine and typed model composition