pools.rs (9645B)
1 #![allow(dead_code)] 2 #![allow(unused_variables)] 3 #![allow(unused_imports)] 4 #![allow(unused_mut)] 5 6 use alloy::{ 7 primitives::{address, Address, FixedBytes}, 8 providers::{Provider, RootProvider}, 9 rpc::types::{BlockId, BlockTransactionsKind, Filter}, 10 sol_types::SolEvent, 11 transports::{http::{Client, Http}, BoxTransport} 12 }; 13 use std::{ 14 collections::{BTreeMap, HashMap}, 15 fs::OpenOptions, 16 path::Path, 17 str::FromStr, 18 sync::Arc 19 }; 20 use indicatif::{ProgressBar, ProgressStyle}; 21 use anyhow::{Result, anyhow}; 22 use csv::StringRecord; 23 use log::info; 24 25 use crate::interfaces::*; 26 27 28 #[derive(Debug, Clone)] 29 pub enum Version { 30 V2, 31 V3 32 } 33 34 #[derive(Debug, Clone)] 35 pub struct Pool { 36 pub id: i64, 37 pub address: Address, 38 pub version: Version, 39 pub token0: Address, 40 pub token1: Address, 41 pub fee: u32, 42 pub block_number: u64, 43 pub timestamp: u64, 44 pub tickspacing: i32, 45 } 46 47 impl From<StringRecord> for Pool { 48 fn from(record: StringRecord) -> Self { 49 let version = match record.get(2).unwrap().parse().unwrap() { 50 2 => Version::V2, 51 _ => Version::V3 52 }; 53 Self { 54 id: record.get(0).unwrap().parse().unwrap(), 55 address: Address::from_str(record.get(1).unwrap()).unwrap(), 56 version, 57 token0: Address::from_str(record.get(3).unwrap()).unwrap(), 58 token1: Address::from_str(record.get(4).unwrap()).unwrap(), 59 fee: record.get(5).unwrap().parse().unwrap(), 60 block_number: record.get(6).unwrap().parse().unwrap(), 61 timestamp: record.get(7).unwrap().parse().unwrap(), 62 tickspacing: record.get(8).unwrap().parse().unwrap(), 63 } 64 } 65 } 66 67 68 impl Pool { 69 pub fn cache_row(&self) -> (i64, String, i32, String, String, u32, u64, u64, i32) { 70 ( 71 self.id, 72 format!("{:?}", self.address), 73 match self.version { 74 Version::V2 => 2, 75 _ => 3, 76 }, 77 format!("{:?}", self.token0), 78 format!("{:?}", self.token1), 79 self.fee, 80 self.block_number, 81 self.timestamp, 82 self.tickspacing, 83 ) 84 } 85 86 pub fn has_token(&self, token: Address) -> bool { 87 self.token0 == token || self.token1 == token 88 } 89 } 90 91 pub async fn load_pools( 92 provider: RootProvider<BoxTransport>, 93 path: &Path, 94 from_block: u64, 95 chunk: u64, 96 ) -> Result<(BTreeMap<Address, Pool>, i64)> { 97 98 info!("Loading Pools..."); 99 100 let mut pools = BTreeMap::new(); 101 let mut blocks = vec![]; 102 103 let file = OpenOptions::new() 104 .write(true) 105 .append(true) 106 .create(true) 107 .open(path) 108 .unwrap(); 109 110 let mut writer = csv::Writer::from_writer(file); 111 112 if path.exists() { 113 let mut reader = csv::Reader::from_path(path)?; 114 for row in reader.records() { 115 let row = row.unwrap(); 116 let pool = Pool::from(row); 117 blocks.push(pool.block_number); 118 pools.insert(pool.address, pool); 119 } 120 } else { 121 writer.write_record(&[ 122 "id", 123 "address", 124 "version", 125 "token0", 126 "token1", 127 "fee", 128 "block_number", 129 "timestamp", 130 "tickspacing", 131 ])?; 132 } 133 134 let last_id = match pools.len() > 0{ 135 true => pools.last_key_value().unwrap().1.id, 136 false => -1 137 }; 138 139 let from_block = match last_id != -1 { 140 true => { 141 match blocks.iter().max() { 142 Some(b) => *b, 143 None => { return Err(anyhow!("load_pools could not find last processed block")); } 144 } 145 } 146 false => from_block 147 }; 148 149 150 let to_block = provider.get_block_number().await.unwrap(); 151 // let from_block = to_block; 152 let mut processed_blocks = 0u64; 153 let mut block_range: Vec<(u64, u64)> = vec![]; 154 155 info!("From block {:?} -> To block {:?}", from_block, to_block); 156 157 loop { 158 let start_idx = from_block + processed_blocks; 159 let mut end_idx = start_idx + chunk - 1; 160 if end_idx > to_block { 161 end_idx = to_block; 162 block_range.push((start_idx, end_idx)); 163 break; 164 } 165 block_range.push((start_idx, end_idx)); 166 processed_blocks += chunk; 167 } 168 169 let sigs = vec![ 170 PoolCreated::SIGNATURE_HASH, // v3 171 PairCreated::SIGNATURE_HASH, // v3 172 ]; 173 174 let factories = vec![ 175 address!("0x1F98431c8aD98523631AE4a59f267346ea31F984"), 176 address!("0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f"), 177 ]; 178 179 180 let pb = ProgressBar::new(to_block-from_block); 181 pb.set_style( 182 ProgressStyle::with_template( 183 "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} current pools: {msg}", 184 ) 185 .unwrap() 186 .progress_chars("##-"), 187 ); 188 pb.inc(0); 189 190 for range in block_range { 191 match get_pool_data( 192 provider.clone(), 193 range.0, 194 range.1, 195 sigs.clone(), 196 factories.clone(), 197 ).await { 198 Ok(r) => { 199 for p in r { pools.insert(p.address, p); } 200 } 201 Err(e) => { 202 info!("get_pool_data call error {:?}", e); 203 continue; 204 } 205 }; 206 pb.inc(chunk); 207 pb.set_message(format!("{:?} block range {:?}-{:?}", pools.len(), range.0, range.1)); 208 } 209 210 let mut id = 0; 211 let mut added = 0; 212 213 for (_, pool) in pools.iter_mut() { 214 if pool.id == -1 { 215 id += 1; 216 pool.id = id; 217 } 218 if (pool.id as i64) > last_id { 219 writer.serialize(pool.cache_row())?; 220 added += 1; 221 } 222 } 223 writer.flush()?; 224 225 Ok((pools, last_id)) 226 } 227 228 229 async fn get_pool_data( 230 provider: RootProvider<BoxTransport>, 231 from_block: u64, 232 to_block: u64, 233 sig_hash: Vec<FixedBytes<32>>, 234 address: Vec<Address>, 235 ) -> Result<Vec<Pool>> { 236 let mut pools = Vec::new(); 237 let mut timestamp_map: HashMap<u64, u64> = HashMap::new(); 238 239 let filter = Filter::new() 240 .from_block(from_block) 241 .to_block(to_block) 242 .event_signature(sig_hash) 243 .address(address); 244 245 let logs = match provider.get_logs(&filter).await { 246 Ok(r) => r, 247 Err(e) => { 248 info!("Error getting logs {:?}", e); 249 return Ok(pools); 250 }, 251 }; 252 253 for log in logs { 254 let (version, address, token0, token1, fee, tickspacing) = match log.topic0().unwrap() { 255 &PairCreated::SIGNATURE_HASH => { 256 let event = match PairCreated::decode_log_data( 257 log.data(), true 258 ) { 259 Ok(r) => r, 260 Err(e) => { 261 info!("UniswapV2Factory decoding error {:?}", e); 262 continue; 263 } 264 }; 265 let tickspacing: i32 = 0; 266 let fee: u32 = 3000; 267 (Version::V2, event.pair, event.token0, event.token1, fee, tickspacing) 268 }, 269 &PoolCreated::SIGNATURE_HASH => { 270 let event = match PoolCreated::decode_log_data( 271 log.data(), true 272 ) { 273 Ok(r) => r, 274 Err(e) => { 275 info!("UniswapV3Factory decoding error {:?}", e); 276 continue; 277 } 278 }; 279 (Version::V3, event.pool, event.token0, event.token1, event.fee.to::<u32>(), event.tickSpacing.as_i32()) 280 }, 281 t => { 282 info!("Counld not match topic {:?}", t); 283 continue; 284 } 285 }; 286 287 let block_number = match log.block_number { 288 Some(r) => r, 289 None => { 290 info!("log does not contain block_number"); 291 0u64 292 } 293 }; 294 295 let timestamp = if !timestamp_map.contains_key(&block_number) { 296 let block = match provider.get_block( 297 BlockId::from(block_number), 298 BlockTransactionsKind::default() 299 ).await { 300 Ok(r) => { 301 match r { 302 Some(v) => v, 303 None => { 304 info!("No block returned"); 305 continue; 306 } 307 } 308 }, 309 Err(e) => { 310 info!("Could not get block {:?}", e); 311 continue; 312 } 313 }; 314 let timestamp = block.header.timestamp; 315 timestamp 316 } else { 317 let timestamp = *timestamp_map.get(&block_number).unwrap(); 318 timestamp 319 }; 320 321 let pool_data = Pool { 322 id: -1, 323 address, 324 version, 325 token0, 326 token1, 327 fee, 328 block_number, 329 timestamp, 330 tickspacing 331 }; 332 333 pools.push(pool_data) 334 } 335 Ok(pools) 336 } 337 338 pub fn load_pools_from_file( 339 path: &Path, 340 ) -> Result<BTreeMap<Address, Pool>> { 341 let mut pools = BTreeMap::new(); 342 343 if path.exists() { 344 let mut reader = csv::Reader::from_path(path)?; 345 for row in reader.records() { 346 let row = row.unwrap(); 347 let pool = Pool::from(row); 348 pools.insert(pool.address, pool); 349 } 350 } else { 351 return Err(anyhow!("File path does not exist")); 352 } 353 354 Ok(pools) 355 }