分布式系统
Reading time: 7 min
Graph 是分布式系统实现的协议。
连接可能失败。 请求无序到达。 具有不同步时钟和状态的很多计算机在处理相关请求。 服务器重新启动。 重组发生在请求之间。以上这些问题是所有分布式系统所固有的,但在全球范围内运行的系统中会更加严重。
考虑这个例子,如果客户端在重组期间轮询索引人以获取最新数据,可能会发生什么。
- 索引人获取区块 8
- 区块 8 的数据传送给客户端
- 索引人获取区块 9
- 索引人获取区块 10A
- 区块 10A 的数据传送给客户端
- 索引人检测到区块重组到 10B 并回转到 10A
- 区块 9 的数据传送给客户端
- 索引人获取区块 10B
- 索引人获取区块 11
- 区块 11 的数据传送给客户端
从索引人的角度来看,事情进展得完全合乎逻辑。 时间不断前行,尽管我们确实必须回滚一个相邻区块,并在共识下处理该区块。 在此过程中,索引人使用它当时知道的最新状态为请求提供服务。
然而,从客户的角度来看,事情却显得很混乱。 客户端观察到响应依次是区块 8、10、9 和 11。 我们称之为“区块摆动”问题。 当客户端遇到区块摆动时,数据可能会随着时间的推移而自相矛盾。 当考虑到索引人不会同时获取最新区块,并且您的请求可能会被路由到多个索引人的时候,情况会变得更糟。
客户端和服务器有责任协同工作,以便向用户提供一致的数据。 根据所需的一致性目标的不同,我们必须使用不同的方法,因为没有一个适合所有问题的完美解决方案。
通过分布式系统的影响进行推理是困难的,但修复可能不是! 我们已经建立了一些 API 和模式来帮助您解决一些常见的用例。 以下示例说明了这些模式,但仍然省略了生产代码所需的细节(如错误处理和消除),以免混淆主要思想。
Graph 提供 block: { number_gte: $minBlock }
API,确保响应是针对等于或高于 $minBlock
的单个区块。 如果向 graph-node
实例发出请求并且最小区块尚未同步,则 graph-node
将返回错误。 如果 graph-node
已同步最小区块,它将返回最新区块的响应。 如果请求是发给 Edge & Node 网关的,网关将过滤掉任何尚未同步最小区块的索引人,并请求索引人已同步的最新区块。
我们可以使用 number_gte
, 从而确保在循环中轮询数据时,时间不会倒流。 这是一个例子:
/// Updates the protocol.paused variable to the latest/// known value in a loop by fetching it using The Graph.async function updateProtocolPaused() {// It's ok to start with minBlock at 0. The query will be served// using the latest block available. Setting minBlock to 0 is the// same as leaving out that argument.let minBlock = 0for (;;) {// Schedule a promise that will be ready once// the next Ethereum block will likely be available.const nextBlock = new Promise((f) => {setTimeout(f, 14000)})const query = `query GetProtocol($minBlock: Int!) {protocol(block: { number_gte: $minBlock } id: "0") {paused}_meta {block {number}}}`const variables = { minBlock }const response = await graphql(query, variables)minBlock = response._meta.block.number// TODO: Do something with the response data here instead of logging it.console.log(response.protocol.paused)// Sleep to wait for the next blockawait nextBlock}}
另一个用例是检索一个更大的集合,或者更一般地说,跨多个请求检索相关项目。 与轮询案例(所需的一致性是及时向前进行)不同,此用例所需的一致性是针对单个时间点的。
在这里,我们将使用 block: { hash: $blockHash }
参数将我们所有的结果锚定到同一个区块。
/// Gets a list of domain names from a single block using paginationasync function getDomainNames() {// Set a cap on the maximum number of items to pull.let pages = 5const perPage = 1000// The first query will get the first page of results and also get the block// hash so that the remainder of the queries are consistent with the first.const listDomainsQuery = `query ListDomains($perPage: Int!) {domains(first: $perPage) {nameid}_meta {block {hash}}}`let data = await graphql(listDomainsQuery, { perPage })let result = data.domains.map((d) => d.name)let blockHash = data._meta.block.hashlet query// Continue fetching additional pages until either we run into the limit of// 5 pages total (specified above) or we know we have reached the last page// because the page has fewer entities than a full page.while (data.domains.length == perPage && --pages) {let lastID = data.domains[data.domains.length - 1].idquery = `query ListDomains($perPage: Int!, $lastID: ID!, $blockHash: Bytes!) {domains(first: $perPage, where: { id_gt: $lastID }, block: { hash: $blockHash }) {nameid}}`data = await graphql(query, { perPage, lastID, blockHash })// Accumulate domain names into the resultfor (domain of data.domains) {result.push(domain.name)}}return result}
请注意,在重组的情况下,客户端将需要从第一个请求重试,以将区块hash更新为非相邻区块。