File Coverage

blib/lib/Combine/SD_SQL.pm
Criterion Covered Total %
statement 11 189 5.8
branch 0 34 0.0
condition 0 12 0.0
subroutine 4 26 15.3
pod 0 23 0.0
total 15 284 5.2


line stmt bran cond sub pod time code
1             ## $Id: SD_SQL.pm 325 2011-05-26 14:26:00Z it-aar $
2              
3             # 2002-2006 Anders Ardö
4             #
5             # See the file LICENCE included in the distribution.
6              
7             package Combine::SD_SQL;
8              
9 1     1   1038 use Combine::Config;
  1         3  
  1         42  
10 1     1   870 use Combine::selurl;
  1         3  
  1         35  
11 1     1   9 use DBI;
  1         3  
  1         4714  
12              
13             sub new {
14 1     1 0 18 my ($class) = @_;
15 1         8 my $sv = Combine::Config::Get('MySQLhandle');
16 0           my $recyclelinks = Combine::Config::Get('AutoRecycleLinks');
17 0           my $waitIntervalHost = Combine::Config::Get('WaitIntervalHost');
18 0 0 0       if (!defined($waitIntervalHost) || $waitIntervalHost < 0) {
19 0           $waitIntervalHost=60;
20             }
21 0           my $self = {
22             dbcon => $sv,
23             recyclelinks => $recyclelinks,
24             waitIntervalHost => $waitIntervalHost,
25             };
26              
27             # Prepare handles for all SQL statements and save them in %{$self}
28              
29             #Statement handles for lock
30 0           $self->{updateUrlsLock} = $sv->prepare(qq{UPDATE urldb SET urllock=UNIX_TIMESTAMP()+?, retries=LAST_INSERT_ID(retries) WHERE urlid=?;});
31 0           $self->{updateHostLock} = $sv->prepare(qq{UPDATE urldb SET netloclock=UNIX_TIMESTAMP()+? WHERE netlocid=?;});
32 0           $self->{updateHostUrlLock} = $sv->prepare(qq{UPDATE urldb SET urllock=GREATEST(urllock,UNIX_TIMESTAMP())+? WHERE netlocid=?;});
33              
34 0           $self->{updateRetries} = $sv->prepare(qq{UPDATE netlocs SET retries=? WHERE netlocid=?;});
35              
36             #Statement handles for get
37 0           $self->{getStatus} = $self->{dbcon}->prepare(qq{SELECT status,schedulealgorithm FROM admin;});
38 0           $self->{updateAlg} = $self->{dbcon}->prepare(qq{UPDATE admin SET schedulealgorithm=?});
39              
40             # $self->{updateHosts} = $sv->prepare(qq{UPDATE urldb SET netloclock=UNIX_TIMESTAMP()+$waitIntervalHost WHERE netlocid=?;});
41 0           $self->{updateUrls} = $sv->prepare(qq{UPDATE urldb SET urllock=UNIX_TIMESTAMP()+?, harvest=0 WHERE urlid=?;});
42 0           $self->{setQueId} = $sv->prepare(qq{UPDATE admin SET queid=LAST_INSERT_ID(queid+1);});
43             # $self->{getUrl} = $sv->prepare(qq{SELECT netlocid,urlid FROM que WHERE queid=LAST_INSERT_ID();});
44             # ($hostid,$urlid,$url_str, $netlocStr, $urlPath)=$self->{getUrl}->fetchrow_array;
45 0           $self->{getUrl} = $sv->prepare(qq{SELECT que.netlocid,que.urlid,urlstr,netlocstr,path FROM que,urls,netlocs WHERE queid=LAST_INSERT_ID() AND netlocs.netlocid=que.netlocid AND urls.urlid=que.urlid;});
46             # $self->{getUrlStr} = $sv->prepare(qq{SELECT urlstr FROM urls WHERE urlid=?;});
47 0           $self->{getUrlId} = $sv->prepare(qq{SELECT urlid FROM urls where url=?;});
48 0           $self->{getCheckedDate} = $sv->prepare(qq{SELECT UNIX_TIMESTAMP(lastchecked) FROM recordurl WHERE recordurl.urlid=?;});
49 0           $self->{lockTables} = $sv->prepare(qq{LOCK TABLES admin WRITE, que WRITE, urldb READ LOCAL, urls READ LOCAL, netlocs READ LOCAL;});
50 0           $self->{unlockTables} = $sv->prepare(qq{UNLOCK TABLES;});
51 0           $self->{deleteQue} = $sv->prepare(qq{DELETE FROM que;});
52 0           $self->{resetQueId} = $sv->prepare(qq{UPDATE admin SET queid=LAST_INSERT_ID(0);});
53 0           $self->{resetId} = $sv->prepare(qq{ALTER TABLE que AUTO_INCREMENT=1;});
54              
55             #fill que in URL scheduling order
56 0           $self->{fillQue} = $sv->prepare(qq{INSERT INTO que SELECT netlocid,urlid,NULL
57             FROM urldb WHERE netloclock < UNIX_TIMESTAMP() AND
58             urllock < UNIX_TIMESTAMP() AND
59             harvest=1 GROUP BY netlocid;});
60             #SELECT host,hostlock,sum(1) as nbrhost FROM urldb WHERE hostlock < UNIX_TIMESTAMP() AND urllock < UNIX_TIMESTAMP() AND harvest=1 GROUP BY host ORDER BY nbrhost DESC;
61             #Ger en lista sorterad med den host som har flest URLer først
62              
63 0           $self->{fillBigQue} = $sv->prepare(qq{INSERT INTO que SELECT netlocid,urlid,NULL
64             FROM urldb WHERE urllock < UNIX_TIMESTAMP() AND harvest=1 GROUP BY netlocid;});
65              
66             #Statement handles for put
67             # $self->{insertUrls} = $sv->prepare(qq{INSERT IGNORE INTO urldb SET netlocid=?, urlid=?, urllock=UNIX_TIMESTAMP(), netloclock=UNIX_TIMESTAMP();});# OK to fail!
68 0           $self->{insertUrls} = $sv->prepare(qq{INSERT IGNORE INTO urldb SET netlocid=?, urlid=?;});# OK to fail!
69 0           $self->{setHarvest} = $sv->prepare(qq{UPDATE urldb SET harvest=1 WHERE urlid=?;});
70              
71             #print "INIT SD\n";
72 0           bless $self, $class;
73 0           return $self;
74             }
75              
76             sub putNorm {
77 0     0 0   my ($self, $urlstr, $doget) = @_;
78             #Makes a URL normalized and eligeble for harvest, inserted into table urldb if needed.
79 0           my $u = new Combine::selurl($urlstr, undef, 'sloppy' => 1);
80 0 0 0       if ( $u && $u->validate() ) {
81 0           $urlstr = $u->normalise();
82 0           $netlocstr = $u->authority;
83 0           $path_query = $u->path_query;
84              
85 0           my $lsth = $self->{dbcon}->prepare(qq{SELECT netlocid,urlid FROM urls WHERE urlstr=?;});
86 0           $lsth->execute($urlstr);
87 0           my ($netlocid,$urlid) = $lsth->fetchrow_array;
88 0 0         if ( !defined($urlid) ) {
89 0           $self->{dbcon}->prepare(qq{INSERT IGNORE INTO netlocs SET netlocstr=?;})->execute($netlocstr);
90 0           ($netlocid) = $self->{dbcon}->selectrow_array(qq{SELECT netlocid FROM netlocs WHERE netlocstr='$netlocstr';});
91 0           $self->{dbcon}->prepare(qq{INSERT IGNORE INTO urls SET urlstr=?, netlocid=?, path=?;})->execute($urlstr,$netlocid,$path_query);
92 0           $lsth->execute($urlstr);
93 0           ($netlocid,$urlid) = $lsth->fetchrow_array;
94             }
95              
96 0           $self->{insertUrls}->execute($netlocid,$urlid);
97 0           $self->{setHarvest}->execute($urlid);
98 0 0         if ($doget) {return ($netlocid,$urlid,$urlstr, $netlocstr, $path_query, 0); }
  0            
99             }
100 0           return 1; #Evt urlid?
101             }
102              
103             sub get_url {
104 0     0 0   my ($self) = @_;
105             #Extracts the next URL from the queue of ready URLs (table que)
106             #If no URLs in the queue, try to fill queue from table urls
107 0           my $hostid=0;
108 0           my $urlid=0;
109 0           my ($url_str, $netlocStr, $urlPath);
110 0           my $InProgress=60; # Needs to be parametrized??
111              
112 0           $self->{getStatus}->execute;
113 0           my ($status,$schedAlg)=$self->{getStatus}->fetchrow_array;
114             # print "In GetUrl ...";
115             ##Combine getStatus and setQueId in one query??
116 0 0         if ( $status eq 'open' ) {
117 0           $self->{setQueId}->execute;
118 0           $self->{getUrl}->execute;
119 0           ($hostid,$urlid,$url_str, $netlocStr, $urlPath)=$self->{getUrl}->fetchrow_array;
120             # print "Got: ($hostid,$urlid,$url_str, $netlocStr, $urlPath)\n";
121 0 0         if (!defined($hostid)) { ($hostid,$urlid,$url_str, $netlocStr, $urlPath)=generateQue($self,$schedAlg); }
  0            
122              
123 0           $self->{updateHostLock}->execute($self->{waitIntervalHost},$hostid);
124 0           $self->{updateUrls}->execute($InProgress,$urlid);
125             }
126 0 0         if ( !defined($urlid) ) {
127             # print "getUrl returns fail \n";
128 0           return (0,0,'','','');
129             } else {
130             # my $url_str = $self->{getUrlStr}->execute($urlid);
131             # print "getUrl returns OK ($hostid,$urlid,$url_str, $netlocStr, $urlPath)\n";
132 0           $self->{getCheckedDate}->execute($urlid);
133 0           my $checkedDate = $self->{getCheckedDate}->fetchrow_array;
134 0           return ($hostid,$urlid,$url_str, $netlocStr, $urlPath, $checkedDate);
135             }
136             }
137              
138             sub generateQue {
139 0     0 0   my ($self,$alg) = @_;
140             # Fills the queue of ready URLs (table que) from the table urls.
141             # Table que must be cleared first. queid in table admin must be reset.
142             # It should return (hostid,url) if possible
143              
144             # It must be executed in mutual exclusion which is done by first
145             # locking tables, and when the lock is obtained checking that the
146             # queue still is empty. If empty try to fill it, otherwise just
147             # return the first url from the queue.
148              
149 0           my ($hostid,$urlid,$url_str, $netlocStr, $urlPath, $r);
150 0 0         if ($self->{recyclelinks}) { RecycleNew($self); }
  0            
151 0           $self->{lockTables}->execute;
152 0           $self->{setQueId}->execute;
153 0           $self->{getUrl}->execute;
154 0           ($hostid,$urlid,$url_str, $netlocStr, $urlPath)=$self->{getUrl}->fetchrow_array;
155 0 0         if ( !defined($hostid) ) { #still no URLs in que => OK to update it
156 0           $self->{deleteQue}->execute;
157 0           $self->{resetQueId}->execute;
158 0           $self->{resetId}->execute;
159 0           $self->{fillQue}->execute; #ORIG
160             #FIX SQL query dependent on configVar ScheduleAlgorithm!!!!!!!!!!
161              
162             # extract URL from que to return
163 0           $self->{setQueId}->execute;
164 0           $self->{getUrl}->execute;
165 0           ($hostid,$urlid,$url_str, $netlocStr, $urlPath)=$self->{getUrl}->fetchrow_array;
166             }
167 0           $self->{unlockTables}->execute;
168 0           return ($hostid,$urlid,$url_str, $netlocStr, $urlPath);
169             }
170              
171             sub UpdateLastCheckTime {
172             # do this in lock by checking the code (304)??
173 0     0 0   my ($self,$urlid) = @_;
174 0           $self->{dbcon}->do(qq{UPDATE recordurl SET lastchecked=NOW() WHERE urlid='$urlid';});
175             }
176              
177             sub lock {
178 0     0 0   my ($self,$netlocid,$urlid,$time,$code) = @_;
179             # my $sdqRetries = 10;
180             # lock $url for $time seconds
181 0           $self->{updateUrlsLock}->execute($time,$urlid);
182              
183             # Compatibility functions
184             # handle deletions when to many retries (nrt=1000)???
185             # handle $code ...
186 0 0 0       if ( ($code eq '408') || &HTTP::Status::is_server_error($code) ) {
187 0           my $RetryDelay=18000;
188             #$self->{updateRetries}->execute($failcnt+1, $netlocid);
189             # lock $netlocid for $RetryDelay seconds
190 0           $self->{updateHostLock}->execute($RetryDelay, $netlocid);
191              
192             # increase failcnt
193             # if ( $failcnt > $sqdRetries ) { #delete host
194             #?????????
195             # }
196             }
197              
198 0           return;
199             }
200              
201             sub hostlock {
202 0     0 0   my ($self,$netlocid,$time) = @_;
203             # lock $netlocid for $time seconds
204 0           $self->{updateHostLock}->execute($time, $netlocid);
205             #? $self->{updateRetries}->execute($failcnt+1, $host);
206 0           return;
207             }
208              
209             #Recycling functions
210             sub RecycleNew {
211             #adds all valid entries in table newlinks to the harvest-database urldb
212 0     0 0   my ($self) = @_;
213 0           my ($netlocid,$urlid,$urlstr);
214 0           my $sth = $self->{dbcon}->prepare(qq{SELECT newlinks.netlocid,newlinks.urlid,urlstr FROM newlinks,urls WHERE newlinks.urlid=urls.urlid;});
215 0           $self->{dbcon}->prepare(qq{LOCK TABLES newlinks WRITE, urls READ LOCAL, urldb WRITE;})->execute;
216 0           $sth->execute;
217 0           my $ant=0; my $tot=0;
  0            
218 0           while ( ($netlocid,$urlid,$urlstr)=$sth->fetchrow_array ) {
219 0           $tot++;
220 0           my $u = new Combine::selurl($urlstr);
221 0 0 0       if ( $u && $u->validate() ) {
222 0           $self->{insertUrls}->execute($netlocid,$urlid);
223 0           $self->{setHarvest}->execute($urlid);
224 0           $ant++;
225             }
226             }
227 0           $self->{dbcon}->prepare(qq{DELETE FROM newlinks;})->execute;
228 0           $self->{dbcon}->prepare(qq{UNLOCK TABLES;})->execute;
229 0           return "$ant links (out of $tot) recycled\n";
230             }
231              
232             sub RecycleOld {
233             #marks all existing records for harvest
234             #use selurl to remove existing records not passing rules ??
235 0     0 0   my ($self) = @_;
236 0           my $sth = $self->{dbcon}->prepare(qq{UPDATE urldb,recordurl SET harvest=1 WHERE urldb.urlid=recordurl.urlid;})->execute();
237 0           return "$sth old records marked for harvesting\n";
238             }
239             #End; Recycling functions
240              
241             #Init (if needed) MEMORY SQL tables. Called from bin/start.pl
242             sub initMemoryTables {
243 0     0 0   my ($self) = @_;
244 0           $self->{getStatus}->execute;
245 0           my ($status,$tmp)=$self->{getStatus}->fetchrow_array;
246 0 0         if ( $status eq '' ) {
247 0           $self->{dbcon}->do(qq{LOCK TABLES admin WRITE;});
248 0           $self->{getStatus}->execute;
249 0           ($status,$tmp)=$self->{getStatus}->fetchrow_array;
250 0 0         if ( $status eq '' ) {
251             #!#Use value from ConfigVar SchedulingAlgorithm
252 0           $self->{dbcon}->do(qq{INSERT INTO admin VALUES ('open','default',0);});
253 0           warn("Memory table 'admin' initialised to ('open','default',0)");
254             }
255 0           $self->{dbcon}->do(qq{UNLOCK TABLES;});
256             }
257             }
258             #
259              
260             sub hosts {
261 0     0 0   my ($self) = @_;
262 0           my $sth = $self->{dbcon}->prepare(qq{SELECT urldb.netlocid,netlocstr,netloclock-UNIX_TIMESTAMP(),sum(1) as ant FROM urldb,netlocs WHERE harvest=1 AND urllock
263              
264 0           $sth->execute;
265 0           while ( ($netlocid,$netlocstr,$tid,$ant)=$sth->fetchrow_array ) {
266 0 0         if ( $tid<0 ) { $t = 'READY'; } else { $t = "WAITING ($tid s)"; }
  0            
  0            
267 0           $res .= "$netlocstr (ID=$netlocid) $ant urls; $t\n";
268             }
269 0           return $res;
270             }
271              
272             #Remove?
273             sub recordsNo {
274 0     0 0   my ($self) = @_;
275 0           my $sth = $self->{dbcon}->prepare(qq{SELECT count(*) FROM hdb;});
276              
277 0           $sth->execute;
278 0           while ( ($ant)=$sth->fetchrow_array ) {
279 0           $res = "There are $ant records in the database\n";
280             }
281 0           return $res;
282             }
283              
284             sub howmany {
285 0     0 0   my ($self) = @_;
286 0           my ($tot) = $self->{dbcon}->selectrow_array(qq{SELECT count(urlid) FROM urldb WHERE harvest=1;});
287 0           my ($ant) = $self->{dbcon}->selectrow_array(qq{SELECT max(queid) FROM que;});
288 0 0         if (!defined($ant)) { $ant=0; }
  0            
289 0           return "$tot waiting for harvest and $ant in ready queue\n";
290             }
291             sub algorithm {
292 0     0 0   my ($self,$which) = @_;
293 0           return "Not implemented yet.\n";
294             }
295             sub sort {
296 0     0 0   my ($self) = @_;
297 0           return "Not implemented yet.\n";
298             }
299             sub stat {
300 0     0 0   my ($self) = @_;
301 0           my ($stat,$present) = $self->{dbcon}->selectrow_array(qq{SELECT status,queid FROM admin;});
302 0           my ($ant) = $self->{dbcon}->selectrow_array(qq{SELECT max(queid) FROM que;});
303 0 0         if (!defined($ant)) { $ant=0; }
  0            
304 0 0         if ($stat eq 'open') { $compat = "Stat: OPENED\n"; }
  0            
305 0           return "Status: $stat; At $present of $ant in ready queue\n$compat";
306             }
307             sub reSchedule {
308 0     0 0   my ($self) = @_;
309 0           $self->{dbcon}->do(qq{UPDATE admin SET queid=99999999;});
310             }
311             sub open {
312 0     0 0   my ($self) = @_;
313 0           $self->{dbcon}->do(qq{UPDATE admin SET status='open';});
314 0           return &stat($self);
315             }
316             sub stop {
317 0     0 0   my ($self) = @_;
318 0           $self->{dbcon}->do(qq{UPDATE admin SET status='stopped';});
319 0           return &stat($self);
320             }
321             sub pause {
322 0     0 0   my ($self) = @_;
323 0           $self->{dbcon}->do(qq{UPDATE admin SET status='paused' WHERE status='open';});
324 0           return &stat($self);
325             }
326             sub continue {
327 0     0 0   my ($self) = @_;
328 0           $self->{dbcon}->do(qq{UPDATE admin SET status='open' WHERE status='paused';});
329 0           return &stat($self);
330             }
331              
332             sub sd_close {
333 0     0 0   my ($self) = @_;
334 0           return undef;
335             }
336              
337             sub destroy {
338 0     0 0   my ($self) = @_;
339 0           return undef;
340             }
341              
342             1;
343              
344             __END__