End-to-end type safety with Remix and Rust - Part 4: The trade service

Implementing a trade service in Rust to send orders and list positions.

Blog post author avatar.
David Steiner

· 10 min read

Previously, we implemented a simple reference data service to add and list stocks. In this post, we’ll implement a trade service analogous in structure to the reference data service.

We’ll pick up the pace here, focusing less on the repetitive boilerplate code we’ve seen in earlier posts and more on new concepts.

If you ever get stuck, please refer to the finished branch for this post.

Defining the gRPC service

Let’s start by creating the interface we’d like to implement in a new proto package.

protos/trade.proto
syntax = "proto3";
import "google/protobuf/empty.proto";
import "refdata.proto";
package trade;
enum Side {
SELL = 0;
BUY = 1;
}
message MarketOrder {
int32 stock_id = 1;
uint32 quantity = 2;
Side side = 3;
}
message Position {
refdata.Stock stock = 1;
uint32 quantity = 2;
}
message Positions {
repeated Position positions = 1;
}
service Trade {
rpc SendOrder(MarketOrder) returns (google.protobuf.Empty);
rpc AllPositions(google.protobuf.Empty) returns (Positions);
}

The interface consists of two functions.

The SendOrder function takes a market order. In this dummy application, the order is immediately executed for both BUY and SELL actions, provided there are sufficient shares.

Positions provide a summarized view of all orders, updating automatically with each new order sent. AllPositions returns the position for every stock.

There is no need to change build.rs, as it picks up new files in the protos directory. We need to register the Rust module, though.

src/services.rs
pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("trading_descriptor");
pub mod proto {
pub mod refdata {
tonic::include_proto!("refdata");
}
pub mod trade {
tonic::include_proto!("trade");
}
}

New modules for trade service

Add a new trade.rs module with 3 sub-modules:

  • models.rs
  • repository.rs
  • service.rs

Let’s create our types to model positions and the side of an order:

src/trade/models.rs
pub struct Position {
pub stock_id: i32,
pub stock_symbol: String,
pub stock_name: String,
pub quantity: i32,
}
#[derive(Debug, sqlx::Type)]
#[sqlx(type_name = "side", rename_all = "lowercase")]
pub enum Side {
Buy,
Sell,
}

Just like for the reference data service, we’ll create an abstract repository trait and a concrete Postgres implementation.

src/trade/repository.rs
use sqlx::PgPool;
use crate::error::Result;
use crate::trade::models::{Position, Side};
#[tonic::async_trait]
pub trait TradeRepository {
async fn add_trade(&self, stock_id: i32, side: Side, quantity: i32) -> Result<()>;
async fn get_all_positions(&self) -> Result<Vec<Position>>;
}
pub struct PostgresTradeRepository {
pool: PgPool,
}
impl PostgresTradeRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[tonic::async_trait]
impl TradeRepository for PostgresTradeRepository {
async fn add_trade(&self, stock_id: i32, side: Side, quantity: i32) -> Result<()> {
todo!()
}
async fn get_all_positions(&self) -> Result<Vec<Position>> {
todo!()
}
}

We’ll leave the implementation of add_trade and get_all_positions as TODOs for now, focusing on these interesting parts later.

Finally, let’s implement the auto-generated service trait using the repository.

src/trade/service.rs
use tonic::{Request, Response, Status};
use crate::services::proto::refdata::Stock;
use crate::services::proto::trade::trade_server::Trade;
use crate::services::proto::trade::{MarketOrder, Position, Positions, Side};
use crate::trade::models::{Position as PositionModel, Side as SideModel};
use crate::trade::repository::TradeRepository;
pub struct TradeService<R> {
repository: R,
}
impl<R> TradeService<R> {
pub fn new(repository: R) -> Self {
Self { repository }
}
}
#[tonic::async_trait]
impl<R> Trade for TradeService<R>
where
R: TradeRepository + Send + Sync + 'static,
{
async fn send_order(&self, request: Request<MarketOrder>) -> Result<Response<()>, Status> {
let order = request.get_ref();
let (side, quantity) = match order.side() {
Side::Sell => (SideModel::Sell, -(order.quantity as i32)),
Side::Buy => (SideModel::Buy, order.quantity as i32),
};
self.repository
.add_trade(order.stock_id, side, quantity)
.await?;
Ok(Response::new(()))
}
async fn all_positions(&self, _request: Request<()>) -> Result<Response<Positions>, Status> {
let positions = self
.repository
.get_all_positions()
.await?
.into_iter()
.map(Into::into)
.collect();
Ok(Response::new(Positions { positions }))
}
}
impl From<PositionModel> for Position {
fn from(value: PositionModel) -> Self {
let stock = Stock {
id: value.stock_id,
symbol: value.stock_symbol,
name: value.stock_name,
};
Self {
stock: Some(stock),
quantity: value.quantity as u32,
}
}
}

Nothing interesting to see here, we’re just massaging the types to satisfy the desired types.

Registering the new service

Let’s register the trade module in lib.rs as a public module and expose the types we’ll need to register it with the Tonic service.

src/trade.rs
mod models;
mod repository;
mod service;
pub use repository::{PostgresTradeRepository, TradeRepository};
pub use service::TradeService;

With the required types exposed, we can add the trade service to the Tonic service.

src/main.rs
7 collapsed lines
use anyhow::Result;
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use tonic::transport::Server;
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use trading_api::refdata::{PostgresRefDataRepository, RefDataService};
use trading_api::services::proto::refdata::ref_data_server::RefDataServer;
use trading_api::services::proto::trade::trade_server::TradeServer;
use trading_api::services::FILE_DESCRIPTOR_SET;
use trading_api::trade::{PostgresTradeRepository, TradeService};
#[tokio::main]
async fn main() -> Result<()> {
dotenvy::dotenv()?;
setup_tracing_registry();
let pool = pg_pool().await?;
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
.build()?;
let refdata_service = RefDataServer::new(build_refdata_service(pool.clone()));
let trade_service = TradeServer::new(build_trade_service(pool.clone()));
let address = "0.0.0.0:8080".parse()?;
info!("Service is listening on {address}");
Server::builder()
.add_service(reflection_service)
.add_service(refdata_service)
.add_service(trade_service)
.serve(address)
.await?;
Ok(())
}
5 collapsed lines
fn build_refdata_service(pool: PgPool) -> RefDataService<PostgresRefDataRepository> {
let repository = PostgresRefDataRepository::new(pool);
RefDataService::new(repository)
}
fn build_trade_service(pool: PgPool) -> TradeService<PostgresTradeRepository> {
let repository = PostgresTradeRepository::new(pool);
TradeService::new(repository)
}
18 collapsed lines
pub async fn pg_pool() -> Result<PgPool> {
let url = std::env::var("DATABASE_URL")?;
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(url.as_str())
.await?;
Ok(pool)
}
fn setup_tracing_registry() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "info,trading_api=debug".into()),
))
.with(tracing_subscriber::fmt::layer().pretty())
.init();
}

You can explore the new endpoints using grpcui, but of course they will fail for now due to the TODOs we have yet to complete.

Adding tests

At this point, it’s probably worth stopping to consider what behaviour we expect from our endpoints, and set our expectations in a few simple integration tests.

A comprehensive test suite is beyond the scope of this article, but here are a few test cases to serve as examples.

tests/trade.rs
97 collapsed lines
use sqlx::PgPool;
use tonic::{Code, Request};
use trading_api::services::proto::trade::trade_server::Trade;
use trading_api::services::proto::trade::{MarketOrder, Positions};
use trading_api::trade::{PostgresTradeRepository, TradeService};
#[sqlx::test(fixtures("stocks"))]
async fn test_simple_order(pool: PgPool) {
let service = create_service(pool);
let order = MarketOrder {
stock_id: 1,
quantity: 100,
side: 1,
};
service.send_order(Request::new(order)).await.unwrap();
let response = service.all_positions(Request::new(())).await.unwrap();
assert_eq!(response.get_ref().positions.len(), 1);
assert_position(response.get_ref(), "AAPL", 100);
}
#[sqlx::test(fixtures("stocks"))]
async fn test_orders_with_different_stocks(pool: PgPool) {
let service = create_service(pool);
let order = MarketOrder {
stock_id: 1,
quantity: 100,
side: 1,
};
service.send_order(Request::new(order)).await.unwrap();
let order = MarketOrder {
stock_id: 2,
quantity: 200,
side: 1,
};
service.send_order(Request::new(order)).await.unwrap();
let response = service.all_positions(Request::new(())).await.unwrap();
assert_eq!(response.get_ref().positions.len(), 2);
}
#[sqlx::test(fixtures("stocks"))]
async fn test_selling_stock(pool: PgPool) {
let service = create_service(pool);
let order = MarketOrder {
stock_id: 1,
quantity: 100,
side: 1,
};
service.send_order(Request::new(order)).await.unwrap();
let response = service.all_positions(Request::new(())).await.unwrap();
assert_position(response.get_ref(), "AAPL", 100);
let order = MarketOrder {
stock_id: 1,
quantity: 60,
side: 2,
};
service.send_order(Request::new(order)).await.unwrap();
let response = service.all_positions(Request::new(())).await.unwrap();
assert_position(response.get_ref(), "AAPL", 40);
let order = MarketOrder {
stock_id: 1,
quantity: 60,
side: 2,
};
let response = service.send_order(Request::new(order)).await;
if let Err(status) = response {
assert_eq!(status.code(), Code::InvalidArgument);
} else {
panic!("expected an error status to be returned");
}
}
fn create_service(pool: PgPool) -> TradeService<PostgresTradeRepository> {
let repository = PostgresTradeRepository::new(pool);
TradeService::new(repository)
}
fn assert_position(positions: &Positions, symbol: &str, quantity: u32) {
match positions
.positions
.iter()
.find(|p| p.stock.as_ref().unwrap().symbol == symbol)
{
None => panic!("position for {symbol} is missing"),
Some(p) => {
assert_eq!(p.quantity, quantity);
}
}
}

By the time we finish the TODO implementations, these tests should pass.

Adding a new migration

We need two new tables for trades (executed orders) and overall positions. Let’s create these in a new migration.

Terminal window
sqlx migrate add trades

The up migration just creates the tables. We can also add any new indexes as required.

migrations/{timestamp}_trades.up.sql
CREATE TYPE side AS ENUM ('buy', 'sell');
CREATE TABLE trades
(
id SERIAL PRIMARY KEY,
stock_id INT NOT NULL,
quantity INT NOT NULL,
side SIDE,
executed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (stock_id) REFERENCES stocks (id)
);
CREATE TABLE positions
(
stock_id INT PRIMARY KEY,
quantity INT NOT NULL CHECK (quantity >= 0),
FOREIGN KEY (stock_id) REFERENCES stocks (id)
);
CREATE INDEX idx_trades_stock_id ON trades (stock_id);
CREATE INDEX idx_positions_stock_id ON positions(stock_id);

Next, we add the down script:

migrations/{timestamp}_trades.down.sql
DROP TABLE positions;
DROP TABLE trades;
DROP TYPE side;

You can verify the migration is working by repeatedly applying and reverting it. Make sure the migration is applied before we proceed.

Implementing the repository

The SQLx implementation of the repository is where we can apply some new concepts. The SQL queries required for the trade service are a little more involved than the ones we had for the reference data service. For more complex cases, SQLx supports defining your queries in dedicated SQL files.

This allows us to separate SQL logic from Rust code, enhancing readability and maintainability.

Handling orders

Create a new src/queries directory for our SQL files. We’ll have two queries for new orders - one to insert the trade and one to upsert positions.

The insert statement is straightforward:

src/queries/insert_trade.sql
INSERT INTO trades (stock_id, quantity, side)
VALUES ($1, $2, $3);

The statement to upsert positions is a little more involved:

src/queries/upsert_position.sql
WITH trade (stock_id, quantity) as (VALUES ($1::INT, $2::INT)),
upserted as (
UPDATE positions p
SET quantity = p.quantity + trade.quantity
FROM trade
WHERE trade.stock_id = p.stock_id
RETURNING p.*)
INSERT
INTO positions (stock_id, quantity)
SELECT stock_id, quantity
FROM trade
JOIN
stocks
ON trade.stock_id = stocks.id
WHERE NOT EXISTS (SELECT 1
FROM upserted
WHERE upserted.stock_id = trade.stock_id)
ON CONFLICT (stock_id) DO NOTHING;

Having this inline in the Rust code would have been quite distracting! Separate SQL files also play nicely with IDEs with database support - you get proper syntax highlighting, auto-complete, and you can run the SQL code to test it out.

We are now equipped to tackle the implementation of add_trade in the repository.

src/trade/repository.rs
23 collapsed lines
use sqlx::PgPool;
use crate::error::Result;
use crate::trade::models::{Position, Side};
#[tonic::async_trait]
pub trait TradeRepository {
async fn add_trade(&self, stock_id: i32, side: Side, quantity: i32) -> Result<()>;
async fn get_all_positions(&self) -> Result<Vec<Position>>;
}
pub struct PostgresTradeRepository {
pool: PgPool,
}
impl PostgresTradeRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[tonic::async_trait]
impl TradeRepository for PostgresTradeRepository {
async fn add_trade(&self, stock_id: i32, side: Side, quantity: i32) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query_file!(
"src/queries/insert_trade.sql",
stock_id,
quantity.abs(),
side as _
)
.execute(&mut *tx)
.await?;
sqlx::query_file_as!(
Position,
"src/queries/upsert_position.sql",
stock_id,
quantity,
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
5 collapsed lines
async fn get_all_positions(&self) -> Result<Vec<Position>> {
todo!()
}
}

Note, we use the query_file_as! macro to source the SQL queries from the files we have defined. The other difference from the reference data service is that we use a transaction to ensure consistency, rather than passing the connection pool directly to execute.

Getting all positions

Thankfully, the query to get the positions is much simpler:

src/queries/all_positions.sql
SELECT quantity, stock_id, s.symbol as stock_symbol, s.name as stock_name
FROM positions
JOIN stocks s
ON positions.stock_id = s.id;

Let’s quickly fill in the implementation of get_all_positions:

src/trade/repository.rs
48 collapsed lines
use sqlx::PgPool;
use crate::error::Result;
use crate::trade::models::{Position, Side};
#[tonic::async_trait]
pub trait TradeRepository {
async fn add_trade(&self, stock_id: i32, side: Side, quantity: i32) -> Result<()>;
async fn get_all_positions(&self) -> Result<Vec<Position>>;
}
pub struct PostgresTradeRepository {
pool: PgPool,
}
impl PostgresTradeRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[tonic::async_trait]
impl TradeRepository for PostgresTradeRepository {
async fn add_trade(&self, stock_id: i32, side: Side, quantity: i32) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query_file!(
"src/queries/insert_trade.sql",
stock_id,
quantity.abs(),
side as _
)
.execute(&mut *tx)
.await?;
sqlx::query_file_as!(
Position,
"src/queries/upsert_position.sql",
stock_id,
quantity,
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
async fn get_all_positions(&self) -> Result<Vec<Position>> {
let positions = sqlx::query_file_as!(Position, "src/queries/all_positions.sql")
.fetch_all(&self.pool)
.await?;
Ok(positions)
}
1 collapsed line
}

Preparing for merge

As we have added new SQL queries, let’s prepare SQLx for offline mode again so that our pipeline can build the code.

Terminal window
cargo sqlx prepare

Commit the new files into git.

Conclusion

If you have followed the steps up to this point, you should have a working trade service that can receive orders and list all positions. The integration tests should pass, and Clippy should be happy.

If that’s not the case, check out the completed branch for this section.

Through implementing the trade service, we’ve seen how Rust’s robust type system and SQLx’s compile-time checks can work together to ensure data integrity and enhance our application’s reliability. In the next installment, we’ll work on the Remix application and demonstrate how this type safety can be extended to the frontend.

Next post - Part 5: The Remix app

David Steiner

I'm a software engineer and architect focusing on performant cloud-native distributed systems.

About me

Back to Blog