Thursday, August 13, 2015

Let's coding mainline dht.(5) Implements FindNodes

Implements FindNodes

(1) Add findnode function in RootingTable.


First. I add findnode function in rootingtable class.  This function can search close peer info from  target KID to sort KIDs .

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class KRootingTable {
  ...
  ...
  ...
  List<KPeerInfo> findNode(KId id) {
    List<KPeerInfo> ids = [];
    for (KBucket b in _kBuckets) {
      for (KPeerInfo i in b.iterable) {
        ids.add(i);
      }
    }
    ids.sort((KPeerInfo a, KPeerInfo b) {
      return a.id.xor(id).compareTo(b.id.xor(id));
    });
    List<KPeerInfo> ret = [];
     for (KPeerInfo p in ids) {
      ret.add(p);
       if (ret.length >= _kBucketSize) {
         return ret;
      }
    }
    return ret;
  }
  ...
  ...
}

(1) KNode have UDP server function

Knode use udp socket to communication between node and node. and use bencode. This term add binding to use udp server and parse message.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class KNode {
  bool _isStart = false;
  bool get isStart => _isStart;
  HetiSocketBuilder _socketBuilder = null;
  HetiUdpSocket _udpSocket = null;

  KNode(HetiSocketBuilder socketBuilder) {
    this._socketBuilder = socketBuilder;
  }
 
  Future start({String ip: "0.0.0.0", int port: 28080}) async {
    (_isStart != false ? throw "already started" : 0);
    _udpSocket = this._socketBuilder.createUdpClient();
    return _udpSocket.bind(ip, port, multicast: true).then((int v) {
      _udpSocket.onReceive().listen((HetiReceiveUdpInfo info) {
        KrpcMessage.decode(info.data, this).then((KrpcMessage message) {
          onReceiveMessage(info, message);
        });
      });
      _isStart = true;
    });
  }

  Future stop() async {
    if (_isStart == false || _udpSocket == null) {
      return null;
    }
    return _udpSocket.close().whenComplete(() {
      _isStart = false;
      _ai.stop(this);
    });
  }
}

(2) KNode have Krpc Message parse function.


MainLine DHT use KRPC protocol with bencode. this term create to prase krpc message . but We had already create bencod barser. It is easy.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class KrpcMessage {
  KrpcMessage.fromMap(Map map) {
    _messageAsMap = map;
  }

  static Future<KrpcMessage> decode(List<int> data) async {
    Map<String, Object> messageAsMap = null;
    try {
      Object v = Bencode.decode(data);
      messageAsMap = v;
    } catch (e) {
      throw {};
    }
    return  new KrpcMessage.fromMap(messageAsMap);
  }
}


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class KrpcMessage {
...
...
  List<int> get transactionId => _messageAsMap["t"]);
  String get transactionIdAsString => UTF8.decode(transactionId);

  //
  List<int> get messageType => _messageAsMap["y"];
  String get messageTypeAsString => UTF8.decode(messageType);

  //
  List<int> get query => _messageAsMap["q"];
...
...
}
````


(3) KNode have seding message function


```

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class KNode {
 ..
 ..
  sendMessage(KrpcMessage message, String ip, int port) {
      return _udpSocket.send(message.messageAsBencode, ip, port);
  }
 ..
}

class FindNode {

  static int queryID = 0;

  static KrpcMessage createQuery(List<int> queryingNodesId, List<int> targetNodeId) {
    List<int> transactionId = UTF8.encode("fi${queryID++}");
    return new KrpcMessage.fromMap({"a": {"id": queryingNodesId, "target": targetNodeId}, "q": "find_node", "t": transactionId, "y": "q"});
  }

  static KrpcMessage createResponse(List<int> compactNodeInfo, List<int> queryingNodesId, List<int> transactionId) {
    return new KrpcMessage.fromMap({"r": {"id": queryingNodesId, "nodes": compactNodeInfo}, "t": transactionId, "y": "r"});
  }
}
```

When send message, KNode use binded UDPSocket. Received message node can know port and ip  to use  socket to receive message.

(4) KNode have join network function


KNode send findnode query that k-number close peer in rootingTable.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class KNodeWorkFindNode {
  ...
  ...
  updateP2PNetworkWithoutClear(KNode node) {
    node.rootingtable.findNode(node.nodeId).then((List<KPeerInfo> infos) {
      for (KPeerInfo info in infos) {
        if (!_findNodesInfo.rawsequential.contains(info)) {
          _findNodesInfo.addLast(info);
          node.sendFindNodeQuery(info.ipAsString, info.port, node.nodeId.value).catchError((_) {});
        }
      }
    });
  }
  ...
  ...
}

when receive response, then KNode send findnode query again.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class KNodeWorkFindNode {
  ...
  ...
  onReceiveQuery(KNode node, HetiReceiveUdpInfo info, KrpcMessage query) {
    if (query.queryAsString == KrpcMessage.QUERY_FIND_NODE) {
      KrpcFindNode findNode = query.toFindNode();
      return node.rootingtable.findNode(findNode.targetAsKId).then((List<KPeerInfo> infos) {
        return node.sendFindNodeResponse(info.remoteAddress, info.remotePort, query.transactionId, KPeerInfo.toCompactNodeInfos(infos)).catchError((_) {});
      });
    }
    node.rootingtable.update(new KPeerInfo(info.remoteAddress, info.remotePort, query.nodeIdAsKId));
    updateP2PNetworkWithoutClear(node);
  }
  ...
  ..
}

and If you were a certain period of time, to access it again


1
2
3
4
5
6
7
8
class KNodeWorkFindNode {
  ...
  ...

  onTicket(KNode node) {
    _findNodesInfo.clear();
    updateP2PNetworkWithoutClear(node);
  }


(5) When receive FindeNode query, KNode send findnode response


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class KNodeWorkFindNode {
  ...
  ...
  onReceiveResponse(KNode node, HetiReceiveUdpInfo info, KrpcMessage response) {
    if (response.queryFromTransactionId == KrpcMessage.QUERY_FIND_NODE) {
      KrpcFindNode findNode = response.toFindNode();
      for (KPeerInfo info in findNode.compactNodeInfoAsKPeerInfo) {
        node.rootingtable.update(info);
      }
    }
    node.rootingtable.update(new KPeerInfo(info.remoteAddress, info.remotePort, response.nodeIdAsKId));
    updateP2PNetworkWithoutClear(node);
  }
  ..
}


Next

About GetPeers

Ref

http://www.bittorrent.org/beps/bep_0005.html
http://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf

PS

- GitBook Nazenani Torrent (for japanese)
https://www.gitbook.com/book/kyorohiro/doc_hetimatorrent/details

-source
  https://github.com/kyorohiro/dart_hetimatorrent
 https://github.com/kyorohiro/dart_hetimatorrent/tree/master/example/TorrentDHT

-------
Kyorohiro work



No comments:

Post a Comment