/* * lftp and utils * * Copyright (c) 2009-2010 by Alexander V. Lukyanov (lav@yars.free.net) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ /* $Id: Torrent.h,v 1.21 2010/06/09 12:42:26 lav Exp $ */ #ifndef TORRENT_H #define TORRENT_H #include "FileAccess.h" #include "Bencode.h" #include "Error.h" #include "ProtoLog.h" #include "network.h" #include "RateLimit.h" class BitField : public xarray { int bit_length; public: BitField() { bit_length=0; } BitField(int bits); bool valid_index(int i) const { return i>=0 && i downloader; // which peers download the blocks TorrentPiece(unsigned b) : sources_count(0), block_map(b) { downloader.allocate(b,0); } bool has_a_downloader() const; }; class TorrentListener; class FDCache; class TorrentBlackList; class Torrent : public SMTask, protected ProtoLog, public ResClient { friend class TorrentPeer; bool started; bool shutting_down; bool complete; bool end_game; bool validating; bool force_valid; unsigned validate_index; Ref invalid_cause; static const unsigned PEER_ID_LEN = 20; static xstring my_peer_id; static xstring my_key; static Ref listener; static Ref fd_cache; static Ref black_list; xstring_c metainfo_url; FileAccessRef metainfo_fa; SMTaskRef metainfo_data; Ref metainfo_tree; BeNode *info; xstring info_hash; const xstring *pieces; const xstring *name; Ref recv_translate; void InitTranslation(); void TranslateString(BeNode *node) const; xstring tracker_url; FileAccessRef t_session; Timer tracker_timer; SMTaskRef tracker_reply; xstring tracker_id; bool single_file; unsigned piece_length; unsigned last_piece_length; unsigned long long total_length; unsigned total_pieces; Ref my_bitfield; unsigned long long total_left; unsigned complete_pieces; static const unsigned BLOCK_SIZE = 0x4000; unsigned long long total_recv; unsigned long long total_sent; void SetError(Error *); void SetError(const char *); BeNode *Lookup(xmap_p& d,const char *name,BeNode::be_type_t type); BeNode *Lookup(BeNode *d,const char *name,BeNode::be_type_t type) { return Lookup(d->dict,name,type); } BeNode *Lookup(Ref& d,const char *name,BeNode::be_type_t type) { return Lookup(d->dict,name,type); } void SendTrackerRequest(const char *event); TaskRefArray peers; RefArray piece_info; static int PeersCompareActivity(const SMTaskRef *p1,const SMTaskRef *p2); static int PeersCompareRecvRate(const SMTaskRef *p1,const SMTaskRef *p2); static int PeersCompareSendRate(const SMTaskRef *p1,const SMTaskRef *p2); Timer pieces_needed_rebuild_timer; xarray pieces_needed; static int PiecesNeededCmp(const unsigned *a,const unsigned *b); unsigned last_piece; void SetPieceNotWanted(unsigned piece); void SetDownloader(unsigned piece,unsigned block,const TorrentPeer *o,const TorrentPeer *n); xstring_c cwd; xstring_c output_dir; const char *FindFileByPosition(unsigned piece,unsigned begin,off_t *f_pos,off_t *f_tail) const; const char *MakePath(BeNode *p) const; int OpenFile(const char *f,int m); void StoreBlock(unsigned piece,unsigned begin,unsigned len,const char *buf,TorrentPeer *src_peer); const xstring& RetrieveBlock(unsigned piece,unsigned begin,unsigned len); Speedometer recv_rate; Speedometer send_rate; RateLimit rate_limit; bool RateLow(RateLimit::dir_t dir) { return rate_limit.Relaxed(dir); } int active_peers_count; int complete_peers_count; int am_interested_peers_count; int am_not_choking_peers_count; int max_peers; int seed_min_peers; bool SeededEnough() const; float stop_on_ratio; Timer seed_timer; Timer decline_timer; Timer optimistic_unchoke_timer; Timer peers_scan_timer; Timer am_interested_timer; static const int max_uploaders = 20; static const int min_uploaders = 1; static const int max_downloaders = 20; static const int min_downloaders = 4; bool NeedMoreUploaders(); bool AllowMoreDownloaders(); void UnchokeBestUploaders(); void ScanPeers(); void OptimisticUnchoke(); void ReducePeers(); void ReduceUploaders(); void ReduceDownloaders(); int PeerBytesAllowed(const TorrentPeer *peer,RateLimit::dir_t dir); void PeerBytesUsed(int b,RateLimit::dir_t dir); void PeerBytesGot(int b) { PeerBytesUsed(b,RateLimit::GET); } static void BlackListPeer(const TorrentPeer *peer,const char *timeout); public: Torrent(const char *mf,const char *cwd,const char *output_dir); int Do(); int Done(); const char *Status(); const Error *GetInvalidCause() const { return invalid_cause; } void Shutdown(); bool ShuttingDown() { return shutting_down; } void PrepareToDie(); static const Ref& GetListener() { return listener; } void Accept(int s,const sockaddr_u *a,IOBuffer *rb); static void SHA1(const xstring& str,xstring& buf); void ValidatePiece(unsigned p); unsigned PieceLength(unsigned p) const { return p==total_pieces-1 ? last_piece_length : piece_length; } unsigned BlocksInPiece(unsigned p) const { return (PieceLength(p)+BLOCK_SIZE-1)/BLOCK_SIZE; } const TaskRefArray& GetPeers() const { return peers; } void AddPeer(TorrentPeer *); const xstring& GetInfoHash() const { return info_hash; } int GetPeersCount() const { return peers.count(); } int GetActivePeersCount() const { return active_peers_count; } int GetCompletePeersCount() const { return complete_peers_count; } bool Complete() const { return complete; } double GetRatio() const; unsigned long long TotalLength() const { return total_length; } unsigned PieceLength() const { return piece_length; } const char *GetName() const { return name?name->get():0; } const char *TrackerTimerTimeLeft() { return tracker_timer.TimeLeft().toString( TimeInterval::TO_STR_TRANSLATE|TimeInterval::TO_STR_TERSE); } void Reconfig(const char *name); const char *GetLogContext() { return GetName(); } void ForceValid() { force_valid=true; } }; class FDCache : public SMTask, public ResClient { struct FD { int fd; int saved_errno; time_t last_used; }; int max_count; int max_time; xmap cache[3]; Timer clean_timer; public: int OpenFile(const char *name,int mode); void Close(const char *name); int Count() const; void Clean(); bool CloseOne(); void CloseAll(); FDCache(); ~FDCache(); int Do(); }; class TorrentPeer : public SMTask, protected ProtoLog, public Networker { friend class Torrent; Ref error; Torrent *parent; sockaddr_u addr; int sock; bool connected; bool passive; Timer timeout_timer; Timer retry_timer; Timer keepalive_timer; Timer choke_timer; Timer interest_timer; Timer activity_timer; Ref recv_buf; Ref send_buf; unsigned long long peer_recv; unsigned long long peer_sent; Speedometer peer_recv_rate; Speedometer peer_send_rate; xstring peer_id; bool myself; bool am_choking; bool am_interested; bool peer_choking; bool peer_interested; Ref peer_bitfield; unsigned peer_complete_pieces; enum packet_type { MSG_KEEPALIVE=-1, MSG_CHOKE=0, MSG_UNCHOKE=1, MSG_INTERESTED=2, MSG_UNINTERESTED=3, MSG_HAVE=4, MSG_BITFIELD=5, MSG_REQUEST=6, MSG_PIECE=7, MSG_CANCEL=8, MSG_PORT=9 }; public: enum unpack_status_t { UNPACK_SUCCESS=0, UNPACK_WRONG_FORMAT=-1, UNPACK_PREMATURE_EOF=-2, UNPACK_NO_DATA_YET=1 }; class Packet { static bool is_valid_reply(int p) { return p>=0 && p<=MSG_PORT; } protected: int length; int unpacked; packet_type type; public: Packet(packet_type t); Packet() { length=0; } virtual void ComputeLength() { length=(type>=0); } virtual void Pack(Ref& b); virtual unpack_status_t Unpack(const Buffer *b); virtual ~Packet() {} int GetLength() const { return length; } packet_type GetPacketType() const { return type; } const char *GetPacketTypeText() const; void DropData(Ref& b) { b->Skip(4+length); } bool TypeIs(packet_type t) const { return type==t; } }; class PacketHave : public Packet { public: unsigned piece; PacketHave(unsigned p=0) : Packet(MSG_HAVE), piece(p) { length+=4; } unpack_status_t Unpack(const Buffer *b) { unpack_status_t res; res=Packet::Unpack(b); if(res!=UNPACK_SUCCESS) return res; piece=b->UnpackUINT32BE(unpacked); unpacked+=4; return UNPACK_SUCCESS; } void ComputeLength() { Packet::ComputeLength(); length+=4; } void Pack(Ref& b) { Packet::Pack(b); b->PackUINT32BE(piece); } }; class PacketBitField : public Packet { public: Ref bitfield; PacketBitField() : Packet(MSG_BITFIELD) {} PacketBitField(const BitField *bf); ~PacketBitField(); unpack_status_t Unpack(const Buffer *b); void ComputeLength(); void Pack(Ref& b); }; class PacketRequest : public Packet { public: Timer expire; unsigned index,begin,req_length; PacketRequest(unsigned i=0,unsigned b=0,unsigned l=0); unpack_status_t Unpack(const Buffer *b); void ComputeLength(); void Pack(Ref& b); }; class PacketCancel : public PacketRequest { public: PacketCancel(unsigned i=0,unsigned b=0,unsigned l=0) : PacketRequest(i,b,l) { type=MSG_CANCEL; } }; class PacketPiece : public Packet { public: unsigned index,begin; xstring data; PacketPiece() : Packet(MSG_PIECE), index(0), begin(0) {} PacketPiece(unsigned i,unsigned b,const xstring &s) : Packet(MSG_PIECE), index(i), begin(b) { data.set(s); length+=8+data.length(); } unpack_status_t Unpack(const Buffer *b) { unpack_status_t res; res=Packet::Unpack(b); if(res!=UNPACK_SUCCESS) return res; index=b->UnpackUINT32BE(unpacked);unpacked+=4; begin=b->UnpackUINT32BE(unpacked);unpacked+=4; unsigned bytes=length+4-unpacked; data.nset(b->Get()+unpacked,bytes); unpacked+=bytes; return UNPACK_SUCCESS; } void ComputeLength() { Packet::ComputeLength(); length+=8+data.length(); } void Pack(Ref& b) { Packet::Pack(b); b->PackUINT32BE(index); b->PackUINT32BE(begin); b->Put(data); } }; class PacketPort : public Packet { public: unsigned port; PacketPort(unsigned p=0) : Packet(MSG_PORT), port(p) { length+=2; } unpack_status_t Unpack(const Buffer *b) { unpack_status_t res; res=Packet::Unpack(b); if(res!=UNPACK_SUCCESS) return res; port=b->UnpackUINT16BE(unpacked); unpacked+=2; return UNPACK_SUCCESS; } void ComputeLength() { Packet::ComputeLength(); length+=2; } void Pack(Ref& b) { Packet::Pack(b); b->PackUINT16BE(port); } }; private: unpack_status_t UnpackPacket(Ref& ,Packet **); void HandlePacket(Packet *); static const int MAX_QUEUE_LEN = 16; RefQueue recv_queue; RefQueue sent_queue; unsigned last_piece; static const unsigned NO_PIECE = ~0U; void SetLastPiece(unsigned p); unsigned GetLastPiece() const; bool HasNeededPieces(); void SetPieceHaving(unsigned p,bool have); void SetAmInterested(bool); void SetAmChoking(bool); void ClearSentQueue(int i); void ClearSentQueue() { ClearSentQueue(sent_queue.count()-1); } int FindRequest(unsigned piece,unsigned begin) const; void SetError(const char *); void SendHandshake(); unpack_status_t RecvHandshake(); void Disconnect(); int SendDataRequests(unsigned p); void SendDataRequests(); void Have(unsigned p); void SendDataReply(); void CancelBlock(unsigned p,unsigned b); void MarkPieceInvalid(unsigned p); unsigned invalid_piece_count; int peer_bytes_pool[2]; int BytesAllowed(RateLimit::dir_t dir); bool BytesAllowed(RateLimit::dir_t dir,unsigned bytes); bool BytesAllowedToGet(unsigned b) { return BytesAllowed(RateLimit::GET,b); } bool BytesAllowedToPut(unsigned b) { return BytesAllowed(RateLimit::PUT,b); } void BytesUsed(int bytes,RateLimit::dir_t dir); void BytesGot(int b) { BytesUsed(b,RateLimit::GET); } void BytesPut(int b) { BytesUsed(b,RateLimit::PUT); } public: int Do(); TorrentPeer(Torrent *p,const sockaddr_u *a); ~TorrentPeer(); void PrepareToDie(); void Connect(int s,IOBuffer *rb); bool Failed() const { return error!=0; } const char *ErrorText() const { return error->Text(); } const char *GetName() const; const char *GetLogContext() { return GetName(); } bool ActivityTimedOut() const { return activity_timer.Stopped(); } bool NotConnected() const { return sock==-1; } bool Disconnected() const { return passive && NotConnected(); } bool Connected() const { return peer_id && send_buf && recv_buf; } bool Active() const { return Connected() && (am_interested || peer_interested); } bool Complete() const { return peer_complete_pieces==parent->total_pieces; } bool AddressEq(const TorrentPeer *o) const; bool IsPassive() const { return passive; } const sockaddr_u& GetAddress() const { return addr; } const char *Status(); }; class TorrentBlackList { xmap bl; void check_expire(); public: bool Listed(const sockaddr_u &a); void Add(const sockaddr_u &a,const char *t="1h"); }; class TorrentDispatcher : public SMTask, protected ProtoLog { int sock; const sockaddr_u addr; Ref recv_buf; Timer timeout_timer; public: TorrentDispatcher(int s,const sockaddr_u *a); ~TorrentDispatcher(); int Do(); }; class TorrentListener : public SMTask, protected ProtoLog, protected Networker { Ref error; int sock; sockaddr_u addr; xmap torrents; Speedometer rate; public: TorrentListener(); ~TorrentListener(); int Do(); Torrent *FindTorrent(const xstring& info_hash) const { return torrents.lookup(info_hash); } void AddTorrent(Torrent *); void RemoveTorrent(Torrent *); int GetPort() const { return addr.port(); } int GetTorrentsCount() const { return torrents.count(); } void Dispatch(const xstring& info_hash,int s,const sockaddr_u *remote_addr,IOBuffer *recv_buf); Torrent *Lookup(const xstring& info_hash) { return torrents.lookup(info_hash); } }; #include "Job.h" class TorrentJob : public Job { SMTaskRef torrent; bool completed; bool done; public: TorrentJob(Torrent *); ~TorrentJob(); int Do(); int Done() { return done; } void PrintStatus(int v,const char *tab); void ShowRunStatus(const SMTaskRef& s); int AcceptSig(int); void PrepareToDie(); }; #endif//TORRENT_H