Skip to content

Instantly share code, notes, and snippets.

@furyutei
Last active April 10, 2024 02:10
Show Gist options
  • Save furyutei/9c68ce0d1f2e2308c418d75d17f2d0da to your computer and use it in GitHub Desktop.
Save furyutei/9c68ce0d1f2e2308c418d75d17f2d0da to your computer and use it in GitHub Desktop.
[Python][Windows] Outlookのフォルダの更新を行なう実装例

[Python][Windows] Outlookのフォルダの更新を行なう実装例

PythonからOutlookに「すべてのフォルダーの送受信」(っぽいもの)を指示し、完了するまで待つような実装例です。
※元ネタ:かづき@kadukijobさんのツイート(ポスト)より

使い方

その1

を呼び出し側のソースコードと同じフォルダにおいて、

import OutlookSyncObject
result = OutlookSyncObject.sync_all()

のようにして使う。
【注意】呼び出し側でpywin32関連機能(pythoncom、win32com.client、win32api等)を使用する場合、OutlookSyncObjectよりも「後に」importすること。

その2

Pythonからみてパスが通ったフォルダ、もしくは呼び出し側のソースコードと同じフォルダに「olsync」というフォルダを作って、その中に

の2ファイルを配置しておき、

import olsync
result = olsync.sync_all()

のようにして使う。
【注意】呼び出し側でpywin32関連機能(pythoncom、win32com.client、win32api等)を使用する場合、olsyncよりも「後に」importすること。

その3

を呼び出し側のソースコードと同じフォルダにおいて、

import OutlookSimpleSyncAll
result = OutlookSimpleSyncAll.simple_sync_all()

のようにして使う。
※OutlookSimpleSyncAll.pyの中身はpywin32 - Sync all folders outlook win32 Python - Stack Overflowと同様のものを関数(simple_sync_all())化し、若干の手直し(handleのclose処理追加、print→loggingに置換)を行ったもの。
 主にその1やその2でうまく動かない場合の動作確認用。

免責事項・注意事項

  • お約束ですが、無保証です。自己責任においてご利用ください。
  • 現状では、Outlookのファイル>オプション>詳細設定>送受信
    [送受信(D)...]
    に登録されている全送受信グループのコレクションを問答無用で同期しますので、実用の際にはグループ指定ができるようにするなど適宜ソースコードを改修してください。
    →sync_all()の引数にtarget_syncobject_name_listオプションを追加したので同期対象となるグループの指定も一応可能となりました。
  • 自分の環境(Windows 10 Pro+MS365(32bit)のOutlook、Python 3.11)でちょっとしか試していないので、うまく動かなくてもご容赦を……(不具合報告や、こうすればうまくいくなどのご教示は歓迎)。

参考

ライセンス

The MIT License
Copyright (c) 風柳(furyu)@furyutei

# -*- coding: utf-8 -*-
""" """
__author__ = 'furyu'
__version__ = '0.1.0'
__copyright__ = 'Copyright (c) 風柳(furyu)@furyutei'
__license__ = 'The MIT License'
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from OutlookSyncObject import sync_all, trigger_send_and_receive_all, get_sync_thread, KeyboadInterruptOperation, SyncResult
__all__ = [
'sync_all',
'trigger_send_and_receive_all',
'get_sync_thread',
'KeyboadInterruptOperation',
'SyncResult'
]
# -*- coding: utf-8 -*-
# [pywin32 - Sync all folders outlook win32 Python - Stack Overflow](https://stackoverflow.com/questions/69786402/sync-all-folders-outlook-win32-python/69789358#69789358)
# のコードを関数化(simple_sync_all)し、handlerを閉じる後処理を追加したもの
import pythoncom
import win32com.client as wc
import win32api
import win32con
import logging
def simple_sync_all():
logger = logging.getLogger('simple_sync_all')
nSync = 0
#Class to handle the SyncObject events from Outlook
class SyncHandler(object):
#Save the dispatch interface to identify the SyncObject if needed
def set(self,disp):
self._disp = disp
def _process(self):
#Decrement sync counter
nonlocal nSync
nSync -= 1
logger.debug(f'{nSync} Sync remaining to complete')
#If nothing left to sync, then send WM_QUIT to thread message loop
if nSync <= 0:
logger.debug('Closing message loop')
win32api.PostThreadMessage(win32api.GetCurrentThreadId(), win32con.WM_QUIT, 0, 0)
def OnSyncStart(self):
logger.debug(f'Starting sync on: {self._disp.Name}')
def OnSyncEnd(self):
logger.debug(f'Sync complete on: {self._disp.Name}')
self._process()
def OnProgress(self,state,description,value,max):
logger.debug(f'Sync progress: {self._disp.Name} {description} {100 * value/max}%')
def OnError(self,code,description):
logger.debug(f'Sync Error: {description}')
self._process()
#Get the application Dispatch interface
ol = wc.Dispatch('Outlook.Application')
syncObjects = ol.GetNamespace('MAPI').SyncObjects
#Find out how many SyncObjects we have
nSync = syncObjects.Count
logger.debug(f'Number of Sync objects: {nSync}')
handler_list = []
for syncObj in syncObjects:
#Set up an event handler and specify the event handler class
handler = wc.WithEvents(syncObj,SyncHandler)
handler.set(syncObj)
syncObj.Start()
handler_list.append(handler)
#This will block until a WM_QUIT message is sent to the message queue
pythoncom.PumpMessages()
for handler in handler_list:
handler.close()
logger.debug('Sync completed')
if __name__ == '__main__':
# [Test]
log_level = logging.DEBUG # logging.(NOTSET|DEBUG|INFO|WARNING|ERROR|CRITICAL)
logging.basicConfig(level=log_level, format='{asctime} [{levelname}] {name}: {message}', style='{')
simple_sync_all()
# -*- coding: utf-8 -*-
"""
References
----------
- [pywin32 - Sync all folders outlook win32 Python - Stack Overflow](https://stackoverflow.com/a/69789358)
"""
import sys
sys.coinit_flags = 0 # 0: Multi-Threaded Apartment model (MTA), 2: Single-Threaded Apartment model (STA)
# [備忘] pythoncomの機能をマルチスレッドで利用しようとする場合、pythoncomをimportする前にsys.coinit_flags = 0を設定しておく必要あり
import pythoncom
import win32com.client
import win32api
import win32con
import threading
import signal
import logging
from enum import Enum
from types import SimpleNamespace
KeyboadInterruptOperation = Enum('KeyboadInterruptOperation', [
'NotSet',
'Ignore',
'StopSync',
], start=0)
class SyncResult(SimpleNamespace):
def __init__(self,
sync_object_name = '(unknown)'
):
self.sync_object_name = sync_object_name
self.is_complete = False
self.is_timeout = False
self.is_interrupt = False
self.error_code = 0
self.error_description = ''
self.interrupt = None
def __dir__(self):
return ['sync_object_name', 'is_complete', 'is_timeout', 'is_interrupt', 'error_code', 'error_description', 'interrupt',]
def get_sync_thread(
sync_object,
timeout = None,
start_individually = True
):
class SyncHandler(object):
@property
def sync_result(self):
return self.__sync_result
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__logger = logging.getLogger(f'SyncHandler')
self.__stop_requested = False
self.__sync_result = SyncResult()
def init(self, parameters):
self.__parameters = parameters
self.__sync_result.sync_object_name = sync_object_name = parameters.sync_object.Name
self.__parent_thread_id = parent_thread_id = parameters.parent_thread_id if hasattr(parameters, 'parent_thread_id') else win32api.GetCurrentThreadId()
self.__logger = logging.getLogger(f'SyncHandler({sync_object_name})[{parent_thread_id}]')
def OnSyncStart(self):
self.__debug_print('OnSyncStart')
def OnProgress(self, _State_, _Description_, _Value_, _Max_):
self.__debug_print(f'OnProgress: {_State_}.{_Description_} ({100 * _Value_ / _Max_}%)')
def OnSyncEnd(self):
self.__debug_print(f'OnSyncEnd')
if self.__sync_result.error_code == 0:
self.__sync_result.is_complete = True
self.__stop_request()
def OnError(self, _Code_, _Description_):
self.__debug_print(f'OnError: {_Code_}.{_Description_}')
self.__sync_result.error_code = _Code_
self.__sync_result.error_description = _Description_
self.__stop_request()
def __stop_request(self):
if not self.__stop_requested:
self.__post_quit()
self.__stop_requested = True
def __post_quit(self):
#parent_thread_id = win32api.GetCurrentThreadId() # [備忘] この時点で取得したスレッドIDは期待したものにならないことに注意
parent_thread_id = self.__parent_thread_id
win32api.PostThreadMessage(parent_thread_id, win32con.WM_QUIT, 0, 0) # WM_QUIT送信によりSyncThread中のPumpMessages()を中断
def __debug_print(self, *args, **kwargs):
self.__logger.debug(*args, **kwargs)
class SyncThread(threading.Thread):
@property
def sync_result(self):
return self.__sync_result
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__sync_object = sync_object = self._kwargs['sync_object']
self.__logger = logging.getLogger(f'SyncThread({sync_object.Name})')
self.__timeout = self._kwargs.get('timeout')
self.__start_individually = self._kwargs.get('start_individually', True)
self.__is_timeout = False
self.__timer = None
self.__is_interrupt = False
self.__stop_requested = False
self.__sync_result = SyncResult(sync_object_name=sync_object.Name)
def run(self):
sync_object = self.__sync_object
self.__logger = logging.getLogger(f'SyncThread({sync_object.Name})[{self.ident}]')
timeout = self.__timeout
if timeout is not None:
def on_timeout():
self.__debug_print(f'Timeout')
self.__is_timeout = True
self.__timer = None
self.__stop_request()
self.__timer = timer = threading.Timer(timeout, on_timeout)
timer.start()
pythoncom.CoInitializeEx(pythoncom.COINIT_MULTITHREADED)
sync_handler = win32com.client.WithEvents(sync_object, SyncHandler)
sync_handler.init(SimpleNamespace(
sync_object = sync_object,
parent_thread_id = self.ident,
))
if self.__start_individually:
sync_object.Start()
pythoncom.PumpMessages() # WM_QUITを受信するまでブロック
sync_handler.close()
sync_result = sync_handler.sync_result
if self.__is_timeout:
sync_result.is_timeout = True
else:
if self.__timer is not None:
self.__timer.cancel()
self.__timer = None
sync_result.is_interrupt = self.__is_interrupt
self.__sync_result = sync_result
pythoncom.CoUninitialize()
def stop_request(self):
self.__debug_print('stop_request() called')
self.__is_interrupt = True
self.__stop_request()
def __stop_request(self):
if not self.__stop_requested:
self.__sync_object.Stop()
self.__stop_requested = True
def __debug_print(self, *args, **kwargs):
self.__logger.debug(*args, **kwargs)
return SyncThread(kwargs=dict(sync_object=sync_object, timeout=timeout, start_individually=start_individually))
def trigger_send_and_receive_all(
outlook_application = None
):
logger = logging.getLogger(f'trigger_send_and_receive_all()')
if outlook_application is None:
outlook_application = win32com.client.Dispatch('Outlook.Application')
namespace = outlook_application.GetNamespace('MAPI')
namespace.SendAndReceive(showProgressDialog = False)
logger.debug('done')
def sync_all(
timeout = 600,
outlook_application = None,
keyboard_interrupt_operation = None,
target_syncobject_name_list = None,
use_trigger_send_and_receive_all = False
):
logger = logging.getLogger(f'sync_all()')
if outlook_application is None:
outlook_application = win32com.client.Dispatch('Outlook.Application')
if keyboard_interrupt_operation is None:
keyboard_interrupt_operation = KeyboadInterruptOperation.NotSet
try:
target_syncobject_name_list = list(target_syncobject_name_list)
except Exception as error:
target_syncobject_name_list = []
target_specified = (0 < len(target_syncobject_name_list))
if target_specified:
use_trigger_send_and_receive_all = False
interrupt = None
if keyboard_interrupt_operation != KeyboadInterruptOperation.NotSet:
def signal_handler(signum, frame):
nonlocal interrupt
interrupt = KeyboardInterrupt()
signal.signal(signal.SIGINT, signal_handler)
namespace = outlook_application.GetNamespace('MAPI')
sync_info_list = []
for sync_object in namespace.SyncObjects:
if target_specified and (not sync_object.Name in target_syncobject_name_list):
continue
sync_thread = get_sync_thread(sync_object, timeout=timeout, start_individually=(not use_trigger_send_and_receive_all))
sync_info_list.append(SimpleNamespace(
sync_object = sync_object,
sync_thread = sync_thread,
))
sync_thread.start()
if len(sync_info_list) < 1:
return []
if use_trigger_send_and_receive_all:
trigger_send_and_receive_all(outlook_application)
sync_result_list = []
for sync_info in sync_info_list:
sync_thread = sync_info.sync_thread
'''
#sync_thread.join() # [備忘] 単にjoin()とするとキーボード割込(Ctrl+C)が効かない
'''
'''
#while sync_thread.is_alive():
# sync_thread.join(timeout=0.1) # [備忘] join()でtimeoutを設定することでキーボード割込(Ctrl+C)が効くようになる
'''
'''
#while sync_thread.is_alive():
# if interrupt is not None:
# sync_thread.stop_request()
# sync_thread.join()
# break
#
# sync_thread.join(timeout=0.1)
# try:
# sync_thread.join(timeout=1)
# except KeyboardInterrupt as error:
# # [備忘]
# # なぜかここにきた段階で該当スレッドが終了してしまっている(sync_thread.is_alive()がFalseになる)
# # →やむを得ず、interruptの書き換えはsignal.signal(signal.SIGINT, signal_handler)で登録したsignal_handler()内で実施
# interrupt = error
'''
while sync_thread.is_alive():
if (keyboard_interrupt_operation == KeyboadInterruptOperation.StopSync) and (interrupt is not None):
sync_thread.stop_request()
sync_thread.join()
break
sync_thread.join(timeout=0.1)
sync_result = sync_thread.sync_result
sync_result.interrupt = interrupt
logger.debug(sync_result)
sync_result_list.append(sync_result)
return sync_result_list
if __name__ == '__main__':
# [Test]
log_level = logging.DEBUG # logging.(NOTSET|DEBUG|INFO|WARNING|ERROR|CRITICAL)
logging.basicConfig(level=log_level, format='{asctime} [{levelname}] {name}: {message}', style='{')
for timeout in (None, 1.0,):
logging.info(f'(*) timeout={timeout}')
sync_result_list = sync_all(timeout=timeout)
#sync_result_list = sync_all(timeout=timeout, keyboard_interrupt_operation=KeyboadInterruptOperation.StopSync)
#sync_result_list = sync_all(timeout=timeout, use_trigger_send_and_receive_all=True)
#sync_result_list = sync_all(timeout=timeout, target_syncobject_name_list=['新しい送受信グループ #1',])
#sync_result_list = sync_all(timeout=timeout, target_syncobject_name_list=['新しい送受信グループ #1',], use_trigger_send_and_receive_all=True,)
for sync_result in sync_result_list:
logging.info('-'*80)
for name in dir(sync_result):
value = getattr(sync_result, name)
if (value is not None) and (not (type(value) in (bool, int, float, str,))):
value = '<object>'
logging.info(f'{name}: {value}')
logging.info('='*80)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment