ccRTP
iqueue.h
Go to the documentation of this file.
1 // Copyright (C) 2001,2002,2004 Federico Montesino Pouzols <fedemp@altern.org>.
2 //
3 // This program is free software; you can redistribute it and/or modify
4 // it under the terms of the GNU General Public License as published by
5 // the Free Software Foundation; either version 2 of the License, or
6 // (at your option) any later version.
7 //
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 // GNU General Public License for more details.
12 //
13 // You should have received a copy of the GNU General Public License
14 // along with this program; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16 //
17 // As a special exception, you may use this file as part of a free software
18 // library without restriction. Specifically, if other files instantiate
19 // templates or use macros or inline functions from this file, or you compile
20 // this file and link it with other files to produce an executable, this
21 // file does not by itself cause the resulting executable to be covered by
22 // the GNU General Public License. This exception does not however
23 // invalidate any other reasons why the executable file might be covered by
24 // the GNU General Public License.
25 //
26 // This exception applies only to the code released under the name GNU
27 // ccRTP. If you copy code from other releases into a copy of GNU
28 // ccRTP, as the General Public License permits, the exception does
29 // not apply to the code that you add in this way. To avoid misleading
30 // anyone as to the status of such modified files, you must delete
31 // this exception notice from them.
32 //
33 // If you write modifications of your own for GNU ccRTP, it is your choice
34 // whether to permit this exception to apply to your modifications.
35 // If you do not wish that, delete this exception notice.
36 //
37 
44 #ifndef CCXX_RTP_IQUEUE_H_
45 #define CCXX_RTP_IQUEUE_H_
46 
47 #include <ccrtp/queuebase.h>
48 #include <ccrtp/CryptoContext.h>
49 
50 #include <list>
51 
52 #ifdef CCXX_NAMESPACES
53 namespace ost {
54 #endif
55 
70 class __EXPORT Members
71 {
72 public:
73  inline void
74  setMembersCount(uint32 n)
75  { members = n; }
76 
77  inline void
78  increaseMembersCount()
79  { members++; }
80 
81  inline void
82  decreaseMembersCount()
83  { members--; }
84 
85  inline uint32
86  getMembersCount() const
87  { return members; }
88 
89  inline void
90  setSendersCount(uint32 n)
91  { activeSenders = n; }
92 
93  inline void
94  increaseSendersCount()
95  { activeSenders++; }
96 
97  inline void
98  decreaseSendersCount()
99  { activeSenders--; }
100 
101  inline uint32
102  getSendersCount() const
103  { return activeSenders; }
104 
105 protected:
107  members(0),
108  activeSenders(0)
109  { }
110 
111  inline virtual ~Members()
112  { }
113 
114 private:
116  uint32 members;
118  uint32 activeSenders;
119 };
120 
127 class __EXPORT SyncSourceHandler
128 {
129 public:
136  inline void*
137  getLink(const SyncSource& source) const
138  { return source.getLink(); }
139 
140  inline void
141  setLink(SyncSource& source, void* link)
142  { source.setLink(link); }
143 
144  inline void
145  setParticipant(SyncSource& source, Participant& p)
146  { source.setParticipant(p); }
147 
148  inline void
149  setState(SyncSource& source, SyncSource::State ns)
150  { source.setState(ns); }
151 
152  inline void
153  setSender(SyncSource& source, bool active)
154  { source.setSender(active); }
155 
156  inline void
157  setDataTransportPort(SyncSource& source, tpport_t p)
158  { source.setDataTransportPort(p); }
159 
160  inline void
161  setControlTransportPort(SyncSource& source, tpport_t p)
162  { source.setControlTransportPort(p); }
163 
164  inline void
165  setNetworkAddress(SyncSource& source, InetAddress addr)
166  { source.setNetworkAddress(addr); }
167 
168 protected:
170  { }
171 
172  inline virtual ~SyncSourceHandler()
173  { }
174 };
175 
182 class __EXPORT ParticipantHandler
183 {
184 public:
185  inline void
186  setSDESItem(Participant* part, SDESItemType item,
187  const std::string& val)
188  { part->setSDESItem(item,val); }
189 
190  inline void
191  setPRIVPrefix(Participant* part, const std::string val)
192  { part->setPRIVPrefix(val); }
193 
194 protected:
196  { }
197 
198  inline virtual ~ParticipantHandler()
199  { }
200 };
201 
208 class __EXPORT ApplicationHandler
209 {
210 public:
211  inline void
212  addParticipant(RTPApplication& app, Participant& part)
213  { app.addParticipant(part); }
214 
215  inline void
216  removeParticipant(RTPApplication& app,
217  RTPApplication::ParticipantLink* pl)
218  { app.removeParticipant(pl); }
219 
220 protected:
222  { }
223 
224  inline virtual ~ApplicationHandler()
225  { }
226 };
227 
235 class __EXPORT ConflictHandler
236 {
237 public:
239  {
240  ConflictingTransportAddress(InetAddress na,
241  tpport_t dtp, tpport_t ctp);
242 
243  void setNext(ConflictingTransportAddress* nc)
244  { next = nc; }
245 
246  inline const InetAddress& getNetworkAddress( ) const
247  { return networkAddress; }
248 
249  inline tpport_t getDataTransportPort() const
250  { return dataTransportPort; }
251 
252  inline tpport_t getControlTransportPort() const
253  { return controlTransportPort; }
254 
255  InetAddress networkAddress;
259  // arrival time of last data or control packet.
260  timeval lastPacketTime;
261  };
262 
267  ConflictingTransportAddress* searchDataConflict(InetAddress na,
268  tpport_t dtp);
273  ConflictingTransportAddress* searchControlConflict(InetAddress na,
274  tpport_t ctp);
275 
276  void updateConflict(ConflictingTransportAddress& ca)
277  { gettimeofday(&(ca.lastPacketTime),NULL); }
278 
279  void addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp);
280 
281 protected:
283  { firstConflict = lastConflict = NULL; }
284 
285  inline virtual ~ConflictHandler()
286  { }
287 
289 };
290 
301 class __EXPORT MembershipBookkeeping :
302  public SyncSourceHandler,
303  public ParticipantHandler,
304  public ApplicationHandler,
305  public ConflictHandler,
306  private Members
307 {
308 public:
309  inline size_t getDefaultMembersHashSize()
310  { return defaultMembersHashSize; }
311 
312 protected:
313 
327  MembershipBookkeeping(uint32 initialSize = defaultMembersHashSize);
328 
333  inline virtual
335  { endMembers(); }
336 
337  struct SyncSourceLink;
338 
339  inline SyncSourceLink* getLink(const SyncSource& source) const
340  { return static_cast<SyncSourceLink*>(SyncSourceHandler::getLink(source)); }
345  inline bool isMine(const SyncSource& source) const
346  { return getLink(source)->getMembership() == this; }
347 
355  {
357  struct timeval& recv_ts,
358  uint32 shifted_ts,
359  IncomingRTPPktLink* sp,
360  IncomingRTPPktLink* sn,
362  IncomingRTPPktLink* n) :
363  packet(pkt),
364  sourceLink(sLink),
365  prev(p), next(n),
366  srcPrev(sp), srcNext(sn),
367  receptionTime(recv_ts),
368  shiftedTimestamp(shifted_ts)
369  { }
370 
372  { }
373 
374  inline SyncSourceLink* getSourceLink() const
375  { return sourceLink; }
376 
377  inline void setSourceLink(SyncSourceLink* src)
378  { sourceLink = src; }
379 
380  inline IncomingRTPPktLink* getNext() const
381  { return next; }
382 
383  inline void setNext(IncomingRTPPktLink* nl)
384  { next = nl; }
385 
386  inline IncomingRTPPktLink* getPrev() const
387  { return prev; }
388 
389  inline void setPrev(IncomingRTPPktLink* pl)
390  { prev = pl; }
391 
392  inline IncomingRTPPktLink* getSrcNext() const
393  { return srcNext; }
394 
395  inline void setSrcNext(IncomingRTPPktLink* sn)
396  { srcNext = sn; }
397 
398  inline IncomingRTPPktLink* getSrcPrev() const
399  { return srcPrev; }
400 
401  inline void setSrcPrev(IncomingRTPPktLink* sp)
402  { srcPrev = sp; }
403 
404  inline IncomingRTPPkt* getPacket() const
405  { return packet; }
406 
407  inline void setPacket(IncomingRTPPkt* pkt)
408  { packet = pkt; }
409 
417  inline void setRecvTime(const timeval &t)
418  { receptionTime = t; }
419 
423  inline timeval getRecvTime() const
424  { return receptionTime; }
425 
434  inline uint32 getTimestamp() const
435  { return shiftedTimestamp; }
436 
437  inline void setTimestamp(uint32 ts)
438  { shiftedTimestamp = ts;}
439 
440  // the packet this link refers to.
442  // the synchronization source this packet comes from.
444  // global incoming packet queue links.
446  // source specific incoming packet queue links.
448  // time this packet was received at
449  struct timeval receptionTime;
450  // timestamp of the packet in host order and after
451  // substracting the initial timestamp for its source
452  // (it is an increment from the initial timestamp).
454  };
455 
473  {
474  // 2^16
475  static const uint32 SEQNUMMOD;
476 
478  SyncSource* s,
479  IncomingRTPPktLink* fp = NULL,
480  IncomingRTPPktLink* lp = NULL,
481  SyncSourceLink* ps = NULL,
482  SyncSourceLink* ns = NULL,
483  SyncSourceLink* ncollis = NULL) :
484  membership(m), source(s), first(fp), last(lp),
485  prev(ps), next(ns), nextCollis(ncollis),
486  prevConflict(NULL)
487  { m->setLink(*s,this); // record that the source is associated
488  initStats(); // to this link.
489  }
490 
494  ~SyncSourceLink();
495 
496  inline MembershipBookkeeping* getMembership()
497  { return membership; }
498 
503  inline SyncSource* getSource() { return source; }
504 
509  inline IncomingRTPPktLink* getFirst()
510  { return first; }
511 
512  inline void setFirst(IncomingRTPPktLink* fp)
513  { first = fp; }
514 
519  inline IncomingRTPPktLink* getLast()
520  { return last; }
521 
522  inline void setLast(IncomingRTPPktLink* lp)
523  { last = lp; }
524 
528  inline SyncSourceLink* getPrev()
529  { return prev; }
530 
531  inline void setPrev(SyncSourceLink* ps)
532  { prev = ps; }
533 
537  inline SyncSourceLink* getNext()
538  { return next; }
539 
540  inline void setNext(SyncSourceLink *ns)
541  { next = ns; }
542 
549  inline SyncSourceLink* getNextCollis()
550  { return nextCollis; }
551 
552  inline void setNextCollis(SyncSourceLink* ns)
553  { nextCollis = ns; }
554 
555  inline ConflictingTransportAddress* getPrevConflict() const
556  { return prevConflict; }
557 
561  void setPrevConflict(InetAddress& addr, tpport_t dataPort,
562  tpport_t controlPort);
563 
564  unsigned char* getSenderInfo()
565  { return senderInfo; }
566 
567  void setSenderInfo(unsigned char* si);
568 
569  unsigned char* getReceiverInfo()
570  { return receiverInfo; }
571 
572  void setReceiverInfo(unsigned char* ri);
573 
574  inline timeval getLastPacketTime() const
575  { return lastPacketTime; }
576 
577  inline timeval getLastRTCPPacketTime() const
578  { return lastRTCPPacketTime; }
579 
580  inline timeval getLastRTCPSRTime() const
581  { return lastRTCPSRTime; }
582 
587  inline uint32 getObservedPacketCount() const
588  { return obsPacketCount; }
589 
590  inline void incObservedPacketCount()
591  { obsPacketCount++; }
592 
597  inline uint32 getObservedOctetCount() const
598  { return obsOctetCount; }
599 
600  inline void incObservedOctetCount(uint32 n)
601  { obsOctetCount += n; }
602 
606  uint16
607  getMaxSeqNum() const
608  { return maxSeqNum; }
609 
614  void
615  setMaxSeqNum(uint16 max)
616  { maxSeqNum = max; }
617 
618  inline uint32
619  getExtendedMaxSeqNum() const
620  { return extendedMaxSeqNum; }
621 
622  inline void
623  setExtendedMaxSeqNum(uint32 seq)
624  { extendedMaxSeqNum = seq; }
625 
626  inline uint32 getCumulativePacketLost() const
627  { return cumulativePacketLost; }
628 
629  inline void setCumulativePacketLost(uint32 pl)
630  { cumulativePacketLost = pl; }
631 
632  inline uint8 getFractionLost() const
633  { return fractionLost; }
634 
635  inline void setFractionLost(uint8 fl)
636  { fractionLost = fl; }
637 
638  inline uint32 getLastPacketTransitTime()
639  { return lastPacketTransitTime; }
640 
641  inline void setLastPacketTransitTime(uint32 time)
642  { lastPacketTransitTime = time; }
643 
644  inline float getJitter() const
645  { return jitter; }
646 
647  inline void setJitter(float j)
648  { jitter = j; }
649 
650  inline uint32 getInitialDataTimestamp() const
651  { return initialDataTimestamp; }
652 
653  inline void setInitialDataTimestamp(uint32 ts)
654  { initialDataTimestamp = ts; }
655 
656  inline timeval getInitialDataTime() const
657  { return initialDataTime; }
658 
659  inline void setInitialDataTime(timeval it)
660  { initialDataTime = it; }
661 
669  bool getGoodbye()
670  {
671  if(!flag)
672  return false;
673  flag = false;
674  return true;
675  }
676 
683  bool getHello() {
684  if(flag)
685  return false;
686  flag = true;
687  return true;
688  }
689 
690  inline uint32 getBadSeqNum() const
691  { return badSeqNum; }
692 
693  inline void setBadSeqNum(uint32 seq)
694  { badSeqNum = seq; }
695 
696  uint8 getProbation() const
697  { return probation; }
698 
699  inline void setProbation(uint8 p)
700  { probation = p; }
701 
702  inline void decProbation()
703  { --probation; }
704 
705  bool isValid() const
706  { return 0 == probation; }
707 
708  inline uint16 getBaseSeqNum() const
709  { return baseSeqNum; }
710 
711  inline uint32 getSeqNumAccum() const
712  { return seqNumAccum; }
713 
714  inline void incSeqNumAccum()
715  { seqNumAccum += SEQNUMMOD; }
716 
720  inline void initSequence(uint16 seqnum)
721  { maxSeqNum = seqNumAccum = seqnum; }
722 
733  void recordInsertion(const IncomingRTPPktLink& pl);
734 
735  void initStats();
736 
741  void computeStats();
742 
744  // The source this link object refers to.
746  // first/last packets from this source in the queue.
748  // Links for synchronization sources located before
749  // and after this one in the list of sources.
751  // Prev and next inside the hash table collision list.
753  ConflictingTransportAddress* prevConflict;
754  unsigned char* senderInfo;
755  unsigned char* receiverInfo;
756  // time the last RTP packet from this source was
757  // received at.
758  timeval lastPacketTime;
759  // time the last RTCP packet was received.
761  // time the lasrt RTCP SR was received. Required for
762  // DLSR computation.
763  timeval lastRTCPSRTime;
764 
765  // for outgoing RR reports.
766  // number of packets received from this source.
768  // number of octets received from this source.
770  // the higher sequence number seen from this source
771  uint16 maxSeqNum;
775  // for interarrivel jitter computation
777  // interarrival jitter of packets from this source.
778  float jitter;
781 
782  // this flag assures we only call one gotHello and one
783  // gotGoodbye for this src.
784  bool flag;
785 
786  // for source validation:
787  uint32 badSeqNum;
788  uint8 probation; // packets in sequence before valid.
789  uint16 baseSeqNum;
792  uint32 seqNumAccum;
793  };
794 
799  bool
800  isRegistered(uint32 ssrc);
801 
811  getSourceBySSRC(uint32 ssrc, bool& created);
812 
823  bool
824  BYESource(uint32 ssrc);
825 
833  bool
834  removeSource(uint32 ssrc);
835 
836  inline SyncSourceLink* getFirst()
837  { return first; }
838 
839  inline SyncSourceLink* getLast()
840  { return last; }
841 
842  inline uint32
843  getMembersCount()
844  { return Members::getMembersCount(); }
845 
846  inline void
847  setMembersCount(uint32 n)
849 
850  inline uint32
851  getSendersCount()
852  { return Members::getSendersCount(); }
853 
854  static const size_t defaultMembersHashSize;
855  static const uint32 SEQNUMMOD;
856 
857 private:
859 
861  operator=(const MembershipBookkeeping &o);
862 
867  void
868  endMembers();
869 
870  // Hash table with sources of RTP and RTCP packets
871  uint32 sourceBucketsNum;
872  SyncSourceLink** sourceLinks;
873  // List of sources, ordered from older to newer
874  SyncSourceLink* first, * last;
875 };
876 
884  protected MembershipBookkeeping
885 {
886 public:
893  {
894  public:
895  typedef std::forward_iterator_tag iterator_category;
897  typedef ptrdiff_t difference_type;
898  typedef const SyncSource* pointer;
899  typedef const SyncSource& reference;
900 
901  SyncSourcesIterator(SyncSourceLink* l = NULL) :
902  link(l)
903  { }
904 
906  link(si.link)
907  { }
908 
909  reference operator*() const
910  { return *(link->getSource()); }
911 
912  pointer operator->() const
913  { return link->getSource(); }
914 
916  link = link->getNext();
917  return *this;
918  }
919 
921  SyncSourcesIterator result(*this);
922  ++(*this);
923  return result;
924  }
925 
926  friend bool operator==(const SyncSourcesIterator& l,
927  const SyncSourcesIterator& r)
928  { return l.link == r.link; }
929 
930  friend bool operator!=(const SyncSourcesIterator& l,
931  const SyncSourcesIterator& r)
932  { return l.link != r.link; }
933 
934  private:
935  SyncSourceLink *link;
936  };
937 
940 
942  { return SyncSourcesIterator(NULL); }
943 
953  const AppDataUnit*
954  getData(uint32 stamp, const SyncSource* src = NULL);
955 
956 
963  bool
964  isWaiting(const SyncSource* src = NULL) const;
965 
972  uint32
973  getFirstTimestamp(const SyncSource* src = NULL) const;
974 
997  void
998  setMinValidPacketSequence(uint8 packets)
999  { minValidPacketSequence = packets; }
1000 
1001  uint8
1002  getDefaultMinValidPacketSequence() const
1003  { return defaultMinValidPacketSequence; }
1004 
1009  uint8
1010  getMinValidPacketSequence() const
1011  { return minValidPacketSequence; }
1012 
1013  void
1014  setMaxPacketMisorder(uint16 packets)
1015  { maxPacketMisorder = packets; }
1016 
1017  uint16
1018  getDefaultMaxPacketMisorder() const
1019  { return defaultMaxPacketMisorder; }
1020 
1021  uint16
1022  getMaxPacketMisorder() const
1023  { return maxPacketMisorder; }
1024 
1030  void
1031  setMaxPacketDropout(uint16 packets) // default: 3000.
1032  { maxPacketDropout = packets; }
1033 
1034  uint16
1035  getDefaultMaxPacketDropout() const
1036  { return defaultMaxPacketDropout; }
1037 
1038  uint16
1039  getMaxPacketDropout() const
1040  { return maxPacketDropout; }
1041 
1042  // default value for constructors that allow to specify
1043  // members table s\ize
1044  inline static size_t
1045  getDefaultMembersSize()
1046  { return defaultMembersSize; }
1047 
1056  void
1057  setInQueueCryptoContext(CryptoContext* cc);
1058 
1069  void
1070  removeInQueueCryptoContext(CryptoContext* cc);
1071 
1079  CryptoContext*
1080  getInQueueCryptoContext(uint32 ssrc);
1081 
1082 protected:
1086  IncomingDataQueue(uint32 size);
1087 
1089  { }
1090 
1103  bool checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink,
1104  bool is_new, InetAddress& na,
1105  tpport_t tp);
1106 
1122  void setSourceExpirationPeriod(uint8 intervals)
1123  { sourceExpirationPeriod = intervals; }
1124 
1131  virtual size_t
1132  takeInDataPacket();
1133 
1134  void renewLocalSSRC();
1135 
1146  getWaiting(uint32 timestamp, const SyncSource *src = NULL);
1147 
1163  bool
1164  recordReception(SyncSourceLink& srcLink, const IncomingRTPPkt& pkt,
1165  const timeval recvtime);
1166 
1173  void
1174  recordExtraction(const IncomingRTPPkt& pkt);
1175 
1176  void purgeIncomingQueue();
1177 
1184  inline virtual void
1185  onNewSyncSource(const SyncSource&)
1186  { }
1187 
1188 protected:
1205  inline virtual bool
1206  onRTPPacketRecv(IncomingRTPPkt&)
1207  { return true; }
1208 
1217  inline virtual void onExpireRecv(IncomingRTPPkt&)
1218  { return; }
1219 
1233  inline virtual bool
1234  onSRTPPacketError(IncomingRTPPkt& pkt, int32 errorCode)
1235  { return false; }
1236 
1237  inline virtual bool
1238  end2EndDelayed(IncomingRTPPktLink&)
1239  { return false; }
1240 
1256  bool
1257  insertRecvPacket(IncomingRTPPktLink* packetLink);
1258 
1270  virtual size_t
1271  recvData(unsigned char* buffer, size_t length,
1272  InetHostAddress& host, tpport_t& port) = 0;
1273 
1274  virtual size_t
1275  getNextDataPacketSize() const = 0;
1276 
1277  mutable ThreadLock recvLock;
1278  // reception queue
1280  // values for packet validation.
1281  static const uint8 defaultMinValidPacketSequence;
1282  static const uint16 defaultMaxPacketMisorder;
1283  static const uint16 defaultMaxPacketDropout;
1287  static const size_t defaultMembersSize;
1289  mutable Mutex cryptoMutex;
1290  std::list<CryptoContext *> cryptoContexts;
1291 };
1292  // iqueue
1294 
1295 #ifdef CCXX_NAMESPACES
1296 }
1297 #endif
1298 
1299 #endif //CCXX_RTP_IQUEUE_H_
1300