master

Master Thesis code
git clone git://popovic.xyz/master.git
Log | Files | Refs | README | LICENSE

pools.rs (9645B)


      1 #![allow(dead_code)]
      2 #![allow(unused_variables)]
      3 #![allow(unused_imports)]
      4 #![allow(unused_mut)]
      5 
      6 use alloy::{
      7     primitives::{address, Address, FixedBytes},
      8     providers::{Provider, RootProvider},
      9     rpc::types::{BlockId, BlockTransactionsKind, Filter},
     10     sol_types::SolEvent,
     11     transports::{http::{Client, Http}, BoxTransport}
     12 };
     13 use std::{
     14     collections::{BTreeMap, HashMap},
     15     fs::OpenOptions,
     16     path::Path,
     17     str::FromStr,
     18     sync::Arc
     19 };
     20 use indicatif::{ProgressBar, ProgressStyle};
     21 use anyhow::{Result, anyhow};
     22 use csv::StringRecord;
     23 use log::info;
     24 
     25 use crate::interfaces::*;
     26 
     27 
     28 #[derive(Debug, Clone)]
     29 pub enum Version {
     30     V2,
     31     V3
     32 }
     33 
     34 #[derive(Debug, Clone)]
     35 pub struct Pool {
     36     pub id: i64,
     37     pub address: Address,
     38     pub version: Version,
     39     pub token0: Address,
     40     pub token1: Address,
     41     pub fee: u32,
     42     pub block_number: u64,
     43     pub timestamp: u64,
     44     pub tickspacing: i32,
     45 }
     46 
     47 impl From<StringRecord> for Pool {
     48     fn from(record: StringRecord) -> Self {
     49         let version = match record.get(2).unwrap().parse().unwrap() {
     50             2 => Version::V2,
     51             _ => Version::V3
     52         };
     53         Self {
     54             id: record.get(0).unwrap().parse().unwrap(),
     55             address: Address::from_str(record.get(1).unwrap()).unwrap(),
     56             version,
     57             token0: Address::from_str(record.get(3).unwrap()).unwrap(),
     58             token1: Address::from_str(record.get(4).unwrap()).unwrap(),
     59             fee: record.get(5).unwrap().parse().unwrap(),
     60             block_number: record.get(6).unwrap().parse().unwrap(),
     61             timestamp: record.get(7).unwrap().parse().unwrap(),
     62             tickspacing: record.get(8).unwrap().parse().unwrap(),
     63         }
     64     }
     65 }
     66 
     67 
     68 impl Pool {
     69     pub fn cache_row(&self) -> (i64, String, i32, String, String, u32, u64, u64, i32) {
     70         (
     71             self.id,
     72             format!("{:?}", self.address),
     73             match self.version {
     74                 Version::V2 => 2,
     75                 _ => 3,
     76             },
     77             format!("{:?}", self.token0),
     78             format!("{:?}", self.token1),
     79             self.fee,
     80             self.block_number,
     81             self.timestamp,
     82             self.tickspacing,
     83         )
     84     }
     85 
     86     pub fn has_token(&self, token: Address) -> bool {
     87         self.token0 == token || self.token1 == token
     88     }
     89 }
     90 
     91 pub async fn load_pools(
     92     provider: RootProvider<BoxTransport>,
     93     path: &Path,
     94     from_block: u64,
     95     chunk: u64,
     96 ) -> Result<(BTreeMap<Address, Pool>, i64)> {
     97 
     98     info!("Loading Pools...");
     99 
    100     let mut pools = BTreeMap::new();
    101     let mut blocks = vec![];
    102 
    103     let file = OpenOptions::new()
    104         .write(true)
    105         .append(true)
    106         .create(true)
    107         .open(path)
    108         .unwrap();
    109 
    110     let mut writer = csv::Writer::from_writer(file);
    111 
    112     if path.exists() {
    113         let mut reader = csv::Reader::from_path(path)?;
    114         for row in reader.records() {
    115             let row = row.unwrap();
    116             let pool = Pool::from(row);
    117             blocks.push(pool.block_number);
    118             pools.insert(pool.address, pool);
    119         }
    120     } else {
    121         writer.write_record(&[
    122             "id",
    123             "address",
    124             "version",
    125             "token0",
    126             "token1",
    127             "fee",
    128             "block_number",
    129             "timestamp",
    130             "tickspacing",
    131         ])?;
    132     }
    133 
    134     let last_id = match pools.len() > 0{
    135         true => pools.last_key_value().unwrap().1.id,
    136         false => -1
    137     };
    138 
    139     let from_block = match last_id != -1 {
    140         true => {
    141             match blocks.iter().max() {
    142                 Some(b) => *b,
    143                 None => { return Err(anyhow!("load_pools could not find last processed block")); }
    144             }
    145         }
    146         false => from_block
    147     };
    148 
    149 
    150     let to_block = provider.get_block_number().await.unwrap();
    151 //    let from_block = to_block;
    152     let mut processed_blocks = 0u64;
    153     let mut block_range: Vec<(u64, u64)> = vec![];
    154 
    155     info!("From block {:?} -> To block {:?}", from_block, to_block);
    156 
    157     loop {
    158         let start_idx = from_block + processed_blocks;
    159         let mut end_idx = start_idx + chunk - 1;
    160         if end_idx > to_block {
    161             end_idx = to_block;
    162             block_range.push((start_idx, end_idx));
    163             break;
    164         }
    165         block_range.push((start_idx, end_idx));
    166         processed_blocks += chunk;
    167     }
    168 
    169     let sigs = vec![
    170         PoolCreated::SIGNATURE_HASH, // v3
    171         PairCreated::SIGNATURE_HASH, // v3
    172     ];
    173 
    174     let factories = vec![
    175         address!("0x1F98431c8aD98523631AE4a59f267346ea31F984"),
    176         address!("0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f"),
    177     ];
    178 
    179 
    180     let pb = ProgressBar::new(to_block-from_block);
    181     pb.set_style(
    182         ProgressStyle::with_template(
    183             "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} current pools: {msg}",
    184         )
    185         .unwrap()
    186         .progress_chars("##-"),
    187     );
    188     pb.inc(0);
    189 
    190     for range in block_range {
    191         match get_pool_data(
    192             provider.clone(),
    193             range.0,
    194             range.1,
    195             sigs.clone(),
    196             factories.clone(),
    197         ).await {
    198             Ok(r) => {
    199                 for p in r { pools.insert(p.address, p); }
    200             }
    201             Err(e) => {
    202                 info!("get_pool_data call error {:?}", e);
    203                 continue;
    204             }
    205         };
    206         pb.inc(chunk);
    207         pb.set_message(format!("{:?} block range {:?}-{:?}", pools.len(), range.0, range.1));
    208     }
    209 
    210     let mut id = 0;
    211     let mut added = 0;
    212 
    213     for (_, pool) in pools.iter_mut() {
    214         if pool.id == -1 {
    215             id += 1;
    216             pool.id = id;
    217         }
    218         if (pool.id as i64) > last_id {
    219             writer.serialize(pool.cache_row())?;
    220             added += 1;
    221         }
    222     }
    223     writer.flush()?;
    224 
    225     Ok((pools, last_id))
    226 }
    227 
    228 
    229 async fn get_pool_data(
    230     provider: RootProvider<BoxTransport>,
    231     from_block: u64,
    232     to_block: u64,
    233     sig_hash: Vec<FixedBytes<32>>,
    234     address: Vec<Address>,
    235 ) -> Result<Vec<Pool>> {
    236     let mut pools = Vec::new();
    237     let mut timestamp_map: HashMap<u64, u64> = HashMap::new();
    238 
    239     let filter = Filter::new()
    240         .from_block(from_block)
    241         .to_block(to_block)
    242         .event_signature(sig_hash)
    243         .address(address);
    244 
    245     let logs = match provider.get_logs(&filter).await {
    246         Ok(r) => r,
    247         Err(e) => {
    248             info!("Error getting logs {:?}", e);
    249             return Ok(pools);
    250         },
    251     };
    252 
    253     for log in logs {
    254         let (version, address, token0, token1, fee, tickspacing) = match log.topic0().unwrap() {
    255             &PairCreated::SIGNATURE_HASH => {
    256                 let event = match PairCreated::decode_log_data(
    257                     log.data(), true
    258                 ) {
    259                     Ok(r) => r,
    260                     Err(e) => {
    261                         info!("UniswapV2Factory decoding error {:?}", e);
    262                         continue;
    263                     }
    264                 };
    265                 let tickspacing: i32 = 0;
    266                 let fee: u32 = 3000;
    267                 (Version::V2, event.pair, event.token0, event.token1, fee, tickspacing)
    268             },
    269             &PoolCreated::SIGNATURE_HASH => {
    270                 let event = match PoolCreated::decode_log_data(
    271                     log.data(), true
    272                 ) {
    273                     Ok(r) => r,
    274                     Err(e) => {
    275                         info!("UniswapV3Factory decoding error {:?}", e);
    276                         continue;
    277                     }
    278                 };
    279                 (Version::V3, event.pool, event.token0, event.token1, event.fee.to::<u32>(), event.tickSpacing.as_i32())
    280             },
    281             t => {
    282                 info!("Counld not match topic {:?}", t);
    283                 continue;
    284             }
    285         };
    286 
    287         let block_number = match log.block_number {
    288             Some(r) => r,
    289             None => {
    290                 info!("log does not contain block_number");
    291                 0u64
    292             }
    293         };
    294 
    295         let timestamp = if !timestamp_map.contains_key(&block_number) {
    296             let block = match provider.get_block(
    297                 BlockId::from(block_number),
    298                 BlockTransactionsKind::default()
    299             ).await {
    300                 Ok(r) => {
    301                     match r {
    302                         Some(v) => v,
    303                         None => {
    304                             info!("No block returned");
    305                             continue;
    306                         }
    307                     }
    308                 },
    309                 Err(e) => {
    310                     info!("Could not get block {:?}", e);
    311                     continue;
    312                 }
    313             };
    314             let timestamp = block.header.timestamp;
    315             timestamp
    316         } else {
    317             let timestamp  = *timestamp_map.get(&block_number).unwrap();
    318             timestamp
    319         };
    320 
    321         let pool_data = Pool {
    322             id: -1,
    323             address,
    324             version,
    325             token0,
    326             token1,
    327             fee,
    328             block_number,
    329             timestamp,
    330             tickspacing
    331         };
    332 
    333         pools.push(pool_data)
    334     }
    335     Ok(pools)
    336 }
    337 
    338 pub fn load_pools_from_file(
    339     path: &Path,
    340 ) -> Result<BTreeMap<Address, Pool>> {
    341     let mut pools = BTreeMap::new();
    342 
    343     if path.exists() {
    344         let mut reader = csv::Reader::from_path(path)?;
    345         for row in reader.records() {
    346             let row = row.unwrap();
    347             let pool = Pool::from(row);
    348             pools.insert(pool.address, pool);
    349         }
    350     } else {
    351         return Err(anyhow!("File path does not exist"));
    352     }
    353 
    354     Ok(pools)
    355 }