From 744be0ed1453ac9017909fdfafb7f4eddd785812 Mon Sep 17 00:00:00 2001
From: "nova @ broiler" <dstndstn@gmail.com>
Date: Thu, 16 Apr 2020 13:41:52 +0000
Subject: [PATCH] misc process_submissions fixes

---
 __init__.py                |   2 +-
 blind/engine-main.c        |   4 ++
 net/find.py                |  64 ++++++++++++++++------
 net/log.py                 |   3 --
 net/process_submissions.py | 106 ++++++++++++++++++++++++-------------
 net/views/image.py         |   3 +-
 6 files changed, 123 insertions(+), 59 deletions(-)

diff --git a/__init__.py b/__init__.py
index 4581cc12a..070cd64f8 100644
--- a/__init__.py
+++ b/__init__.py
@@ -1 +1 @@
-__version__ = '0.78-33-gffadbe61'
+__version__ = '0.79-26-g3e790b81'
diff --git a/blind/engine-main.c b/blind/engine-main.c
index 96ce400ca..49a10623d 100644
--- a/blind/engine-main.c
+++ b/blind/engine-main.c
@@ -72,6 +72,8 @@ static an_option_t myopts[] = {
      "run the index files in parallel"},
     {'D', "data-log file", required_argument, "file",
      "log data to the given filename"},
+    {'j', "job-id", required_argument, "jobid",
+     "IGNORED; purely to allow process to contain the job id!"},
 };
 
 static void print_help(const char* progname, bl* opts) {
@@ -122,6 +124,8 @@ int main(int argc, char** args) {
         if (c == -1)
             break;
         switch (c) {
+	case 'j':
+	    break;
         case 'D':
             datalog = optarg;
             break;
diff --git a/net/find.py b/net/find.py
index c71497d5f..2bd0f10db 100644
--- a/net/find.py
+++ b/net/find.py
@@ -21,13 +21,21 @@ import logging
 logging.basicConfig(format='%(message)s',
                     level=logging.DEBUG)
 
-def bounce_try_dojob(jobid):
-    print('Trying Job ID', jobid)
-    job = Job.objects.filter(id=jobid)[0]
-    print('Found Job', job)
-    return try_dojob(job, job.user_image)
-
-if __name__ == '__main__':
+def bounce_try_dojob(X):
+    jobid, solve_command, solve_locally = X
+    try:
+        from process_submissions import try_dojob
+        print('Trying Job ID', jobid)
+        job = Job.objects.filter(id=jobid)[0]
+        print('Found Job', job)
+        r = try_dojob(job, job.user_image, solve_command, solve_locally)
+        print('Job result for', job, ':', r)
+        return r
+    except:
+        import traceback
+        traceback.print_exc()
+
+def main():
     import optparse
     parser = optparse.OptionParser('%(prog)')
     parser.add_option('-s', '--sub', type=int, dest='sub', help='Submission ID')
@@ -57,6 +65,9 @@ if __name__ == '__main__':
     parser.add_option('--delete', action='store_true', default=False,
               help='Delete everything associated with the given image')
 
+    parser.add_option('--delextra', action='store_true', default=False,
+                      help='Delete extraneous duplicate jobs?')
+    
     parser.add_option('--hide', action='store_true', default=False,
                       help='For a UserImage, set publicly_visible=False')
     
@@ -67,7 +78,12 @@ if __name__ == '__main__':
         parser.print_help()
         sys.exit(-1)
 
-    if opt.ssh or opt.empty:
+    if opt.threads is not None:
+        mp = multiproc(opt.threads)
+    else:
+        mp = None
+        
+    if opt.ssh or opt.empty or opt.delextra:
         subs = Submission.objects.all()
         if opt.minsub:
             subs = subs.filter(id__gt=opt.minsub)
@@ -109,28 +125,41 @@ if __name__ == '__main__':
                         allfailed = False
                         break
 
+                if opt.delextra:
+                    print('Delextra:', len(jobs), 'jobs', len(uis), 'uis; failedjob:', failedjob)
+                    if len(jobs) > 1 and failedjob is not None:
+                        print('Delextra: delete', failedjob)
+
             if not allfailed:
                 continue
             print('All jobs failed for sub', sub.id) #, 'via ssh failure')
-            failedsubs.append(sub)
+            #failedsubs.append(sub)
             failedjobs.append(failedjob)
 
-        print('Found total of', len(failedsubs), 'failed Submissions')
+        print('Found total of', len(failedsubs), 'failed Submissions and', len(failedjobs), 'failed Jobs')
         if opt.rerun:
             from process_submissions import try_dosub, try_dojob
+
             if opt.threads is not None:
-                mp = multiproc(opt.threads)
                 args = []
                 for j in failedjobs:
                     if j is None:
                         continue
-                    args.append(j.id) #, j.user_image))
+                    args.append((j.id, opt.solve_command, opt.solve_locally))
                 mp.map(bounce_try_dojob, args)
-
             else:
-                for sub in failedsubs:
-                    print('Re-trying sub', sub.id)
-                    try_dosub(sub, 1)
+                for job in failedjobs:
+                    if job is None:
+                        continue
+                    print('Re-trying job', job.id)
+                    try_dojob(job, job.user_image, opt.solve_command, opt.solve_locally)
+
+            # FIXME -- failed subs??
+            # 
+            # else:
+            #     for sub in failedsubs:
+            #         print('Re-trying sub', sub.id)
+            #         try_dosub(sub, 1)
             
 
     if opt.sub:
@@ -232,3 +261,6 @@ if __name__ == '__main__':
             if im.display_image:
                 im.display_image.delete()
                 
+if __name__ == '__main__':
+    main()
+    
diff --git a/net/log.py b/net/log.py
index bb5a9ff3b..7c75f3cbb 100644
--- a/net/log.py
+++ b/net/log.py
@@ -1,8 +1,5 @@
 import logging
 logger = logging.getLogger(__name__)
-#debug = logger.debug
-#loginfo = logger.info
-#logmsg = logger.info
 
 def _getstr(args):
     try:
diff --git a/net/process_submissions.py b/net/process_submissions.py
index 71c41298d..e85ea7ecd 100644
--- a/net/process_submissions.py
+++ b/net/process_submissions.py
@@ -1,49 +1,26 @@
 #! /usr/bin/env python3
 
-
-
 import os
 import sys
 from subprocess import check_output #nosec
 
 # add .. to PYTHONPATH
 path = os.path.realpath(__file__)
-#print('My path', path)
 basedir = os.path.dirname(os.path.dirname(path))
-
-#print('PYTHONPATH is', os.environ['PYTHONPATH'])
-
-#print('Adding basedir', basedir, 'to PYTHONPATH')
 sys.path.append(basedir)
 
 # add ../blind and ../util to PATH
 os.environ['PATH'] += ':' + os.path.join(basedir, 'blind')
 os.environ['PATH'] += ':' + os.path.join(basedir, 'util')
 
-# print('sys.path is:')
-# for x in sys.path:
-#     print('  ', x)
-#
-# print('PATH is:', os.environ['PATH'])
-
 os.environ['DJANGO_SETTINGS_MODULE'] = 'astrometry.net.settings'
 
 import django
 django.setup()
 
-try:
-    import pyfits
-except ImportError:
-    try:
-        from astropy.io import fits as pyfits
-    except ImportError:
-        raise ImportError("Cannot import either pyfits or astropy.io.fits")
-
-
 import tempfile
 import traceback
 from urllib.parse import urlparse
-import logging
 import urllib.request, urllib.parse, urllib.error
 import shutil
 import multiprocessing
@@ -54,10 +31,6 @@ import gzip
 import zipfile
 import math
 
-import logging
-logging.basicConfig(format='%(message)s',
-                    level=logging.DEBUG)
-
 from astrometry.util import image2pnm
 from astrometry.util.filetype import filetype_short
 from astrometry.util.run_command import run_command
@@ -66,7 +39,6 @@ from astrometry.util.util import Tan
 from astrometry.util import util as anutil
 from astrometry.util.fits import *
 
-#import astrometry.net.settings as settings
 import settings
 settings.LOGGING['loggers'][''] = {
     'handlers': ['console'],
@@ -81,10 +53,10 @@ from django.db import DatabaseError
 from django.db.models import Q
 
 from logging.config import dictConfig
-
 dictConfig(settings.LOGGING)
 
-
+import logging
+logging.basicConfig(format='%(message)s', level=logging.DEBUG)
 
 def is_tarball(fn):
     logmsg('is_tarball: %s' % fn)
@@ -181,18 +153,24 @@ def create_job_logger(job):
     return MyLogger(logger)
 
 def try_dojob(job, userimage, solve_command, solve_locally):
-    print('try_dojob', job)
+    print('try_dojob', job, '(sub', job.user_image.submission.id, ')')
     try:
-        return dojob(job, userimage, solve_command=solve_command,
+        r = dojob(job, userimage, solve_command=solve_command,
                      solve_locally=solve_locally)
+        print('try_dojob', job, 'completed:', r)
+        return r
     except OSError as e:
-        import os.errno
+        print('OSError processing job', job)
+        print(e)
+        import errno
         # Too many open files
-        if e.errno == os.errno.EMFILE:
+        print('e.errno:', e.errno)
+        print('errno.EMFILE:', errno.EMFILE)
+        if e.errno == errno.EMFILE:
+            print('Too many open files -- exiting!')
             sys.exit(-1)
     except IOError as e:
         import errno
-        # Too many open files
         print('Caught IOError')
         print('Errno:', e.errno)
         if e.errno == errno.EMFILE:
@@ -623,6 +601,10 @@ def create_source_list(df):
     img = None
     fits = None
     source_type = None
+
+    path = df.get_path()
+    print('path:', path, type(path))
+    
     try:
         # see if disk file is a fits list
         fits = fits_table(str(df.get_path()))
@@ -648,7 +630,7 @@ def create_source_list(df):
             fits = fits_table(fitsfn)
             source_type = 'text'
         except Exception as e:
-            logmsg('Traceback:\n' + traceback.format_exc())
+            #logmsg('Traceback:\n' + traceback.format_exc())
             logmsg('fitsfn: %s' % fitsfn)
             logmsg(e)
             logmsg('file is not a text list')
@@ -741,7 +723,9 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,
         all_user_images = UserImage.objects.annotate(job_count=Count('jobs'))
         newuis = all_user_images.filter(job_count=0)
         if newuis.count():
-            print('Found', len(newuis), 'UserImages without Jobs:', [u.id for u in newuis])
+            #print('Found', len(newuis), 'UserImages without Jobs:', [u.id for u in newuis])
+            #print('Found', len(newuis), 'UserImages without Jobs.')
+            print('Jobs queued:', len(newuis))
 
         runsubs = me.subs.filter(finished=False)
         if subresults != lastsubs:
@@ -777,6 +761,18 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,
                 qj.success = res.successful()
                 qj.save()
 
+                try:
+                    job = Job.objects.get(id=jid)
+                    print('Job:', job)
+                    print('  status:', job.status)
+                    print('  error message:', job.error_message)
+                    #logfn = job.get_log_file()
+                    print('  log file tail:')
+                    print(job.get_log_tail(nlines=10))
+                    
+                except:
+                    print('exception getting job')
+                
                 if res.successful():
                     print('result:', res.get(),)
             print()
@@ -789,7 +785,6 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,
 
         # FIXME -- order by user, etc
 
-
         ## HACK -- order 'newuis' to do the newest ones first... helpful when there
         # is a big backlog.
         newuis = newuis.order_by('-submission__submitted_on')
@@ -813,6 +808,41 @@ def main(dojob_nthreads, dosub_nthreads, refresh_rate, max_sub_retries,
                 try_dosub(sub, max_sub_retries)
 
 
+        if dojob_pool:
+            n_add = dojob_nthreads - len(jobresults)
+            if n_add <= 0:
+                # Already full!
+                continue
+            # Queue some new ones -- randomly select from waiting users
+            newuis = list(newuis)
+            start_newuis = []
+            import numpy as np
+            from collections import Counter
+            print('Need to start', n_add, 'jobs;', len(newuis), 'eligible uis')
+            while n_add > 0:
+                cusers = Counter([u.user for u in newuis])
+                print('Jobs queued:', len(newuis), 'by', len(cusers), 'users; top:')
+                for k,user in cusers.most_common(5):
+                    try:
+                        print('  ', k, user, user.get_profile().display_name)
+                    except:
+                        print('  ', k, user)
+                users = list(cusers.keys())
+                print(len(users), 'eligible users')
+                if len(users) == 0:
+                    break
+                iu = np.random.randint(len(users))
+                user = users[iu]
+                print('Selected user', user)
+                for ui in newuis:
+                    if ui.user == user:
+                        print('Selected ui', ui)
+                        newuis.remove(ui)
+                        start_newuis.append(ui)
+                        n_add -= 1
+                        break
+            newuis = start_newuis
+
         for userimage in newuis:
             job = Job(user_image=userimage)
             job.set_queued_time()
diff --git a/net/views/image.py b/net/views/image.py
index 48341f332..f8dfa930e 100644
--- a/net/views/image.py
+++ b/net/views/image.py
@@ -1062,7 +1062,8 @@ if __name__ == '__main__':
 
     from django.test import Client
     c = Client()
-    r = c.get('/user_images/2676353')
+    #r = c.get('/user_images/2676353')
+    r = c.get('/extraction_image_full/4005556')
     #print(r)
     with open('out.html', 'wb') as f:
         for x in r:
-- 
GitLab