master

Master Thesis code
git clone git://popovic.xyz/master.git
Log | Files | Refs | README | LICENSE

prices.rs (7730B)


      1 use alloy::{
      2     eips::BlockId,
      3     primitives::{Address, U256},
      4     providers::RootProvider,
      5     pubsub::PubSubFrontend, transports::BoxTransport,
      6 };
      7 use arrow::{
      8     array::ArrayRef, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch
      9 };
     10 use parquet::{
     11     arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter}, basic::Compression, file::properties::{EnabledStatistics, WriterProperties}
     12 };
     13 use indicatif::{ProgressBar, ProgressStyle};
     14 use std::{
     15     collections::BTreeMap,
     16     fs::{OpenOptions, File},
     17     path::Path,
     18     sync::Arc
     19 };
     20 use log::info;
     21 use anyhow::{anyhow, Result};
     22 
     23 use crate::{interfaces::*, pools::{Pool, Version}};
     24 
     25 pub struct Price {
     26     pool: Address,
     27     block: u64,
     28     r_t0: Option<U256>,
     29     r_t1: Option<U256>,
     30     spx96: Option<U256>,
     31 }
     32 
     33 
     34 pub async fn load_prices(
     35     provider: RootProvider<BoxTransport>,
     36     pools: &BTreeMap<Address, Pool>,
     37     from_block: u64,
     38     to_block: u64,
     39     block_gap: u64,
     40     path : &Path,
     41 ) -> Result<Vec<Price>> {
     42 
     43     let mut prices = Vec::new();
     44 
     45 
     46     let mut blocks = Vec::new();
     47     blocks.push(from_block);
     48     let mut cur = from_block;
     49     loop {
     50         cur += block_gap;
     51         if cur > to_block {
     52             blocks.push(to_block);
     53             break
     54         }
     55         blocks.push(cur);
     56     }
     57 
     58     let pb = ProgressBar::new(blocks.len() as u64);
     59     pb.set_style(
     60         ProgressStyle::with_template(
     61             "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
     62         )
     63         .unwrap()
     64         .progress_chars("##-"),
     65     );
     66     pb.set_message(format!("From block {:?} - To Block {:?}", from_block, to_block));
     67     pb.inc(0);
     68 
     69     for block in blocks {
     70         'pool_loop: for (_, pool) in pools.into_iter() {
     71             match pool.version {
     72                 Version::V2 => {
     73                     match get_v2_price(
     74                         provider.clone(),
     75                         block,
     76                         pool,
     77                     ).await {
     78                         Ok(price) => {
     79                             prices.push(
     80                                 Price {
     81                                     pool: pool.address,
     82                                     block,
     83                                     r_t0: Some(price.0),
     84                                     r_t1: Some(price.1),
     85                                     spx96: None,
     86                                 }
     87                             );
     88                         }
     89                         Err(e) => {
     90                             info!("Error getting price {:?}", e);
     91                             continue 'pool_loop;
     92                         }
     93                     };
     94                 }
     95                 Version::V3 => {
     96                     match get_v3_price(
     97                         provider.clone(),
     98                         block,
     99                         pool
    100                     ).await {
    101                         Ok(price) => {
    102                             prices.push(
    103                                 Price {
    104                                     pool: pool.address,
    105                                     block,
    106                                     r_t0: Some(price.0),
    107                                     r_t1: Some(price.1),
    108                                     spx96: Some(price.2),
    109                                 }
    110                             );
    111                         }
    112                         Err(e) => {
    113                             info!("Error getting price {:?}", e);
    114                             continue 'pool_loop;
    115                         }
    116                     };
    117                 }
    118             }
    119         }
    120         pb.inc(1)
    121     }
    122 
    123     let file = OpenOptions::new()
    124         .write(true)
    125         .truncate(true)
    126         .create(true)
    127         .open(path)
    128         .unwrap();
    129 
    130     let props = WriterProperties::builder()
    131         .set_compression(Compression::SNAPPY)
    132         .set_statistics_enabled(EnabledStatistics::None)
    133         .build();
    134 
    135     let batch = create_record_batch(&prices).unwrap();
    136     println!("{:?}", batch.schema());
    137 
    138     let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap();
    139     writer.write(&batch).unwrap();
    140     writer.close().unwrap();
    141 
    142     Ok(prices)
    143 }
    144 
    145 fn create_record_batch(
    146     prices: &Vec<Price>,
    147 ) -> Result<RecordBatch> {
    148 
    149     let pools = prices.iter()
    150         .map(|p| format!("{:?}", p.pool))
    151         .collect::<Vec<String>>();
    152 
    153     let blocks = prices.iter()
    154         .map(|p| p.block as i64)
    155         .collect::<Vec<i64>>();
    156 
    157     let r_t0s = prices.iter()
    158         .map(|p| match p.r_t0{
    159             Some(r) => Some(format!("{:?}", r)),
    160             None => None,
    161         }).collect::<Vec<Option<String>>>();
    162 
    163     let r_t1s = prices.iter()
    164         .map(|p| match p.r_t1{
    165             Some(r) => Some(format!("{:?}", r)),
    166             None => None,
    167         })
    168         .collect::<Vec<Option<String>>>();
    169 
    170     let spx96s = prices.iter()
    171         .map(|p| match p.spx96 {
    172             Some(r) => Some(format!("{:?}", r)),
    173             None => None,
    174         })
    175         .collect::<Vec<Option<String>>>();
    176 
    177     let batch = RecordBatch::try_from_iter(
    178         vec![
    179             ("pool_address", Arc::new(arrow::array::StringArray::from(pools)) as ArrayRef),
    180             ("block_number", Arc::new(arrow::array::Int64Array::from(blocks)) as ArrayRef),
    181             ("reserve_t0", Arc::new(arrow::array::StringArray::from(r_t0s)) as ArrayRef),
    182             ("reserve_t1", Arc::new(arrow::array::StringArray::from(r_t1s)) as ArrayRef),
    183             ("sqrt_price_x96", Arc::new(arrow::array::StringArray::from(spx96s)) as ArrayRef),
    184         ]
    185     ).unwrap();
    186 
    187     Ok(batch)
    188 }
    189 
    190 async fn get_v2_price(
    191     provider: RootProvider<BoxTransport>,
    192     block_number: u64,
    193     pool: &Pool,
    194 ) -> Result<(U256, U256)> {
    195 
    196     let block = BlockId::from(block_number);
    197 
    198     let token0_ierc20 = IERC20::new(pool.token0, &provider); // token1
    199     let token1_ierc20 = IERC20::new(pool.token1, &provider); // token1
    200 
    201     let balance_token0 = match token0_ierc20
    202         .balanceOf(pool.address)
    203         .block(block)
    204         .call()
    205         .await {
    206             Ok(r) => r.balance,
    207             Err(e) => { return Err(anyhow!("Error getting balance_token0 {:?}", e)); }
    208     };
    209 
    210     let balance_token1 = match token1_ierc20
    211         .balanceOf(pool.address)
    212         .block(block)
    213         .call()
    214         .await {
    215             Ok(r) => r.balance,
    216             Err(e) => { return Err(anyhow!("Error getting balance_token1 {:?}", e)); }
    217     };
    218 
    219     return Ok((balance_token0, balance_token1));
    220 }
    221 
    222 async fn get_v3_price(
    223     provider: RootProvider<BoxTransport>,
    224     block_number: u64,
    225     pool: &Pool,
    226 ) -> Result<(U256, U256, U256)> {
    227 
    228     let block = BlockId::from(block_number);
    229 
    230     let token0_ierc20 = IERC20::new(pool.token0, &provider); // token1
    231     let token1_ierc20 = IERC20::new(pool.token1, &provider); // token1
    232     let pool_int = IUniswapV3Pool::new(pool.address, &provider); // token1
    233 
    234 
    235     let balance_token0 = match token0_ierc20
    236         .balanceOf(pool.address)
    237         .block(block)
    238         .call()
    239         .await {
    240             Ok(r) => r.balance,
    241             Err(e) => { return Err(anyhow!("Error getting balance_token0 {:?}", e)); }
    242     };
    243     let balance_token1 = match token1_ierc20
    244         .balanceOf(pool.address)
    245         .block(block)
    246         .call()
    247         .await {
    248             Ok(r) => r.balance,
    249             Err(e) => { return Err(anyhow!("Error getting balance_token1 {:?}", e)); }
    250     };
    251 
    252     let sqrt_price_x96 = match pool_int
    253         .slot0()
    254         .block(block)
    255         .call()
    256         .await {
    257         Ok(r) => U256::from(r.sqrtPriceX96),
    258         Err(e) => { return Err(anyhow!("Error returning sqrt_price_x96 {:?}", e)); }
    259     };
    260 
    261     return Ok((balance_token0, balance_token1, sqrt_price_x96));
    262 }