Previously, we implemented a simple reference data service to add and list stocks. In this post, we’ll implement a trade service analogous in structure to the reference data service.
We’ll pick up the pace here, focusing less on the repetitive boilerplate code we’ve seen in earlier posts and more on new concepts.
If you ever get stuck, please refer to the finished branch for this post.
Defining the gRPC service
Let’s start by creating the interface we’d like to implement in a new proto package.
syntax = "proto3";
import "google/protobuf/empty.proto";import "refdata.proto";
package trade;
enum Side { SELL = 0; BUY = 1;}
message MarketOrder { int32 stock_id = 1; uint32 quantity = 2; Side side = 3;}
message Position { refdata.Stock stock = 1; uint32 quantity = 2;}
message Positions { repeated Position positions = 1;}
service Trade { rpc SendOrder(MarketOrder) returns (google.protobuf.Empty); rpc AllPositions(google.protobuf.Empty) returns (Positions);}
The interface consists of two functions.
The SendOrder
function takes a market order.
In this dummy application, the order is immediately executed
for both BUY
and SELL
actions, provided there are sufficient shares.
Positions provide a summarized view of all orders, updating automatically with each new order sent.
AllPositions
returns the position for every stock.
There is no need to change build.rs
, as it picks up new files in the
protos
directory. We need to register the Rust module, though.
pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("trading_descriptor");
pub mod proto { pub mod refdata { tonic::include_proto!("refdata"); }
pub mod trade { tonic::include_proto!("trade"); }}
New modules for trade service
Add a new trade.rs
module with 3 sub-modules:
models.rs
repository.rs
service.rs
Let’s create our types to model positions and the side of an order:
pub struct Position { pub stock_id: i32, pub stock_symbol: String, pub stock_name: String, pub quantity: i32,}
#[derive(Debug, sqlx::Type)]#[sqlx(type_name = "side", rename_all = "lowercase")]pub enum Side { Buy, Sell,}
Just like for the reference data service, we’ll create an abstract repository trait and a concrete Postgres implementation.
use sqlx::PgPool;
use crate::error::Result;use crate::trade::models::{Position, Side};
#[tonic::async_trait]pub trait TradeRepository { async fn add_trade(&self, stock_id: i32, side: Side, quantity: i32) -> Result<()>; async fn get_all_positions(&self) -> Result<Vec<Position>>;}
pub struct PostgresTradeRepository { pool: PgPool,}
impl PostgresTradeRepository { pub fn new(pool: PgPool) -> Self { Self { pool } }}
#[tonic::async_trait]impl TradeRepository for PostgresTradeRepository { async fn add_trade(&self, stock_id: i32, side: Side, quantity: i32) -> Result<()> { todo!() }
async fn get_all_positions(&self) -> Result<Vec<Position>> { todo!() }}
We’ll leave the implementation of add_trade and get_all_positions as TODOs for now, focusing on these interesting parts later.
Finally, let’s implement the auto-generated service trait using the repository.
use tonic::{Request, Response, Status};
use crate::services::proto::refdata::Stock;use crate::services::proto::trade::trade_server::Trade;use crate::services::proto::trade::{MarketOrder, Position, Positions, Side};use crate::trade::models::{Position as PositionModel, Side as SideModel};use crate::trade::repository::TradeRepository;
pub struct TradeService<R> { repository: R,}
impl<R> TradeService<R> { pub fn new(repository: R) -> Self { Self { repository } }}
#[tonic::async_trait]impl<R> Trade for TradeService<R>where R: TradeRepository + Send + Sync + 'static,{ async fn send_order(&self, request: Request<MarketOrder>) -> Result<Response<()>, Status> { let order = request.get_ref(); let (side, quantity) = match order.side() { Side::Sell => (SideModel::Sell, -(order.quantity as i32)), Side::Buy => (SideModel::Buy, order.quantity as i32), }; self.repository .add_trade(order.stock_id, side, quantity) .await?;
Ok(Response::new(())) }
async fn all_positions(&self, _request: Request<()>) -> Result<Response<Positions>, Status> { let positions = self .repository .get_all_positions() .await? .into_iter() .map(Into::into) .collect();
Ok(Response::new(Positions { positions })) }}
impl From<PositionModel> for Position { fn from(value: PositionModel) -> Self { let stock = Stock { id: value.stock_id, symbol: value.stock_symbol, name: value.stock_name, }; Self { stock: Some(stock), quantity: value.quantity as u32, } }}
Nothing interesting to see here, we’re just massaging the types to satisfy the desired types.
Registering the new service
Let’s register the trade module in lib.rs
as a public module and
expose the types we’ll need to register it with the Tonic service.
mod models;mod repository;mod service;
pub use repository::{PostgresTradeRepository, TradeRepository};pub use service::TradeService;
With the required types exposed, we can add the trade service to the Tonic service.
7 collapsed lines
use anyhow::Result;use sqlx::postgres::PgPoolOptions;use sqlx::PgPool;use tonic::transport::Server;use tracing::info;use tracing_subscriber::layer::SubscriberExt;use tracing_subscriber::util::SubscriberInitExt;
use trading_api::refdata::{PostgresRefDataRepository, RefDataService};use trading_api::services::proto::refdata::ref_data_server::RefDataServer;use trading_api::services::proto::trade::trade_server::TradeServer;use trading_api::services::FILE_DESCRIPTOR_SET;use trading_api::trade::{PostgresTradeRepository, TradeService};
#[tokio::main]async fn main() -> Result<()> { dotenvy::dotenv()?; setup_tracing_registry(); let pool = pg_pool().await?;
let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) .build()?; let refdata_service = RefDataServer::new(build_refdata_service(pool.clone())); let trade_service = TradeServer::new(build_trade_service(pool.clone()));
let address = "0.0.0.0:8080".parse()?; info!("Service is listening on {address}");
Server::builder() .add_service(reflection_service) .add_service(refdata_service) .add_service(trade_service) .serve(address) .await?;
Ok(())}5 collapsed lines
fn build_refdata_service(pool: PgPool) -> RefDataService<PostgresRefDataRepository> { let repository = PostgresRefDataRepository::new(pool); RefDataService::new(repository)}
fn build_trade_service(pool: PgPool) -> TradeService<PostgresTradeRepository> { let repository = PostgresTradeRepository::new(pool); TradeService::new(repository)}
18 collapsed lines
pub async fn pg_pool() -> Result<PgPool> { let url = std::env::var("DATABASE_URL")?; let pool = PgPoolOptions::new() .max_connections(5) .connect(url.as_str()) .await?;
Ok(pool)}
fn setup_tracing_registry() { tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( std::env::var("RUST_LOG").unwrap_or_else(|_| "info,trading_api=debug".into()), )) .with(tracing_subscriber::fmt::layer().pretty()) .init();}
You can explore the new endpoints using grpcui
, but of course they will fail for now
due to the TODOs we have yet to complete.
Adding tests
At this point, it’s probably worth stopping to consider what behaviour we expect from our endpoints, and set our expectations in a few simple integration tests.
A comprehensive test suite is beyond the scope of this article, but here are a few test cases to serve as examples.
97 collapsed lines
use sqlx::PgPool;use tonic::{Code, Request};use trading_api::services::proto::trade::trade_server::Trade;use trading_api::services::proto::trade::{MarketOrder, Positions};use trading_api::trade::{PostgresTradeRepository, TradeService};
#[sqlx::test(fixtures("stocks"))]async fn test_simple_order(pool: PgPool) { let service = create_service(pool);
let order = MarketOrder { stock_id: 1, quantity: 100, side: 1, }; service.send_order(Request::new(order)).await.unwrap();
let response = service.all_positions(Request::new(())).await.unwrap(); assert_eq!(response.get_ref().positions.len(), 1); assert_position(response.get_ref(), "AAPL", 100);}
#[sqlx::test(fixtures("stocks"))]async fn test_orders_with_different_stocks(pool: PgPool) { let service = create_service(pool);
let order = MarketOrder { stock_id: 1, quantity: 100, side: 1, }; service.send_order(Request::new(order)).await.unwrap();
let order = MarketOrder { stock_id: 2, quantity: 200, side: 1, }; service.send_order(Request::new(order)).await.unwrap();
let response = service.all_positions(Request::new(())).await.unwrap(); assert_eq!(response.get_ref().positions.len(), 2);}
#[sqlx::test(fixtures("stocks"))]async fn test_selling_stock(pool: PgPool) { let service = create_service(pool);
let order = MarketOrder { stock_id: 1, quantity: 100, side: 1, }; service.send_order(Request::new(order)).await.unwrap();
let response = service.all_positions(Request::new(())).await.unwrap(); assert_position(response.get_ref(), "AAPL", 100);
let order = MarketOrder { stock_id: 1, quantity: 60, side: 2, }; service.send_order(Request::new(order)).await.unwrap(); let response = service.all_positions(Request::new(())).await.unwrap(); assert_position(response.get_ref(), "AAPL", 40);
let order = MarketOrder { stock_id: 1, quantity: 60, side: 2, }; let response = service.send_order(Request::new(order)).await; if let Err(status) = response { assert_eq!(status.code(), Code::InvalidArgument); } else { panic!("expected an error status to be returned"); }}
fn create_service(pool: PgPool) -> TradeService<PostgresTradeRepository> { let repository = PostgresTradeRepository::new(pool); TradeService::new(repository)}
fn assert_position(positions: &Positions, symbol: &str, quantity: u32) { match positions .positions .iter() .find(|p| p.stock.as_ref().unwrap().symbol == symbol) { None => panic!("position for {symbol} is missing"), Some(p) => { assert_eq!(p.quantity, quantity); } }}
By the time we finish the TODO implementations, these tests should pass.
Adding a new migration
We need two new tables for trades (executed orders) and overall positions. Let’s create these in a new migration.
sqlx migrate add trades
The up
migration just creates the tables. We can also add any new indexes
as required.
CREATE TYPE side AS ENUM ('buy', 'sell');CREATE TABLE trades( id SERIAL PRIMARY KEY, stock_id INT NOT NULL, quantity INT NOT NULL, side SIDE, executed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (stock_id) REFERENCES stocks (id));
CREATE TABLE positions( stock_id INT PRIMARY KEY, quantity INT NOT NULL CHECK (quantity >= 0), FOREIGN KEY (stock_id) REFERENCES stocks (id));
CREATE INDEX idx_trades_stock_id ON trades (stock_id);CREATE INDEX idx_positions_stock_id ON positions(stock_id);
Next, we add the down script:
DROP TABLE positions;DROP TABLE trades;DROP TYPE side;
You can verify the migration is working by repeatedly applying and reverting it. Make sure the migration is applied before we proceed.
Implementing the repository
The SQLx implementation of the repository is where we can apply some new concepts. The SQL queries required for the trade service are a little more involved than the ones we had for the reference data service. For more complex cases, SQLx supports defining your queries in dedicated SQL files.
This allows us to separate SQL logic from Rust code, enhancing readability and maintainability.
Handling orders
Create a new src/queries
directory for our SQL files. We’ll have two queries for
new orders - one to insert the trade and one to upsert positions.
The insert statement is straightforward:
INSERT INTO trades (stock_id, quantity, side)VALUES ($1, $2, $3);
The statement to upsert positions is a little more involved:
WITH trade (stock_id, quantity) as (VALUES ($1::INT, $2::INT)), upserted as ( UPDATE positions p SET quantity = p.quantity + trade.quantity FROM trade WHERE trade.stock_id = p.stock_id RETURNING p.*)INSERTINTO positions (stock_id, quantity)SELECT stock_id, quantityFROM trade JOIN stocks ON trade.stock_id = stocks.idWHERE NOT EXISTS (SELECT 1 FROM upserted WHERE upserted.stock_id = trade.stock_id)ON CONFLICT (stock_id) DO NOTHING;
Having this inline in the Rust code would have been quite distracting! Separate SQL files also play nicely with IDEs with database support - you get proper syntax highlighting, auto-complete, and you can run the SQL code to test it out.
We are now equipped to tackle the implementation of add_trade
in the
repository.
23 collapsed lines
use sqlx::PgPool;
use crate::error::Result;use crate::trade::models::{Position, Side};
#[tonic::async_trait]pub trait TradeRepository { async fn add_trade(&self, stock_id: i32, side: Side, quantity: i32) -> Result<()>; async fn get_all_positions(&self) -> Result<Vec<Position>>;}
pub struct PostgresTradeRepository { pool: PgPool,}
impl PostgresTradeRepository { pub fn new(pool: PgPool) -> Self { Self { pool } }}
#[tonic::async_trait]impl TradeRepository for PostgresTradeRepository { async fn add_trade(&self, stock_id: i32, side: Side, quantity: i32) -> Result<()> { let mut tx = self.pool.begin().await?;
sqlx::query_file!( "src/queries/insert_trade.sql", stock_id, quantity.abs(), side as _ ) .execute(&mut *tx) .await?; sqlx::query_file_as!( Position, "src/queries/upsert_position.sql", stock_id, quantity, ) .execute(&mut *tx) .await?;
tx.commit().await?;
Ok(()) }5 collapsed lines
async fn get_all_positions(&self) -> Result<Vec<Position>> { todo!() }}
Note, we use the query_file_as!
macro to source the SQL queries from
the files we have defined. The other difference from the reference data
service is that we use a transaction to ensure consistency,
rather than passing the connection pool directly to execute
.
Getting all positions
Thankfully, the query to get the positions is much simpler:
SELECT quantity, stock_id, s.symbol as stock_symbol, s.name as stock_nameFROM positionsJOIN stocks sON positions.stock_id = s.id;
Let’s quickly fill in the implementation of get_all_positions
:
48 collapsed lines
use sqlx::PgPool;
use crate::error::Result;use crate::trade::models::{Position, Side};
#[tonic::async_trait]pub trait TradeRepository { async fn add_trade(&self, stock_id: i32, side: Side, quantity: i32) -> Result<()>; async fn get_all_positions(&self) -> Result<Vec<Position>>;}
pub struct PostgresTradeRepository { pool: PgPool,}
impl PostgresTradeRepository { pub fn new(pool: PgPool) -> Self { Self { pool } }}
#[tonic::async_trait]impl TradeRepository for PostgresTradeRepository { async fn add_trade(&self, stock_id: i32, side: Side, quantity: i32) -> Result<()> { let mut tx = self.pool.begin().await?;
sqlx::query_file!( "src/queries/insert_trade.sql", stock_id, quantity.abs(), side as _ ) .execute(&mut *tx) .await?; sqlx::query_file_as!( Position, "src/queries/upsert_position.sql", stock_id, quantity, ) .execute(&mut *tx) .await?;
tx.commit().await?;
Ok(()) }
async fn get_all_positions(&self) -> Result<Vec<Position>> { let positions = sqlx::query_file_as!(Position, "src/queries/all_positions.sql") .fetch_all(&self.pool) .await?;
Ok(positions) }1 collapsed line
}
Preparing for merge
As we have added new SQL queries, let’s prepare SQLx for offline mode again so that our pipeline can build the code.
cargo sqlx prepare
Commit the new files into git.
Conclusion
If you have followed the steps up to this point, you should have a working trade service that can receive orders and list all positions. The integration tests should pass, and Clippy should be happy.
If that’s not the case, check out the completed branch for this section.
Through implementing the trade service, we’ve seen how Rust’s robust type system and SQLx’s compile-time checks can work together to ensure data integrity and enhance our application’s reliability. In the next installment, we’ll work on the Remix application and demonstrate how this type safety can be extended to the frontend.
Next post - Part 5: The Remix app
David Steiner
I'm a software engineer and architect focusing on performant cloud-native distributed systems.