Web3的Filter使用入门介绍

说明

最近的项目涉及到对链上数据的监控。针对链上数据监控最有效的方式就是通过对event事件的监控来获取想要的信息。本篇文章就是借介绍通过web3py不同的监控event的方法。

监控必然会涉及到过滤,web3py通过filter实现。

介绍

Event的过滤用法是:web3.eth.Eth.filter().例如:

  • web3.eth.filter('pending') ,监控所有的pending交易
  • web3.eth.filter('latest'),监控最新块

监控特定合约上的event方法:

  1. event_filter = mycontract.events.myEvent.createFilter(fromBlock='latest', argument_filters={'arg1':10})
  2. event_filter = w3.eth.filter({"address": contract_address})

以上两种方式都是可以的。第一种方法是通过mycontract.events.myEvent.createFilter方式,第二种是通过w3.eth.filter

需要注意的是,事件过滤器需要以太坊节点的支持。Infrua节点就不支持pending的时间过滤器。

常见函数

Filter

classweb3.utils.filters.Filter(web3, filter_id)

  • Filter.filter_id,通过eth_newFilter rpc 调用返回的;
  • Filter.get_new_entries(),返回filter最新的过滤事件信息,内部是通过web3.eth.Eth.get_filter_changes()轮询的方式获取新的事件信息
  • Filter.get_all_entries(),返回filter所有的过滤事件信息,内部是通过web3.eth.Eth.get_filter_logs()返回所有的事件信息
  • Filter.format_entry(entry),修改返回的event事件格式
  • Filter.is_valid_entry(entry),对event事件进行过滤,默认情况下返回True

Block and Transaction Filter Classes

classweb3.utils.filters.BlockFilter(...)

BlockFilterFilter的子集。

通过web3.eth.filter('latest')创建一个获取最新块的filter。如下所示:

1
2
new_block_filter = w3.eth.filter('latest')
new_block_filter.get_new_entries()

classweb3.utils.filters.TransactionFilter(...)

TransactionFilter是Filter的子集。

通过web3.eth.filter('pending')创建一个TransactionFilter对象。用法如下:

1
2
new_transaction_filter = w3.eth.filter('pending')
new_transaction_filter.get_new_entries()

Event Log Filters

event时间过滤器是最常用的过滤器。比如我们可以使用如下的命令对特定的合约的特定event时间进行监控。

1
2
event_filter = myContract.events.<event_name>.createFilter(fromBlock="latest", argument_filters={'arg1':10})
event_filter.get_new_entries()

最常用的方法还是web3.eth.filter。通过提供不同的参数,我们就可以对不同的合约,不同的event事件,参数要求都可以进行设置,从而完成我们的监控需求。如下所示:

1
2
3
4
5
6
7
event_signature_hash = web3.keccak(text="Transfer(address, address, uint)").hex()
event_filter = web3.eth.filter({
"address": myContract_address,
"topics": [event_signature_hash,
"0x0000000000000000000000000000000000000000000000000000000000000000"],
})

这个表示就是对Transfer(address, address, uint)的event事件的监控,对应的合约是myContract_address。topic对应的就是监控的event。

topic的指,是有顺序关系的。第一个topic表示的是对应event的hash(计算方法就是web3.keccak),后面表示的就是event时间对应的参数。所以在本例中,需要监控的就是Transfer(address, address, uint)这个event事件,对应Transfer的地址就是地址0。

有关说明topic的使用方法,具体以下面的几个例子来作为说明:

对于一个交易hash,有[A,B]这样的topic,这个topic的含义是event的事件函数是A,事件函数的第一个参数值是B,那么下面这些都将可以命中。

  • [],表示可以匹配对应的contract中所有的topic。换句话说,就是可以监控到指定合约的所有的event事件
  • [A],表示匹配的topic是A,后面的参数都可以随意匹配。所以对于[A,B]这种情况,必然是可以匹配的
  • [None, B] ,表示匹配的是任意event事件,只要求第一个参数是B。所以对于[A,B]这种情况,也是可以匹配的
  • [A, B],完全命中
  • [[A,B],[A,C]]这种是存在两个匹配模式,两个匹配是OR的关系,只要其中有一个匹配了就算是满足要求。所以[A,B]满足了要求,自然就会命中了

有关eth filter更多的用法,可以参考官方文档eth_newfilter

通过web3.eth.filter的方式创建就可以获得一个LogFilter的实例对象。

用法展示

同步用法

通过w3.eth.filter('latest')的方式监控最新的快:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from web3.auto import w3
import time

def handle_event(event):
print(event)

def log_loop(event_filter, poll_interval):
while True:
for event in event_filter.get_new_entries():
handle_event(event)
time.sleep(poll_interval)

def main():
block_filter = w3.eth.filter('latest')
log_loop(block_filter, 2)

if __name__ == '__main__':
main()

通过while True配合for event in event_filter.get_new_entries()就可以持续性获得最新块信息。

异步用法

通过python中的asyncawait就可以实现异步的事件。下面就是使用asyncio进行异步调用的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from web3.auto import w3
import asyncio


def handle_event(event):
print(event)
# and whatever

async def log_loop(event_filter, poll_interval):
while True:
for event in event_filter.get_new_entries():
handle_event(event)
await asyncio.sleep(poll_interval)

def main():
block_filter = w3.eth.filter('latest')
tx_filter = w3.eth.filter('pending')
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(
asyncio.gather(
log_loop(block_filter, 2),
log_loop(tx_filter, 2)))
finally:
loop.close()

if __name__ == '__main__':
main()

除了使用 asyncio的方式实现异步之外,本例中还展示了另一种event监控的用法.asyncio.gather(log_loop(block_filter, 2), log_loop(tx_filter, 2))) 同时对两个filter进行监控(block_filtertx_filter). 如果我们有这种不同的event监控需求同时也无法u将其放在一个filter中实现时,就可以考虑这种方案.

多线程实现

除了同步和异步的方法之外,还可以使用多线程的方法实现.示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from web3.auto import w3
from threading import Thread
import time


def handle_event(event):
print(event)
# and whatever


def log_loop(event_filter, poll_interval):
while True:
for event in event_filter.get_new_entries():
handle_event(event)
time.sleep(poll_interval)


def main():
block_filter = w3.eth.filter('latest')
worker = Thread(target=log_loop, args=(block_filter, 5), daemon=True)
worker.start()
# .. do some other stuff

if __name__ == '__main__':
main()

多线程的实现是通过Thread(target=log_loop, args=(block_filter, 5), daemon=True)的方式实现,就是利用Python代码库中的自带的多线程库来实现的,并没有任何特殊的地方.

既然可以使用多线程的方法,那么多进程的方式是不是也一样可以呢?至于这个问题,就不在本篇文章讨论了.

总结

总的来说,event监控是目前链下对于链上监控最常见的方案.本例中对event的中filter概念,以及各种用法进行了展示,并且对于异步和多线程的实现也进行了展示.希望对大家有所帮助.

参考

https://web3py.readthedocs.io/en/latest/filters.html

文章作者: Oooverflow
文章链接: https://www.oooverflow.com/2022/09/20/web3-filter-usage/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Oooverflow