prices.rs (7730B)
1 use alloy::{ 2 eips::BlockId, 3 primitives::{Address, U256}, 4 providers::RootProvider, 5 pubsub::PubSubFrontend, transports::BoxTransport, 6 }; 7 use arrow::{ 8 array::ArrayRef, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch 9 }; 10 use parquet::{ 11 arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter}, basic::Compression, file::properties::{EnabledStatistics, WriterProperties} 12 }; 13 use indicatif::{ProgressBar, ProgressStyle}; 14 use std::{ 15 collections::BTreeMap, 16 fs::{OpenOptions, File}, 17 path::Path, 18 sync::Arc 19 }; 20 use log::info; 21 use anyhow::{anyhow, Result}; 22 23 use crate::{interfaces::*, pools::{Pool, Version}}; 24 25 pub struct Price { 26 pool: Address, 27 block: u64, 28 r_t0: Option<U256>, 29 r_t1: Option<U256>, 30 spx96: Option<U256>, 31 } 32 33 34 pub async fn load_prices( 35 provider: RootProvider<BoxTransport>, 36 pools: &BTreeMap<Address, Pool>, 37 from_block: u64, 38 to_block: u64, 39 block_gap: u64, 40 path : &Path, 41 ) -> Result<Vec<Price>> { 42 43 let mut prices = Vec::new(); 44 45 46 let mut blocks = Vec::new(); 47 blocks.push(from_block); 48 let mut cur = from_block; 49 loop { 50 cur += block_gap; 51 if cur > to_block { 52 blocks.push(to_block); 53 break 54 } 55 blocks.push(cur); 56 } 57 58 let pb = ProgressBar::new(blocks.len() as u64); 59 pb.set_style( 60 ProgressStyle::with_template( 61 "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}", 62 ) 63 .unwrap() 64 .progress_chars("##-"), 65 ); 66 pb.set_message(format!("From block {:?} - To Block {:?}", from_block, to_block)); 67 pb.inc(0); 68 69 for block in blocks { 70 'pool_loop: for (_, pool) in pools.into_iter() { 71 match pool.version { 72 Version::V2 => { 73 match get_v2_price( 74 provider.clone(), 75 block, 76 pool, 77 ).await { 78 Ok(price) => { 79 prices.push( 80 Price { 81 pool: pool.address, 82 block, 83 r_t0: Some(price.0), 84 r_t1: Some(price.1), 85 spx96: None, 86 } 87 ); 88 } 89 Err(e) => { 90 info!("Error getting price {:?}", e); 91 continue 'pool_loop; 92 } 93 }; 94 } 95 Version::V3 => { 96 match get_v3_price( 97 provider.clone(), 98 block, 99 pool 100 ).await { 101 Ok(price) => { 102 prices.push( 103 Price { 104 pool: pool.address, 105 block, 106 r_t0: Some(price.0), 107 r_t1: Some(price.1), 108 spx96: Some(price.2), 109 } 110 ); 111 } 112 Err(e) => { 113 info!("Error getting price {:?}", e); 114 continue 'pool_loop; 115 } 116 }; 117 } 118 } 119 } 120 pb.inc(1) 121 } 122 123 let file = OpenOptions::new() 124 .write(true) 125 .truncate(true) 126 .create(true) 127 .open(path) 128 .unwrap(); 129 130 let props = WriterProperties::builder() 131 .set_compression(Compression::SNAPPY) 132 .set_statistics_enabled(EnabledStatistics::None) 133 .build(); 134 135 let batch = create_record_batch(&prices).unwrap(); 136 println!("{:?}", batch.schema()); 137 138 let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap(); 139 writer.write(&batch).unwrap(); 140 writer.close().unwrap(); 141 142 Ok(prices) 143 } 144 145 fn create_record_batch( 146 prices: &Vec<Price>, 147 ) -> Result<RecordBatch> { 148 149 let pools = prices.iter() 150 .map(|p| format!("{:?}", p.pool)) 151 .collect::<Vec<String>>(); 152 153 let blocks = prices.iter() 154 .map(|p| p.block as i64) 155 .collect::<Vec<i64>>(); 156 157 let r_t0s = prices.iter() 158 .map(|p| match p.r_t0{ 159 Some(r) => Some(format!("{:?}", r)), 160 None => None, 161 }).collect::<Vec<Option<String>>>(); 162 163 let r_t1s = prices.iter() 164 .map(|p| match p.r_t1{ 165 Some(r) => Some(format!("{:?}", r)), 166 None => None, 167 }) 168 .collect::<Vec<Option<String>>>(); 169 170 let spx96s = prices.iter() 171 .map(|p| match p.spx96 { 172 Some(r) => Some(format!("{:?}", r)), 173 None => None, 174 }) 175 .collect::<Vec<Option<String>>>(); 176 177 let batch = RecordBatch::try_from_iter( 178 vec![ 179 ("pool_address", Arc::new(arrow::array::StringArray::from(pools)) as ArrayRef), 180 ("block_number", Arc::new(arrow::array::Int64Array::from(blocks)) as ArrayRef), 181 ("reserve_t0", Arc::new(arrow::array::StringArray::from(r_t0s)) as ArrayRef), 182 ("reserve_t1", Arc::new(arrow::array::StringArray::from(r_t1s)) as ArrayRef), 183 ("sqrt_price_x96", Arc::new(arrow::array::StringArray::from(spx96s)) as ArrayRef), 184 ] 185 ).unwrap(); 186 187 Ok(batch) 188 } 189 190 async fn get_v2_price( 191 provider: RootProvider<BoxTransport>, 192 block_number: u64, 193 pool: &Pool, 194 ) -> Result<(U256, U256)> { 195 196 let block = BlockId::from(block_number); 197 198 let token0_ierc20 = IERC20::new(pool.token0, &provider); // token1 199 let token1_ierc20 = IERC20::new(pool.token1, &provider); // token1 200 201 let balance_token0 = match token0_ierc20 202 .balanceOf(pool.address) 203 .block(block) 204 .call() 205 .await { 206 Ok(r) => r.balance, 207 Err(e) => { return Err(anyhow!("Error getting balance_token0 {:?}", e)); } 208 }; 209 210 let balance_token1 = match token1_ierc20 211 .balanceOf(pool.address) 212 .block(block) 213 .call() 214 .await { 215 Ok(r) => r.balance, 216 Err(e) => { return Err(anyhow!("Error getting balance_token1 {:?}", e)); } 217 }; 218 219 return Ok((balance_token0, balance_token1)); 220 } 221 222 async fn get_v3_price( 223 provider: RootProvider<BoxTransport>, 224 block_number: u64, 225 pool: &Pool, 226 ) -> Result<(U256, U256, U256)> { 227 228 let block = BlockId::from(block_number); 229 230 let token0_ierc20 = IERC20::new(pool.token0, &provider); // token1 231 let token1_ierc20 = IERC20::new(pool.token1, &provider); // token1 232 let pool_int = IUniswapV3Pool::new(pool.address, &provider); // token1 233 234 235 let balance_token0 = match token0_ierc20 236 .balanceOf(pool.address) 237 .block(block) 238 .call() 239 .await { 240 Ok(r) => r.balance, 241 Err(e) => { return Err(anyhow!("Error getting balance_token0 {:?}", e)); } 242 }; 243 let balance_token1 = match token1_ierc20 244 .balanceOf(pool.address) 245 .block(block) 246 .call() 247 .await { 248 Ok(r) => r.balance, 249 Err(e) => { return Err(anyhow!("Error getting balance_token1 {:?}", e)); } 250 }; 251 252 let sqrt_price_x96 = match pool_int 253 .slot0() 254 .block(block) 255 .call() 256 .await { 257 Ok(r) => U256::from(r.sqrtPriceX96), 258 Err(e) => { return Err(anyhow!("Error returning sqrt_price_x96 {:?}", e)); } 259 }; 260 261 return Ok((balance_token0, balance_token1, sqrt_price_x96)); 262 }